package async
import (
"sync"
"sync/atomic"
"github.com/prequel-dev/plz4/internal/pkg/blk"
"github.com/prequel-dev/plz4/internal/pkg/xxh32"
)
// Note: AsyncHash is not thread safe.
// All API's must be synchronized.
type AsyncHash struct {
hash xxh32.XXHZero
wg sync.WaitGroup
ch chan *blk.BlkT
done atomic.Bool
}
func NewAsyncHash(sz int) *AsyncHash {
h := &AsyncHash{
ch: make(chan *blk.BlkT, sz),
}
h.wg.Add(1)
return h
}
func (h *AsyncHash) Queue(qBlk *blk.BlkT) {
h.ch <- qBlk
}
func (h *AsyncHash) Done() uint32 {
// Avoid double Done calls
if !h.done.CompareAndSwap(false, true) {
return 0
}
close(h.ch)
h.wg.Wait()
return h.hash.Sum32()
}
func (h *AsyncHash) Run() {
defer h.wg.Done()
for srcBlk := range h.ch {
h.hash.Write(srcBlk.Data())
blk.ReturnBlk(srcBlk)
}
}
type blkRef struct {
blk *blk.BlkT
idx int64
}
type AsyncHashIdx struct {
hash xxh32.XXHZero
wg sync.WaitGroup
ch chan blkRef
next atomic.Int64
done atomic.Bool
}
func NewAsyncHashIdx(sz int) *AsyncHashIdx {
h := &AsyncHashIdx{
ch: make(chan blkRef, sz),
}
h.wg.Add(1)
return h
}
// Blocks must arrive in order
func (h *AsyncHashIdx) Queue(qBlk *blk.BlkT) {
h.ch <- blkRef{qBlk, -1}
}
// Free can be called from any goroutine
func (h *AsyncHashIdx) Free(qBlk *blk.BlkT, idx int) {
if int64(idx) < h.next.Load() {
blk.ReturnBlk(qBlk)
return
}
h.ch <- blkRef{qBlk, int64(idx)}
}
func (h *AsyncHashIdx) Done() uint32 {
// Avoid double Done calls
if !h.done.CompareAndSwap(false, true) {
return 0
}
close(h.ch)
h.wg.Wait()
return h.hash.Sum32()
}
func (h *AsyncHashIdx) Run() {
defer h.wg.Done()
for ref := range h.ch {
switch {
case ref.idx < 0:
h.hash.Write(ref.blk.Data())
h.next.Add(1)
default:
blk.ReturnBlk(ref.blk)
}
}
}
package async
import (
"io"
"sync"
"github.com/prequel-dev/plz4/internal/pkg/blk"
"github.com/prequel-dev/plz4/internal/pkg/compress"
"github.com/prequel-dev/plz4/internal/pkg/header"
"github.com/prequel-dev/plz4/internal/pkg/opts"
"github.com/prequel-dev/plz4/internal/pkg/zerr"
)
type asyncRdrT struct {
frameRdr blk.FrameReader
nextIdx int
inChan chan inBlkT
outChan chan outBlkT
finChan chan struct{}
pending map[int]outBlkT
hasher *AsyncHash
dc compress.Decompressor
wg sync.WaitGroup
}
type inBlkT struct {
idx int
blk *blk.BlkT
dict *blk.BlkT
}
type outBlkT struct {
err error
idx int
srcSz int
blk *blk.BlkT
}
func (b inBlkT) Dict() []byte {
if b.dict == nil {
return nil
}
return b.dict.Data()
}
func NewAsyncReader(rdr io.Reader, hdr header.HeaderT, opt *opts.OptsT) *asyncRdrT {
var dict *compress.DictT
if opt.Dictionary != nil || !hdr.Flags.BlockIndependence() {
dict = compress.NewDictT(opt.Dictionary, !hdr.Flags.BlockIndependence())
}
r := &asyncRdrT{
inChan: make(chan inBlkT),
outChan: make(chan outBlkT),
finChan: make(chan struct{}),
pending: make(map[int]outBlkT, opt.NParallel),
frameRdr: *blk.NewFrameReader(
rdr,
hdr.BlockDesc.Idx().Size(),
hdr.Flags.ContentChecksum(),
hdr.Flags.BlockChecksum(),
),
dc: compress.NewDecompressor(
hdr.Flags.BlockIndependence(),
dict,
),
}
// Note: control routine must be outside of workerpool.
// Otherwise could dead lock on too many simultaneous request
if hdr.Flags.ContentChecksum() && (opt.ReadOffset == 0) && opt.ContentChecksum {
r.hasher = NewAsyncHash(opt.NParallel)
go r.hasher.Run()
}
// Spin up all the goroutines
// Note: control routine must be outside of workerpool.
// Otherwise could dead lock on too many simultaneous request
go r.dispatch()
// Each closure escapes and causes an allocate.
// No reason to do that NParallel times
task := func() {
r.decompress()
}
r.wg.Add(opt.NParallel)
for i := 0; i < opt.NParallel; i++ {
opt.WorkerPool.Submit(task)
}
return r
}
func (r *asyncRdrT) dispatch() {
curIdx, err := r._readLoop()
// We are done on the input channel; close it down.
// This will signal the compressor goroutines to exit.
close(r.inChan)
// Send final error packet on the outChan if necessary.
if err != nil {
outBlk := outBlkT{
err: err,
idx: curIdx,
}
select {
case r.outChan <- outBlk:
case <-r.finChan:
}
}
}
func (r *asyncRdrT) _readLoop() (int, error) {
var curIdx int
LOOP:
for {
frame, err := r.frameRdr.Read()
if err != nil {
return curIdx, err
}
if frame.Uncompressed {
// If uncompressed; send directly to outChan.
// Decompress not necessary.
outBlk := outBlkT{
idx: curIdx,
blk: frame.Blk,
srcSz: frame.ReadCnt,
}
select {
case r.outChan <- outBlk:
case <-r.finChan:
blk.ReturnBlk(frame.Blk)
break LOOP
}
} else {
// Otherwise, queue up on inChan for decompressor.
inBlk := inBlkT{
idx: curIdx,
blk: frame.Blk,
}
select {
case r.inChan <- inBlk:
case <-r.finChan:
blk.ReturnBlk(frame.Blk)
break LOOP
}
}
curIdx += 1
// 'select' will choose randomly; so it is possible
// that although finChan is ready, it is never selected above
// and loop will continue indefintiely. To abort ASAP,
// an additional finCh check is added here.
select {
case <-r.finChan:
break LOOP
default:
}
}
return curIdx, nil
}
func (r *asyncRdrT) decompress() {
defer r.wg.Done()
r._decompressLoop()
}
func (r *asyncRdrT) _decompressLoop() {
LOOP:
for {
srcBlk, ok := <-r.inChan
if !ok {
break LOOP // inChan closed, we are done
}
// Set aside the source size for the outBlkT
srcSz := srcBlk.blk.Len()
// Decompress the source block
dstBlk, err := srcBlk.blk.Decompress(r.dc)
// Return srcBlk no longer necessary.
blk.ReturnBlk(srcBlk.blk)
select {
case r.outChan <- outBlkT{
err: err,
idx: srcBlk.idx,
blk: dstBlk,
srcSz: srcSz,
}:
case <-r.finChan:
if dstBlk != nil {
blk.ReturnBlk(dstBlk)
}
break LOOP
}
}
}
func (r *asyncRdrT) NextBlock(prevBlk *blk.BlkT) (*blk.BlkT, int, error) {
switch {
case prevBlk == nil:
case r.hasher == nil:
blk.ReturnBlk(prevBlk)
default:
r.hasher.Queue(prevBlk)
}
nextBlk, nRead, err := r._nextBlock()
switch {
case err != zerr.EndMark:
case r.hasher == nil:
case r.hasher.Done() != r.frameRdr.ContentChecksum():
err = zerr.ErrContentHash
}
return nextBlk, nRead, err
}
func (r *asyncRdrT) _nextBlock() (*blk.BlkT, int, error) {
// Check if nextIdx is already in pending list.
// If so, remove from list and continue
if p, ok := r.pending[r.nextIdx]; ok {
delete(r.pending, r.nextIdx)
r.nextIdx += 1
return p.blk, p.srcSz, p.err
}
for {
outBlk := <-r.outChan
// Check if outBlk from outChan is the nextIdx,
// If so return it.
if outBlk.idx == r.nextIdx {
r.nextIdx += 1
return outBlk.blk, outBlk.srcSz, outBlk.err
}
// Block came in out of order; add to pending map
r.pending[outBlk.idx] = outBlk
}
}
func (r *asyncRdrT) Close() {
// defend double close
select {
case <-r.finChan:
default:
r._close()
}
}
func (r *asyncRdrT) _close() {
// Send close signal with finChan
close(r.finChan)
// Wait for all compression workers to close
r.wg.Wait()
// Drain inChan
for inBlk := range r.inChan {
if inBlk.blk != nil {
blk.ReturnBlk(inBlk.blk)
}
}
// Drain outChan; not closed so have to loop until empty
LOOP:
for {
select {
case outBlk := <-r.outChan:
if outBlk.blk != nil {
blk.ReturnBlk(outBlk.blk)
}
default:
break LOOP
}
}
// Drain pending
for idx, p := range r.pending {
if p.blk != nil {
blk.ReturnBlk(p.blk)
}
delete(r.pending, idx)
}
// Close hasher; could be a noop.
if r.hasher != nil {
r.hasher.Done()
}
}
package async
import (
"io"
"sync"
"sync/atomic"
"github.com/prequel-dev/plz4/internal/pkg/blk"
"github.com/prequel-dev/plz4/internal/pkg/compress"
"github.com/prequel-dev/plz4/internal/pkg/header"
"github.com/prequel-dev/plz4/internal/pkg/opts"
"github.com/prequel-dev/plz4/internal/pkg/trailer"
"github.com/prequel-dev/plz4/internal/pkg/zerr"
)
type asyncWriterT struct {
bsz int
srcIdx int
srcOff int
srcBlk *blk.BlkT
inChan chan inBlkT
outChan chan outBlkT
synChan chan int
dict *blk.BlkT
wg sync.WaitGroup
opts *opts.OptsT
hasher *AsyncHashIdx
taskF func()
cmpF compress.CompressorFactory
nTasks int
state atomic.Pointer[error]
report bool
}
func NewAsyncWriter(wr io.Writer, opts *opts.OptsT) *asyncWriterT {
var (
bsz = opts.BlockSizeIdx.Size()
cmpF = opts.NewCompressorFactory()
srcBlk = blk.BorrowBlk(bsz)
)
// Scope it down to our block size
srcBlk.Trim(bsz)
w := &asyncWriterT{
bsz: bsz,
srcBlk: srcBlk,
inChan: make(chan inBlkT),
outChan: make(chan outBlkT),
synChan: make(chan int),
cmpF: cmpF,
opts: opts,
nTasks: 1,
}
// Spin up writer before compress tasks in
// case worker pool is defined that does not
// have enough slots. Need at least
// 2 slots available (3 if opts.SrcChecksum)
// Note: control routine must be outside of workerpool.
// Otherwise could deadlock on too many simultaneous request
go w.writeLoop(wr)
if opts.ContentChecksum {
w.hasher = NewAsyncHashIdx(opts.NParallel)
go w.hasher.Run()
}
// Bind task function
// Each closure escapes and causes an allocate.
// No reason to do that NParallel times
w.taskF = func() {
w.compressLoop()
}
// Spin up at least one producer task; we will need at least one
// assuming there is a write at some point.
// Will spin up additional on demand; this conserves
// resources in auto parallel mode; particularly with small payloads.
// If we do happen to have content size, intialize based on size:
if opts.ContentSz != nil {
w.nTasks = int(*opts.ContentSz)/bsz + 1
if w.nTasks > opts.NParallel {
w.nTasks = opts.NParallel
}
}
w.wg.Add(w.nTasks)
for i := 0; i < w.nTasks; i++ {
opts.WorkerPool.Submit(w.taskF)
}
return w
}
func (w *asyncWriterT) Write(src []byte) (int, error) {
var nConsumed int
for len(src) > 0 && !w.errState() {
// Copy the source data into our srcBlk
n := copy(w.srcBlk.Suffix(w.srcOff), src)
w.srcOff += n
nConsumed += n
// Flush block if completely filled
if w.srcOff == w.bsz {
w._flushBlk()
}
// Slide the src buffer over by N for next spin
src = src[n:]
}
return nConsumed, w.reportError()
}
func (w *asyncWriterT) Flush() error {
// Check error before consuming;
if err := w.reportError(); err != nil {
return err
}
// Flush out pending data if any
w.flushBlk()
// If no data has been queue, return.
if w.srcIdx == 0 {
return nil
}
// Force a flush signal.
// Notify writeLoop to respond when it processes up to w.srcIdx
w.synChan <- w.srcIdx
// Now wait for response
<-w.synChan
// Return any error that might have been generated in the meantime
return w.reportError()
}
func (w *asyncWriterT) Close() error {
if w.srcBlk == nil {
return w.reportError()
}
// Flush any outstanding data
w.flushBlk()
// Close down the inChan.
// This will cause the producer goroutines to exit.
close(w.inChan)
// Wait for the producer go routines to cycle down;
// Not safe to close the w.outChan until all have exited.
w.wg.Wait()
// Close down the outChan. This is safe because
// all the producers have closed down via wg.Wait()
close(w.outChan)
// Wait for the writeLoop goroutine to exit
<-w.synChan
// Use srcBlk as sentinel for Close().
blk.ReturnBlk(w.srcBlk)
w.srcBlk = nil
w.srcOff = 0
// Dump the dict if still around
blk.ReturnBlk(w.dict)
w.dict = nil
var err error
switch {
case w.report:
// Should return no error on Close() if error already reported
case !w.errState():
// If no error, set our internal error to ErrClosed for
// subsequent API calls, but return nil.
w.setError(zerr.ErrClosed)
w.reportError()
default:
// We are in an error state that is not yet reported.
// Mark error as reported, and return it to the caller.
err = w.reportError()
}
return err
}
func (w *asyncWriterT) ReadFrom(r io.Reader) (int64, error) {
var nConsumed int64
LOOP:
for !w.errState() {
n, rerr := io.ReadFull(r, w.srcBlk.Suffix(w.srcOff))
w.srcOff += n
nConsumed += int64(n)
switch rerr {
case nil:
// srcBlk was filled; flush the block to the out channel
w._flushBlk()
case io.ErrUnexpectedEOF:
// Some bytes were read and add to w.srcBlk.
// Defer flush and spin loop again.
// Expect io.EOF on next spin.
case io.EOF:
// Exit loop; note we may have bytes left in w.srcBlk.
// Those will get flushed if more data is added on another
// call to ReadFrom/Write, or Flush/Close.
break LOOP
default:
// Unexpected error, set the error state.
// Will break on check at top of loop.
w.setError(rerr)
}
}
return nConsumed, w.reportError()
}
func (w *asyncWriterT) compressLoop() {
defer w.wg.Done()
var (
bsz = w.bsz
cmp = w.cmpF.NewCompressor()
blkCheck = w.opts.BlockChecksum
)
freeSrcBlk := func(srcBlk inBlkT) {
blk.ReturnBlk(srcBlk.dict)
if w.hasher != nil {
// Coordinate block free with hasher
w.hasher.Free(srcBlk.blk, srcBlk.idx)
} else {
blk.ReturnBlk(srcBlk.blk)
}
}
LOOP:
for {
srcBlk, ok := <-w.inChan
if !ok {
// compressLoop only exits on close of w.inChan
break LOOP
}
// Check for error state;
// On error, don't bother compressing, just drop and continue.
if w.errState() {
freeSrcBlk(srcBlk)
continue
}
// Set aside the source size for the outBlkT
srcSz := srcBlk.blk.Len()
dstBlk, err := srcBlk.blk.Compress(cmp, bsz, blkCheck, srcBlk.Dict())
freeSrcBlk(srcBlk)
w.outChan <- outBlkT{
err: err,
idx: srcBlk.idx,
blk: dstBlk,
srcSz: srcSz,
}
}
}
func (w *asyncWriterT) writeLoop(wr io.Writer) {
var (
nextIdx = 0
flushIdx = -1
srcMark = int64(0)
dstMark = int64(0)
pending = make(map[int]outBlkT, w.opts.NParallel)
)
if hdrSz, herr := header.WriteHeader(wr, w.opts); herr != nil {
w.setError(herr)
} else {
dstMark = int64(hdrSz)
}
LOOP:
for {
select {
case flushIdx = <-w.synChan:
case outBlk, ok := <-w.outChan:
// Poll error state in case we get a setError() during ReadFrom
err := w.getError()
switch {
case !ok:
// WriteLoop may exit *ONLY* when the outCh is closed.
break LOOP
case err != nil:
// Drop block on error state; cannot process in an error state
blk.ReturnBlk(outBlk.blk)
case outBlk.idx != nextIdx:
// Deal with pending block later
pending[outBlk.idx] = outBlk
case outBlk.err != nil:
// Error on incoming block puts parser in error state
if outBlk.blk != nil {
blk.ReturnBlk(outBlk.blk)
}
w.setError(outBlk.err)
default:
// Main write loop; write the block just received on w.outChan
for moreData := true; moreData; {
n, werr := wr.Write(outBlk.blk.Data())
w.opts.Handler(srcMark, dstMark)
srcMark += int64(outBlk.srcSz)
dstMark += int64(n)
// Return the block whether we get an error or not
blk.ReturnBlk(outBlk.blk)
// Bump the nextIdx
nextIdx += 1
switch {
case werr != nil:
// On error, put parser in error state
w.setError(werr)
moreData = false
default:
// Check for pending blocks and continue loop if nextIdx available
if outBlk, moreData = pending[nextIdx]; moreData {
delete(pending, nextIdx)
}
}
}
}
}
// If a flush is pending and we hit that index, or in error state, respond.
if flushIdx != -1 && (flushIdx <= nextIdx || w.errState()) {
w.synChan <- nextIdx
flushIdx = -1
}
}
// There could be some pending items left in an error case
for _, outBlk := range pending {
blk.ReturnBlk(outBlk.blk)
}
switch {
case !w.errState():
w.opts.Handler(srcMark, dstMark)
// Write trailer if exiting cleanly
if _, werr := w.writeTrailer(wr); werr != nil {
w.setError(werr)
}
case w.hasher != nil:
// Must close down the asyncHash goroutine on error to avoid leak
w.hasher.Done()
}
// Signal completion by closing the syn channel
close(w.synChan)
}
func (w *asyncWriterT) writeTrailer(wr io.Writer) (int, error) {
if w.hasher == nil {
return trailer.WriteTrailer(wr)
}
xxh := w.hasher.Done()
return trailer.WriteTrailerWithHash(wr, xxh)
}
func (w *asyncWriterT) flushBlk() {
if w.srcOff == 0 {
return
}
// Clip w.srcBlk to whatever we are currently cached to
w.srcBlk.Trim(w.srcOff)
w._flushBlk()
}
func (w *asyncWriterT) _genDict() (outDict *blk.BlkT) {
if !w.opts.BlockLinked {
return nil
}
// Return the previously cached dictionary
outDict = w.dict
// Set aside last (up to) 64K of the previous
// block as dictionary input for the next block.
const maxDict = 64 * 1024
var (
off int
srcSz = w.srcBlk.Len()
)
if srcSz > maxDict {
off = srcSz - maxDict
}
w.dict = blk.BorrowBlk(maxDict)
w.dict.Trim(srcSz - off)
copy(w.dict.Data(), w.srcBlk.View(off, srcSz))
return
}
func (w *asyncWriterT) _flushBlk() {
// Defer content hash to minimize pipeline blockage
if w.hasher != nil {
w.hasher.Queue(w.srcBlk)
}
// Queue the block to a compressor task
w.inChan <- inBlkT{
idx: w.srcIdx,
blk: w.srcBlk,
dict: w._genDict(),
}
// Try to get ahead of next write by
// spawning go routine if we have capacity
if w.nTasks < w.opts.NParallel {
w.nTasks += 1
w.wg.Add(1)
w.opts.WorkerPool.Submit(w.taskF)
}
// Set up next block for write
w.srcBlk = blk.BorrowBlk(w.bsz)
w.srcBlk.Trim(w.bsz)
w.srcOff = 0
w.srcIdx += 1
}
// First error wins. Nil error will panic.
func (w *asyncWriterT) setError(err error) bool {
return w.state.CompareAndSwap(nil, &err)
}
// Mark the errors as reported;
// ie. we have returned it to the caller at least once.
// This helps differentiate on the Close() call,
// where we want to return nil on a Close() after an
// error has already reported.
func (w *asyncWriterT) reportError() error {
err := w.getError()
if err != nil {
// Once we are in an error state, the state is marked reported
// if error is returned on a user facing API.
// Does not require mutex because only called from caller goroutine.
w.report = true
}
return err
}
func (w *asyncWriterT) getError() error {
v := w.state.Load()
if v == nil {
return nil
}
return *v
}
func (w *asyncWriterT) errState() bool {
return w.state.Load() != nil
}
package blk
import (
"encoding/binary"
"github.com/prequel-dev/plz4/internal/pkg/compress"
"github.com/prequel-dev/plz4/internal/pkg/descriptor"
"github.com/prequel-dev/plz4/internal/pkg/xxh32"
)
// Protect the 'data' slice to avoid an accidental
// reslice which changes the capacity and breaks the pool.
type BlkT struct {
data []byte
}
func (b *BlkT) Len() int {
if b == nil {
return 0
}
return len(b.data)
}
func (b *BlkT) Cap() int {
return cap(b.data)
}
func (b *BlkT) Trim(sz int) {
b.data = b.data[:sz]
}
func (b *BlkT) View(start, stop int) []byte {
return b.data[start:stop]
}
func (b *BlkT) Prefix(pos int) []byte {
return b.data[:pos]
}
func (b *BlkT) Suffix(pos int) []byte {
return b.data[pos:]
}
func (b *BlkT) Data() []byte {
return b.data
}
func (b *BlkT) Decompress(dc compress.Decompressor) (*BlkT, error) {
dstBlk := BorrowBlk(b.Cap() - szOverhead)
n, err := dc.Decompress(b.data, dstBlk.data)
if err != nil {
ReturnBlk(dstBlk)
return nil, err
}
dstBlk.Trim(n)
return dstBlk, nil
}
// Compress object to a lz4 block.
func (b *BlkT) Compress(cmp compress.Compressor, bsz int, checksum bool, dict []byte) (*BlkT, error) {
return CompressToBlk(b.data, cmp, bsz, checksum, dict)
}
func CompressToBlk(src []byte, cmp compress.Compressor, bsz int, checksum bool, dict []byte) (*BlkT, error) {
dstBlk := BorrowBlk(bsz)
n, err := cmp.Compress(src, dstBlk.View(4, bsz+4), dict)
if err != nil {
// If the src data is uncompressable for whatever reason,
// write out the src block as non-compressed and keep on truckin'
n = 0
err = nil
}
var blkSz descriptor.DataBlockSize
if n == 0 {
blkSz.SetUncompressed()
n = copy(dstBlk.View(4, bsz+4), src)
}
// Write the data block size to beginning of dstBlk
blkSz.SetSize(n)
binary.LittleEndian.PutUint32(dstBlk.Data(), uint32(blkSz))
if checksum {
xxh := xxh32.ChecksumZero(dstBlk.View(4, n+4))
binary.LittleEndian.PutUint32(dstBlk.Suffix(n+4), xxh)
// Reslice the result buffer to include length + hash trailer
dstBlk.Trim(n + 8)
} else {
// Reslice the result buffer to include length
dstBlk.Trim(n + 4)
}
return dstBlk, nil
}
package blk
import (
"encoding/binary"
"errors"
"io"
"github.com/prequel-dev/plz4/internal/pkg/descriptor"
"github.com/prequel-dev/plz4/internal/pkg/xxh32"
"github.com/prequel-dev/plz4/internal/pkg/zerr"
)
type FrameReader struct {
rdr io.Reader // data source
bsz int // blocksize as specifed in header
srcSum uint32 // contains content checksum if last frame and enabled
srcCheck bool // parse content checksum on EOF
blkCheck bool // validate hash on blocks
}
func NewFrameReader(rdr io.Reader, bsz int, srcCheck, blkCheck bool) *FrameReader {
return &FrameReader{
rdr: rdr,
bsz: bsz,
srcCheck: srcCheck,
blkCheck: blkCheck,
}
}
type FrameT struct {
Blk *BlkT
ReadCnt int
Uncompressed bool
}
func (fr *FrameReader) Read() (FrameT, error) {
var (
dstBlk = BorrowBlk(fr.bsz)
nRead, uncompressed, err = fr._read(dstBlk)
)
if err != nil {
ReturnBlk(dstBlk)
return FrameT{}, err
}
return FrameT{
Blk: dstBlk,
ReadCnt: nRead,
Uncompressed: uncompressed,
}, nil
}
func (fr *FrameReader) _read(dstBlk *BlkT) (nRead int, uncompressed bool, err error) {
var v uint32
nRead, v, err = fr.readUint32(dstBlk.Prefix(4))
if err != nil {
err = errors.Join(zerr.ErrBlockSizeRead, err)
return
}
var (
n int
blkSz = descriptor.DataBlockSize(v)
dataSz = blkSz.Size()
)
switch {
// Zero block size indicates EOF
case blkSz.EOF():
n, err = fr.maybeReadContentHash(dstBlk.Data())
nRead += n
if err == nil {
err = zerr.EndMark
}
return
// Sanity check on size
case dataSz > fr.bsz:
err = zerr.WrapCorrupted(zerr.ErrBlockSizeOverflow)
return
// Adjust block size of there is a hash trailer
case fr.blkCheck:
dataSz += 4
}
// Slice it down to data size
dstBlk.Trim(dataSz)
// Read the frame
n, err = io.ReadFull(fr.rdr, dstBlk.Data())
nRead += n
// Return on failure
if err != nil {
err = errors.Join(zerr.ErrBlockRead, err)
return
}
// Check hash on block if option is enabled
if fr.blkCheck {
if err = checkBlkHash(dstBlk.Data()); err != nil {
return
}
// Strip off blk trailer
dstBlk.Trim(dstBlk.Len() - 4)
}
uncompressed = blkSz.Uncompressed()
return
}
func checkBlkHash(data []byte) error {
var (
hashOff = len(data) - 4
blkData = data[:hashOff]
calcHash = xxh32.ChecksumZero(blkData)
readHash = binary.LittleEndian.Uint32(data[hashOff:])
)
if readHash != calcHash {
return zerr.WrapCorrupted(zerr.ErrBlockHash)
}
return nil
}
func (fr *FrameReader) maybeReadContentHash(buf []byte) (int, error) {
if !fr.srcCheck {
return 0, nil
}
nRead, v, err := fr.readUint32(buf[:4])
if err != nil {
return nRead, errors.Join(zerr.ErrContentHashRead, err)
}
fr.srcSum = v
return nRead, nil
}
func (fr *FrameReader) ContentChecksum() uint32 {
return fr.srcSum
}
// Read block header into pre-allocated buffer;
// binary.Read causes an escape to heap because
// of the underlying io.Read call
func (fr *FrameReader) readUint32(buf []byte) (nRead int, v uint32, err error) {
if nRead, err = io.ReadFull(fr.rdr, buf); err != nil {
return
}
v = binary.LittleEndian.Uint32(buf)
return
}
package blk
import (
"sync"
"github.com/prequel-dev/plz4/internal/pkg/descriptor"
)
// allocate +4 for header +4 for trailer
// We will need the extra space for processing
// checksums on read and checksums+headers on write.
const (
szOverhead = 8
szAlloc4MB = descriptor.BlockIdx4MBSz + szOverhead
szAlloc1MB = descriptor.BlockIdx1MBSz + szOverhead
szAlloc256KB = descriptor.BlockIdx256KBSz + szOverhead
szAlloc64KB = descriptor.BlockIdx64KBSz + szOverhead
)
var (
pool4MB = sync.Pool{New: func() any { return &BlkT{data: make([]byte, szAlloc4MB)} }}
pool1MB = sync.Pool{New: func() any { return &BlkT{data: make([]byte, szAlloc1MB)} }}
pool256KB = sync.Pool{New: func() any { return &BlkT{data: make([]byte, szAlloc256KB)} }}
pool64KB = sync.Pool{New: func() any { return &BlkT{data: make([]byte, szAlloc64KB)} }}
)
func BorrowBlk(bsz int) *BlkT {
switch bsz {
case descriptor.BlockIdx4MBSz:
return pool4MB.Get().(*BlkT)
case descriptor.BlockIdx1MBSz:
return pool1MB.Get().(*BlkT)
case descriptor.BlockIdx256KBSz:
return pool256KB.Get().(*BlkT)
case descriptor.BlockIdx64KBSz:
return pool64KB.Get().(*BlkT)
}
panic("bad block size")
}
func ReturnBlk(blk *BlkT) {
if blk == nil {
return
}
bsz := cap(blk.data)
blk.data = blk.data[:bsz]
switch bsz {
case szAlloc4MB:
pool4MB.Put(blk)
case szAlloc1MB:
pool1MB.Put(blk)
case szAlloc256KB:
pool256KB.Put(blk)
case szAlloc64KB:
pool64KB.Put(blk)
default:
panic("bad block size")
}
}
package clz4
// #cgo CFLAGS: -O3
// #include "lz4.h"
// #include "lz4hc.h"
import "C"
import (
"errors"
"fmt"
"unsafe"
)
var (
ErrLz4Compress = errors.New("lz4 fail compress; insufficient destination buffer")
ErrLz4Decompress = errors.New("lz4 fail decompress")
)
func byteSliceToCharPointer(b []byte) *C.char {
if len(b) == 0 {
return (*C.char)(unsafe.Pointer(nil))
}
return (*C.char)(unsafe.Pointer(&b[0]))
}
func CompressFast(source, dest []byte, acceleration int) (int, error) {
ret := int(C.LZ4_compress_fast(
byteSliceToCharPointer(source),
byteSliceToCharPointer(dest),
C.int(len(source)),
C.int(len(dest)),
C.int(acceleration),
))
if ret == 0 {
return ret, ErrLz4Compress
}
return ret, nil
}
func DecompressSafe(source, dest []byte) (int, error) {
ret := int(C.LZ4_decompress_safe(
byteSliceToCharPointer(source),
byteSliceToCharPointer(dest),
C.int(len(source)),
C.int(len(dest)),
))
if ret < 0 {
return ret, fmt.Errorf("%w: code %d", ErrLz4Decompress, ret)
}
return ret, nil
}
func DecompressSafeWithDict(source, dest, dict []byte) (int, error) {
ret := int(C.LZ4_decompress_safe_usingDict(
byteSliceToCharPointer(source),
byteSliceToCharPointer(dest),
C.int(len(source)),
C.int(len(dest)),
byteSliceToCharPointer(dict),
C.int(len(dict)),
))
if ret < 0 {
return ret, fmt.Errorf("%w: code %d", ErrLz4Decompress, ret)
}
return ret, nil
}
func CompressHC(source, dest []byte, level int) (int, error) {
ret := int(C.LZ4_compress_HC(
byteSliceToCharPointer(source),
byteSliceToCharPointer(dest),
C.int(len(source)),
C.int(len(dest)),
C.int(level),
))
if ret == 0 {
return ret, ErrLz4Compress
}
return ret, nil
}
type DictCtx struct {
data []byte
strm C.LZ4_stream_t
}
func NewDictCtx(dict []byte) *DictCtx {
// dupe 'dict' for stability
// must remained pinned for life of ctx.
dupe := make([]byte, len(dict))
copy(dupe, dict)
ctx := &DictCtx{
data: dupe,
}
C.LZ4_resetStream_fast(&ctx.strm)
C.LZ4_loadDictSlow(
&ctx.strm,
byteSliceToCharPointer(ctx.data),
C.int(len(ctx.data)),
)
return ctx
}
type DictCtxHC struct {
data []byte
strm C.LZ4_streamHC_t
}
func NewDictCtxHC(dict []byte, level int) *DictCtxHC {
// dupe 'dict' for stability
// must remained pinned for life of ctx.
dupe := make([]byte, len(dict))
copy(dupe, dict)
ctx := &DictCtxHC{
data: dupe,
}
C.LZ4_resetStreamHC_fast(
&ctx.strm,
C.int(level),
)
C.LZ4_loadDictHC(
&ctx.strm,
byteSliceToCharPointer(ctx.data),
C.int(len(ctx.data)),
)
return ctx
}
type StreamIndieCtx struct {
ctx C.LZ4_stream_t
dict *DictCtx
}
func NewStreamIndieCtx(dict *DictCtx) *StreamIndieCtx {
return &StreamIndieCtx{dict: dict}
}
func (c *StreamIndieCtx) Compress(src, dst []byte) (int, error) {
C.LZ4_resetStream_fast(&c.ctx)
C.LZ4_attach_dictionary(&c.ctx, &c.dict.strm)
ret := int(C.LZ4_compress_fast_continue(
&c.ctx,
byteSliceToCharPointer(src),
byteSliceToCharPointer(dst),
C.int(len(src)),
C.int(len(dst)),
C.int(1),
))
if ret == 0 {
return ret, ErrLz4Compress
}
return ret, nil
}
type StreamCtxHC struct {
ctx C.LZ4_streamHC_t
dict *DictCtxHC
level int
}
func NewStreamCtxHC(level int, dict *DictCtxHC) *StreamCtxHC {
return &StreamCtxHC{level: level, dict: dict}
}
func (c *StreamCtxHC) Compress(src, dst []byte) (int, error) {
C.LZ4_resetStreamHC_fast(&c.ctx, C.int(c.level))
C.LZ4_attach_HC_dictionary(&c.ctx, &c.dict.strm)
ret := int(C.LZ4_compress_HC_continue(
&c.ctx,
byteSliceToCharPointer(src),
byteSliceToCharPointer(dst),
C.int(len(src)),
C.int(len(dst)),
))
if ret == 0 {
return ret, ErrLz4Compress
}
return ret, nil
}
type StreamLinkedCtx struct {
ctx C.LZ4_stream_t
}
func NewStreamLinkedCtx(dict *DictCtx) *StreamLinkedCtx {
c := new(StreamLinkedCtx)
C.LZ4_resetStream_fast(&c.ctx)
if dict != nil {
C.LZ4_attach_dictionary(&c.ctx, &dict.strm)
}
return c
}
func (c *StreamLinkedCtx) Compress(src, dst, dict []byte) (int, error) {
if dict != nil {
C.LZ4_loadDict(
&c.ctx,
byteSliceToCharPointer(dict),
C.int(len(dict)),
)
}
ret := int(C.LZ4_compress_fast_continue(
&c.ctx,
byteSliceToCharPointer(src),
byteSliceToCharPointer(dst),
C.int(len(src)),
C.int(len(dst)),
C.int(1),
))
if ret == 0 {
return ret, ErrLz4Compress
}
return ret, nil
}
type StreamLinkedCtxHC struct {
ctx C.LZ4_streamHC_t
}
func NewStreamLinkedCtxHC(dict *DictCtxHC, level int) *StreamLinkedCtxHC {
c := new(StreamLinkedCtxHC)
C.LZ4_resetStreamHC_fast(&c.ctx, C.int(level))
if dict != nil {
C.LZ4_attach_HC_dictionary(&c.ctx, &dict.strm)
}
return c
}
func (c *StreamLinkedCtxHC) Compress(src, dst, dict []byte) (int, error) {
if dict != nil {
C.LZ4_loadDictHC(
&c.ctx,
byteSliceToCharPointer(dict),
C.int(len(dict)),
)
}
ret := int(C.LZ4_compress_HC_continue(
&c.ctx,
byteSliceToCharPointer(src),
byteSliceToCharPointer(dst),
C.int(len(src)),
C.int(len(dst)),
))
if ret == 0 {
return ret, ErrLz4Compress
}
return ret, nil
}
package compress
import "github.com/prequel-dev/plz4/internal/pkg/clz4"
type Compressor interface {
Compress(src, dst, dict []byte) (int, error)
}
var statelessFast indieCompressorFast
// memoize compressors avoids unnecessary alloc
// at the cost of init time and global ram
var memoizeCompressors = []indieCompressorLevel{
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
}
type LevelT int
type CompressorFactory struct {
indie bool
level LevelT
dictCtx *clz4.DictCtx
dictCtxHC *clz4.DictCtxHC
}
func NewCompressorFactory(level LevelT, independent bool, dict *DictT) CompressorFactory {
f := CompressorFactory{
level: level,
indie: independent,
}
switch {
case dict == nil:
case level == 1:
f.dictCtx = clz4.NewDictCtx(dict.Data())
default:
f.dictCtxHC = clz4.NewDictCtxHC(dict.Data(), int(level))
}
return f
}
func (f CompressorFactory) NewCompressor() Compressor {
if f.indie {
return f.newIndie()
}
return f.newLinked()
}
func (f CompressorFactory) newIndie() Compressor {
switch {
case f.dictCtx != nil:
return newIndieCompressorDict(f.dictCtx)
case f.dictCtxHC != nil:
return newIndieCompressorDictHC(f.level, f.dictCtxHC)
case f.level <= 1:
return &statelessFast
case f.level <= 12:
return memoizeCompressors[f.level-2]
default:
panic("invalid level")
}
}
func (f CompressorFactory) newLinked() Compressor {
switch {
case f.level == 1:
return newLinkedCompressor(f.dictCtx)
default:
return newLinkedCompressorHC(f.level, f.dictCtxHC)
}
}
package compress
import (
"errors"
"github.com/prequel-dev/plz4/internal/pkg/clz4"
"github.com/prequel-dev/plz4/internal/pkg/zerr"
)
var statelessDecompressor indieDecompressor
type Decompressor interface {
Decompress(src, dst []byte) (int, error)
}
func NewDecompressor(independent bool, dict *DictT) Decompressor {
if !independent || dict != nil {
return &indieDecompressorWithDict{dict: dict}
}
return &statelessDecompressor
}
//---
type indieDecompressor struct {
}
func (ctx *indieDecompressor) Decompress(src, dst []byte) (int, error) {
n, err := clz4.DecompressSafe(src, dst)
if err != nil {
return 0, errors.Join(zerr.ErrCorrupted, zerr.ErrDecompress, err)
}
return n, nil
}
//---
type indieDecompressorWithDict struct {
dict *DictT
}
func (ctx *indieDecompressorWithDict) Decompress(src, dst []byte) (int, error) {
n, err := clz4.DecompressSafeWithDict(src, dst, ctx.dict.Data())
if err != nil {
return 0, errors.Join(zerr.ErrCorrupted, zerr.ErrDecompress, err)
}
if ctx.dict.NeedsUpdate() {
ctx.dict.Update(dst[:n])
}
return n, nil
}
package compress
const lz4DictSz = 64 << 10
type DictT struct {
dict []byte
linked bool
}
func NewDictT(data []byte, linked bool) *DictT {
return &DictT{
dict: dupeDict(data),
linked: linked,
}
}
func (r *DictT) Data() []byte {
return r.dict
}
func (r *DictT) NeedsUpdate() bool {
return r.linked
}
// Used in block linked mode
func (r *DictT) Update(dstPtr []byte) {
switch {
case len(dstPtr) >= lz4DictSz:
r.dict = r.dict[:lz4DictSz]
copy(r.dict, dstPtr[len(dstPtr)-lz4DictSz:])
case len(r.dict)+len(dstPtr) > lz4DictSz:
extra := len(r.dict) + len(dstPtr) - lz4DictSz
copy(r.dict, r.dict[extra:])
r.dict = r.dict[:len(r.dict)-extra]
fallthrough
default:
r.dict = append(r.dict, dstPtr...)
}
}
func dupeDict(data []byte) []byte {
dict := make([]byte, 0, lz4DictSz)
if len(data) <= lz4DictSz {
dict = append(dict, data...)
} else {
dict = dict[:lz4DictSz]
extra := len(data) - lz4DictSz
copy(dict, data[extra:])
}
return dict
}
package compress
import (
"errors"
"github.com/prequel-dev/plz4/internal/pkg/clz4"
"github.com/prequel-dev/plz4/internal/pkg/zerr"
)
//---
type indieCompressorDict struct {
strm *clz4.StreamIndieCtx
}
func newIndieCompressorDict(dict *clz4.DictCtx) *indieCompressorDict {
if dict == nil {
dict = clz4.NewDictCtx([]byte{})
}
strm := clz4.NewStreamIndieCtx(dict)
return &indieCompressorDict{strm: strm}
}
func (ctx *indieCompressorDict) Compress(src, dst, _ []byte) (int, error) {
n, err := ctx.strm.Compress(src, dst)
if err != nil {
return 0, errors.Join(zerr.ErrCompress, err)
}
return n, err
}
//---
type indieCompressorDictHC struct {
strm *clz4.StreamCtxHC
}
func newIndieCompressorDictHC(level LevelT, dict *clz4.DictCtxHC) *indieCompressorDictHC {
if dict == nil {
dict = clz4.NewDictCtxHC([]byte{}, int(level))
}
strm := clz4.NewStreamCtxHC(int(level), dict)
return &indieCompressorDictHC{strm: strm}
}
func (ctx *indieCompressorDictHC) Compress(src, dst, _ []byte) (int, error) {
n, err := ctx.strm.Compress(src, dst)
if err != nil {
return 0, errors.Join(zerr.ErrCompress, err)
}
return n, err
}
//---
type indieCompressorFast struct {
}
func (ctx *indieCompressorFast) Compress(src, dst, _ []byte) (int, error) {
n, err := clz4.CompressFast(src, dst, 1)
if err != nil {
return 0, errors.Join(zerr.ErrCompress, err)
}
return n, err
}
//---
type indieCompressorLevel uint8
func (level indieCompressorLevel) Compress(src, dst, dict []byte) (int, error) {
n, err := clz4.CompressHC(src, dst, int(level))
if err != nil {
return 0, errors.Join(zerr.ErrCompress, err)
}
return n, err
}
package compress
import (
"errors"
"github.com/prequel-dev/plz4/internal/pkg/clz4"
"github.com/prequel-dev/plz4/internal/pkg/zerr"
)
type linkedCompressor struct {
strm *clz4.StreamLinkedCtx
}
func newLinkedCompressor(dict *clz4.DictCtx) Compressor {
return &linkedCompressor{
strm: clz4.NewStreamLinkedCtx(dict),
}
}
func (ctx *linkedCompressor) Compress(src, dst, dict []byte) (int, error) {
n, err := ctx.strm.Compress(src, dst, dict)
if err != nil {
return 0, errors.Join(zerr.ErrCompress, err)
}
return n, err
}
type linkedCompressorHC struct {
strm *clz4.StreamLinkedCtxHC
}
func newLinkedCompressorHC(level LevelT, dict *clz4.DictCtxHC) Compressor {
return &linkedCompressorHC{
strm: clz4.NewStreamLinkedCtxHC(dict, int(level)),
}
}
func (ctx *linkedCompressorHC) Compress(src, dst, dict []byte) (int, error) {
return ctx.strm.Compress(src, dst, dict)
}
package descriptor
type Block uint8
func (m Block) Size() int {
return m.Idx().Size()
}
func (m Block) Valid() (v bool) {
switch {
case m.Idx() < 4: // 4-7 only supported
case m&0x80 != 0: // Hit bit is reserved
case m&0xF != 0: // 4 Low bits are reserved
default:
v = true
}
return
}
func (m *Block) SetIdx(idx BlockIdxT) {
*m = Block(idx&0x7) << 4
}
// Convert to BlockIdx, see spec.
func (m Block) Idx() BlockIdxT {
return BlockIdxT(m >> 4 & 0x7)
}
package descriptor
type DataBlockSize uint32
const (
dbsMask = 0x7FFFFFFF
cmpMask = 0x80000000
endMark = 0x00000000
)
func (s DataBlockSize) Size() int { return int(s & dbsMask) }
func (s DataBlockSize) EOF() bool { return s == endMark }
func (s DataBlockSize) Uncompressed() bool { return (cmpMask & s) != 0 }
func (s *DataBlockSize) SetSize(v int) { *s = *s&^dbsMask | DataBlockSize(v)&dbsMask }
func (s *DataBlockSize) SetUncompressed() { *s |= cmpMask }
package descriptor
const (
bitDictId = 0
bitReserved = 1
bitContentChecksum = 2
bitSize = 3
bitBlockChecksum = 4
bitBlockIndependence = 5
)
type Flags uint8
func (m Flags) DictId() bool { return m.isSet(bitDictId) }
func (m Flags) Reserved() bool { return m.isSet(bitReserved) }
func (m Flags) ContentChecksum() bool { return m.isSet(bitContentChecksum) }
func (m Flags) ContentSize() bool { return m.isSet(bitSize) }
func (m Flags) BlockChecksum() bool { return m.isSet(bitBlockChecksum) }
func (m Flags) BlockIndependence() bool { return m.isSet(bitBlockIndependence) }
func (m Flags) Version() uint8 { return uint8(m >> 6 & 0x3) }
func (m *Flags) SetDictId() { m.setBit(bitDictId) }
func (m *Flags) SetReserved() { m.setBit(bitReserved) }
func (m *Flags) SetContentChecksum() { m.setBit(bitContentChecksum) }
func (m *Flags) SetContentSize() { m.setBit(bitSize) }
func (m *Flags) SetBlockChecksum() { m.setBit(bitBlockChecksum) }
func (m *Flags) SetBlockIndependence() { m.setBit(bitBlockIndependence) }
func (m *Flags) SetVersion(v uint8) { *m = *m&^(0x3<<6) | (Flags(v&0x3) << 6) }
func (m *Flags) ClrContentChecksum() { m.clrBit(bitContentChecksum) }
func (m Flags) isSet(pos uint8) bool {
return (m & (1 << pos)) != 0
}
func (m *Flags) setBit(pos uint8) {
*m |= (1 << pos)
}
func (m *Flags) clrBit(pos uint8) {
*m &= ^(1 << pos)
}
package descriptor
type BlockIdxT uint8
const (
BlockIdx64KB BlockIdxT = 4
BlockIdx256KB BlockIdxT = 5
BlockIdx1MB BlockIdxT = 6
BlockIdx4MB BlockIdxT = 7
BlockIdx64KBSz = 64 << 10
BlockIdx256KBSz = 256 << 10
BlockIdx1MBSz = 1 << 20
BlockIdx4MBSz = 4 << 20
blockIdx64KBStr = "64KiB"
blockIdx256KBStr = "256KiB"
blockIdx1MBStr = "1MiB"
blockIdx4MBStr = "4MiB"
)
func (idx BlockIdxT) Valid() bool {
return idx >= 4 && idx <= 7
}
func (idx BlockIdxT) Size() (v int) {
switch idx {
case BlockIdx64KB:
v = BlockIdx64KBSz
case BlockIdx256KB:
v = BlockIdx256KBSz
case BlockIdx1MB:
v = BlockIdx1MBSz
case BlockIdx4MB:
v = BlockIdx4MBSz
}
return
}
func (idx BlockIdxT) Str() (s string) {
switch idx {
case BlockIdx64KB:
s = blockIdx64KBStr
case BlockIdx256KB:
s = blockIdx256KBStr
case BlockIdx1MB:
s = blockIdx1MBStr
case BlockIdx4MB:
s = blockIdx4MBStr
default:
s = "undefined"
}
return
}
package header
import (
"bytes"
"encoding/binary"
"errors"
"io"
"github.com/prequel-dev/plz4/internal/pkg/blk"
"github.com/prequel-dev/plz4/internal/pkg/descriptor"
"github.com/prequel-dev/plz4/internal/pkg/opts"
"github.com/prequel-dev/plz4/internal/pkg/xxh32"
"github.com/prequel-dev/plz4/internal/pkg/zerr"
)
type HeaderT struct {
Sz int64
DictId uint32
ContentSz uint64
Flags descriptor.Flags
BlockDesc descriptor.Block
}
// see lz4_Frame_Format.md
func ReadHeader(rdr io.Reader, cb opts.SkipCallbackT) (nRead int, hdr HeaderT, err error) {
// Use a block instead of a local []byte buffer because the io.ReadFull
// call forces an escape so the local buffer becomes heap allocated.
blkHdr := blk.BorrowBlk(descriptor.BlockIdx64KBSz)
defer blk.ReturnBlk(blkHdr)
// Header is minimally 7 bytes, max 19.
hdrBytes := blkHdr.Prefix(lz4MinHeaderSz)
if nRead, err = io.ReadFull(rdr, hdrBytes); err != nil {
// If no data in the stream, return a clean io.EOF.
// Only considered a header read error if we got > 0 bytes.
if err != io.EOF {
err = errors.Join(zerr.ErrHeaderRead, err)
}
return
}
if !bytes.Equal(hdrBytes[:4], lz4FrameMagic[:]) {
// Check for special case frame skip
var n int
n, err = maybeSkipFrame(rdr, hdrBytes, cb)
nRead += n
return
}
hdr.Flags = descriptor.Flags(hdrBytes[4])
hdr.BlockDesc = descriptor.Block(hdrBytes[5])
if err = sanityCheck(hdr); err != nil {
return
}
// Do we need to grab more data?
if hdr.Flags.ContentSize() {
hdrBytes = blkHdr.Prefix(15)
n, rerr := io.ReadFull(rdr, hdrBytes[7:15])
nRead += n
if rerr != nil {
err = errors.Join(zerr.ErrHeaderRead, rerr)
return
}
hdr.ContentSz = binary.LittleEndian.Uint64(hdrBytes[6:14])
}
if hdr.Flags.DictId() {
hdrSz := len(hdrBytes)
hdrBytes = blkHdr.Prefix(hdrSz + 4)
n, rerr := io.ReadFull(rdr, hdrBytes[hdrSz:hdrSz+4])
nRead += n
if rerr != nil {
err = errors.Join(zerr.ErrHeaderRead, rerr)
return
}
hdr.DictId = binary.LittleEndian.Uint32(hdrBytes[hdrSz-1 : hdrSz+3])
}
// Validate hash, last byte is the checksum.
var (
hdrSz = len(hdrBytes)
xxh32Hash = xxh32.ChecksumZero(hdrBytes[4 : hdrSz-1])
calcHash = byte((xxh32Hash >> 8) & 0xFF)
readHash = hdrBytes[hdrSz-1]
)
if calcHash != readHash {
err = zerr.WrapCorrupted(zerr.ErrHeaderHash)
return
}
hdr.Sz = int64(hdrSz)
return
}
func sanityCheck(hdr HeaderT) (err error) {
switch {
case hdr.Flags.Version() != lz4FrameVers:
err = zerr.ErrVersion
case hdr.Flags.Reserved():
err = zerr.WrapCorrupted(zerr.ErrReserveBitSet)
case !hdr.BlockDesc.Valid():
err = zerr.WrapCorrupted(zerr.ErrBlockDescriptor)
}
return
}
package header
import (
"encoding/binary"
"errors"
"io"
"github.com/prequel-dev/plz4/internal/pkg/opts"
"github.com/prequel-dev/plz4/internal/pkg/zerr"
)
const skipMagic = uint32(0x184D2A50)
// Write a skip frame header to wr.
// nibble: 4-bit unsigned value shifted into low nibble on magic
// sz: 32-bit unsigned long size of frame.
func WriteSkip(wr io.Writer, nibble uint8, sz uint32) (int, error) {
if nibble > 0xF {
return 0, zerr.ErrNibble
}
var (
magic = skipMagic | uint32(nibble)
payload = make([]byte, 8)
)
binary.LittleEndian.PutUint32(payload[:4], magic)
binary.LittleEndian.PutUint32(payload[4:8], sz)
// Yes, payload will escape. Sigh.
return wr.Write(payload)
}
// Value : 0x184D2A5X, which means any value from 0x184D2A50 to 0x184D2A5F.
// All 16 values are valid to identify a skippable frame.
func maybeSkipFrame(rdr io.Reader, hdr []byte, cb opts.SkipCallbackT) (int, error) {
m := binary.LittleEndian.Uint32(hdr[:4])
if m>>4 != skipMagic>>4 {
return 0, zerr.WrapCorrupted(zerr.ErrMagic)
}
// We read 7 bytes on entry; read the 8th byte into header to determine size
// There is enough capacity in the hdr slice to accommodate a full 19 byte header.
nRead, err := io.ReadFull(rdr, hdr[7:8])
if err != nil {
err = errors.Join(zerr.ErrHeaderRead)
return nRead, err
}
bsz := binary.LittleEndian.Uint32(hdr[4:8])
if cb != nil {
var n int
n, err = cb(rdr, uint8(m&0xF), bsz)
nRead += n
} else {
var n int64
n, err = io.CopyN(io.Discard, rdr, int64(bsz))
nRead += int(n)
}
switch err {
case nil:
// If no error processing frame; return EndMark.
// This will shift the reader back into ReadHeader mode
err = zerr.EndMark
default:
err = errors.Join(zerr.ErrSkip, err)
}
return nRead, err
}
package header
import (
"encoding/binary"
"errors"
"io"
"github.com/prequel-dev/plz4/internal/pkg/descriptor"
"github.com/prequel-dev/plz4/internal/pkg/opts"
"github.com/prequel-dev/plz4/internal/pkg/xxh32"
"github.com/prequel-dev/plz4/internal/pkg/zerr"
)
// see lz4_Frame_format.md
const (
lz4FrameVers = uint8(1)
lz4MinHeaderSz = 7
lz4MaxHeaderSz = 19
)
var lz4FrameMagic = [4]byte{0x04, 0x22, 0x4d, 0x18}
func WriteHeader(wr io.Writer, opts *opts.OptsT) (int, error) {
hdrBytes := make([]byte, lz4MinHeaderSz, lz4MaxHeaderSz)
copy(hdrBytes, lz4FrameMagic[:])
var (
flags descriptor.Flags
bsize descriptor.Block
)
flags.SetVersion(lz4FrameVers)
bsize.SetIdx(opts.BlockSizeIdx)
if !opts.BlockLinked {
flags.SetBlockIndependence()
}
if opts.BlockChecksum {
flags.SetBlockChecksum()
}
if opts.ContentChecksum {
flags.SetContentChecksum()
}
if opts.ContentSz != nil {
flags.SetContentSize()
hdrBytes = hdrBytes[:15]
binary.LittleEndian.PutUint64(hdrBytes[6:14], *opts.ContentSz)
}
if opts.DictionaryId != nil {
flags.SetDictId()
hdrSz := len(hdrBytes)
hdrBytes = hdrBytes[:hdrSz+4]
binary.LittleEndian.PutUint32(hdrBytes[hdrSz-1:hdrSz+3], *opts.DictionaryId)
}
hdrBytes[4] = byte(flags)
hdrBytes[5] = byte(bsize)
xxh := xxh32.ChecksumZero(hdrBytes[4 : len(hdrBytes)-1])
hdrBytes[len(hdrBytes)-1] = byte((xxh >> 8) & 0xFF)
n, err := wr.Write(hdrBytes)
if err != nil {
return n, errors.Join(zerr.ErrHeaderWrite, err)
}
return n, err
}
package opts
import (
"io"
"github.com/prequel-dev/plz4/internal/pkg/compress"
"github.com/prequel-dev/plz4/internal/pkg/descriptor"
)
type BlockIdxT = descriptor.BlockIdxT
// The 'nibble' parameter contains the user defined
// low nibble in the skip frame magic prefix.
//
// Callback must return number of bytes read.
type SkipCallbackT func(rdr io.Reader, nibble uint8, sz uint32) (int, error)
// Emits offset in bytes from beginning of stream of src and corresponding block.
type ProgressFuncT func(srcOffset, dstPos int64)
// Emits 'id' embedded in header. Callback may return optional dictionary.
type DictCallbackT func(id uint32) ([]byte, error)
type OptsT struct {
NParallel int
Level compress.LevelT
ContentSz *uint64
ReadOffset int64
BlockChecksum bool
BlockLinked bool
ContentChecksum bool
SkipContentSz bool
Dictionary []byte
DictionaryId *uint32
DictCallback DictCallbackT
BlockSizeIdx BlockIdxT
Handler ProgressFuncT
WorkerPool WorkerPool
SkipCallback SkipCallbackT
}
type WorkerPool interface {
Submit(task func())
}
func (o OptsT) NewCompressorFactory() compress.CompressorFactory {
var dict *compress.DictT
if o.Dictionary != nil {
dict = compress.NewDictT(o.Dictionary, o.BlockLinked)
}
return compress.NewCompressorFactory(
o.Level,
!o.BlockLinked,
dict,
)
}
type stubWorkerPool struct {
}
func (s *stubWorkerPool) Submit(task func()) {
go task()
}
var StubWorkerPool = &stubWorkerPool{}
package rdr
import (
"io"
"os"
"github.com/prequel-dev/plz4/internal/pkg/async"
"github.com/prequel-dev/plz4/internal/pkg/blk"
"github.com/prequel-dev/plz4/internal/pkg/header"
"github.com/prequel-dev/plz4/internal/pkg/opts"
"github.com/prequel-dev/plz4/internal/pkg/sync"
"github.com/prequel-dev/plz4/internal/pkg/zerr"
)
type Reader struct {
rdr io.Reader
dstOff int
dstBlk *blk.BlkT
blkReader blk.BlkRdrI
opts *opts.OptsT
srcPos int64
dstPos int64
contentSz uint64
state error
}
// Construct a Reader to decompress the LZ4 Frame stream 'rdr'.
//
// Specify optional paramters in 'opts'.
func NewReader(rdr io.Reader, o *opts.OptsT) *Reader {
// Will escape
return &Reader{
rdr: rdr,
opts: o,
}
}
// Read decompressed data into 'dst'. Implements the io.Reader interface.
func (r *Reader) Read(dst []byte) (int, error) {
var nRead int
switch {
case r.state != nil:
case r.blkReader != nil:
nRead, r.state = r.modeBody(dst)
default:
nRead, r.state = r.modeHeader(dst)
}
if r.state == zerr.EndMark {
r.state = r.handleEndMark()
}
return nRead, r.state
}
// Reset state; next Read() will read the header and likely EOF
// This supports transparent reads of consecutive LZ4 frames.
func (r *Reader) handleEndMark() (err error) {
if r.opts.ContentSz != nil && !r.opts.SkipContentSz && *r.opts.ContentSz != r.contentSz {
err = zerr.WrapCorrupted(zerr.ErrContentSize)
}
r.contentSz = 0
r.blkReader.Close()
r.blkReader = nil
return
}
// Close the reader to release underlying resources.
//
// Close() *MUST* be called on completion whether or not the Reader
// is in an error state.
func (r *Reader) Close() error {
// r.opts is a sentinel for Close
if r.opts == nil {
return r.state
}
// Close blk reader if still allocated
if r.blkReader != nil {
r.blkReader.Close()
r.blkReader = nil
}
// Clear dstBlk if still around.
if r.dstBlk != nil {
blk.ReturnBlk(r.dstBlk)
r.dstBlk = nil
r.dstOff = 0
}
// Set state to zerr.ErrClosed if undefined;
// otherwise subsequent calls should return original error
if r.state == nil {
r.state = zerr.ErrClosed
}
// Use r.opts as a sentinel for Close
r.opts = nil
return nil
}
// Decompress to 'wr'.
// Implements the io.WriteTo interface.
func (r *Reader) WriteTo(wr io.Writer) (int64, error) {
var sum int64
LOOP:
for r.state == nil {
if r.blkReader == nil {
var nBytes int
nBytes, r.state = r._readHeader()
// Update srcPos to account for header.
r.srcPos += int64(nBytes)
if r.state != nil {
if r.state == io.EOF {
// Eat io.EOF on readHeader per WriterTo interface
// io.EOF on readHeader simply means read stream is done.
r.state = nil
}
break LOOP
}
}
var n int64
n, r.state = r._writeTo(wr)
sum += n
if r.state == zerr.EndMark {
r.state = r.handleEndMark()
}
}
return sum, r.state
}
func (r *Reader) _writeTo(w io.Writer) (n int64, err error) {
LOOP:
for {
if r.dstOff < r.dstBlk.Len() {
// Write out any pending data
var nWritten int
nWritten, err = w.Write(r.dstBlk.Suffix(r.dstOff))
// Update our internal offset pointer that
// tracks where we are in copying from dstBuffer
r.dstOff += nWritten
// Update return cnt
n += int64(nWritten)
if err != nil {
break LOOP
}
}
// Retrieve next lz4 block, and on success copy at top of loop
if err = r.nextBlock(); err != nil {
break LOOP
}
}
return
}
func (r *Reader) nextBlock() (err error) {
var nRead int
// Reset dstOff; we are retreiving a new dstBlk.
r.dstOff = 0
// Grab the next uncompressed output dstBlk.
// nRead is the amount of data read from the src.
r.dstBlk, nRead, err = r.blkReader.NextBlock(r.dstBlk)
r.opts.Handler(r.srcPos, r.dstPos)
// Update srcPos to account for compressed block size.
r.srcPos += int64(nRead)
// Update dstPos to account for decompressed block size
r.dstPos += int64(r.dstBlk.Len())
// Content size tracks the cummulative size of this frame
r.contentSz += uint64(r.dstBlk.Len())
return
}
func (r *Reader) modeHeader(dst []byte) (int, error) {
n, err := r._readHeader()
if err != nil {
return n, err
}
r.dstOff += n
// And read body immediately
return r.modeBody(dst)
}
func (r *Reader) _readHeader() (int, error) {
nRead, hdr, err := header.ReadHeader(r.rdr, r.opts.SkipCallback)
// If we hit an EndMark on ReadHeader, this means we processes a
// skipFrame. Keeping skipping until we hit a different error.
for err == zerr.EndMark {
var n int
n, hdr, err = header.ReadHeader(r.rdr, r.opts.SkipCallback)
nRead += n
}
if err == nil && hdr.Flags.DictId() && r.opts.DictCallback != nil {
var ndict []byte
if ndict, err = r.opts.DictCallback(hdr.DictId); err == nil && ndict != nil {
r.opts.Dictionary = ndict
}
}
switch {
case err != nil:
return nRead, err
case r.opts.ReadOffset == 0 || r.opts.ReadOffset == hdr.Sz: // NOOP
case r.opts.ReadOffset < hdr.Sz:
return nRead, zerr.ErrReadOffset
case !hdr.Flags.BlockIndependence():
return nRead, zerr.ErrReadOffsetLinked
default:
var n int64
n, err := skipOffset(r.rdr, r.opts.ReadOffset-hdr.Sz)
nRead += int(n)
if err != nil {
return nRead, err
}
// Read offset only applies to first of possible consecutive frames
r.opts.ReadOffset = 0
// Do not validate content checksum if we skipped; will fail.
hdr.Flags.ClrContentChecksum()
// Do not check the content size if we skipped; will fail.
r.opts.SkipContentSz = true
}
// Cache the content size in the opts ptr to save RAM;
// Will be validated on end of frame
if hdr.Flags.ContentSize() {
v := hdr.ContentSz
r.opts.ContentSz = &v
}
r.blkReader = r.makeBlockReader(hdr)
return nRead, nil
}
func (r *Reader) modeBody(dst []byte) (nRead int, err error) {
LOOP:
for {
if r.dstOff < r.dstBlk.Len() {
// We have some extra data in dstBuffer
// Copy into 'dst' up to dst capacity
n := copy(dst, r.dstBlk.Suffix(r.dstOff))
// Update our internal offset pointer that
// tracks where we are in copying from dstBuffer
r.dstOff += n
// Update output nRead, used in return.
nRead += n
// If we have completely filled 'dst', return.
if n == len(dst) {
return
}
// Slide dst buffer over by N for next spin
// after we try to grab the next block.
dst = dst[n:]
}
// Retrieve next lz4 block, and on success copy at top of loop
if err = r.nextBlock(); err != nil {
break LOOP
}
}
return
}
func (r *Reader) makeBlockReader(hdr header.HeaderT) blk.BlkRdrI {
if r.opts.NParallel <= 0 {
return sync.NewSyncReader(r.rdr, hdr, r.opts)
}
// If dependent blocks, must run in serial
if !hdr.Flags.BlockIndependence() {
r.opts.NParallel = 1
}
return async.NewAsyncReader(r.rdr, hdr, r.opts)
}
func skipOffset(rdr io.Reader, offset int64) (int64, error) {
// Attempt to seek ahead
if rdr != os.Stdin && rdr != os.Stderr {
if seeker, ok := rdr.(io.Seeker); ok {
if _, err := seeker.Seek(offset, io.SeekCurrent); err != nil {
return 0, err
}
return offset, nil
}
}
// Reader is not an io.Seeker, much read and discard
return io.CopyN(io.Discard, rdr, offset)
}
package sync
import (
"io"
"github.com/prequel-dev/plz4/internal/pkg/blk"
"github.com/prequel-dev/plz4/internal/pkg/compress"
"github.com/prequel-dev/plz4/internal/pkg/header"
"github.com/prequel-dev/plz4/internal/pkg/opts"
"github.com/prequel-dev/plz4/internal/pkg/xxh32"
"github.com/prequel-dev/plz4/internal/pkg/zerr"
)
type BlkT = blk.BlkT
type syncReaderT struct {
dc compress.Decompressor
frameRdr blk.FrameReader
hasher *xxh32.XXHZero
}
func NewSyncReader(rdr io.Reader, hdr header.HeaderT, opts *opts.OptsT) *syncReaderT {
var hasher *xxh32.XXHZero
if hdr.Flags.ContentChecksum() && (opts.ReadOffset == 0) && opts.ContentChecksum {
hasher = &xxh32.XXHZero{}
}
var dict *compress.DictT
if opts.Dictionary != nil || !hdr.Flags.BlockIndependence() {
dict = compress.NewDictT(opts.Dictionary, !hdr.Flags.BlockIndependence())
}
return &syncReaderT{
dc: compress.NewDecompressor(
hdr.Flags.BlockIndependence(),
dict,
),
frameRdr: *blk.NewFrameReader(
rdr,
hdr.BlockDesc.Idx().Size(),
hdr.Flags.ContentChecksum(),
hdr.Flags.BlockChecksum(),
),
hasher: hasher,
}
}
func (r *syncReaderT) NextBlock(prevBlk *BlkT) (*BlkT, int, error) {
switch {
case prevBlk == nil:
case r.hasher == nil:
blk.ReturnBlk(prevBlk)
default:
r.hasher.Write(prevBlk.Data())
blk.ReturnBlk(prevBlk)
}
frame, err := r.frameRdr.Read()
switch err {
case nil:
case zerr.EndMark:
if r.hasher != nil {
if r.hasher.Sum32() != r.frameRdr.ContentChecksum() {
err = zerr.ErrContentHash
}
}
fallthrough
default:
return nil, frame.ReadCnt, err
}
// If not compressed, directly return the block from reader
if frame.Uncompressed {
return frame.Blk, frame.ReadCnt, nil
}
// Decompress the frame block
dstBlk, err := frame.Blk.Decompress(r.dc)
// We are done with the srcBlk, can return for reuse.
blk.ReturnBlk(frame.Blk)
return dstBlk, frame.ReadCnt, err
}
func (r *syncReaderT) Close() {
}
package sync
import (
"io"
"github.com/prequel-dev/plz4/internal/pkg/blk"
"github.com/prequel-dev/plz4/internal/pkg/compress"
"github.com/prequel-dev/plz4/internal/pkg/header"
"github.com/prequel-dev/plz4/internal/pkg/opts"
"github.com/prequel-dev/plz4/internal/pkg/trailer"
"github.com/prequel-dev/plz4/internal/pkg/xxh32"
"github.com/prequel-dev/plz4/internal/pkg/zerr"
)
type OptsT = opts.OptsT
type syncWriterT struct {
wr io.Writer
cmp compress.Compressor
bsz int
srcOff int
srcBlk *BlkT
srcMark int64
dstMark int64
state error
opts *OptsT
srcHash *xxh32.XXHZero
}
func NewSyncWriter(wr io.Writer, opts *OptsT) *syncWriterT {
var (
bsz = opts.BlockSizeIdx.Size()
srcBlk = blk.BorrowBlk(bsz)
factory = opts.NewCompressorFactory()
)
// Scope it down to our block size
srcBlk.Trim(bsz)
w := &syncWriterT{
wr: wr,
cmp: factory.NewCompressor(),
bsz: bsz,
srcBlk: srcBlk,
srcOff: -1,
opts: opts,
}
if opts.ContentChecksum {
w.srcHash = &xxh32.XXHZero{}
}
return w
}
func (w *syncWriterT) Write(src []byte) (int, error) {
if w.state != nil {
return 0, w.state
}
var n int
n, w.state = w._write(src)
return n, w.state
}
func (w *syncWriterT) _write(src []byte) (nConsumed int, err error) {
if w.srcOff < 0 {
if err = w._writeHeader(); err != nil {
return
}
}
if w.srcOff > 0 {
// Append to current srcBlk
n := copy(w.srcBlk.Suffix(w.srcOff), src)
// Update our internal offset pointer that
// tracks where we are in copying from dstBuffer
w.srcOff += n
// Update output size
nConsumed += n
if w.srcOff == w.bsz {
if err = w._writeFrame(w.srcBlk.Data()); err != nil {
return
}
w.srcOff = 0
}
// Slide the src buffer over by N for next spin
src = src[n:]
}
for len(src) > 0 {
// Write src directly if large enough to avoid a copy.
if len(src) >= w.bsz {
nConsumed += w.bsz
if err = w._writeFrame(src[:w.bsz]); err != nil {
return
}
src = src[w.bsz:]
} else {
// Cache the data in w.srcBlk for the next spin
n := copy(w.srcBlk.Data(), src)
nConsumed += n
w.srcOff = n
src = src[n:]
}
}
return
}
// Close finishes
func (w *syncWriterT) Close() error {
// If s.srcBlk is nil, close has already been called
if w.srcBlk == nil {
return w.state
}
var err error
switch {
case w.srcOff < 0:
// It is possible header was not written if Write not called.
err = w._writeHeader()
default:
// Flush out any bytes that might be in srcBlk
err = w._flush()
}
// Release the buffer whether or not there was an error on Flush
blk.ReturnBlk(w.srcBlk)
w.srcBlk = nil
w.srcOff = 0
switch {
case w.state != nil:
// Close should succeed even if we are in an error state,
// but should not try to write the trailer.
return nil
case err != nil:
// Return error if flush failed. Close effectively fails.
w.state = err
return err
}
w.opts.Handler(w.srcMark, w.dstMark)
if w.srcHash != nil {
_, err = trailer.WriteTrailerWithHash(w.wr, w.srcHash.Sum32())
} else {
_, err = trailer.WriteTrailer(w.wr)
}
// Cache error for future closes
switch {
case err == nil:
w.state = zerr.ErrClosed
default:
w.state = err
}
return err
}
func (w *syncWriterT) Flush() error {
if w.state != nil {
return w.state
}
w.state = w._flush()
return w.state
}
func (w *syncWriterT) _flush() error {
if w.srcOff <= 0 {
return nil
}
if err := w._writeFrame(w.srcBlk.Prefix(w.srcOff)); err != nil {
return err
}
w.srcOff = 0
return nil
}
func (w *syncWriterT) ReadFrom(r io.Reader) (int64, error) {
if w.state != nil {
return 0, w.state
}
var n int64
n, w.state = w._readFrom(r)
return n, w.state
}
func (w *syncWriterT) _readFrom(r io.Reader) (nConsumed int64, err error) {
// Check if header has been written yet
if w.srcOff < 0 {
if err = w._writeHeader(); err != nil {
return
}
}
LOOP:
for {
n, rerr := io.ReadFull(r, w.srcBlk.Suffix(w.srcOff))
nConsumed += int64(n)
switch rerr {
case nil:
case io.EOF, io.ErrUnexpectedEOF:
w.srcOff += n
break LOOP
default:
err = rerr
break LOOP
}
if err = w._writeFrame(w.srcBlk.Data()); err != nil {
break LOOP
}
w.srcOff = 0
}
return
}
func (w *syncWriterT) _writeHeader() error {
w.srcOff = 0
hdrSz, err := header.WriteHeader(w.wr, w.opts)
if err != nil {
return err
}
w.dstMark = int64(hdrSz)
return nil
}
func (w *syncWriterT) _writeFrame(src []byte) error {
if w.srcHash != nil {
if _, err := w.srcHash.Write(src); err != nil {
return err
}
}
dstBlk, err := blk.CompressToBlk(src, w.cmp, w.bsz, w.opts.BlockChecksum, nil)
if err != nil {
return err
}
defer blk.ReturnBlk(dstBlk)
wsz, err := w.wr.Write(dstBlk.Data())
if err != nil {
return err
}
w.opts.Handler(w.srcMark, w.dstMark)
w.srcMark += int64(len(src))
w.dstMark += int64(wsz)
return err
}
package trailer
import (
"encoding/binary"
"io"
)
var emptyTrailer [4]byte
func WriteTrailer(wr io.Writer) (int, error) {
// Use global to avoid escaped alloc on wr.Write
return wr.Write(emptyTrailer[:])
}
func WriteTrailerWithHash(wr io.Writer, xxh uint32) (int, error) {
var buf [8]byte
binary.LittleEndian.PutUint32(buf[4:], xxh)
return wr.Write(buf[:])
}
// Package xxh32 implements the very fast XXH hashing algorithm (32 bits version).
// (ported from the reference implementation https://github.com/Cyan4973/xxHash/)
package xxh32
import (
"encoding/binary"
)
const (
prime1 uint32 = 2654435761
prime2 uint32 = 2246822519
prime3 uint32 = 3266489917
prime4 uint32 = 668265263
prime5 uint32 = 374761393
primeMask = 0xFFFFFFFF
prime1plus2 = uint32((uint64(prime1) + uint64(prime2)) & primeMask) // 606290984
prime1minus = uint32((-int64(prime1)) & primeMask) // 1640531535
)
// XXHZero represents an xxhash32 object with seed 0.
type XXHZero struct {
v [4]uint32
totalLen uint64
buf [16]byte
bufused int
}
// Sum appends the current hash to b and returns the resulting slice.
// It does not change the underlying hash state.
func (xxh XXHZero) Sum(b []byte) []byte {
h32 := xxh.Sum32()
return append(b, byte(h32), byte(h32>>8), byte(h32>>16), byte(h32>>24))
}
// Reset resets the Hash to its initial state.
func (xxh *XXHZero) Reset() {
xxh.v[0] = prime1plus2
xxh.v[1] = prime2
xxh.v[2] = 0
xxh.v[3] = prime1minus
xxh.totalLen = 0
xxh.bufused = 0
}
// Size returns the number of bytes returned by Sum().
func (xxh *XXHZero) Size() int {
return 4
}
// BlockSizeIndex gives the minimum number of bytes accepted by Write().
func (xxh *XXHZero) BlockSize() int {
return 1
}
// Write adds input bytes to the Hash.
// It never returns an error.
func (xxh *XXHZero) Write(input []byte) (int, error) {
if xxh.totalLen == 0 {
xxh.Reset()
}
n := len(input)
m := xxh.bufused
xxh.totalLen += uint64(n)
r := len(xxh.buf) - m
if n < r {
copy(xxh.buf[m:], input)
xxh.bufused += len(input)
return n, nil
}
var buf *[16]byte
if m != 0 {
// some data left from previous update
buf = &xxh.buf
c := copy(buf[m:], input)
n -= c
input = input[c:]
}
update(&xxh.v, buf, input)
xxh.bufused = copy(xxh.buf[:], input[n-n%16:])
return n, nil
}
// Portable version of update. This updates v by processing all of buf
// (if not nil) and all full 16-byte blocks of input.
func updateGo(v *[4]uint32, buf *[16]byte, input []byte) {
// Causes compiler to work directly from registers instead of stack:
v1, v2, v3, v4 := v[0], v[1], v[2], v[3]
if buf != nil {
v1 = rol13(v1+binary.LittleEndian.Uint32(buf[:])*prime2) * prime1
v2 = rol13(v2+binary.LittleEndian.Uint32(buf[4:])*prime2) * prime1
v3 = rol13(v3+binary.LittleEndian.Uint32(buf[8:])*prime2) * prime1
v4 = rol13(v4+binary.LittleEndian.Uint32(buf[12:])*prime2) * prime1
}
// SGC: Roughly 30% speed up by unrolling loops;
// albeit generating duplicative assembly code.
// Performance improvement not equal on all processors,
// but generally no worse.
if len(input) > 256 {
for ; len(input) >= 256; input = input[256:] {
sub := input[:256] //BCE hint for compiler
v1 = rol13(v1+binary.LittleEndian.Uint32(sub[:])*prime2) * prime1
v2 = rol13(v2+binary.LittleEndian.Uint32(sub[4:])*prime2) * prime1
v3 = rol13(v3+binary.LittleEndian.Uint32(sub[8:])*prime2) * prime1
v4 = rol13(v4+binary.LittleEndian.Uint32(sub[12:])*prime2) * prime1
v1 = rol13(v1+binary.LittleEndian.Uint32(sub[16:])*prime2) * prime1
v2 = rol13(v2+binary.LittleEndian.Uint32(sub[20:])*prime2) * prime1
v3 = rol13(v3+binary.LittleEndian.Uint32(sub[24:])*prime2) * prime1
v4 = rol13(v4+binary.LittleEndian.Uint32(sub[28:])*prime2) * prime1
v1 = rol13(v1+binary.LittleEndian.Uint32(sub[32:])*prime2) * prime1
v2 = rol13(v2+binary.LittleEndian.Uint32(sub[36:])*prime2) * prime1
v3 = rol13(v3+binary.LittleEndian.Uint32(sub[40:])*prime2) * prime1
v4 = rol13(v4+binary.LittleEndian.Uint32(sub[44:])*prime2) * prime1
v1 = rol13(v1+binary.LittleEndian.Uint32(sub[48:])*prime2) * prime1
v2 = rol13(v2+binary.LittleEndian.Uint32(sub[52:])*prime2) * prime1
v3 = rol13(v3+binary.LittleEndian.Uint32(sub[56:])*prime2) * prime1
v4 = rol13(v4+binary.LittleEndian.Uint32(sub[60:])*prime2) * prime1
v1 = rol13(v1+binary.LittleEndian.Uint32(sub[64:])*prime2) * prime1
v2 = rol13(v2+binary.LittleEndian.Uint32(sub[68:])*prime2) * prime1
v3 = rol13(v3+binary.LittleEndian.Uint32(sub[72:])*prime2) * prime1
v4 = rol13(v4+binary.LittleEndian.Uint32(sub[76:])*prime2) * prime1
v1 = rol13(v1+binary.LittleEndian.Uint32(sub[80:])*prime2) * prime1
v2 = rol13(v2+binary.LittleEndian.Uint32(sub[84:])*prime2) * prime1
v3 = rol13(v3+binary.LittleEndian.Uint32(sub[88:])*prime2) * prime1
v4 = rol13(v4+binary.LittleEndian.Uint32(sub[92:])*prime2) * prime1
v1 = rol13(v1+binary.LittleEndian.Uint32(sub[96:])*prime2) * prime1
v2 = rol13(v2+binary.LittleEndian.Uint32(sub[100:])*prime2) * prime1
v3 = rol13(v3+binary.LittleEndian.Uint32(sub[104:])*prime2) * prime1
v4 = rol13(v4+binary.LittleEndian.Uint32(sub[108:])*prime2) * prime1
v1 = rol13(v1+binary.LittleEndian.Uint32(sub[112:])*prime2) * prime1
v2 = rol13(v2+binary.LittleEndian.Uint32(sub[116:])*prime2) * prime1
v3 = rol13(v3+binary.LittleEndian.Uint32(sub[120:])*prime2) * prime1
v4 = rol13(v4+binary.LittleEndian.Uint32(sub[124:])*prime2) * prime1
v1 = rol13(v1+binary.LittleEndian.Uint32(sub[128:])*prime2) * prime1
v2 = rol13(v2+binary.LittleEndian.Uint32(sub[132:])*prime2) * prime1
v3 = rol13(v3+binary.LittleEndian.Uint32(sub[136:])*prime2) * prime1
v4 = rol13(v4+binary.LittleEndian.Uint32(sub[140:])*prime2) * prime1
v1 = rol13(v1+binary.LittleEndian.Uint32(sub[144:])*prime2) * prime1
v2 = rol13(v2+binary.LittleEndian.Uint32(sub[148:])*prime2) * prime1
v3 = rol13(v3+binary.LittleEndian.Uint32(sub[152:])*prime2) * prime1
v4 = rol13(v4+binary.LittleEndian.Uint32(sub[156:])*prime2) * prime1
v1 = rol13(v1+binary.LittleEndian.Uint32(sub[160:])*prime2) * prime1
v2 = rol13(v2+binary.LittleEndian.Uint32(sub[164:])*prime2) * prime1
v3 = rol13(v3+binary.LittleEndian.Uint32(sub[168:])*prime2) * prime1
v4 = rol13(v4+binary.LittleEndian.Uint32(sub[172:])*prime2) * prime1
v1 = rol13(v1+binary.LittleEndian.Uint32(sub[176:])*prime2) * prime1
v2 = rol13(v2+binary.LittleEndian.Uint32(sub[180:])*prime2) * prime1
v3 = rol13(v3+binary.LittleEndian.Uint32(sub[184:])*prime2) * prime1
v4 = rol13(v4+binary.LittleEndian.Uint32(sub[188:])*prime2) * prime1
v1 = rol13(v1+binary.LittleEndian.Uint32(sub[192:])*prime2) * prime1
v2 = rol13(v2+binary.LittleEndian.Uint32(sub[196:])*prime2) * prime1
v3 = rol13(v3+binary.LittleEndian.Uint32(sub[200:])*prime2) * prime1
v4 = rol13(v4+binary.LittleEndian.Uint32(sub[204:])*prime2) * prime1
v1 = rol13(v1+binary.LittleEndian.Uint32(sub[208:])*prime2) * prime1
v2 = rol13(v2+binary.LittleEndian.Uint32(sub[212:])*prime2) * prime1
v3 = rol13(v3+binary.LittleEndian.Uint32(sub[216:])*prime2) * prime1
v4 = rol13(v4+binary.LittleEndian.Uint32(sub[220:])*prime2) * prime1
v1 = rol13(v1+binary.LittleEndian.Uint32(sub[224:])*prime2) * prime1
v2 = rol13(v2+binary.LittleEndian.Uint32(sub[228:])*prime2) * prime1
v3 = rol13(v3+binary.LittleEndian.Uint32(sub[232:])*prime2) * prime1
v4 = rol13(v4+binary.LittleEndian.Uint32(sub[236:])*prime2) * prime1
v1 = rol13(v1+binary.LittleEndian.Uint32(sub[240:])*prime2) * prime1
v2 = rol13(v2+binary.LittleEndian.Uint32(sub[244:])*prime2) * prime1
v3 = rol13(v3+binary.LittleEndian.Uint32(sub[248:])*prime2) * prime1
v4 = rol13(v4+binary.LittleEndian.Uint32(sub[252:])*prime2) * prime1
}
}
for ; len(input) >= 16; input = input[16:] {
sub := input[:16] //BCE hint for compiler
v1 = rol13(v1+binary.LittleEndian.Uint32(sub[:])*prime2) * prime1
v2 = rol13(v2+binary.LittleEndian.Uint32(sub[4:])*prime2) * prime1
v3 = rol13(v3+binary.LittleEndian.Uint32(sub[8:])*prime2) * prime1
v4 = rol13(v4+binary.LittleEndian.Uint32(sub[12:])*prime2) * prime1
}
v[0], v[1], v[2], v[3] = v1, v2, v3, v4
}
// Sum32 returns the 32 bits Hash value.
func (xxh *XXHZero) Sum32() uint32 {
// SGC: Casting to uint32 will strip high bits
h32 := uint32(xxh.totalLen)
// SGC: Handle case where totalLen is on a uint32 boundary
if h32 >= 16 || xxh.totalLen >= 0x100000000 {
h32 += rol1(xxh.v[0]) + rol7(xxh.v[1]) + rol12(xxh.v[2]) + rol18(xxh.v[3])
} else {
h32 += prime5
}
p := 0
n := xxh.bufused
buf := xxh.buf
for n := n - 4; p <= n; p += 4 {
h32 += binary.LittleEndian.Uint32(buf[p:p+4]) * prime3
h32 = rol17(h32) * prime4
}
for ; p < n; p++ {
h32 += uint32(buf[p]) * prime5
h32 = rol11(h32) * prime1
}
h32 ^= h32 >> 15
h32 *= prime2
h32 ^= h32 >> 13
h32 *= prime3
h32 ^= h32 >> 16
return h32
}
// Portable version of ChecksumZero.
func checksumZeroGo(input []byte) uint32 {
n := len(input)
h32 := uint32(n)
if n < 16 {
h32 += prime5
} else {
v1 := prime1plus2
v2 := prime2
v3 := uint32(0)
v4 := prime1minus
p := 0
for n := n - 16; p <= n; p += 16 {
sub := input[p:][:16] //BCE hint for compiler
v1 = rol13(v1+binary.LittleEndian.Uint32(sub[:])*prime2) * prime1
v2 = rol13(v2+binary.LittleEndian.Uint32(sub[4:])*prime2) * prime1
v3 = rol13(v3+binary.LittleEndian.Uint32(sub[8:])*prime2) * prime1
v4 = rol13(v4+binary.LittleEndian.Uint32(sub[12:])*prime2) * prime1
}
input = input[p:]
n -= p
h32 += rol1(v1) + rol7(v2) + rol12(v3) + rol18(v4)
}
p := 0
for n := n - 4; p <= n; p += 4 {
h32 += binary.LittleEndian.Uint32(input[p:p+4]) * prime3
h32 = rol17(h32) * prime4
}
for p < n {
h32 += uint32(input[p]) * prime5
h32 = rol11(h32) * prime1
p++
}
h32 ^= h32 >> 15
h32 *= prime2
h32 ^= h32 >> 13
h32 *= prime3
h32 ^= h32 >> 16
return h32
}
func rol1(u uint32) uint32 {
return u<<1 | u>>31
}
func rol7(u uint32) uint32 {
return u<<7 | u>>25
}
func rol11(u uint32) uint32 {
return u<<11 | u>>21
}
func rol12(u uint32) uint32 {
return u<<12 | u>>20
}
func rol13(u uint32) uint32 {
return u<<13 | u>>19
}
func rol17(u uint32) uint32 {
return u<<17 | u>>15
}
func rol18(u uint32) uint32 {
return u<<18 | u>>14
}
//go:build !arm || noasm
// +build !arm noasm
package xxh32
// ChecksumZero returns the 32-bit hash of input.
func ChecksumZero(input []byte) uint32 { return checksumZeroGo(input) }
func update(v *[4]uint32, buf *[16]byte, input []byte) {
updateGo(v, buf, input)
}
package zerr
import "fmt"
type constError string
func (err constError) Error() string {
return string(err)
}
const (
EndMark constError = "lz4 end mark"
ErrClosed constError = "lz4 closed"
ErrCorrupted constError = "lz4 corrupted"
ErrHeaderHash constError = "lz4 header hash mismatch"
ErrBlockHash constError = "lz4 block hash mismatch"
ErrContentHash constError = "lz4 content hash mismatch"
ErrHeaderRead constError = "lz4 fail read header"
ErrHeaderWrite constError = "lz4 fail write header"
ErrMagic constError = "lz4 bad magic"
ErrVersion constError = "lz4 unsupported version"
ErrDescriptorRead constError = "lz4 fail read descriptor"
ErrBlockSizeRead constError = "lz4 fail read block size"
ErrBlockRead constError = "lz4 fail read block"
ErrBlockSizeOverflow constError = "lz4 block size overflow"
ErrCompress constError = "lz4 fail compress"
ErrDecompress constError = "lz4 fail decompress"
ErrReserveBitSet constError = "lz4 reserved bit set"
ErrBlockDescriptor constError = "lz4 invalid BD byte"
ErrContentHashRead constError = "lz4 fail read content hash"
ErrContentSize constError = "lz4 content size mismatch"
ErrReadOffset constError = "lz4 bad read offset"
ErrReadOffsetLinked constError = "lz4 read offset unsupported in block linked mode"
ErrSkip constError = "lz4 fail skip"
ErrNibble constError = "lz4 bad nibble"
)
func WrapCorrupted(err error) error {
return fmt.Errorf("%w: %w", ErrCorrupted, err)
}
package test
import (
"bytes"
"compress/bzip2"
"crypto/rand"
"crypto/sha256"
"embed"
"encoding/hex"
"io"
"testing"
)
//go:embed samples/*
var content embed.FS
const (
LargeUncompressed = iota
Uncompressable
Monster
Dict
Lz4_4MB
Lz4_BlockCRC
Lz4_ContentCRC
Lz4_64KB
Lz4_NoContentCRC
Lz4_NoBlockCRC
Lz4_IndieWithDict
Lz4_Linked
Lz4_LinkedWithDict
)
var (
cacheLargeBinary = bunzip("samples/webster.bz2")
cacheLargeSha2 = "6a68f69b26daf09f9dd84f7470368553194a0b294fcfa80f1604efb11143a383"
cacheLz4_4MB_BX_B7 = readAll("samples/dickens.lz4")
cacheLz4_4MB_BX_B7_Sha2 = "b24c37886142e11d0ee687db6ab06f936207aa7f2ea1fd1d9a36763c7a507e6a"
cacheLz4_64KB_NOCRC = readAll("samples/mr.lz4")
cacheLz4_64KB_NOCRC_Sha2 = "68637ed52e3e4860174ed2dc0840ac77d5f1a60abbcb13770d5754e3774d53e6"
cache_dict = bunzip("samples/dict.bin.bz2")
cache_dict_Sha2 = "fb0f084fe0e2fceaa7443efeb8f260857dda231199c419d8f8de2946f91c539c"
cacheLz4_IndieWithDict = readAll("samples/dickens_dict.lz4")
cacheLz4_IndieWithDict_Sha2 = "b24c37886142e11d0ee687db6ab06f936207aa7f2ea1fd1d9a36763c7a507e6a"
cacheLz4_Linked = readAll("samples/dickens_linked.lz4")
cacheLz4_Linked_Sha2 = "b24c37886142e11d0ee687db6ab06f936207aa7f2ea1fd1d9a36763c7a507e6a"
cacheLz4_LinkedWithDict = readAll("samples/dickens_linked_dict.lz4")
cacheLz4_LinkedWithDict_Sha2 = "b24c37886142e11d0ee687db6ab06f936207aa7f2ea1fd1d9a36763c7a507e6a"
)
// Various samples for testing different use cases
func LoadSample(t testing.TB, ty int) ([]byte, string) {
switch ty {
case LargeUncompressed:
return cacheLargeBinary, cacheLargeSha2
case Uncompressable:
return genUncompressable()
case Monster:
return genMonster()
case Lz4_4MB, Lz4_BlockCRC, Lz4_ContentCRC:
return cacheLz4_4MB_BX_B7, cacheLz4_4MB_BX_B7_Sha2
case Lz4_64KB, Lz4_NoContentCRC, Lz4_NoBlockCRC:
return cacheLz4_64KB_NOCRC, cacheLz4_64KB_NOCRC_Sha2
case Dict:
return cache_dict, cache_dict_Sha2
case Lz4_IndieWithDict:
return cacheLz4_IndieWithDict, cacheLz4_IndieWithDict_Sha2
case Lz4_Linked:
return cacheLz4_Linked, cacheLz4_Linked_Sha2
case Lz4_LinkedWithDict:
return cacheLz4_LinkedWithDict, cacheLz4_LinkedWithDict_Sha2
}
t.Fatalf("Cannot find sample")
return nil, ""
}
// Return copy of the sample to allow maniuplation without corruption.
func DupeSample(t testing.TB, ty int) ([]byte, string) {
data, sha2 := LoadSample(t, ty)
nData := make([]byte, len(data))
copy(nData, data)
return nData, sha2
}
func readAll(name string) []byte {
fh, err := content.Open(name)
if err != nil {
panic(err)
}
defer fh.Close()
data, err := io.ReadAll(fh)
if err != nil {
panic(err)
}
return data
}
func bunzip(name string) []byte {
data := readAll(name)
rd := bzip2.NewReader(bytes.NewReader(data))
data, err := io.ReadAll(rd)
if err != nil {
panic(err)
}
return data
}
func genUncompressable() ([]byte, string) {
data := make([]byte, 10<<20)
_, err := rand.Read(data)
if err != nil {
panic(err)
}
return data, Sha2sum(data)
}
func genMonster() ([]byte, string) {
targetSize := 2 << 30
data := make([]byte, 0, targetSize)
for len(data) < targetSize {
sz := len(cacheLargeBinary)
if len(data)+sz > targetSize {
sz = targetSize - len(data)
}
data = append(data, cacheLargeBinary[:sz]...)
}
return data, Sha2sum(data)
}
func Sha2sum(data []byte) string {
sum := sha256.Sum256(data)
return hex.EncodeToString(sum[:])
}
package sparse
import (
"io"
"unsafe"
)
const (
blkSz = 4 << 10
readSz = 4 << 10
)
type Writer struct {
wr io.Writer
skip int64
}
func NewWriter(wr io.Writer) *Writer {
return &Writer{wr: wr}
}
func (w *Writer) Write(data []byte) (int, error) {
seeker, ok := w.wr.(io.Seeker)
if !ok {
return w.wr.Write(data)
}
var (
n int
nBlks = len(data) / blkSz
loopLimit = nBlks * blkSz
)
scanFunc := func(i, sz int) error {
skip := skipZeros(data[i : i+sz])
w.skip += int64(skip)
// Report skips on response to Write even if deferred.
// io.Write requires an non-nil error if n < len(p)
n += skip
if skip < sz {
// Skip ahead
if w.skip > 0 {
if _, err := seeker.Seek(w.skip, io.SeekCurrent); err != nil {
return err
}
w.skip = 0
}
// Write the remainder of the block
wn, err := w.wr.Write(data[i+skip : i+sz])
n += wn
if err != nil {
return err
}
}
return nil
}
// Scan at predefined block size
for i := 0; i < loopLimit; i += blkSz {
if err := scanFunc(i, blkSz); err != nil {
return n, err
}
}
var err error
if nLeft := len(data) % blkSz; nLeft > 0 {
err = scanFunc(nBlks*blkSz, nLeft)
}
return n, err
}
func (w *Writer) ReadFrom(rd io.Reader) (n int64, err error) {
buf := make([]byte, readSz)
LOOP:
for {
var nRead int
nRead, err = io.ReadFull(rd, buf)
n += int64(nRead)
switch err {
case nil:
// buf filled, continue
case io.ErrUnexpectedEOF:
// buf partially filled, continue
// expect next io.ReadFull returns io.EOF
case io.EOF:
// io.EOF is expected, not an error
err = nil
break LOOP
default:
break LOOP
}
if _, err = w.Write(buf[:nRead]); err != nil {
break LOOP
}
}
return
}
type flusherI interface {
Flush() error
}
func (w *Writer) Flush() error {
if w.skip > 0 {
// Skip ahead
seeker := w.wr.(io.Seeker)
if _, err := seeker.Seek(w.skip, io.SeekCurrent); err != nil {
return err
}
w.skip = 0
}
if flusher, ok := w.wr.(flusherI); ok {
return flusher.Flush()
}
return nil
}
// Calls to wr.Write force an escape of the buffer.
// Avoid the escape by preallocating.
var avoidEscape = []byte{0}
func (w *Writer) Close() error {
if w.skip > 0 {
// Skip ahead, set pos to last skip - 1, then write the last.
// This causes the underlying writer, usually a file, to commit the seek.
if w.skip > 1 {
seeker := w.wr.(io.Seeker)
if _, err := seeker.Seek(w.skip-1, io.SeekCurrent); err != nil {
return err
}
w.skip = 1
}
if _, err := w.wr.Write(avoidEscape); err != nil {
return err
}
w.skip = 0
}
// Should we do this?
if closer, ok := w.wr.(io.Closer); ok {
return closer.Close()
}
return nil
}
func skipZeros(data []byte) int {
var (
i = 0
n = len(data)
cnt = (n / 256) * 256
)
// Unroll the loop at bit increases performance;
for ; i < cnt; i += 256 {
v := (*[32]uint64)(unsafe.Pointer(&data[i]))
var b uint64
b |= v[0]
b |= v[1]
b |= v[2]
b |= v[3]
b |= v[4]
b |= v[5]
b |= v[6]
b |= v[7]
b |= v[8]
b |= v[9]
b |= v[10]
b |= v[11]
b |= v[12]
b |= v[13]
b |= v[14]
b |= v[15]
b |= v[16]
b |= v[17]
b |= v[18]
b |= v[19]
b |= v[20]
b |= v[21]
b |= v[22]
b |= v[23]
b |= v[24]
b |= v[25]
b |= v[26]
b |= v[27]
b |= v[28]
b |= v[29]
b |= v[30]
b |= v[31]
if b != 0 {
return i
}
}
for ; i < n; i++ {
if data[i] != 0 {
return i
}
}
return i
}
package plz4
import (
"errors"
"github.com/prequel-dev/plz4/internal/pkg/zerr"
)
// Forward declare internal errors
const (
ErrClosed = zerr.ErrClosed
ErrCorrupted = zerr.ErrCorrupted
ErrMagic = zerr.ErrMagic
ErrVersion = zerr.ErrVersion
ErrHeaderHash = zerr.ErrHeaderHash
ErrBlockHash = zerr.ErrBlockHash
ErrContentHash = zerr.ErrContentHash
ErrHeaderRead = zerr.ErrHeaderRead
ErrHeaderWrite = zerr.ErrHeaderWrite
ErrDescriptorRead = zerr.ErrDescriptorRead
ErrBlockSizeRead = zerr.ErrBlockSizeRead
ErrBlockRead = zerr.ErrBlockRead
ErrBlockSizeOverflow = zerr.ErrBlockSizeOverflow
ErrCompress = zerr.ErrCompress
ErrDecompress = zerr.ErrDecompress
ErrReserveBitSet = zerr.ErrReserveBitSet
ErrBlockDescriptor = zerr.ErrBlockDescriptor
ErrContentHashRead = zerr.ErrContentHashRead
ErrContentSize = zerr.ErrContentSize
ErrReadOffset = zerr.ErrReadOffset
ErrReadOffsetLinked = zerr.ErrReadOffsetLinked
ErrSkip = zerr.ErrSkip
ErrNibble = zerr.ErrNibble
)
// Returns true if 'err' indicates that the read input is corrupted.
func Lz4Corrupted(err error) bool {
return errors.Is(err, ErrCorrupted)
}
package plz4
import (
"runtime"
"github.com/prequel-dev/plz4/internal/pkg/compress"
"github.com/prequel-dev/plz4/internal/pkg/descriptor"
"github.com/prequel-dev/plz4/internal/pkg/opts"
)
type OptT func(*opts.OptsT)
type WorkerPool = opts.WorkerPool
type BlockIdxT = descriptor.BlockIdxT
type LevelT = compress.LevelT
type CbProgressT = opts.ProgressFuncT
type CbSkipT = opts.SkipCallbackT
type CbDictT = opts.DictCallbackT
const (
// 64 KiB block size
BlockIdx64KB = descriptor.BlockIdx64KB
// 256 KiB block size
BlockIdx256KB = descriptor.BlockIdx256KB
// 1 MiB block size
BlockIdx1MB = descriptor.BlockIdx1MB
// 4 MiB block size
BlockIdx4MB = descriptor.BlockIdx4MB
)
const (
Level1 LevelT = iota + 1
Level2
Level3
Level4
Level5
Level6
Level7
Level8
Level9
Level10
Level11
Level12
)
/////////////////
// Global options
/////////////////
// Specify number of go routines to run in parallel. Defaults to 1.
//
// 0 Process synchronously
// 1+ Process asynchronously
// <0 Process asynchronously with the number of goroutines up to the CPU count
func WithParallel(n int) OptT {
return func(o *opts.OptsT) {
numCPU := runtime.NumCPU()
if n < 0 || n > numCPU {
o.NParallel = numCPU
} else {
o.NParallel = n
}
}
}
// Enable full content checksum. Defaults to enabled.
//
// ReadMode: Calculate and append content checksum if enabled
// WriteMode: Validate content checksum if provided; ignore if disabled.
func WithContentChecksum(enable bool) OptT {
return func(o *opts.OptsT) {
o.ContentChecksum = enable
}
}
// Optional worker pool for both compress and decompress mode.
func WithWorkerPool(wp WorkerPool) OptT {
return func(o *opts.OptsT) {
o.WorkerPool = wp
}
}
// Processor will emit tuple (src_block_offset, dst_blk_offset) on each
// block boundary. Applies to both compress and decompress mode.
//
// Offsets are relative to the start of the frame.
//
// Note: Callback may be called from a secondary goroutine.
func WithProgress(cb CbProgressT) OptT {
return func(o *opts.OptsT) {
o.Handler = cb
}
}
// Provide a dictionary for compress or decompress mode.
// Only last 64KiB is used.
func WithDictionary(data []byte) OptT {
return func(o *opts.OptsT) {
o.Dictionary = data
}
}
/////////////////////////////////
// Write Options: ignored on read
/////////////////////////////////
// Specify write compression level [1-12]. Defaults to Level1.
func WithLevel(lvl LevelT) OptT {
return func(o *opts.OptsT) {
switch {
case lvl < Level1:
lvl = Level1
case lvl > Level12:
lvl = Level12
}
o.Level = compress.LevelT(lvl)
}
}
// Enable block checksums on write. Defaults to disabled.
func WithBlockChecksum(enable bool) OptT {
return func(o *opts.OptsT) {
o.BlockChecksum = enable
}
}
// Specify write block size. Defaults to BlockIdx4MB.
func WithBlockSize(idx BlockIdxT) OptT {
return func(o *opts.OptsT) {
if !idx.Valid() {
// Use default on invalid input
idx = BlockIdx4MB
}
o.BlockSizeIdx = idx
}
}
// Enable linked blocks on write. Defaults to disabled.
func WithBlockLinked(enable bool) OptT {
return func(o *opts.OptsT) {
o.BlockLinked = enable
}
}
// Specify write content size to embed in header.
func WithContentSize(sz uint64) OptT {
return func(o *opts.OptsT) {
o.ContentSz = &sz
}
}
// Specify dictionary identifer to embed in header on write.
func WithDictionaryId(id uint32) OptT {
return func(o *opts.OptsT) {
o.DictionaryId = &id
}
}
////////////////////////////////
// Read Options; ignored on write
////////////////////////////////
// Read block starting at byte 'offset'.
//
// The offset is the first byte of the block relative to the start of the frame.
func WithReadOffset(offset int64) OptT {
return func(o *opts.OptsT) {
o.ReadOffset = offset
}
}
// Enable content size check. Defaults to enabled.
//
// According to spec, the content size is informational so in some cases it
// may be desirable to skip the check.
func WithContentSizeCheck(enabled bool) OptT {
return func(o *opts.OptsT) {
o.SkipContentSz = !enabled
}
}
// Specify skip block callback function.
//
// Callback will emit on a skip frame. The callback
// must consume exactly 'sz' bytes from the reader.
func WithSkipCallback(cb CbSkipT) OptT {
return func(o *opts.OptsT) {
o.SkipCallback = cb
}
}
// Specify optional dictionary callback.
//
// Engine will emit callback when a dictionary identifier
// is read in the frame header. An optional dictionary
// may be returned from callback. This dictionary will
// overide any dictionary previously specified with the
// WithDictionary() option.
func WithDictCallback(cb CbDictT) OptT {
return func(o *opts.OptsT) {
o.DictCallback = cb
}
}
func defaultHandler(int64, int64) {}
func parseOpts(optFuncs ...OptT) opts.OptsT {
o := opts.OptsT{
Level: Level1, // Fast by default
NParallel: 1, // Run async by default
Handler: defaultHandler, // NOOP
BlockSizeIdx: BlockIdx4MB, // 4MB is default size
WorkerPool: opts.StubWorkerPool, // Stub worker pool with simple go dispatch
ContentChecksum: true, // Spec recommends default true; but does slow things down.
ContentSz: nil, // Default to unset
SkipContentSz: false, // Check content size enabled by default
}
for _, oFunc := range optFuncs {
oFunc(&o)
}
return o
}
package plz4
import (
"io"
"github.com/prequel-dev/plz4/internal/pkg/rdr"
)
type Reader interface {
// Read decompressed data into 'dst'. Return number bytes read.
Read(dst []byte) (n int, err error)
// Decompress to 'wr'. Return number bytes written.
WriteTo(wr io.Writer) (int64, error)
// Close the Reader to release underlying resources.
// Close() *MUST* be called on completion whether or not
// the Reader is in an error state.
Close() error
}
// Construct a Reader to decompress the LZ4 frame from 'rdr'.
//
// Specify optional parameters in 'opts'.
func NewReader(rd io.Reader, opts ...OptT) Reader {
// 'o' will escape as we are taking a ptr in Reader.
// We do this to save space since also passing to underlying implementation.
o := parseOpts(opts...)
return rdr.NewReader(rd, &o)
}
package plz4
import (
"io"
"github.com/prequel-dev/plz4/internal/pkg/async"
"github.com/prequel-dev/plz4/internal/pkg/header"
"github.com/prequel-dev/plz4/internal/pkg/sync"
)
type Writer interface {
// Compress 'src' data; return number of bytes written.
// May be used in sequence with ReadFrom.
Write(src []byte) (n int, err error)
// Compress data from 'rd'; return number of bytes read.
// May be used in sequence with Write.
ReadFrom(rd io.Reader) (n int64, err error)
// Flush pending data immediately to 'wr', generating
// a new LZ4 Frame block. If no data is pending, no
// block is generated.
//
// This is a synchronous call; it will completely flush an
// asynchronous pipeline.
Flush() error
// Close the Writer to release underlying resources.
// Close() *MUST* be called on completion whether or not
// the Writer is in an error state.
Close() error
}
// Construct a Writer to compress an LZ4 frame into 'wr'.
//
// Specify optional paramters in 'opts'.
func NewWriter(wr io.Writer, opts ...OptT) Writer {
o := parseOpts(opts...)
// Linked blocks is only implemented in async mode.
if o.BlockLinked && o.NParallel <= 0 {
o.NParallel = 1
}
if o.NParallel <= 0 {
return sync.NewSyncWriter(wr, &o)
}
return async.NewAsyncWriter(wr, &o)
}
// Write a skip frame header to 'wr'.
// A skip frame of exactly size 'sz' must follow the header.
//
// 'sz' 32-bit unsigned long size of frame.
// 'nibble' 4-bit value shifted into block magic field
func WriteSkipFrameHeader(wr io.Writer, nibble uint8, sz uint32) (int, error) {
return header.WriteSkip(wr, nibble, sz)
}