diff options
author | n-peugnet <n.peugnet@free.fr> | 2021-09-22 22:18:29 +0200 |
---|---|---|
committer | n-peugnet <n.peugnet@free.fr> | 2021-09-22 22:18:29 +0200 |
commit | cac8795d33cd52d362e02590b4df4ccd9b96ff71 (patch) | |
tree | c18169ed9e3e6ed059c481afa6149783d1b9585a | |
parent | bded863524b701332bbc6e20ae72468301015c36 (diff) | |
download | dna-backup-cac8795d33cd52d362e02590b4df4ccd9b96ff71.tar.gz dna-backup-cac8795d33cd52d362e02590b4df4ccd9b96ff71.zip |
fix mystical bug by making multiple passes
We make ne passes while new blocks have been added.
This way we are assured that the result will be the same on following runs.
result :
161 /tmp/test-1/00001/recipe
161 /tmp/test-1/00001/files
0 /tmp/test-1/00001/hashes
40 /tmp/test-1/00001/chunks
482 /tmp/test-1/00001
9904052 /tmp/test-1/00000/recipe
5377304 /tmp/test-1/00000/files
2061331 /tmp/test-1/00000/hashes
207958371 /tmp/test-1/00000/chunks
225301178 /tmp/test-1/00000
225301740 /tmp/test-1
-rw-r--r-- | TODO.md | 3 | ||||
-rw-r--r-- | repo.go | 35 | ||||
-rw-r--r-- | repo_test.go | 7 |
3 files changed, 29 insertions, 16 deletions
@@ -89,3 +89,6 @@ The first solution would have an advantage if we were directly streaming the output of the program into DNA, as it could start DNA-encode it from the first chunk. The second solution will probably have better space-saving performance as waiting for better matches will probably lower the size of the patches. + +This has been fixed by making multiple passes until no more blocks are added, +this way we are assured that the result will be the same on the following run. @@ -154,13 +154,24 @@ func (r *Repo) Commit(source string) { newChunkPath := filepath.Join(newPath, chunksName) os.Mkdir(newPath, 0775) // TODO: handle errors os.Mkdir(newChunkPath, 0775) // TODO: handle errors - reader, writer := io.Pipe() files := listFiles(source) r.loadHashes(versions) r.loadFileLists(versions) r.loadRecipes(versions) - go concatFiles(&files, writer) - recipe := r.matchStream(reader, newVersion) + storeQueue := make(chan chunkData, 10) + storeEnd := make(chan bool) + go r.storageWorker(newVersion, storeQueue, storeEnd) + var last, nlast, pass uint64 + var recipe []Chunk + for ; nlast > last || pass == 0; pass++ { + logger.Infof("pass number %d", pass+1) + last = nlast + reader, writer := io.Pipe() + go concatFiles(&files, writer) + recipe, nlast = r.matchStream(reader, storeQueue, newVersion, last) + } + close(storeQueue) + <-storeEnd r.storeFileList(newVersion, unprefixFiles(files, source)) r.storeRecipe(newVersion, recipe) } @@ -355,7 +366,7 @@ func (r *Repo) storageWorker(version int, storeQueue <-chan chunkData, end chan< if err != nil { logger.Error(err) } - logger.Debug("stored", data.id) + // logger.Debug("stored ", data.id) } if err = file.Close(); err != nil { logger.Panic(err) @@ -566,10 +577,10 @@ func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64, st id: id, } r.chunkCache.Set(id, temp.Bytes()) - logger.Debug("add new chunk", id) + logger.Debug("add new chunk ", id) return NewStoredChunk(r, id), false } - logger.Debug("add new partial chunk of size:", chunk.Len()) + logger.Debug("add new partial chunk of size: ", chunk.Len()) return } @@ -593,22 +604,18 @@ func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version return []Chunk{prevD, currD} } -func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { +func (r *Repo) matchStream(stream io.Reader, storeQueue chan<- chunkData, version int, last uint64) ([]Chunk, uint64) { var b byte var chunks []Chunk var prev *TempChunk - var last uint64 var err error bufStream := bufio.NewReaderSize(stream, r.chunkSize*2) buff := make([]byte, r.chunkSize, r.chunkSize*2) - storeQueue := make(chan chunkData, 10) - storeEnd := make(chan bool) - go r.storageWorker(version, storeQueue, storeEnd) if n, err := io.ReadFull(stream, buff); n < r.chunkSize { if err == io.ErrUnexpectedEOF { c, _ := r.encodeTempChunk(NewTempChunk(buff[:n]), version, &last, storeQueue) chunks = append(chunks, c) - return chunks + return chunks, last } else { logger.Panicf("matching stream, read only %d bytes with error '%s'", n, err) } @@ -671,9 +678,7 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { } chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...) } - close(storeQueue) - <-storeEnd - return chunks + return chunks, last } func (r *Repo) restoreStream(stream io.WriteCloser, recipe []Chunk) { diff --git a/repo_test.go b/repo_test.go index 0ee7bef..52da9ae 100644 --- a/repo_test.go +++ b/repo_test.go @@ -224,7 +224,12 @@ func TestBsdiff(t *testing.T) { newPath := filepath.Join(repo.path, fmt.Sprintf(versionFmt, newVersion)) os.MkdirAll(newPath, 0775) reader := getDataStream(dataDir, concatFiles) - recipe := repo.matchStream(reader, newVersion) + storeQueue := make(chan chunkData, 10) + storeEnd := make(chan bool) + go repo.storageWorker(newVersion, storeQueue, storeEnd) + recipe, _ := repo.matchStream(reader, storeQueue, newVersion, 0) + close(storeQueue) + <-storeEnd newChunks := extractDeltaChunks(recipe) testutils.AssertLen(t, 2, newChunks, "New delta chunks:") for _, c := range newChunks { |