aboutsummaryrefslogtreecommitdiff
path: root/repo.go
diff options
context:
space:
mode:
authorn-peugnet <n.peugnet@free.fr>2021-09-22 22:18:29 +0200
committern-peugnet <n.peugnet@free.fr>2021-09-22 22:18:29 +0200
commitcac8795d33cd52d362e02590b4df4ccd9b96ff71 (patch)
treec18169ed9e3e6ed059c481afa6149783d1b9585a /repo.go
parentbded863524b701332bbc6e20ae72468301015c36 (diff)
downloaddna-backup-cac8795d33cd52d362e02590b4df4ccd9b96ff71.tar.gz
dna-backup-cac8795d33cd52d362e02590b4df4ccd9b96ff71.zip
fix mystical bug by making multiple passes
We make ne passes while new blocks have been added. This way we are assured that the result will be the same on following runs. result : 161 /tmp/test-1/00001/recipe 161 /tmp/test-1/00001/files 0 /tmp/test-1/00001/hashes 40 /tmp/test-1/00001/chunks 482 /tmp/test-1/00001 9904052 /tmp/test-1/00000/recipe 5377304 /tmp/test-1/00000/files 2061331 /tmp/test-1/00000/hashes 207958371 /tmp/test-1/00000/chunks 225301178 /tmp/test-1/00000 225301740 /tmp/test-1
Diffstat (limited to 'repo.go')
-rw-r--r--repo.go35
1 files changed, 20 insertions, 15 deletions
diff --git a/repo.go b/repo.go
index d58eebb..ec5935d 100644
--- a/repo.go
+++ b/repo.go
@@ -154,13 +154,24 @@ func (r *Repo) Commit(source string) {
newChunkPath := filepath.Join(newPath, chunksName)
os.Mkdir(newPath, 0775) // TODO: handle errors
os.Mkdir(newChunkPath, 0775) // TODO: handle errors
- reader, writer := io.Pipe()
files := listFiles(source)
r.loadHashes(versions)
r.loadFileLists(versions)
r.loadRecipes(versions)
- go concatFiles(&files, writer)
- recipe := r.matchStream(reader, newVersion)
+ storeQueue := make(chan chunkData, 10)
+ storeEnd := make(chan bool)
+ go r.storageWorker(newVersion, storeQueue, storeEnd)
+ var last, nlast, pass uint64
+ var recipe []Chunk
+ for ; nlast > last || pass == 0; pass++ {
+ logger.Infof("pass number %d", pass+1)
+ last = nlast
+ reader, writer := io.Pipe()
+ go concatFiles(&files, writer)
+ recipe, nlast = r.matchStream(reader, storeQueue, newVersion, last)
+ }
+ close(storeQueue)
+ <-storeEnd
r.storeFileList(newVersion, unprefixFiles(files, source))
r.storeRecipe(newVersion, recipe)
}
@@ -355,7 +366,7 @@ func (r *Repo) storageWorker(version int, storeQueue <-chan chunkData, end chan<
if err != nil {
logger.Error(err)
}
- logger.Debug("stored", data.id)
+ // logger.Debug("stored ", data.id)
}
if err = file.Close(); err != nil {
logger.Panic(err)
@@ -566,10 +577,10 @@ func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64, st
id: id,
}
r.chunkCache.Set(id, temp.Bytes())
- logger.Debug("add new chunk", id)
+ logger.Debug("add new chunk ", id)
return NewStoredChunk(r, id), false
}
- logger.Debug("add new partial chunk of size:", chunk.Len())
+ logger.Debug("add new partial chunk of size: ", chunk.Len())
return
}
@@ -593,22 +604,18 @@ func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version
return []Chunk{prevD, currD}
}
-func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
+func (r *Repo) matchStream(stream io.Reader, storeQueue chan<- chunkData, version int, last uint64) ([]Chunk, uint64) {
var b byte
var chunks []Chunk
var prev *TempChunk
- var last uint64
var err error
bufStream := bufio.NewReaderSize(stream, r.chunkSize*2)
buff := make([]byte, r.chunkSize, r.chunkSize*2)
- storeQueue := make(chan chunkData, 10)
- storeEnd := make(chan bool)
- go r.storageWorker(version, storeQueue, storeEnd)
if n, err := io.ReadFull(stream, buff); n < r.chunkSize {
if err == io.ErrUnexpectedEOF {
c, _ := r.encodeTempChunk(NewTempChunk(buff[:n]), version, &last, storeQueue)
chunks = append(chunks, c)
- return chunks
+ return chunks, last
} else {
logger.Panicf("matching stream, read only %d bytes with error '%s'", n, err)
}
@@ -671,9 +678,7 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
}
chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...)
}
- close(storeQueue)
- <-storeEnd
- return chunks
+ return chunks, last
}
func (r *Repo) restoreStream(stream io.WriteCloser, recipe []Chunk) {