From cac8795d33cd52d362e02590b4df4ccd9b96ff71 Mon Sep 17 00:00:00 2001 From: n-peugnet Date: Wed, 22 Sep 2021 22:18:29 +0200 Subject: fix mystical bug by making multiple passes We make ne passes while new blocks have been added. This way we are assured that the result will be the same on following runs. result : 161 /tmp/test-1/00001/recipe 161 /tmp/test-1/00001/files 0 /tmp/test-1/00001/hashes 40 /tmp/test-1/00001/chunks 482 /tmp/test-1/00001 9904052 /tmp/test-1/00000/recipe 5377304 /tmp/test-1/00000/files 2061331 /tmp/test-1/00000/hashes 207958371 /tmp/test-1/00000/chunks 225301178 /tmp/test-1/00000 225301740 /tmp/test-1 --- TODO.md | 3 +++ repo.go | 35 ++++++++++++++++++++--------------- repo_test.go | 7 ++++++- 3 files changed, 29 insertions(+), 16 deletions(-) diff --git a/TODO.md b/TODO.md index f3c2d3f..b154718 100644 --- a/TODO.md +++ b/TODO.md @@ -89,3 +89,6 @@ The first solution would have an advantage if we were directly streaming the output of the program into DNA, as it could start DNA-encode it from the first chunk. The second solution will probably have better space-saving performance as waiting for better matches will probably lower the size of the patches. + +This has been fixed by making multiple passes until no more blocks are added, +this way we are assured that the result will be the same on the following run. diff --git a/repo.go b/repo.go index d58eebb..ec5935d 100644 --- a/repo.go +++ b/repo.go @@ -154,13 +154,24 @@ func (r *Repo) Commit(source string) { newChunkPath := filepath.Join(newPath, chunksName) os.Mkdir(newPath, 0775) // TODO: handle errors os.Mkdir(newChunkPath, 0775) // TODO: handle errors - reader, writer := io.Pipe() files := listFiles(source) r.loadHashes(versions) r.loadFileLists(versions) r.loadRecipes(versions) - go concatFiles(&files, writer) - recipe := r.matchStream(reader, newVersion) + storeQueue := make(chan chunkData, 10) + storeEnd := make(chan bool) + go r.storageWorker(newVersion, storeQueue, storeEnd) + var last, nlast, pass uint64 + var recipe []Chunk + for ; nlast > last || pass == 0; pass++ { + logger.Infof("pass number %d", pass+1) + last = nlast + reader, writer := io.Pipe() + go concatFiles(&files, writer) + recipe, nlast = r.matchStream(reader, storeQueue, newVersion, last) + } + close(storeQueue) + <-storeEnd r.storeFileList(newVersion, unprefixFiles(files, source)) r.storeRecipe(newVersion, recipe) } @@ -355,7 +366,7 @@ func (r *Repo) storageWorker(version int, storeQueue <-chan chunkData, end chan< if err != nil { logger.Error(err) } - logger.Debug("stored", data.id) + // logger.Debug("stored ", data.id) } if err = file.Close(); err != nil { logger.Panic(err) @@ -566,10 +577,10 @@ func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64, st id: id, } r.chunkCache.Set(id, temp.Bytes()) - logger.Debug("add new chunk", id) + logger.Debug("add new chunk ", id) return NewStoredChunk(r, id), false } - logger.Debug("add new partial chunk of size:", chunk.Len()) + logger.Debug("add new partial chunk of size: ", chunk.Len()) return } @@ -593,22 +604,18 @@ func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version return []Chunk{prevD, currD} } -func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { +func (r *Repo) matchStream(stream io.Reader, storeQueue chan<- chunkData, version int, last uint64) ([]Chunk, uint64) { var b byte var chunks []Chunk var prev *TempChunk - var last uint64 var err error bufStream := bufio.NewReaderSize(stream, r.chunkSize*2) buff := make([]byte, r.chunkSize, r.chunkSize*2) - storeQueue := make(chan chunkData, 10) - storeEnd := make(chan bool) - go r.storageWorker(version, storeQueue, storeEnd) if n, err := io.ReadFull(stream, buff); n < r.chunkSize { if err == io.ErrUnexpectedEOF { c, _ := r.encodeTempChunk(NewTempChunk(buff[:n]), version, &last, storeQueue) chunks = append(chunks, c) - return chunks + return chunks, last } else { logger.Panicf("matching stream, read only %d bytes with error '%s'", n, err) } @@ -671,9 +678,7 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { } chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...) } - close(storeQueue) - <-storeEnd - return chunks + return chunks, last } func (r *Repo) restoreStream(stream io.WriteCloser, recipe []Chunk) { diff --git a/repo_test.go b/repo_test.go index 0ee7bef..52da9ae 100644 --- a/repo_test.go +++ b/repo_test.go @@ -224,7 +224,12 @@ func TestBsdiff(t *testing.T) { newPath := filepath.Join(repo.path, fmt.Sprintf(versionFmt, newVersion)) os.MkdirAll(newPath, 0775) reader := getDataStream(dataDir, concatFiles) - recipe := repo.matchStream(reader, newVersion) + storeQueue := make(chan chunkData, 10) + storeEnd := make(chan bool) + go repo.storageWorker(newVersion, storeQueue, storeEnd) + recipe, _ := repo.matchStream(reader, storeQueue, newVersion, 0) + close(storeQueue) + <-storeEnd newChunks := extractDeltaChunks(recipe) testutils.AssertLen(t, 2, newChunks, "New delta chunks:") for _, c := range newChunks { -- cgit v1.2.3