1
0
buffer/reader.go

518 lines
7.9 KiB
Go
Raw Normal View History

2026-02-17 16:58:00 +01:00
package buffer
import (
"errors"
"io"
"unicode/utf8"
)
type reader struct {
options Options
in io.Reader
segments [][]byte
offset int
len int
lastSegStart int
err error
}
func (r *reader) fillSegment(n int) (fn int) {
for {
if r.err != nil {
return
}
if fn >= n {
return
}
seg := r.segments[len(r.segments)-1]
start := r.offset + r.len - r.lastSegStart
end := start + n - fn
if end-start < r.options.ReadSize {
end = start + r.options.ReadSize
}
if end > len(seg) {
end = len(seg)
}
if end == start {
return
}
rn, err := r.in.Read(seg[start:end])
if rn == 0 && err == nil {
rn, err = r.in.Read(seg[start:end])
}
if rn == 0 && err == nil {
return
}
fn += rn
r.len += rn
r.err = err
}
}
func (r *reader) allocate() {
segment, err := r.options.Pool.Get()
if err != nil {
r.err = err
return
}
if len(segment) == 0 {
r.err = ErrZeroAllocation
return
}
if len(r.segments) > 0 {
r.lastSegStart += len(r.segments[len(r.segments)-1])
}
r.segments = append(r.segments, segment)
}
func (r *reader) fill(n int) {
for {
if r.err != nil {
return
}
if r.len >= n {
return
}
if len(r.segments) == 0 || r.offset+r.len == r.lastSegStart+len(r.segments[len(r.segments)-1]) {
r.allocate()
if r.err != nil {
return
}
}
fn := r.fillSegment(n)
if fn == 0 {
return
}
}
}
func (r reader) copy(p []byte) (n int) {
var seg, segStart int
offset := r.offset
for {
if len(p) == 0 {
return
}
if seg == len(r.segments) {
return
}
rl := r.offset + r.len - segStart
if rl > len(r.segments[seg]) {
rl = len(r.segments[seg])
}
ni := copy(p, r.segments[seg][offset:rl])
n += ni
p = p[ni:]
segStart += len(r.segments[seg])
seg++
offset = 0
}
}
func (r *reader) consume(n int) {
r.offset += n
r.len -= n
for {
if len(r.segments) <= 1 {
if r.len == 0 {
r.offset = 0
}
return
}
if r.offset < len(r.segments[0]) {
return
}
r.offset -= len(r.segments[0])
if len(r.segments) > 1 {
r.lastSegStart -= len(r.segments[0])
}
r.options.Pool.Put(r.segments[0])
r.segments = r.segments[1:]
}
}
func (r *reader) free() {
for {
if len(r.segments) == 0 {
return
}
r.options.Pool.Put(r.segments[0])
r.segments = r.segments[1:]
}
}
func (r reader) findSegmentUp(i int) (int, int) {
var seg, segStart int
for {
if r.offset+i >= segStart && r.offset+i < segStart+len(r.segments[seg]) {
return seg, segStart
}
segStart += len(r.segments[seg])
seg++
}
}
func (r reader) findSegmentDown(i int) (int, int) {
seg := len(r.segments) - 1
segStart := r.lastSegStart
for {
if r.offset+i >= segStart && r.offset+i < segStart+len(r.segments[seg]) {
return seg, segStart
}
seg--
segStart -= len(r.segments[seg])
}
}
func match(p0, p1 []byte) (bool, int) {
var i int
for {
if len(p0) == i || len(p1) == i {
return true, len(p0) - len(p1)
}
if p0[i] != p1[i] {
return false, len(p0) - len(p1)
}
i++
}
}
func (r *reader) fillToDelimiter(delimiter []byte, max int) (int, bool) {
var i, seg, segStart int
d := delimiter
for {
if i+len(d) > max {
return 0, false
}
if r.len < i+len(d) {
clen := r.len
r.fill(i + len(d))
if r.len == clen {
return 0, false
}
}
if r.offset+i >= segStart+len(r.segments[seg]) {
segStart += len(r.segments[seg])
seg++
}
first := r.offset + i - segStart
last := first + len(d)
if last > len(r.segments[seg]) {
last = len(r.segments[seg])
}
m, c := match(d, r.segments[seg][first:last])
if !m {
i += 1 - len(delimiter) + len(d)
seg, segStart = r.findSegmentDown(i)
d = delimiter
continue
}
if c > 0 {
c = len(d) - c
i += c
d = d[c:]
continue
}
return i + len(d), true
}
}
func (r reader) view(i, max int) []byte {
if len(r.segments) == 0 {
return nil
}
if max == 0 {
return nil
}
if i >= r.len {
return nil
}
if i+max > r.len {
max = r.len - i
}
var b []byte
seg, segStart := r.findSegmentUp(i)
for {
if len(b) == max {
return b
}
start := r.offset - segStart + i
l := start + max - len(b)
if l > len(r.segments[seg]) {
l = len(r.segments[seg])
}
if len(b) == 0 {
b = r.segments[seg][start:l]
} else {
b = append(b, r.segments[seg][start:l]...)
}
i += l - start
if r.offset+i == segStart+len(r.segments[seg]) {
segStart += len(r.segments[seg])
seg++
}
}
}
func (r *reader) read(p []byte) (int, error) {
if r.err != nil && r.len == 0 {
return 0, r.err
}
// TODO:
// - consider optimizing such that it reads to and copies from the existing segment
// - pool.put errors are not really reader errors
// - pool.get errors are not really reader errors, if the reader can still finish its task
// - consider optimizing other places
// - first implement the pool test cases
// - the size parameter for the pool contradicts the pool functionality
// - defining the segment sizes for the pool conflicts with the buffer options
// - is the read size even necessary? Doesn't it always make sense to fill up the current segment?
r.fill(len(p))
n := r.copy(p)
r.consume(n)
if r.err != nil && r.len == 0 {
r.free()
if n == 0 {
return 0, r.err
}
}
return n, nil
}
func (r *reader) readBytes(delimiter []byte, max int) ([]byte, bool, error) {
if r.err != nil && r.len == 0 {
return nil, false, r.err
}
if len(delimiter) == 0 {
return nil, true, nil
}
var (
p []byte
n int
)
l, ok := r.fillToDelimiter(delimiter, max)
if ok {
p = make([]byte, l)
n = r.copy(p)
r.consume(n)
}
if !ok && r.err != nil {
l = r.len
if l > max {
l = max
}
p = make([]byte, l)
n = r.copy(p)
r.consume(n)
}
if r.err != nil && r.len == 0 {
r.free()
if n == 0 {
return nil, ok, r.err
}
}
return p, ok, nil
}
func (r *reader) readUTF8(max int) ([]rune, int, error) {
if r.err != nil && r.len == 0 {
return nil, 0, r.err
}
var (
runes []rune
n int
)
for {
if len(runes) == max {
break
}
var (
b []byte
nullRead bool
)
for {
b = r.view(n, 4)
if len(b) == 4 || utf8.FullRune(b) {
break
}
if r.err != nil {
break
}
clen := r.len
r.fill(4 + 2*(max-len(runes)))
nullRead = r.len == clen && r.err == nil
if nullRead {
break
}
}
if nullRead || len(b) == 0 {
break
}
r, s := utf8.DecodeRune(b)
if r == utf8.RuneError && s == 1 && len(runes) == 0 {
n = 1
break
}
if r == utf8.RuneError && s == 1 {
break
}
runes = append(runes, r)
n += s
}
r.consume(n)
if r.err != nil && r.len == 0 {
r.free()
if n == 0 {
return nil, 0, r.err
}
}
return runes, n, nil
}
func (r *reader) peek(max int) ([]byte, error) {
if r.err != nil && r.len == 0 {
return nil, r.err
}
r.fill(max)
v := r.view(0, max)
if r.err != nil && r.len == 0 {
r.free()
if len(v) == 0 {
return v, r.err
}
}
return v, nil
}
func (r reader) buffered() []byte {
return r.view(0, r.len)
}
func (r *reader) writeTo(w io.Writer) (int64, error) {
if errors.Is(r.err, io.EOF) && r.len == 0 {
return 0, nil
}
if r.err != nil && r.len == 0 {
return 0, r.err
}
var (
n int64
werr error
)
for {
if r.len == 0 {
if r.err != nil {
break
}
if len(r.segments) == 0 {
r.allocate()
if r.err != nil {
break
}
}
fn := r.fillSegment(len(r.segments[len(r.segments) - 1]))
if fn == 0 && r.err == nil {
return n, io.ErrNoProgress
}
}
wl := len(r.segments[0]) - r.offset
if wl > r.len {
wl = r.len
}
ni, err := w.Write(r.segments[0][r.offset : r.offset+wl])
r.consume(ni)
n += int64(ni)
if err != nil {
werr = err
break
}
if ni < wl {
werr = io.ErrShortWrite
break
}
}
if r.err != nil && r.len == 0 {
r.free()
}
if werr != nil {
return n, werr
}
if r.err != nil && !errors.Is(r.err, io.EOF) && r.len == 0 {
return n, r.err
}
return n, nil
}