1
0

fix reader for blocking reads

This commit is contained in:
Arpad Ryszka 2026-04-06 20:41:37 +02:00
parent e2a24dda24
commit 513f9a3585
3 changed files with 693 additions and 708 deletions

View File

@ -128,7 +128,8 @@ func (p *syncedForeverPool[T]) Put(i T) {
}
func TestPoolUsage(t *testing.T) {
for _, cr := range []createReader{buffer.BufferedReader, testContent} {
for title, cr := range map[string]createReader{"reader": buffer.BufferedReader, "content": testContent} {
t.Run(title, func(t *testing.T) {
t.Run("allocate", func(t *testing.T) {
t.Run("read", func(t *testing.T) {
g := &gen{max: 1 << 15}
@ -195,8 +196,8 @@ func TestPoolUsage(t *testing.T) {
t.Fatal("invalid content")
}
if p.alloc != 3 {
t.Fatal("invalid allocation count")
if p.alloc != 1 {
t.Fatal("invalid allocation count", p.alloc)
}
})
@ -550,11 +551,7 @@ func TestPoolUsage(t *testing.T) {
}
if ok {
t.Fatal("invalid delimiter")
}
if !bytes.Equal(b, generate(1<<15)) {
t.Fatal("invalid content")
t.Fatal("invalid delimiter", len(b))
}
})
})
@ -891,6 +888,7 @@ func TestPoolUsage(t *testing.T) {
}
})
})
})
}
}

View File

@ -16,8 +16,7 @@ type reader struct {
err error
}
func (r *reader) fillSegment() (fn int) {
for {
func (r *reader) fillSegment() (fn int, full bool) {
if r.err != nil {
return
}
@ -25,24 +24,20 @@ func (r *reader) fillSegment() (fn int) {
seg := r.segments[len(r.segments)-1]
start := r.offset + r.len - r.lastSegStart
if start == len(seg) {
full = true
return
}
rn, err := r.in.Read(seg[start:len(seg)])
if rn == 0 && err == nil {
rn, err = r.in.Read(seg[start:len(seg)])
fn, r.err = r.in.Read(seg[start:len(seg)])
if fn == 0 && r.err == nil {
fn, r.err = r.in.Read(seg[start:len(seg)])
}
if rn == 0 && err == nil {
r.len += fn
full = fn == len(seg)-start
return
}
fn += rn
r.len += rn
r.err = err
}
}
func (r *reader) allocate() {
segment, err := r.options.Pool.Get()
if err != nil {
@ -62,26 +57,28 @@ func (r *reader) allocate() {
r.segments = append(r.segments, segment)
}
func (r *reader) fill(n int) {
func (r *reader) fill(to int) int {
var n int
for {
if r.err != nil {
return
return n
}
if r.len >= n {
return
if r.len >= to {
return n
}
if len(r.segments) == 0 || r.offset+r.len == r.lastSegStart+len(r.segments[len(r.segments)-1]) {
r.allocate()
if r.err != nil {
return
return n
}
}
fn := r.fillSegment()
if fn == 0 {
return
fn, full := r.fillSegment()
n += fn
if fn == 0 || !full {
return n
}
}
}
@ -309,8 +306,10 @@ func (r *reader) read(p []byte) (int, error) {
}
}
var partialRead bool
if r.len == 0 {
r.fillSegment()
_, full := r.fillSegment()
partialRead = !full
}
ni := r.copy(p)
@ -321,6 +320,9 @@ func (r *reader) read(p []byte) (int, error) {
r.consume(ni)
p = p[ni:]
n += ni
if partialRead {
break
}
}
if r.err != nil && r.len == 0 {
@ -390,30 +392,16 @@ func (r *reader) readUTF8(max int) ([]rune, int, error) {
break
}
var (
b []byte
nullRead bool
)
for {
b = r.view(n, 4)
if len(b) == 4 || utf8.FullRune(b) {
break
}
if r.err != nil {
break
}
var nullRead bool
b := r.view(n, 4)
if len(b) < 4 && !utf8.FullRune(b) {
clen := r.len
r.fill(4 + 2*(max-len(runes)))
nullRead = r.len == clen && r.err == nil
if nullRead {
break
}
r.fill(n + 4)
b = r.view(n, 4)
nullRead = r.len == clen
}
if nullRead || len(b) == 0 {
if nullRead && r.err == nil || len(b) == 0 {
break
}
@ -490,7 +478,7 @@ func (r *reader) writeTo(w io.Writer) (int64, error) {
}
}
fn := r.fillSegment()
fn, _ := r.fillSegment()
if fn == 0 && r.err == nil {
return n, io.ErrNoProgress
}

View File

@ -12,8 +12,8 @@ func TestReadUTF8(t *testing.T) {
t.Run(title, func(t *testing.T) {
t.Run("read all after error", func(t *testing.T) {
g := &gen{
rng: utf8Range,
max: 24,
rng: utf8W2Range,
max: 30,
fastErr: true,
}
@ -25,27 +25,27 @@ func TestReadUTF8(t *testing.T) {
}
if len(runes) != 12 {
t.Fatal("short read", len(runes))
t.Fatal("invalid read 0", len(runes), n0)
}
if string(runes) != string(generateFrom(utf8Range, n0)) {
if string(runes) != string(generateFrom(utf8W2Range, n0)) {
t.Fatal("invalid content")
}
runes, n1, err := r.ReadUTF8(12)
runes1, n1, err := r.ReadUTF8(12)
if err != nil {
t.Fatal(err)
}
if len(runes) != 3 {
t.Fatal("short read", len(runes))
if len(runes1) != 3 {
t.Fatal("invalid read 1", len(runes1), n0, n1)
}
if string(runes) != string(generateFrom(utf8Range, n0+n1)[n0:]) {
t.Fatal("invalid content", string(runes), string(generateFrom(utf8Range, n0+n1)[n0:]))
if string(runes1) != string(generateFrom(utf8W2Range, n0+n1)[n0:]) {
t.Fatal("invalid content", string(runes1), string(generateFrom(utf8W2Range, n0+n1)[n0:]))
}
runes, n2, err := r.ReadUTF8(12)
runes2, n2, err := r.ReadUTF8(12)
if !errors.Is(err, io.EOF) {
t.Fatal(err)
}
@ -54,8 +54,8 @@ func TestReadUTF8(t *testing.T) {
t.Fatal("unexpected consumption")
}
if len(runes) != 0 {
t.Fatal("unexpected read", len(runes))
if len(runes2) != 0 {
t.Fatal("unexpected read", len(runes2))
}
})
@ -106,7 +106,6 @@ func TestReadUTF8(t *testing.T) {
}
o := buffer.Options{Pool: buffer.NoPool(9)}
r := cr(g, o)
runes, n, err := r.ReadUTF8(12)
if err != nil {
@ -149,7 +148,7 @@ func TestReadUTF8(t *testing.T) {
for i := 0; i < 3; i++ {
runes, n, err = r.ReadUTF8(24)
if len(runes) != 0 || n != 1 || err != nil {
t.Fatal("failed to read out broken end")
t.Fatal("failed to read out broken end", len(runes), n, err == nil)
}
}