diff options
-rw-r--r-- | TODO.md | 3 | ||||
-rw-r--r-- | chunk.go | 5 | ||||
-rw-r--r-- | repo.go | 95 | ||||
-rw-r--r-- | repo_test.go | 32 | ||||
-rw-r--r-- | tar.go | 10 |
5 files changed, 101 insertions, 44 deletions
@@ -21,4 +21,5 @@ priority 1 priority 2 ---------- - use more the `Reader` API (which is analoguous to the `IOStream` in Java) -- refactor matchStream as right now it is quite +- refactor matchStream as right now it is quite complex +- store sketches and fingerprint in Repo @@ -108,6 +108,10 @@ func (c *TempChunk) Len() int { return len(c.value) } +func (c *TempChunk) Bytes() []byte { + return c.value +} + func (c *TempChunk) AppendFrom(r io.Reader) { buff, err := io.ReadAll(r) if err != nil { @@ -129,6 +133,7 @@ func (c *DeltaChunk) Reader() ChunkReader { return &buff } +// TODO: Maybe return the size of the patch instead ? func (c *DeltaChunk) Len() int { return c.size } @@ -26,6 +26,7 @@ package main import ( "bufio" + "bytes" "encoding/gob" "fmt" "hash" @@ -91,8 +92,8 @@ func (r *Repo) Commit(source string) { files := listFiles(source) go r.loadChunks(versions, oldChunks) go concatFiles(files, writer) - fingerprints, _ := r.hashChunks(oldChunks) - chunks := r.matchStream(reader, fingerprints) + fingerprints, sketches := r.hashChunks(oldChunks) + chunks := r.matchStream(reader, fingerprints, sketches) extractTempChunks(chunks) // storeChunks(newChunkPath, newChunks) // storeFiles(newFilesPath, files) @@ -146,6 +147,10 @@ func concatFiles(files []File, stream io.WriteCloser) { stream.Close() } +func (r *Repo) chunkMinLen() int { + return SuperFeatureSize(r.chunkSize, r.sketchSfCount, r.sketchFCount) +} + func (r *Repo) chunkStream(stream io.Reader, chunks chan<- []byte) { var buff []byte var prev, read = r.chunkSize, 0 @@ -287,15 +292,47 @@ func (r *Repo) findSimilarChunk(chunk Chunk, sketches SketchMap) (*ChunkId, bool return similarChunk, similarChunk != nil } -// TODO: encode stream -func (r *Repo) encodeStream(stream io.Reader, fingerprints FingerprintMap, sketches SketchMap) []Chunk { - chunks := r.matchStream(stream, fingerprints) - return chunks +func (r *Repo) tryDeltaEncodeChunk(temp *TempChunk, sketches SketchMap) (Chunk, bool) { + id, found := r.findSimilarChunk(temp, sketches) + if found { + var buff bytes.Buffer + if err := r.differ.Diff(id.Reader(r), temp.Reader(), &buff); err != nil { + log.Println("Error trying delta encode chunk:", temp, "with source:", id, ":", err) + } else { + return &DeltaChunk{ + repo: r, + source: id, + patch: buff.Bytes(), + size: temp.Len(), + }, true + } + } + // TODO: if temp is of chunkSize, save it as a real new Chunk (add it to maps) + return temp, false } -func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chunk { +func (r *Repo) tryDeltaEncodeChunks(prev *TempChunk, curr *TempChunk, sketches SketchMap) []Chunk { + if prev == nil { + c, _ := r.tryDeltaEncodeChunk(curr, sketches) + return []Chunk{c} + } else if curr.Len() < r.chunkMinLen() { + c, success := r.tryDeltaEncodeChunk(NewTempChunk(append(prev.Bytes(), curr.Bytes()...)), sketches) + if success { + return []Chunk{c} + } else { + return []Chunk{prev, curr} + } + } else { + prevD, _ := r.tryDeltaEncodeChunk(prev, sketches) + currD, _ := r.tryDeltaEncodeChunk(curr, sketches) + return []Chunk{prevD, currD} + } +} + +func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap, sketches SketchMap) []Chunk { var b byte var chunks []Chunk + var prev *TempChunk bufStream := bufio.NewReaderSize(stream, r.chunkSize) buff := make([]byte, 0, r.chunkSize*2) n, err := io.ReadFull(stream, buff[:r.chunkSize]) @@ -312,7 +349,13 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun if len(buff) > r.chunkSize && len(buff) < r.chunkSize*2 { size := len(buff) - r.chunkSize log.Println("Add new partial chunk of size:", size) - chunks = append(chunks, NewTempChunk(buff[:size])) + temp := NewTempChunk(buff[:size]) + chunks = append(chunks, r.tryDeltaEncodeChunks(prev, temp, sketches)...) + prev = nil + } else if prev != nil { + c, _ := r.tryDeltaEncodeChunk(prev, sketches) + chunks = append(chunks, c) + prev = nil } log.Printf("Add existing chunk: %d\n", chunkId) chunks = append(chunks, NewChunkFile(r, chunkId)) @@ -326,7 +369,11 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun } if len(buff) == r.chunkSize*2 { log.Println("Add new chunk") - chunks = append(chunks, NewTempChunk(buff[:r.chunkSize])) + if prev != nil { + chunk, _ := r.tryDeltaEncodeChunk(prev, sketches) + chunks = append(chunks, chunk) + } + prev = NewTempChunk(buff[:r.chunkSize]) tmp := buff[r.chunkSize:] buff = make([]byte, 0, r.chunkSize*2) buff = append(buff, tmp...) @@ -337,14 +384,18 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun buff = append(buff, b) } } - if len(buff) > r.chunkSize { - log.Println("Add new chunk") - chunks = append(chunks, NewTempChunk(buff[:r.chunkSize])) - log.Println("Add new partial chunk of size:", len(buff)-r.chunkSize) - chunks = append(chunks, NewTempChunk(buff[r.chunkSize:])) - } else if len(buff) > 0 { - log.Println("Add new partial chunk of size:", len(buff)) - chunks = append(chunks, NewTempChunk(buff)) + if len(buff) > 0 { + var temp *TempChunk + if len(buff) > r.chunkSize { + log.Println("Add new chunk") + prev = NewTempChunk(buff[:r.chunkSize]) + log.Println("Add new partial chunk of size:", len(buff)-r.chunkSize) + temp = NewTempChunk(buff[r.chunkSize:]) + } else { + log.Println("Add new partial chunk of size:", len(buff)) + temp = NewTempChunk(buff) + } + chunks = append(chunks, r.tryDeltaEncodeChunks(prev, temp, sketches)...) } return chunks } @@ -390,6 +441,16 @@ func extractTempChunks(chunks []Chunk) (ret []*TempChunk) { return } +func extractDeltaChunks(chunks []Chunk) (ret []*DeltaChunk) { + for _, c := range chunks { + tmp, isDelta := c.(*DeltaChunk) + if isDelta { + ret = append(ret, tmp) + } + } + return +} + func writeFile(filePath string, object interface{}) error { file, err := os.Create(filePath) if err == nil { diff --git a/repo_test.go b/repo_test.go index d3aa0d9..b09f6ed 100644 --- a/repo_test.go +++ b/repo_test.go @@ -9,8 +9,6 @@ import ( "path" "reflect" "testing" - - "github.com/gabstv/go-bsdiff/pkg/bsdiff" ) func chunkCompare(t *testing.T, dataDir string, repo *Repo, testFiles []string, chunkCount int) { @@ -175,14 +173,16 @@ func TestBsdiff(t *testing.T) { resultDir := t.TempDir() repo := NewRepo(resultDir) dataDir := path.Join("test", "data", "logs") - addedFile := path.Join(dataDir, "2", "slogTest.log") + addedFile1 := path.Join(dataDir, "2", "slogTest.log") + addedFile2 := path.Join(dataDir, "3", "slogTest.log") // Store initial chunks prepareChunks(dataDir, repo, concatFiles) // Modify data - input := []byte("hello") - ioutil.WriteFile(addedFile, input, 0664) - defer os.Remove(addedFile) + ioutil.WriteFile(addedFile1, []byte("hello"), 0664) + defer os.Remove(addedFile1) + ioutil.WriteFile(addedFile2, make([]byte, 4000), 0664) + defer os.Remove(addedFile2) // Load previously stored chunks oldChunks := make(chan StoredChunk, 16) @@ -192,21 +192,13 @@ func TestBsdiff(t *testing.T) { // Read new data reader := getDataStream(dataDir, concatFiles) - recipe := repo.matchStream(reader, fingerprints) - newChunks := extractTempChunks(repo.mergeTempChunks(recipe)) - assertLen(t, 2, newChunks, "New chunks:") + recipe := repo.matchStream(reader, fingerprints, sketches) + newChunks := extractDeltaChunks(repo.mergeTempChunks(recipe)) + assertLen(t, 2, newChunks, "New delta chunks:") for _, c := range newChunks { - id, exists := repo.findSimilarChunk(c, sketches) - log.Println(id, exists) - if exists { - patch := new(bytes.Buffer) - stored := id.Reader(repo) - new := c.Reader() - bsdiff.Reader(stored, new, patch) - log.Println("Patch size:", patch.Len()) - if patch.Len() >= repo.chunkSize/10 { - t.Errorf("Bsdiff of chunk is too large: %d", patch.Len()) - } + log.Println("Patch size:", len(c.patch)) + if len(c.patch) >= repo.chunkSize/10 { + t.Errorf("Bsdiff of chunk is too large: %d", len(c.patch)) } } } @@ -26,18 +26,16 @@ func streamFilesTar(files []File, stream io.WriteCloser) { continue } if err := tarStream.WriteHeader(hdr); err != nil { - log.Printf("Error writing tar header to stream for file '%s': %s\n", f.Path, err) - continue + log.Panicf("Error writing tar header to stream for file '%s': %s\n", f.Path, err) } if _, err := io.Copy(tarStream, file); err != nil { - log.Printf("Error writing file to stream '%s': %s\n", f.Path, err) - continue + log.Panicf("Error writing file to stream '%s': %s\n", f.Path, err) } } if err := tarStream.Close(); err != nil { - log.Fatal("Error closing tar stream:", err) + log.Panic("Error closing tar stream:", err) } if err := stream.Close(); err != nil { - log.Fatal("Error closing stream:", err) + log.Panic("Error closing stream:", err) } } |