diff options
author | n-peugnet <n.peugnet@free.fr> | 2021-08-26 12:41:43 +0200 |
---|---|---|
committer | n-peugnet <n.peugnet@free.fr> | 2021-08-26 12:41:43 +0200 |
commit | d25d38b8989787551c8b29500f580ea331afc4e8 (patch) | |
tree | b4d5e96f0ba930ee835594b32b569ff7b9127ae6 /repo.go | |
parent | fc5151c54a551b5f4f13aed6f4cf67098c7ed595 (diff) | |
download | dna-backup-d25d38b8989787551c8b29500f580ea331afc4e8.tar.gz dna-backup-d25d38b8989787551c8b29500f580ea331afc4e8.zip |
try to use more streams (part 2)
Diffstat (limited to 'repo.go')
-rw-r--r-- | repo.go | 83 |
1 files changed, 46 insertions, 37 deletions
@@ -25,6 +25,7 @@ repo/ package main import ( + "bufio" "encoding/gob" "fmt" "hash" @@ -61,14 +62,12 @@ func (r *Repo) Commit(source string) { os.Mkdir(newPath, 0775) os.Mkdir(newChunkPath, 0775) reader, writer := io.Pipe() - newChunks := make(chan []byte, 16) oldChunks := make(chan Chunk, 16) files := listFiles(source) go loadChunks(versions, oldChunks) go concatFiles(files, writer) - go chunkStream(reader, newChunks) hashes := hashChunks(oldChunks) - chunks := r.matchChunks(newChunks, hashes) + chunks := r.matchStream(reader, hashes) extractNewChunks(chunks) // storeChunks(newChunkPath, newChunks) // storeFiles(newFilesPath, files) @@ -223,46 +222,56 @@ func hashChunks(chunks <-chan Chunk) map[uint64]ChunkId { return hashes } -func (r *Repo) matchChunks(chunks <-chan []byte, hashes map[uint64]ChunkId) []Chunk { - hasher := rabinkarp64.New() - hasher.Write(<-chunks) - recipe := make([]Chunk, 0) +func readChunk(stream io.Reader) ([]byte, error) { + buff := make([]byte, chunkSize) + _, err := io.ReadFull(stream, buff) + return buff, err +} - var i uint64 - var offset, prefill, postfill int - var exists bool - var chunkId ChunkId - for c := range chunks { - buff := make([]byte, 0) - // Pre fill the window with the rest of the previous chunk - for prefill = 0; prefill < offset; prefill++ { - hasher.Roll(c[prefill]) - } - // Fill the window with the current chunk and match hash byte by byte - for ; offset < len(c); offset++ { - h := hasher.Sum64() - chunkId, exists = hashes[h] - if exists { - // log.Printf("Found existing chunk: New{id:%d, offset:%d} Old%d\n", i, offset, chunkId) - break +func (r *Repo) matchStream(stream io.Reader, hashes map[uint64]ChunkId) []Chunk { + var b byte + chunks := make([]Chunk, 0) + buff, err := readChunk(stream) + if err == io.EOF { + chunks = append(chunks, Chunk{Value: buff}) + return chunks + } + bufStream := bufio.NewReader(stream) + hasher := rabinkarp64.New() + hasher.Write(buff) + buff = make([]byte, 0, chunkSize) + for err != io.EOF { + h := hasher.Sum64() + chunkId, exists := hashes[h] + if exists { + if len(buff) > 0 { + log.Printf("Add new partial chunk of size: %d\n", len(buff)) + chunks = append(chunks, Chunk{Value: buff}) + } + log.Printf("Found existing chunk with offset %d: %d\n", len(buff), chunkId) + chunks = append(chunks, Chunk{Id: &chunkId}) + buff = make([]byte, 0, chunkSize) + for i := 0; i < chunkSize && err == nil; i++ { + b, err = bufStream.ReadByte() + hasher.Roll(b) } - hasher.Roll(c[offset]) - buff = append(buff, c[offset]) + continue } - // Fill the window with the rest of the current chunk if it matched early - for postfill = offset; postfill < len(c); postfill++ { - hasher.Roll(c[postfill]) + if len(buff) == chunkSize { + log.Println("Add new chunk") + chunks = append(chunks, Chunk{Value: buff}) + buff = make([]byte, 0, chunkSize) } - if len(buff) > 0 { - recipe = append(recipe, Chunk{Value: buff}) + b, err = bufStream.ReadByte() + if err != io.EOF { + hasher.Roll(b) + buff = append(buff, b) } - if exists { - recipe = append(recipe, Chunk{Id: &chunkId}) - } - offset %= chunkSize - i++ } - return recipe + if len(buff) > 0 { + chunks = append(chunks, Chunk{Value: buff}) + } + return chunks } func extractNewChunks(chunks []Chunk) (ret [][]Chunk) { |