diff --git a/content.go b/content.go index 71a7489..7cbcaf7 100644 --- a/content.go +++ b/content.go @@ -41,7 +41,7 @@ func (w contentWriter) Write(p []byte) (int, error) { sm, ok := <-w.r if !ok { - return n, ErrContentAbort + return n, ErrAbort } ni := copy(sm.p, p) @@ -106,7 +106,7 @@ func (c *content) close() error { // not using errors.Is, because the writer logic may have combined the abort error with another // user logic error, that needs to be reported: - if sm.err == ErrContentAbort { + if sm.err == ErrAbort { continue } diff --git a/lib.go b/lib.go index dd1f94e..94dfde4 100644 --- a/lib.go +++ b/lib.go @@ -51,9 +51,10 @@ var ( // ErrZeroAllocation is returned when the used pool returned a zero length byte slice. ErrZeroAllocation = errors.New("zero allocation") - // ErrContentAbort is returned to the writer process in case of buffered content, when the reader - // experienced an error. - ErrContentAbort = errors.New("content pipe aborted") + // ErrAbort is returned to the writer process in case of buffered content, when the reader + // experienced an error. ErrAbort is returned to the reader process, when the resources were released + // using Free(), and any of the Read operations is called again. + ErrAbort = errors.New("read aborted") ) // DefultPool initializes a synchronized pool that stores and returns byte slices of allocSize length. It can be @@ -101,7 +102,7 @@ func BufferedReader(in io.Reader, o Options) Reader { // The provided WriterTo instances need to be safe to call in goroutines other than they were created in. The // writer function returns with nil error, it will be interpreted as EOF on the reader side. When the reader // side experiences an error, and the writer still has content to be written, the passed in io.Writer will -// return an ErrContentAbort error. +// return an ErrAbort error. func BufferedContent(c io.WriterTo, o Options) Reader { if c == nil { return Reader{} @@ -209,3 +210,15 @@ func (r Reader) WriteTo(w io.Writer) (int64, error) { return r.reader.writeTo(w) } + +// Free releases the resource held by the Reader, and puts back the underlying byte buffers into the used pool. +// +// It is used when a multi-step read operation is aborted. When a preceeding read operation resulted in an +// error, it is not necessary to call Free(). +func (r Reader) Free() { + if r.reader == nil { + return + } + + r.reader.free() +} diff --git a/lib_test.go b/lib_test.go index b39c44a..eb85d9e 100644 --- a/lib_test.go +++ b/lib_test.go @@ -150,6 +150,159 @@ func TestLib(t *testing.T) { } }) }) + + t.Run("free", func(t *testing.T) { + t.Run("from blank state", func(t *testing.T) { + p := &fakePool{allocSize: 1 << 6} + r := buffer.BufferedReader(&gen{max: 1 << 12}, buffer.Options{Pool: p}) + r.Free() + b := make([]byte, 1<<9) + n, err := r.Read(b) + if n != 0 || !errors.Is(err, buffer.ErrAbort) { + t.Fatal(n, err) + } + + if p.alloc != 0 || p.free != 0 { + t.Fatal(p.alloc, p.free) + } + }) + + t.Run("with multiple segments", func(t *testing.T) { + p := &fakePool{allocSize: 1 << 6} + r := buffer.BufferedReader(&gen{max: 1 << 12}, buffer.Options{Pool: p}) + b, ok, err := r.ReadBytes([]byte("123"), 1<<6+1<<5) + if ok || err != nil || p.alloc != 2 { + t.Fatal(len(b), ok, err, p.alloc) + } + + r.Free() + b = make([]byte, 1<<9) + n, err := r.Read(b) + if n != 0 || !errors.Is(err, buffer.ErrAbort) { + t.Fatal(n, err) + } + + if p.alloc != 2 || p.free != 2 { + t.Fatal(p.alloc, p.free) + } + }) + + t.Run("from err state", func(t *testing.T) { + p := &fakePool{allocSize: 1 << 6} + g := &gen{ + max: 1 << 12, + errAfter: []int{1 << 6}, + } + + r := buffer.BufferedReader(g, buffer.Options{Pool: p}) + b := make([]byte, 1<<9) + n, err := r.Read(b) + if n != 1<<6 || err != nil { + t.Fatal(n, err) + } + + n, err = r.Read(b) + if n != 0 || !errors.Is(err, errTest) { + t.Fatal(n, err) + } + + r.Free() + b = make([]byte, 1<<9) + n, err = r.Read(b) + if n != 0 || !errors.Is(err, errTest) { + t.Fatal(n, err) + } + + if p.alloc != 1 || p.free != 1 { + t.Fatal(p.alloc, p.free) + } + }) + + t.Run("with content", func(t *testing.T) { + c := buffer.ContentFunc(func(w io.Writer) (int64, error) { + var n int64 + for i := 0; i < 3; i++ { + ni, err := w.Write([]byte("123456789")[i*3 : i*3+3]) + n += int64(ni) + if err != nil { + return n, err + } + } + + return n, nil + }) + + p := &fakePool{allocSize: 2} + o := buffer.Options{Pool: p} + r := buffer.BufferedContent(c, o) + b := make([]byte, 3) + for i := 0; i < 2; i++ { + n, err := r.Read(b) + if n != 3 || err != nil { + t.Fatal(n, err) + } + + if string(b) != "123456789"[i*3:i*3+3] { + t.Fatal(string(b)) + } + } + + r.Free() + n, err := r.Read(b) + if n != 0 || !errors.Is(err, buffer.ErrAbort) { + t.Fatal(n, err) + } + + if p.alloc != 1 || p.free != 1 { + t.Fatal(p.alloc, p.free) + } + }) + + t.Run("with content error", func(t *testing.T) { + c := buffer.ContentFunc(func(w io.Writer) (int64, error) { + var n int64 + for i := 0; i < 3; i++ { + ni, err := w.Write([]byte("123456789")[i*3 : i*3+3]) + n += int64(ni) + if err != nil { + return n, err + } + } + + return n, errTest + }) + + p := &fakePool{allocSize: 2} + o := buffer.Options{Pool: p} + r := buffer.BufferedContent(c, o) + b := make([]byte, 3) + for i := 0; i < 3; i++ { + n, err := r.Read(b) + if n != 3 || err != nil { + t.Fatal(n, err) + } + + if string(b) != "123456789"[i*3:i*3+3] { + t.Fatal(string(b)) + } + } + + n, err := r.Read(b) + if n != 0 || !errors.Is(err, errTest) { + t.Fatal(n, err) + } + + r.Free() + n, err = r.Read(b) + if n != 0 || !errors.Is(err, errTest) { + t.Fatal(n, err) + } + + if p.alloc != 1 || p.free != 1 { + t.Fatal(p.alloc, p.free) + } + }) + }) } // -- bench diff --git a/reader.go b/reader.go index 5346b16..0f346d2 100644 --- a/reader.go +++ b/reader.go @@ -148,11 +148,18 @@ func (r *reader) free() { r.segments = r.segments[1:] } + r.len = 0 + r.options.Pool = nil if c, ok := r.in.(interface{ close() error }); ok { if err := c.close(); err != nil { r.err = errors.Join(r.err, err) } } + + r.in = nil + if r.err == nil { + r.err = ErrAbort + } } func (r reader) findSegmentUp(i int) (int, int) {