add exported free function to abort reading
This commit is contained in:
parent
52f004dad8
commit
574f6b00e0
@ -41,7 +41,7 @@ func (w contentWriter) Write(p []byte) (int, error) {
|
|||||||
|
|
||||||
sm, ok := <-w.r
|
sm, ok := <-w.r
|
||||||
if !ok {
|
if !ok {
|
||||||
return n, ErrContentAbort
|
return n, ErrAbort
|
||||||
}
|
}
|
||||||
|
|
||||||
ni := copy(sm.p, p)
|
ni := copy(sm.p, p)
|
||||||
@ -106,7 +106,7 @@ func (c *content) close() error {
|
|||||||
|
|
||||||
// not using errors.Is, because the writer logic may have combined the abort error with another
|
// not using errors.Is, because the writer logic may have combined the abort error with another
|
||||||
// user logic error, that needs to be reported:
|
// user logic error, that needs to be reported:
|
||||||
if sm.err == ErrContentAbort {
|
if sm.err == ErrAbort {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
21
lib.go
21
lib.go
@ -51,9 +51,10 @@ var (
|
|||||||
// ErrZeroAllocation is returned when the used pool returned a zero length byte slice.
|
// ErrZeroAllocation is returned when the used pool returned a zero length byte slice.
|
||||||
ErrZeroAllocation = errors.New("zero allocation")
|
ErrZeroAllocation = errors.New("zero allocation")
|
||||||
|
|
||||||
// ErrContentAbort is returned to the writer process in case of buffered content, when the reader
|
// ErrAbort is returned to the writer process in case of buffered content, when the reader
|
||||||
// experienced an error.
|
// experienced an error. ErrAbort is returned to the reader process, when the resources were released
|
||||||
ErrContentAbort = errors.New("content pipe aborted")
|
// using Free(), and any of the Read operations is called again.
|
||||||
|
ErrAbort = errors.New("read aborted")
|
||||||
)
|
)
|
||||||
|
|
||||||
// DefultPool initializes a synchronized pool that stores and returns byte slices of allocSize length. It can be
|
// DefultPool initializes a synchronized pool that stores and returns byte slices of allocSize length. It can be
|
||||||
@ -101,7 +102,7 @@ func BufferedReader(in io.Reader, o Options) Reader {
|
|||||||
// The provided WriterTo instances need to be safe to call in goroutines other than they were created in. The
|
// The provided WriterTo instances need to be safe to call in goroutines other than they were created in. The
|
||||||
// writer function returns with nil error, it will be interpreted as EOF on the reader side. When the reader
|
// writer function returns with nil error, it will be interpreted as EOF on the reader side. When the reader
|
||||||
// side experiences an error, and the writer still has content to be written, the passed in io.Writer will
|
// side experiences an error, and the writer still has content to be written, the passed in io.Writer will
|
||||||
// return an ErrContentAbort error.
|
// return an ErrAbort error.
|
||||||
func BufferedContent(c io.WriterTo, o Options) Reader {
|
func BufferedContent(c io.WriterTo, o Options) Reader {
|
||||||
if c == nil {
|
if c == nil {
|
||||||
return Reader{}
|
return Reader{}
|
||||||
@ -209,3 +210,15 @@ func (r Reader) WriteTo(w io.Writer) (int64, error) {
|
|||||||
|
|
||||||
return r.reader.writeTo(w)
|
return r.reader.writeTo(w)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Free releases the resource held by the Reader, and puts back the underlying byte buffers into the used pool.
|
||||||
|
//
|
||||||
|
// It is used when a multi-step read operation is aborted. When a preceeding read operation resulted in an
|
||||||
|
// error, it is not necessary to call Free().
|
||||||
|
func (r Reader) Free() {
|
||||||
|
if r.reader == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
r.reader.free()
|
||||||
|
}
|
||||||
|
|||||||
153
lib_test.go
153
lib_test.go
@ -150,6 +150,159 @@ func TestLib(t *testing.T) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("free", func(t *testing.T) {
|
||||||
|
t.Run("from blank state", func(t *testing.T) {
|
||||||
|
p := &fakePool{allocSize: 1 << 6}
|
||||||
|
r := buffer.BufferedReader(&gen{max: 1 << 12}, buffer.Options{Pool: p})
|
||||||
|
r.Free()
|
||||||
|
b := make([]byte, 1<<9)
|
||||||
|
n, err := r.Read(b)
|
||||||
|
if n != 0 || !errors.Is(err, buffer.ErrAbort) {
|
||||||
|
t.Fatal(n, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.alloc != 0 || p.free != 0 {
|
||||||
|
t.Fatal(p.alloc, p.free)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("with multiple segments", func(t *testing.T) {
|
||||||
|
p := &fakePool{allocSize: 1 << 6}
|
||||||
|
r := buffer.BufferedReader(&gen{max: 1 << 12}, buffer.Options{Pool: p})
|
||||||
|
b, ok, err := r.ReadBytes([]byte("123"), 1<<6+1<<5)
|
||||||
|
if ok || err != nil || p.alloc != 2 {
|
||||||
|
t.Fatal(len(b), ok, err, p.alloc)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Free()
|
||||||
|
b = make([]byte, 1<<9)
|
||||||
|
n, err := r.Read(b)
|
||||||
|
if n != 0 || !errors.Is(err, buffer.ErrAbort) {
|
||||||
|
t.Fatal(n, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.alloc != 2 || p.free != 2 {
|
||||||
|
t.Fatal(p.alloc, p.free)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("from err state", func(t *testing.T) {
|
||||||
|
p := &fakePool{allocSize: 1 << 6}
|
||||||
|
g := &gen{
|
||||||
|
max: 1 << 12,
|
||||||
|
errAfter: []int{1 << 6},
|
||||||
|
}
|
||||||
|
|
||||||
|
r := buffer.BufferedReader(g, buffer.Options{Pool: p})
|
||||||
|
b := make([]byte, 1<<9)
|
||||||
|
n, err := r.Read(b)
|
||||||
|
if n != 1<<6 || err != nil {
|
||||||
|
t.Fatal(n, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err = r.Read(b)
|
||||||
|
if n != 0 || !errors.Is(err, errTest) {
|
||||||
|
t.Fatal(n, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Free()
|
||||||
|
b = make([]byte, 1<<9)
|
||||||
|
n, err = r.Read(b)
|
||||||
|
if n != 0 || !errors.Is(err, errTest) {
|
||||||
|
t.Fatal(n, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.alloc != 1 || p.free != 1 {
|
||||||
|
t.Fatal(p.alloc, p.free)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("with content", func(t *testing.T) {
|
||||||
|
c := buffer.ContentFunc(func(w io.Writer) (int64, error) {
|
||||||
|
var n int64
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
ni, err := w.Write([]byte("123456789")[i*3 : i*3+3])
|
||||||
|
n += int64(ni)
|
||||||
|
if err != nil {
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return n, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
p := &fakePool{allocSize: 2}
|
||||||
|
o := buffer.Options{Pool: p}
|
||||||
|
r := buffer.BufferedContent(c, o)
|
||||||
|
b := make([]byte, 3)
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
n, err := r.Read(b)
|
||||||
|
if n != 3 || err != nil {
|
||||||
|
t.Fatal(n, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if string(b) != "123456789"[i*3:i*3+3] {
|
||||||
|
t.Fatal(string(b))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Free()
|
||||||
|
n, err := r.Read(b)
|
||||||
|
if n != 0 || !errors.Is(err, buffer.ErrAbort) {
|
||||||
|
t.Fatal(n, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.alloc != 1 || p.free != 1 {
|
||||||
|
t.Fatal(p.alloc, p.free)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("with content error", func(t *testing.T) {
|
||||||
|
c := buffer.ContentFunc(func(w io.Writer) (int64, error) {
|
||||||
|
var n int64
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
ni, err := w.Write([]byte("123456789")[i*3 : i*3+3])
|
||||||
|
n += int64(ni)
|
||||||
|
if err != nil {
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return n, errTest
|
||||||
|
})
|
||||||
|
|
||||||
|
p := &fakePool{allocSize: 2}
|
||||||
|
o := buffer.Options{Pool: p}
|
||||||
|
r := buffer.BufferedContent(c, o)
|
||||||
|
b := make([]byte, 3)
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
|
n, err := r.Read(b)
|
||||||
|
if n != 3 || err != nil {
|
||||||
|
t.Fatal(n, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if string(b) != "123456789"[i*3:i*3+3] {
|
||||||
|
t.Fatal(string(b))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := r.Read(b)
|
||||||
|
if n != 0 || !errors.Is(err, errTest) {
|
||||||
|
t.Fatal(n, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Free()
|
||||||
|
n, err = r.Read(b)
|
||||||
|
if n != 0 || !errors.Is(err, errTest) {
|
||||||
|
t.Fatal(n, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.alloc != 1 || p.free != 1 {
|
||||||
|
t.Fatal(p.alloc, p.free)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// -- bench
|
// -- bench
|
||||||
|
|||||||
@ -148,11 +148,18 @@ func (r *reader) free() {
|
|||||||
r.segments = r.segments[1:]
|
r.segments = r.segments[1:]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.len = 0
|
||||||
|
r.options.Pool = nil
|
||||||
if c, ok := r.in.(interface{ close() error }); ok {
|
if c, ok := r.in.(interface{ close() error }); ok {
|
||||||
if err := c.close(); err != nil {
|
if err := c.close(); err != nil {
|
||||||
r.err = errors.Join(r.err, err)
|
r.err = errors.Join(r.err, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.in = nil
|
||||||
|
if r.err == nil {
|
||||||
|
r.err = ErrAbort
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r reader) findSegmentUp(i int) (int, int) {
|
func (r reader) findSegmentUp(i int) (int, int) {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user