diff options
author | n-peugnet <n.peugnet@free.fr> | 2021-09-02 15:04:05 +0200 |
---|---|---|
committer | n-peugnet <n.peugnet@free.fr> | 2021-09-02 15:04:05 +0200 |
commit | 9387c79a1862e7572ffc59919c05cf1dc9aeeae5 (patch) | |
tree | d3e037378edacff40a242434ed1d3fe971051be3 /repo.go | |
parent | 1dec2cdc84a6497c893d84485c6f94589f997215 (diff) | |
download | dna-backup-9387c79a1862e7572ffc59919c05cf1dc9aeeae5.tar.gz dna-backup-9387c79a1862e7572ffc59919c05cf1dc9aeeae5.zip |
find similar chunks while matching stream
Diffstat (limited to 'repo.go')
-rw-r--r-- | repo.go | 95 |
1 files changed, 78 insertions, 17 deletions
@@ -26,6 +26,7 @@ package main import ( "bufio" + "bytes" "encoding/gob" "fmt" "hash" @@ -91,8 +92,8 @@ func (r *Repo) Commit(source string) { files := listFiles(source) go r.loadChunks(versions, oldChunks) go concatFiles(files, writer) - fingerprints, _ := r.hashChunks(oldChunks) - chunks := r.matchStream(reader, fingerprints) + fingerprints, sketches := r.hashChunks(oldChunks) + chunks := r.matchStream(reader, fingerprints, sketches) extractTempChunks(chunks) // storeChunks(newChunkPath, newChunks) // storeFiles(newFilesPath, files) @@ -146,6 +147,10 @@ func concatFiles(files []File, stream io.WriteCloser) { stream.Close() } +func (r *Repo) chunkMinLen() int { + return SuperFeatureSize(r.chunkSize, r.sketchSfCount, r.sketchFCount) +} + func (r *Repo) chunkStream(stream io.Reader, chunks chan<- []byte) { var buff []byte var prev, read = r.chunkSize, 0 @@ -287,15 +292,47 @@ func (r *Repo) findSimilarChunk(chunk Chunk, sketches SketchMap) (*ChunkId, bool return similarChunk, similarChunk != nil } -// TODO: encode stream -func (r *Repo) encodeStream(stream io.Reader, fingerprints FingerprintMap, sketches SketchMap) []Chunk { - chunks := r.matchStream(stream, fingerprints) - return chunks +func (r *Repo) tryDeltaEncodeChunk(temp *TempChunk, sketches SketchMap) (Chunk, bool) { + id, found := r.findSimilarChunk(temp, sketches) + if found { + var buff bytes.Buffer + if err := r.differ.Diff(id.Reader(r), temp.Reader(), &buff); err != nil { + log.Println("Error trying delta encode chunk:", temp, "with source:", id, ":", err) + } else { + return &DeltaChunk{ + repo: r, + source: id, + patch: buff.Bytes(), + size: temp.Len(), + }, true + } + } + // TODO: if temp is of chunkSize, save it as a real new Chunk (add it to maps) + return temp, false } -func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chunk { +func (r *Repo) tryDeltaEncodeChunks(prev *TempChunk, curr *TempChunk, sketches SketchMap) []Chunk { + if prev == nil { + c, _ := r.tryDeltaEncodeChunk(curr, sketches) + return []Chunk{c} + } else if curr.Len() < r.chunkMinLen() { + c, success := r.tryDeltaEncodeChunk(NewTempChunk(append(prev.Bytes(), curr.Bytes()...)), sketches) + if success { + return []Chunk{c} + } else { + return []Chunk{prev, curr} + } + } else { + prevD, _ := r.tryDeltaEncodeChunk(prev, sketches) + currD, _ := r.tryDeltaEncodeChunk(curr, sketches) + return []Chunk{prevD, currD} + } +} + +func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap, sketches SketchMap) []Chunk { var b byte var chunks []Chunk + var prev *TempChunk bufStream := bufio.NewReaderSize(stream, r.chunkSize) buff := make([]byte, 0, r.chunkSize*2) n, err := io.ReadFull(stream, buff[:r.chunkSize]) @@ -312,7 +349,13 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun if len(buff) > r.chunkSize && len(buff) < r.chunkSize*2 { size := len(buff) - r.chunkSize log.Println("Add new partial chunk of size:", size) - chunks = append(chunks, NewTempChunk(buff[:size])) + temp := NewTempChunk(buff[:size]) + chunks = append(chunks, r.tryDeltaEncodeChunks(prev, temp, sketches)...) + prev = nil + } else if prev != nil { + c, _ := r.tryDeltaEncodeChunk(prev, sketches) + chunks = append(chunks, c) + prev = nil } log.Printf("Add existing chunk: %d\n", chunkId) chunks = append(chunks, NewChunkFile(r, chunkId)) @@ -326,7 +369,11 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun } if len(buff) == r.chunkSize*2 { log.Println("Add new chunk") - chunks = append(chunks, NewTempChunk(buff[:r.chunkSize])) + if prev != nil { + chunk, _ := r.tryDeltaEncodeChunk(prev, sketches) + chunks = append(chunks, chunk) + } + prev = NewTempChunk(buff[:r.chunkSize]) tmp := buff[r.chunkSize:] buff = make([]byte, 0, r.chunkSize*2) buff = append(buff, tmp...) @@ -337,14 +384,18 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun buff = append(buff, b) } } - if len(buff) > r.chunkSize { - log.Println("Add new chunk") - chunks = append(chunks, NewTempChunk(buff[:r.chunkSize])) - log.Println("Add new partial chunk of size:", len(buff)-r.chunkSize) - chunks = append(chunks, NewTempChunk(buff[r.chunkSize:])) - } else if len(buff) > 0 { - log.Println("Add new partial chunk of size:", len(buff)) - chunks = append(chunks, NewTempChunk(buff)) + if len(buff) > 0 { + var temp *TempChunk + if len(buff) > r.chunkSize { + log.Println("Add new chunk") + prev = NewTempChunk(buff[:r.chunkSize]) + log.Println("Add new partial chunk of size:", len(buff)-r.chunkSize) + temp = NewTempChunk(buff[r.chunkSize:]) + } else { + log.Println("Add new partial chunk of size:", len(buff)) + temp = NewTempChunk(buff) + } + chunks = append(chunks, r.tryDeltaEncodeChunks(prev, temp, sketches)...) } return chunks } @@ -390,6 +441,16 @@ func extractTempChunks(chunks []Chunk) (ret []*TempChunk) { return } +func extractDeltaChunks(chunks []Chunk) (ret []*DeltaChunk) { + for _, c := range chunks { + tmp, isDelta := c.(*DeltaChunk) + if isDelta { + ret = append(ret, tmp) + } + } + return +} + func writeFile(filePath string, object interface{}) error { file, err := os.Create(filePath) if err == nil { |