110 lines
1.6 KiB
Go
110 lines
1.6 KiB
Go
package buffer
|
|
|
|
import (
|
|
"errors"
|
|
"io"
|
|
)
|
|
|
|
type syncMessage struct {
|
|
p []byte
|
|
n int
|
|
err error
|
|
}
|
|
|
|
type contentWriter struct {
|
|
w chan<- syncMessage
|
|
r <-chan syncMessage
|
|
}
|
|
|
|
type content struct {
|
|
wrt io.WriterTo
|
|
started bool
|
|
w chan syncMessage
|
|
r chan syncMessage
|
|
}
|
|
|
|
func mkcontent(wrt io.WriterTo) *content {
|
|
return &content{
|
|
wrt: wrt,
|
|
w: make(chan syncMessage, 1),
|
|
r: make(chan syncMessage, 1),
|
|
}
|
|
}
|
|
|
|
func (w contentWriter) Write(p []byte) (int, error) {
|
|
var n int
|
|
for {
|
|
if len(p) == 0 {
|
|
return n, nil
|
|
}
|
|
|
|
sm, ok := <-w.r
|
|
if !ok {
|
|
return n, ErrContentAbort
|
|
}
|
|
|
|
ni := copy(sm.p, p)
|
|
n += ni
|
|
p = p[ni:]
|
|
w.w <- syncMessage{n: ni}
|
|
}
|
|
}
|
|
|
|
func (c *content) writeTo() {
|
|
w := contentWriter{
|
|
w: c.r,
|
|
r: c.w,
|
|
}
|
|
|
|
var sm syncMessage
|
|
_, err := c.wrt.WriteTo(w)
|
|
if err != nil {
|
|
sm = syncMessage{err: err}
|
|
}
|
|
|
|
if err == nil {
|
|
sm = syncMessage{err: io.EOF}
|
|
}
|
|
|
|
w.w <- sm
|
|
close(w.w)
|
|
}
|
|
|
|
func (c *content) Read(p []byte) (int, error) {
|
|
if !c.started {
|
|
go c.writeTo()
|
|
c.started = true
|
|
}
|
|
|
|
c.w <- syncMessage{p: p}
|
|
sm := <-c.r
|
|
return sm.n, sm.err
|
|
}
|
|
|
|
func (c *content) close() error {
|
|
if !c.started {
|
|
return nil
|
|
}
|
|
|
|
// there can be only zero or one messages to receive, but we need to wait for the channel be closed by
|
|
// writeTo:
|
|
var errs []error
|
|
close(c.w)
|
|
for sm := range c.r {
|
|
|
|
// not using errors.Is, because the writer logic may have combined the abort error with another
|
|
// user logic error, that needs to be reported:
|
|
if sm.err == ErrContentAbort {
|
|
continue
|
|
}
|
|
|
|
if sm.err == io.EOF {
|
|
continue
|
|
}
|
|
|
|
errs = append(errs, sm.err)
|
|
}
|
|
|
|
return errors.Join(errs...)
|
|
}
|