aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorn-peugnet <n.peugnet@free.fr>2021-09-02 15:04:05 +0200
committern-peugnet <n.peugnet@free.fr>2021-09-02 15:04:05 +0200
commit9387c79a1862e7572ffc59919c05cf1dc9aeeae5 (patch)
treed3e037378edacff40a242434ed1d3fe971051be3
parent1dec2cdc84a6497c893d84485c6f94589f997215 (diff)
downloaddna-backup-9387c79a1862e7572ffc59919c05cf1dc9aeeae5.tar.gz
dna-backup-9387c79a1862e7572ffc59919c05cf1dc9aeeae5.zip
find similar chunks while matching stream
-rw-r--r--TODO.md3
-rw-r--r--chunk.go5
-rw-r--r--repo.go95
-rw-r--r--repo_test.go32
-rw-r--r--tar.go10
5 files changed, 101 insertions, 44 deletions
diff --git a/TODO.md b/TODO.md
index 496bf10..0f5e88d 100644
--- a/TODO.md
+++ b/TODO.md
@@ -21,4 +21,5 @@ priority 1
priority 2
----------
- use more the `Reader` API (which is analoguous to the `IOStream` in Java)
-- refactor matchStream as right now it is quite
+- refactor matchStream as right now it is quite complex
+- store sketches and fingerprint in Repo
diff --git a/chunk.go b/chunk.go
index 3fc9057..abcdf1c 100644
--- a/chunk.go
+++ b/chunk.go
@@ -108,6 +108,10 @@ func (c *TempChunk) Len() int {
return len(c.value)
}
+func (c *TempChunk) Bytes() []byte {
+ return c.value
+}
+
func (c *TempChunk) AppendFrom(r io.Reader) {
buff, err := io.ReadAll(r)
if err != nil {
@@ -129,6 +133,7 @@ func (c *DeltaChunk) Reader() ChunkReader {
return &buff
}
+// TODO: Maybe return the size of the patch instead ?
func (c *DeltaChunk) Len() int {
return c.size
}
diff --git a/repo.go b/repo.go
index 32666f4..c077dff 100644
--- a/repo.go
+++ b/repo.go
@@ -26,6 +26,7 @@ package main
import (
"bufio"
+ "bytes"
"encoding/gob"
"fmt"
"hash"
@@ -91,8 +92,8 @@ func (r *Repo) Commit(source string) {
files := listFiles(source)
go r.loadChunks(versions, oldChunks)
go concatFiles(files, writer)
- fingerprints, _ := r.hashChunks(oldChunks)
- chunks := r.matchStream(reader, fingerprints)
+ fingerprints, sketches := r.hashChunks(oldChunks)
+ chunks := r.matchStream(reader, fingerprints, sketches)
extractTempChunks(chunks)
// storeChunks(newChunkPath, newChunks)
// storeFiles(newFilesPath, files)
@@ -146,6 +147,10 @@ func concatFiles(files []File, stream io.WriteCloser) {
stream.Close()
}
+func (r *Repo) chunkMinLen() int {
+ return SuperFeatureSize(r.chunkSize, r.sketchSfCount, r.sketchFCount)
+}
+
func (r *Repo) chunkStream(stream io.Reader, chunks chan<- []byte) {
var buff []byte
var prev, read = r.chunkSize, 0
@@ -287,15 +292,47 @@ func (r *Repo) findSimilarChunk(chunk Chunk, sketches SketchMap) (*ChunkId, bool
return similarChunk, similarChunk != nil
}
-// TODO: encode stream
-func (r *Repo) encodeStream(stream io.Reader, fingerprints FingerprintMap, sketches SketchMap) []Chunk {
- chunks := r.matchStream(stream, fingerprints)
- return chunks
+func (r *Repo) tryDeltaEncodeChunk(temp *TempChunk, sketches SketchMap) (Chunk, bool) {
+ id, found := r.findSimilarChunk(temp, sketches)
+ if found {
+ var buff bytes.Buffer
+ if err := r.differ.Diff(id.Reader(r), temp.Reader(), &buff); err != nil {
+ log.Println("Error trying delta encode chunk:", temp, "with source:", id, ":", err)
+ } else {
+ return &DeltaChunk{
+ repo: r,
+ source: id,
+ patch: buff.Bytes(),
+ size: temp.Len(),
+ }, true
+ }
+ }
+ // TODO: if temp is of chunkSize, save it as a real new Chunk (add it to maps)
+ return temp, false
}
-func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chunk {
+func (r *Repo) tryDeltaEncodeChunks(prev *TempChunk, curr *TempChunk, sketches SketchMap) []Chunk {
+ if prev == nil {
+ c, _ := r.tryDeltaEncodeChunk(curr, sketches)
+ return []Chunk{c}
+ } else if curr.Len() < r.chunkMinLen() {
+ c, success := r.tryDeltaEncodeChunk(NewTempChunk(append(prev.Bytes(), curr.Bytes()...)), sketches)
+ if success {
+ return []Chunk{c}
+ } else {
+ return []Chunk{prev, curr}
+ }
+ } else {
+ prevD, _ := r.tryDeltaEncodeChunk(prev, sketches)
+ currD, _ := r.tryDeltaEncodeChunk(curr, sketches)
+ return []Chunk{prevD, currD}
+ }
+}
+
+func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap, sketches SketchMap) []Chunk {
var b byte
var chunks []Chunk
+ var prev *TempChunk
bufStream := bufio.NewReaderSize(stream, r.chunkSize)
buff := make([]byte, 0, r.chunkSize*2)
n, err := io.ReadFull(stream, buff[:r.chunkSize])
@@ -312,7 +349,13 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun
if len(buff) > r.chunkSize && len(buff) < r.chunkSize*2 {
size := len(buff) - r.chunkSize
log.Println("Add new partial chunk of size:", size)
- chunks = append(chunks, NewTempChunk(buff[:size]))
+ temp := NewTempChunk(buff[:size])
+ chunks = append(chunks, r.tryDeltaEncodeChunks(prev, temp, sketches)...)
+ prev = nil
+ } else if prev != nil {
+ c, _ := r.tryDeltaEncodeChunk(prev, sketches)
+ chunks = append(chunks, c)
+ prev = nil
}
log.Printf("Add existing chunk: %d\n", chunkId)
chunks = append(chunks, NewChunkFile(r, chunkId))
@@ -326,7 +369,11 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun
}
if len(buff) == r.chunkSize*2 {
log.Println("Add new chunk")
- chunks = append(chunks, NewTempChunk(buff[:r.chunkSize]))
+ if prev != nil {
+ chunk, _ := r.tryDeltaEncodeChunk(prev, sketches)
+ chunks = append(chunks, chunk)
+ }
+ prev = NewTempChunk(buff[:r.chunkSize])
tmp := buff[r.chunkSize:]
buff = make([]byte, 0, r.chunkSize*2)
buff = append(buff, tmp...)
@@ -337,14 +384,18 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun
buff = append(buff, b)
}
}
- if len(buff) > r.chunkSize {
- log.Println("Add new chunk")
- chunks = append(chunks, NewTempChunk(buff[:r.chunkSize]))
- log.Println("Add new partial chunk of size:", len(buff)-r.chunkSize)
- chunks = append(chunks, NewTempChunk(buff[r.chunkSize:]))
- } else if len(buff) > 0 {
- log.Println("Add new partial chunk of size:", len(buff))
- chunks = append(chunks, NewTempChunk(buff))
+ if len(buff) > 0 {
+ var temp *TempChunk
+ if len(buff) > r.chunkSize {
+ log.Println("Add new chunk")
+ prev = NewTempChunk(buff[:r.chunkSize])
+ log.Println("Add new partial chunk of size:", len(buff)-r.chunkSize)
+ temp = NewTempChunk(buff[r.chunkSize:])
+ } else {
+ log.Println("Add new partial chunk of size:", len(buff))
+ temp = NewTempChunk(buff)
+ }
+ chunks = append(chunks, r.tryDeltaEncodeChunks(prev, temp, sketches)...)
}
return chunks
}
@@ -390,6 +441,16 @@ func extractTempChunks(chunks []Chunk) (ret []*TempChunk) {
return
}
+func extractDeltaChunks(chunks []Chunk) (ret []*DeltaChunk) {
+ for _, c := range chunks {
+ tmp, isDelta := c.(*DeltaChunk)
+ if isDelta {
+ ret = append(ret, tmp)
+ }
+ }
+ return
+}
+
func writeFile(filePath string, object interface{}) error {
file, err := os.Create(filePath)
if err == nil {
diff --git a/repo_test.go b/repo_test.go
index d3aa0d9..b09f6ed 100644
--- a/repo_test.go
+++ b/repo_test.go
@@ -9,8 +9,6 @@ import (
"path"
"reflect"
"testing"
-
- "github.com/gabstv/go-bsdiff/pkg/bsdiff"
)
func chunkCompare(t *testing.T, dataDir string, repo *Repo, testFiles []string, chunkCount int) {
@@ -175,14 +173,16 @@ func TestBsdiff(t *testing.T) {
resultDir := t.TempDir()
repo := NewRepo(resultDir)
dataDir := path.Join("test", "data", "logs")
- addedFile := path.Join(dataDir, "2", "slogTest.log")
+ addedFile1 := path.Join(dataDir, "2", "slogTest.log")
+ addedFile2 := path.Join(dataDir, "3", "slogTest.log")
// Store initial chunks
prepareChunks(dataDir, repo, concatFiles)
// Modify data
- input := []byte("hello")
- ioutil.WriteFile(addedFile, input, 0664)
- defer os.Remove(addedFile)
+ ioutil.WriteFile(addedFile1, []byte("hello"), 0664)
+ defer os.Remove(addedFile1)
+ ioutil.WriteFile(addedFile2, make([]byte, 4000), 0664)
+ defer os.Remove(addedFile2)
// Load previously stored chunks
oldChunks := make(chan StoredChunk, 16)
@@ -192,21 +192,13 @@ func TestBsdiff(t *testing.T) {
// Read new data
reader := getDataStream(dataDir, concatFiles)
- recipe := repo.matchStream(reader, fingerprints)
- newChunks := extractTempChunks(repo.mergeTempChunks(recipe))
- assertLen(t, 2, newChunks, "New chunks:")
+ recipe := repo.matchStream(reader, fingerprints, sketches)
+ newChunks := extractDeltaChunks(repo.mergeTempChunks(recipe))
+ assertLen(t, 2, newChunks, "New delta chunks:")
for _, c := range newChunks {
- id, exists := repo.findSimilarChunk(c, sketches)
- log.Println(id, exists)
- if exists {
- patch := new(bytes.Buffer)
- stored := id.Reader(repo)
- new := c.Reader()
- bsdiff.Reader(stored, new, patch)
- log.Println("Patch size:", patch.Len())
- if patch.Len() >= repo.chunkSize/10 {
- t.Errorf("Bsdiff of chunk is too large: %d", patch.Len())
- }
+ log.Println("Patch size:", len(c.patch))
+ if len(c.patch) >= repo.chunkSize/10 {
+ t.Errorf("Bsdiff of chunk is too large: %d", len(c.patch))
}
}
}
diff --git a/tar.go b/tar.go
index e811840..e2703c8 100644
--- a/tar.go
+++ b/tar.go
@@ -26,18 +26,16 @@ func streamFilesTar(files []File, stream io.WriteCloser) {
continue
}
if err := tarStream.WriteHeader(hdr); err != nil {
- log.Printf("Error writing tar header to stream for file '%s': %s\n", f.Path, err)
- continue
+ log.Panicf("Error writing tar header to stream for file '%s': %s\n", f.Path, err)
}
if _, err := io.Copy(tarStream, file); err != nil {
- log.Printf("Error writing file to stream '%s': %s\n", f.Path, err)
- continue
+ log.Panicf("Error writing file to stream '%s': %s\n", f.Path, err)
}
}
if err := tarStream.Close(); err != nil {
- log.Fatal("Error closing tar stream:", err)
+ log.Panic("Error closing tar stream:", err)
}
if err := stream.Close(); err != nil {
- log.Fatal("Error closing stream:", err)
+ log.Panic("Error closing stream:", err)
}
}