diff options
-rw-r--r-- | TODO.md | 3 | ||||
-rw-r--r-- | repo.go | 35 | ||||
-rw-r--r-- | repo_test.go | 7 |
3 files changed, 29 insertions, 16 deletions
@@ -89,3 +89,6 @@ The first solution would have an advantage if we were directly streaming the output of the program into DNA, as it could start DNA-encode it from the first chunk. The second solution will probably have better space-saving performance as waiting for better matches will probably lower the size of the patches. + +This has been fixed by making multiple passes until no more blocks are added, +this way we are assured that the result will be the same on the following run. @@ -154,13 +154,24 @@ func (r *Repo) Commit(source string) { newChunkPath := filepath.Join(newPath, chunksName) os.Mkdir(newPath, 0775) // TODO: handle errors os.Mkdir(newChunkPath, 0775) // TODO: handle errors - reader, writer := io.Pipe() files := listFiles(source) r.loadHashes(versions) r.loadFileLists(versions) r.loadRecipes(versions) - go concatFiles(&files, writer) - recipe := r.matchStream(reader, newVersion) + storeQueue := make(chan chunkData, 10) + storeEnd := make(chan bool) + go r.storageWorker(newVersion, storeQueue, storeEnd) + var last, nlast, pass uint64 + var recipe []Chunk + for ; nlast > last || pass == 0; pass++ { + logger.Infof("pass number %d", pass+1) + last = nlast + reader, writer := io.Pipe() + go concatFiles(&files, writer) + recipe, nlast = r.matchStream(reader, storeQueue, newVersion, last) + } + close(storeQueue) + <-storeEnd r.storeFileList(newVersion, unprefixFiles(files, source)) r.storeRecipe(newVersion, recipe) } @@ -355,7 +366,7 @@ func (r *Repo) storageWorker(version int, storeQueue <-chan chunkData, end chan< if err != nil { logger.Error(err) } - logger.Debug("stored", data.id) + // logger.Debug("stored ", data.id) } if err = file.Close(); err != nil { logger.Panic(err) @@ -566,10 +577,10 @@ func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64, st id: id, } r.chunkCache.Set(id, temp.Bytes()) - logger.Debug("add new chunk", id) + logger.Debug("add new chunk ", id) return NewStoredChunk(r, id), false } - logger.Debug("add new partial chunk of size:", chunk.Len()) + logger.Debug("add new partial chunk of size: ", chunk.Len()) return } @@ -593,22 +604,18 @@ func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version return []Chunk{prevD, currD} } -func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { +func (r *Repo) matchStream(stream io.Reader, storeQueue chan<- chunkData, version int, last uint64) ([]Chunk, uint64) { var b byte var chunks []Chunk var prev *TempChunk - var last uint64 var err error bufStream := bufio.NewReaderSize(stream, r.chunkSize*2) buff := make([]byte, r.chunkSize, r.chunkSize*2) - storeQueue := make(chan chunkData, 10) - storeEnd := make(chan bool) - go r.storageWorker(version, storeQueue, storeEnd) if n, err := io.ReadFull(stream, buff); n < r.chunkSize { if err == io.ErrUnexpectedEOF { c, _ := r.encodeTempChunk(NewTempChunk(buff[:n]), version, &last, storeQueue) chunks = append(chunks, c) - return chunks + return chunks, last } else { logger.Panicf("matching stream, read only %d bytes with error '%s'", n, err) } @@ -671,9 +678,7 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { } chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...) } - close(storeQueue) - <-storeEnd - return chunks + return chunks, last } func (r *Repo) restoreStream(stream io.WriteCloser, recipe []Chunk) { diff --git a/repo_test.go b/repo_test.go index 0ee7bef..52da9ae 100644 --- a/repo_test.go +++ b/repo_test.go @@ -224,7 +224,12 @@ func TestBsdiff(t *testing.T) { newPath := filepath.Join(repo.path, fmt.Sprintf(versionFmt, newVersion)) os.MkdirAll(newPath, 0775) reader := getDataStream(dataDir, concatFiles) - recipe := repo.matchStream(reader, newVersion) + storeQueue := make(chan chunkData, 10) + storeEnd := make(chan bool) + go repo.storageWorker(newVersion, storeQueue, storeEnd) + recipe, _ := repo.matchStream(reader, storeQueue, newVersion, 0) + close(storeQueue) + <-storeEnd newChunks := extractDeltaChunks(recipe) testutils.AssertLen(t, 2, newChunks, "New delta chunks:") for _, c := range newChunks { |