diff options
-rw-r--r-- | repo.go | 83 | ||||
-rw-r--r-- | repo_test.go | 12 |
2 files changed, 54 insertions, 41 deletions
@@ -25,6 +25,7 @@ repo/ package main import ( + "bufio" "encoding/gob" "fmt" "hash" @@ -61,14 +62,12 @@ func (r *Repo) Commit(source string) { os.Mkdir(newPath, 0775) os.Mkdir(newChunkPath, 0775) reader, writer := io.Pipe() - newChunks := make(chan []byte, 16) oldChunks := make(chan Chunk, 16) files := listFiles(source) go loadChunks(versions, oldChunks) go concatFiles(files, writer) - go chunkStream(reader, newChunks) hashes := hashChunks(oldChunks) - chunks := r.matchChunks(newChunks, hashes) + chunks := r.matchStream(reader, hashes) extractNewChunks(chunks) // storeChunks(newChunkPath, newChunks) // storeFiles(newFilesPath, files) @@ -223,46 +222,56 @@ func hashChunks(chunks <-chan Chunk) map[uint64]ChunkId { return hashes } -func (r *Repo) matchChunks(chunks <-chan []byte, hashes map[uint64]ChunkId) []Chunk { - hasher := rabinkarp64.New() - hasher.Write(<-chunks) - recipe := make([]Chunk, 0) +func readChunk(stream io.Reader) ([]byte, error) { + buff := make([]byte, chunkSize) + _, err := io.ReadFull(stream, buff) + return buff, err +} - var i uint64 - var offset, prefill, postfill int - var exists bool - var chunkId ChunkId - for c := range chunks { - buff := make([]byte, 0) - // Pre fill the window with the rest of the previous chunk - for prefill = 0; prefill < offset; prefill++ { - hasher.Roll(c[prefill]) - } - // Fill the window with the current chunk and match hash byte by byte - for ; offset < len(c); offset++ { - h := hasher.Sum64() - chunkId, exists = hashes[h] - if exists { - // log.Printf("Found existing chunk: New{id:%d, offset:%d} Old%d\n", i, offset, chunkId) - break +func (r *Repo) matchStream(stream io.Reader, hashes map[uint64]ChunkId) []Chunk { + var b byte + chunks := make([]Chunk, 0) + buff, err := readChunk(stream) + if err == io.EOF { + chunks = append(chunks, Chunk{Value: buff}) + return chunks + } + bufStream := bufio.NewReader(stream) + hasher := rabinkarp64.New() + hasher.Write(buff) + buff = make([]byte, 0, chunkSize) + for err != io.EOF { + h := hasher.Sum64() + chunkId, exists := hashes[h] + if exists { + if len(buff) > 0 { + log.Printf("Add new partial chunk of size: %d\n", len(buff)) + chunks = append(chunks, Chunk{Value: buff}) + } + log.Printf("Found existing chunk with offset %d: %d\n", len(buff), chunkId) + chunks = append(chunks, Chunk{Id: &chunkId}) + buff = make([]byte, 0, chunkSize) + for i := 0; i < chunkSize && err == nil; i++ { + b, err = bufStream.ReadByte() + hasher.Roll(b) } - hasher.Roll(c[offset]) - buff = append(buff, c[offset]) + continue } - // Fill the window with the rest of the current chunk if it matched early - for postfill = offset; postfill < len(c); postfill++ { - hasher.Roll(c[postfill]) + if len(buff) == chunkSize { + log.Println("Add new chunk") + chunks = append(chunks, Chunk{Value: buff}) + buff = make([]byte, 0, chunkSize) } - if len(buff) > 0 { - recipe = append(recipe, Chunk{Value: buff}) + b, err = bufStream.ReadByte() + if err != io.EOF { + hasher.Roll(b) + buff = append(buff, b) } - if exists { - recipe = append(recipe, Chunk{Id: &chunkId}) - } - offset %= chunkSize - i++ } - return recipe + if len(buff) > 0 { + chunks = append(chunks, Chunk{Value: buff}) + } + return chunks } func extractNewChunks(chunks []Chunk) (ret [][]Chunk) { diff --git a/repo_test.go b/repo_test.go index ba45e0e..b26d73a 100644 --- a/repo_test.go +++ b/repo_test.go @@ -40,7 +40,10 @@ func chunkCompare(t *testing.T, dataDir string, testFiles []string, chunkCount i } if bytes.Compare(c, content) != 0 { t.Errorf("Chunk %d does not match file content", i) - t.Log("Expected: ", c[:10], "...") + // for i, b := range c { + // fmt.Printf("E: %d, A: %d\n", b, content[i]) + // } + t.Log("Expected: ", c[:10], "...", c[chunkSize-10:]) t.Log("Actual:", content) } i++ @@ -139,6 +142,9 @@ func TestStoreLoadFiles(t *testing.T) { files1 := listFiles(dataDir) storeFileList(resultFiles, files1) files2 := loadFileList(resultFiles) + if len(files1) != 4 { + t.Errorf("Incorrect number of files: %d, should be %d\n", len(files1), 4) + } for i, f := range files1 { if f != files2[i] { t.Errorf("Loaded file data %d does not match stored one", i) @@ -166,16 +172,14 @@ func TestBsdiff(t *testing.T) { ioutil.WriteFile(addedFile, input, 0664) reader, writer = io.Pipe() - newChunks := make(chan []byte, 16) oldChunks := make(chan Chunk, 16) files = listFiles(dataDir) repo := NewRepo(resultDir) versions := repo.loadVersions() go loadChunks(versions, oldChunks) go concatFiles(files, writer) - go chunkStream(reader, newChunks) hashes := hashChunks(oldChunks) - recipe := repo.matchChunks(newChunks, hashes) + recipe := repo.matchStream(reader, hashes) buff := new(bytes.Buffer) r2, _ := recipe[2].Reader(repo.path) r0, _ := recipe[0].Reader(repo.path) |