121 lines
2.2 KiB
Go
121 lines
2.2 KiB
Go
package buffer
|
|
|
|
// TODO:
|
|
// - the write internally can get a max in the request, and this way multiple writes can be executed without
|
|
// synchronization steps in between
|
|
// - the current segment can be passed in to the writer to copy to
|
|
// - additional segments can be requested from the writer if necessary
|
|
// - a single channel with buffer=1 can be used to exchange
|
|
// - the message can be called 'syncMessage'
|
|
// - filter out the errors coming from the writer, but originated from the reader. Can errors.Is used for this?
|
|
// If yes, replace the captured error
|
|
// - check if the existing stdlib interface is good for the content writer
|
|
|
|
import (
|
|
"errors"
|
|
"io"
|
|
)
|
|
|
|
type syncMessage struct {
|
|
p []byte
|
|
n int
|
|
err error
|
|
}
|
|
|
|
type contentWriter struct {
|
|
w chan<- syncMessage
|
|
r <-chan syncMessage
|
|
}
|
|
|
|
type content struct {
|
|
wrt io.WriterTo
|
|
started bool
|
|
w chan syncMessage
|
|
r chan syncMessage
|
|
}
|
|
|
|
func mkcontent(wrt io.WriterTo) *content {
|
|
return &content{
|
|
wrt: wrt,
|
|
w: make(chan syncMessage, 1),
|
|
r: make(chan syncMessage, 1),
|
|
}
|
|
}
|
|
|
|
func (w contentWriter) Write(p []byte) (int, error) {
|
|
var n int
|
|
for {
|
|
if len(p) == 0 {
|
|
return n, nil
|
|
}
|
|
|
|
sm, ok := <-w.r
|
|
if !ok {
|
|
return n, ErrContentAbort
|
|
}
|
|
|
|
ni := copy(sm.p, p)
|
|
n += ni
|
|
p = p[ni:]
|
|
w.w <- syncMessage{n: ni}
|
|
}
|
|
}
|
|
|
|
func (c *content) writeTo() {
|
|
w := contentWriter{
|
|
w: c.r,
|
|
r: c.w,
|
|
}
|
|
|
|
var sm syncMessage
|
|
_, err := c.wrt.WriteTo(w)
|
|
if err != nil {
|
|
sm = syncMessage{err: err}
|
|
}
|
|
|
|
if err == nil {
|
|
sm = syncMessage{err: io.EOF}
|
|
}
|
|
|
|
w.w <- sm
|
|
close(w.w)
|
|
}
|
|
|
|
func (c *content) Read(p []byte) (int, error) {
|
|
if !c.started {
|
|
go c.writeTo()
|
|
c.started = true
|
|
}
|
|
|
|
c.w <- syncMessage{p: p}
|
|
sm := <-c.r
|
|
return sm.n, sm.err
|
|
}
|
|
|
|
func (c *content) close() error {
|
|
if !c.started {
|
|
return nil
|
|
}
|
|
|
|
// there can be only zero or one messages to receive, but we need to wait for the channel be closed by
|
|
// writeTo:
|
|
var errs []error
|
|
close(c.w)
|
|
for sm := range c.r {
|
|
|
|
// not using errors.Is, because the writer logic may have combined the abort error with another
|
|
// user logic error, that needs to be reported:
|
|
if sm.err == ErrContentAbort {
|
|
continue
|
|
}
|
|
|
|
if sm.err == io.EOF {
|
|
continue
|
|
}
|
|
|
|
errs = append(errs, sm.err)
|
|
}
|
|
|
|
return errors.Join(errs...)
|
|
}
|