aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorn-peugnet <n.peugnet@free.fr>2021-09-22 22:18:29 +0200
committern-peugnet <n.peugnet@free.fr>2021-09-22 22:18:29 +0200
commitcac8795d33cd52d362e02590b4df4ccd9b96ff71 (patch)
treec18169ed9e3e6ed059c481afa6149783d1b9585a
parentbded863524b701332bbc6e20ae72468301015c36 (diff)
downloaddna-backup-cac8795d33cd52d362e02590b4df4ccd9b96ff71.tar.gz
dna-backup-cac8795d33cd52d362e02590b4df4ccd9b96ff71.zip
fix mystical bug by making multiple passes
We make ne passes while new blocks have been added. This way we are assured that the result will be the same on following runs. result : 161 /tmp/test-1/00001/recipe 161 /tmp/test-1/00001/files 0 /tmp/test-1/00001/hashes 40 /tmp/test-1/00001/chunks 482 /tmp/test-1/00001 9904052 /tmp/test-1/00000/recipe 5377304 /tmp/test-1/00000/files 2061331 /tmp/test-1/00000/hashes 207958371 /tmp/test-1/00000/chunks 225301178 /tmp/test-1/00000 225301740 /tmp/test-1
-rw-r--r--TODO.md3
-rw-r--r--repo.go35
-rw-r--r--repo_test.go7
3 files changed, 29 insertions, 16 deletions
diff --git a/TODO.md b/TODO.md
index f3c2d3f..b154718 100644
--- a/TODO.md
+++ b/TODO.md
@@ -89,3 +89,6 @@ The first solution would have an advantage if we were directly streaming the
output of the program into DNA, as it could start DNA-encode it from the first
chunk. The second solution will probably have better space-saving performance as
waiting for better matches will probably lower the size of the patches.
+
+This has been fixed by making multiple passes until no more blocks are added,
+this way we are assured that the result will be the same on the following run.
diff --git a/repo.go b/repo.go
index d58eebb..ec5935d 100644
--- a/repo.go
+++ b/repo.go
@@ -154,13 +154,24 @@ func (r *Repo) Commit(source string) {
newChunkPath := filepath.Join(newPath, chunksName)
os.Mkdir(newPath, 0775) // TODO: handle errors
os.Mkdir(newChunkPath, 0775) // TODO: handle errors
- reader, writer := io.Pipe()
files := listFiles(source)
r.loadHashes(versions)
r.loadFileLists(versions)
r.loadRecipes(versions)
- go concatFiles(&files, writer)
- recipe := r.matchStream(reader, newVersion)
+ storeQueue := make(chan chunkData, 10)
+ storeEnd := make(chan bool)
+ go r.storageWorker(newVersion, storeQueue, storeEnd)
+ var last, nlast, pass uint64
+ var recipe []Chunk
+ for ; nlast > last || pass == 0; pass++ {
+ logger.Infof("pass number %d", pass+1)
+ last = nlast
+ reader, writer := io.Pipe()
+ go concatFiles(&files, writer)
+ recipe, nlast = r.matchStream(reader, storeQueue, newVersion, last)
+ }
+ close(storeQueue)
+ <-storeEnd
r.storeFileList(newVersion, unprefixFiles(files, source))
r.storeRecipe(newVersion, recipe)
}
@@ -355,7 +366,7 @@ func (r *Repo) storageWorker(version int, storeQueue <-chan chunkData, end chan<
if err != nil {
logger.Error(err)
}
- logger.Debug("stored", data.id)
+ // logger.Debug("stored ", data.id)
}
if err = file.Close(); err != nil {
logger.Panic(err)
@@ -566,10 +577,10 @@ func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64, st
id: id,
}
r.chunkCache.Set(id, temp.Bytes())
- logger.Debug("add new chunk", id)
+ logger.Debug("add new chunk ", id)
return NewStoredChunk(r, id), false
}
- logger.Debug("add new partial chunk of size:", chunk.Len())
+ logger.Debug("add new partial chunk of size: ", chunk.Len())
return
}
@@ -593,22 +604,18 @@ func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version
return []Chunk{prevD, currD}
}
-func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
+func (r *Repo) matchStream(stream io.Reader, storeQueue chan<- chunkData, version int, last uint64) ([]Chunk, uint64) {
var b byte
var chunks []Chunk
var prev *TempChunk
- var last uint64
var err error
bufStream := bufio.NewReaderSize(stream, r.chunkSize*2)
buff := make([]byte, r.chunkSize, r.chunkSize*2)
- storeQueue := make(chan chunkData, 10)
- storeEnd := make(chan bool)
- go r.storageWorker(version, storeQueue, storeEnd)
if n, err := io.ReadFull(stream, buff); n < r.chunkSize {
if err == io.ErrUnexpectedEOF {
c, _ := r.encodeTempChunk(NewTempChunk(buff[:n]), version, &last, storeQueue)
chunks = append(chunks, c)
- return chunks
+ return chunks, last
} else {
logger.Panicf("matching stream, read only %d bytes with error '%s'", n, err)
}
@@ -671,9 +678,7 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
}
chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...)
}
- close(storeQueue)
- <-storeEnd
- return chunks
+ return chunks, last
}
func (r *Repo) restoreStream(stream io.WriteCloser, recipe []Chunk) {
diff --git a/repo_test.go b/repo_test.go
index 0ee7bef..52da9ae 100644
--- a/repo_test.go
+++ b/repo_test.go
@@ -224,7 +224,12 @@ func TestBsdiff(t *testing.T) {
newPath := filepath.Join(repo.path, fmt.Sprintf(versionFmt, newVersion))
os.MkdirAll(newPath, 0775)
reader := getDataStream(dataDir, concatFiles)
- recipe := repo.matchStream(reader, newVersion)
+ storeQueue := make(chan chunkData, 10)
+ storeEnd := make(chan bool)
+ go repo.storageWorker(newVersion, storeQueue, storeEnd)
+ recipe, _ := repo.matchStream(reader, storeQueue, newVersion, 0)
+ close(storeQueue)
+ <-storeEnd
newChunks := extractDeltaChunks(recipe)
testutils.AssertLen(t, 2, newChunks, "New delta chunks:")
for _, c := range newChunks {