aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--TODO.md3
-rw-r--r--repo.go35
-rw-r--r--repo_test.go7
3 files changed, 29 insertions, 16 deletions
diff --git a/TODO.md b/TODO.md
index f3c2d3f..b154718 100644
--- a/TODO.md
+++ b/TODO.md
@@ -89,3 +89,6 @@ The first solution would have an advantage if we were directly streaming the
output of the program into DNA, as it could start DNA-encode it from the first
chunk. The second solution will probably have better space-saving performance as
waiting for better matches will probably lower the size of the patches.
+
+This has been fixed by making multiple passes until no more blocks are added,
+this way we are assured that the result will be the same on the following run.
diff --git a/repo.go b/repo.go
index d58eebb..ec5935d 100644
--- a/repo.go
+++ b/repo.go
@@ -154,13 +154,24 @@ func (r *Repo) Commit(source string) {
newChunkPath := filepath.Join(newPath, chunksName)
os.Mkdir(newPath, 0775) // TODO: handle errors
os.Mkdir(newChunkPath, 0775) // TODO: handle errors
- reader, writer := io.Pipe()
files := listFiles(source)
r.loadHashes(versions)
r.loadFileLists(versions)
r.loadRecipes(versions)
- go concatFiles(&files, writer)
- recipe := r.matchStream(reader, newVersion)
+ storeQueue := make(chan chunkData, 10)
+ storeEnd := make(chan bool)
+ go r.storageWorker(newVersion, storeQueue, storeEnd)
+ var last, nlast, pass uint64
+ var recipe []Chunk
+ for ; nlast > last || pass == 0; pass++ {
+ logger.Infof("pass number %d", pass+1)
+ last = nlast
+ reader, writer := io.Pipe()
+ go concatFiles(&files, writer)
+ recipe, nlast = r.matchStream(reader, storeQueue, newVersion, last)
+ }
+ close(storeQueue)
+ <-storeEnd
r.storeFileList(newVersion, unprefixFiles(files, source))
r.storeRecipe(newVersion, recipe)
}
@@ -355,7 +366,7 @@ func (r *Repo) storageWorker(version int, storeQueue <-chan chunkData, end chan<
if err != nil {
logger.Error(err)
}
- logger.Debug("stored", data.id)
+ // logger.Debug("stored ", data.id)
}
if err = file.Close(); err != nil {
logger.Panic(err)
@@ -566,10 +577,10 @@ func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64, st
id: id,
}
r.chunkCache.Set(id, temp.Bytes())
- logger.Debug("add new chunk", id)
+ logger.Debug("add new chunk ", id)
return NewStoredChunk(r, id), false
}
- logger.Debug("add new partial chunk of size:", chunk.Len())
+ logger.Debug("add new partial chunk of size: ", chunk.Len())
return
}
@@ -593,22 +604,18 @@ func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version
return []Chunk{prevD, currD}
}
-func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
+func (r *Repo) matchStream(stream io.Reader, storeQueue chan<- chunkData, version int, last uint64) ([]Chunk, uint64) {
var b byte
var chunks []Chunk
var prev *TempChunk
- var last uint64
var err error
bufStream := bufio.NewReaderSize(stream, r.chunkSize*2)
buff := make([]byte, r.chunkSize, r.chunkSize*2)
- storeQueue := make(chan chunkData, 10)
- storeEnd := make(chan bool)
- go r.storageWorker(version, storeQueue, storeEnd)
if n, err := io.ReadFull(stream, buff); n < r.chunkSize {
if err == io.ErrUnexpectedEOF {
c, _ := r.encodeTempChunk(NewTempChunk(buff[:n]), version, &last, storeQueue)
chunks = append(chunks, c)
- return chunks
+ return chunks, last
} else {
logger.Panicf("matching stream, read only %d bytes with error '%s'", n, err)
}
@@ -671,9 +678,7 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
}
chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...)
}
- close(storeQueue)
- <-storeEnd
- return chunks
+ return chunks, last
}
func (r *Repo) restoreStream(stream io.WriteCloser, recipe []Chunk) {
diff --git a/repo_test.go b/repo_test.go
index 0ee7bef..52da9ae 100644
--- a/repo_test.go
+++ b/repo_test.go
@@ -224,7 +224,12 @@ func TestBsdiff(t *testing.T) {
newPath := filepath.Join(repo.path, fmt.Sprintf(versionFmt, newVersion))
os.MkdirAll(newPath, 0775)
reader := getDataStream(dataDir, concatFiles)
- recipe := repo.matchStream(reader, newVersion)
+ storeQueue := make(chan chunkData, 10)
+ storeEnd := make(chan bool)
+ go repo.storageWorker(newVersion, storeQueue, storeEnd)
+ recipe, _ := repo.matchStream(reader, storeQueue, newVersion, 0)
+ close(storeQueue)
+ <-storeEnd
newChunks := extractDeltaChunks(recipe)
testutils.AssertLen(t, 2, newChunks, "New delta chunks:")
for _, c := range newChunks {