aboutsummaryrefslogtreecommitdiff
path: root/repo.go
diff options
context:
space:
mode:
Diffstat (limited to 'repo.go')
-rw-r--r--repo.go35
1 files changed, 20 insertions, 15 deletions
diff --git a/repo.go b/repo.go
index d58eebb..ec5935d 100644
--- a/repo.go
+++ b/repo.go
@@ -154,13 +154,24 @@ func (r *Repo) Commit(source string) {
newChunkPath := filepath.Join(newPath, chunksName)
os.Mkdir(newPath, 0775) // TODO: handle errors
os.Mkdir(newChunkPath, 0775) // TODO: handle errors
- reader, writer := io.Pipe()
files := listFiles(source)
r.loadHashes(versions)
r.loadFileLists(versions)
r.loadRecipes(versions)
- go concatFiles(&files, writer)
- recipe := r.matchStream(reader, newVersion)
+ storeQueue := make(chan chunkData, 10)
+ storeEnd := make(chan bool)
+ go r.storageWorker(newVersion, storeQueue, storeEnd)
+ var last, nlast, pass uint64
+ var recipe []Chunk
+ for ; nlast > last || pass == 0; pass++ {
+ logger.Infof("pass number %d", pass+1)
+ last = nlast
+ reader, writer := io.Pipe()
+ go concatFiles(&files, writer)
+ recipe, nlast = r.matchStream(reader, storeQueue, newVersion, last)
+ }
+ close(storeQueue)
+ <-storeEnd
r.storeFileList(newVersion, unprefixFiles(files, source))
r.storeRecipe(newVersion, recipe)
}
@@ -355,7 +366,7 @@ func (r *Repo) storageWorker(version int, storeQueue <-chan chunkData, end chan<
if err != nil {
logger.Error(err)
}
- logger.Debug("stored", data.id)
+ // logger.Debug("stored ", data.id)
}
if err = file.Close(); err != nil {
logger.Panic(err)
@@ -566,10 +577,10 @@ func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64, st
id: id,
}
r.chunkCache.Set(id, temp.Bytes())
- logger.Debug("add new chunk", id)
+ logger.Debug("add new chunk ", id)
return NewStoredChunk(r, id), false
}
- logger.Debug("add new partial chunk of size:", chunk.Len())
+ logger.Debug("add new partial chunk of size: ", chunk.Len())
return
}
@@ -593,22 +604,18 @@ func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version
return []Chunk{prevD, currD}
}
-func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
+func (r *Repo) matchStream(stream io.Reader, storeQueue chan<- chunkData, version int, last uint64) ([]Chunk, uint64) {
var b byte
var chunks []Chunk
var prev *TempChunk
- var last uint64
var err error
bufStream := bufio.NewReaderSize(stream, r.chunkSize*2)
buff := make([]byte, r.chunkSize, r.chunkSize*2)
- storeQueue := make(chan chunkData, 10)
- storeEnd := make(chan bool)
- go r.storageWorker(version, storeQueue, storeEnd)
if n, err := io.ReadFull(stream, buff); n < r.chunkSize {
if err == io.ErrUnexpectedEOF {
c, _ := r.encodeTempChunk(NewTempChunk(buff[:n]), version, &last, storeQueue)
chunks = append(chunks, c)
- return chunks
+ return chunks, last
} else {
logger.Panicf("matching stream, read only %d bytes with error '%s'", n, err)
}
@@ -671,9 +678,7 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
}
chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...)
}
- close(storeQueue)
- <-storeEnd
- return chunks
+ return chunks, last
}
func (r *Repo) restoreStream(stream io.WriteCloser, recipe []Chunk) {