1
0
buffer/content.go

121 lines
2.2 KiB
Go
Raw Normal View History

2026-02-22 16:30:37 +01:00
package buffer
// TODO:
// - the write internally can get a max in the request, and this way multiple writes can be executed without
// synchronization steps in between
// - the current segment can be passed in to the writer to copy to
// - additional segments can be requested from the writer if necessary
// - a single channel with buffer=1 can be used to exchange
// - the message can be called 'syncMessage'
// - filter out the errors coming from the writer, but originated from the reader. Can errors.Is used for this?
// If yes, replace the captured error
// - check if the existing stdlib interface is good for the content writer
import (
"errors"
"io"
)
type syncMessage struct {
p []byte
n int
err error
}
type contentWriter struct {
w chan<- syncMessage
r <-chan syncMessage
}
type content struct {
wrt io.WriterTo
started bool
w chan syncMessage
r chan syncMessage
}
func mkcontent(wrt io.WriterTo) *content {
return &content{
wrt: wrt,
w: make(chan syncMessage, 1),
r: make(chan syncMessage, 1),
}
}
func (w contentWriter) Write(p []byte) (int, error) {
var n int
for {
if len(p) == 0 {
return n, nil
}
sm, ok := <-w.r
if !ok {
return n, ErrContentAbort
}
ni := copy(sm.p, p)
n += ni
p = p[ni:]
w.w <- syncMessage{n: ni}
}
}
func (c *content) writeTo() {
w := contentWriter{
w: c.r,
r: c.w,
}
var sm syncMessage
_, err := c.wrt.WriteTo(w)
if err != nil {
sm = syncMessage{err: err}
}
if err == nil {
sm = syncMessage{err: io.EOF}
}
w.w <- sm
close(w.w)
}
func (c *content) Read(p []byte) (int, error) {
if !c.started {
go c.writeTo()
c.started = true
}
c.w <- syncMessage{p: p}
sm := <-c.r
return sm.n, sm.err
}
func (c *content) close() error {
if !c.started {
return nil
}
// there can be only zero or one messages to receive, but we need to wait for the channel be closed by
// writeTo:
var errs []error
close(c.w)
for sm := range c.r {
// not using errors.Is, because the writer logic may have combined the abort error with another
// user logic error, that needs to be reported:
if sm.err == ErrContentAbort {
continue
}
if sm.err == io.EOF {
continue
}
errs = append(errs, sm.err)
}
return errors.Join(errs...)
}