diff options
Diffstat (limited to 'repo.go')
-rw-r--r-- | repo.go | 35 |
1 files changed, 20 insertions, 15 deletions
@@ -154,13 +154,24 @@ func (r *Repo) Commit(source string) { newChunkPath := filepath.Join(newPath, chunksName) os.Mkdir(newPath, 0775) // TODO: handle errors os.Mkdir(newChunkPath, 0775) // TODO: handle errors - reader, writer := io.Pipe() files := listFiles(source) r.loadHashes(versions) r.loadFileLists(versions) r.loadRecipes(versions) - go concatFiles(&files, writer) - recipe := r.matchStream(reader, newVersion) + storeQueue := make(chan chunkData, 10) + storeEnd := make(chan bool) + go r.storageWorker(newVersion, storeQueue, storeEnd) + var last, nlast, pass uint64 + var recipe []Chunk + for ; nlast > last || pass == 0; pass++ { + logger.Infof("pass number %d", pass+1) + last = nlast + reader, writer := io.Pipe() + go concatFiles(&files, writer) + recipe, nlast = r.matchStream(reader, storeQueue, newVersion, last) + } + close(storeQueue) + <-storeEnd r.storeFileList(newVersion, unprefixFiles(files, source)) r.storeRecipe(newVersion, recipe) } @@ -355,7 +366,7 @@ func (r *Repo) storageWorker(version int, storeQueue <-chan chunkData, end chan< if err != nil { logger.Error(err) } - logger.Debug("stored", data.id) + // logger.Debug("stored ", data.id) } if err = file.Close(); err != nil { logger.Panic(err) @@ -566,10 +577,10 @@ func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64, st id: id, } r.chunkCache.Set(id, temp.Bytes()) - logger.Debug("add new chunk", id) + logger.Debug("add new chunk ", id) return NewStoredChunk(r, id), false } - logger.Debug("add new partial chunk of size:", chunk.Len()) + logger.Debug("add new partial chunk of size: ", chunk.Len()) return } @@ -593,22 +604,18 @@ func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version return []Chunk{prevD, currD} } -func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { +func (r *Repo) matchStream(stream io.Reader, storeQueue chan<- chunkData, version int, last uint64) ([]Chunk, uint64) { var b byte var chunks []Chunk var prev *TempChunk - var last uint64 var err error bufStream := bufio.NewReaderSize(stream, r.chunkSize*2) buff := make([]byte, r.chunkSize, r.chunkSize*2) - storeQueue := make(chan chunkData, 10) - storeEnd := make(chan bool) - go r.storageWorker(version, storeQueue, storeEnd) if n, err := io.ReadFull(stream, buff); n < r.chunkSize { if err == io.ErrUnexpectedEOF { c, _ := r.encodeTempChunk(NewTempChunk(buff[:n]), version, &last, storeQueue) chunks = append(chunks, c) - return chunks + return chunks, last } else { logger.Panicf("matching stream, read only %d bytes with error '%s'", n, err) } @@ -671,9 +678,7 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { } chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...) } - close(storeQueue) - <-storeEnd - return chunks + return chunks, last } func (r *Repo) restoreStream(stream io.WriteCloser, recipe []Chunk) { |