aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--repo.go83
-rw-r--r--repo_test.go12
2 files changed, 54 insertions, 41 deletions
diff --git a/repo.go b/repo.go
index 68371c2..f893b47 100644
--- a/repo.go
+++ b/repo.go
@@ -25,6 +25,7 @@ repo/
package main
import (
+ "bufio"
"encoding/gob"
"fmt"
"hash"
@@ -61,14 +62,12 @@ func (r *Repo) Commit(source string) {
os.Mkdir(newPath, 0775)
os.Mkdir(newChunkPath, 0775)
reader, writer := io.Pipe()
- newChunks := make(chan []byte, 16)
oldChunks := make(chan Chunk, 16)
files := listFiles(source)
go loadChunks(versions, oldChunks)
go concatFiles(files, writer)
- go chunkStream(reader, newChunks)
hashes := hashChunks(oldChunks)
- chunks := r.matchChunks(newChunks, hashes)
+ chunks := r.matchStream(reader, hashes)
extractNewChunks(chunks)
// storeChunks(newChunkPath, newChunks)
// storeFiles(newFilesPath, files)
@@ -223,46 +222,56 @@ func hashChunks(chunks <-chan Chunk) map[uint64]ChunkId {
return hashes
}
-func (r *Repo) matchChunks(chunks <-chan []byte, hashes map[uint64]ChunkId) []Chunk {
- hasher := rabinkarp64.New()
- hasher.Write(<-chunks)
- recipe := make([]Chunk, 0)
+func readChunk(stream io.Reader) ([]byte, error) {
+ buff := make([]byte, chunkSize)
+ _, err := io.ReadFull(stream, buff)
+ return buff, err
+}
- var i uint64
- var offset, prefill, postfill int
- var exists bool
- var chunkId ChunkId
- for c := range chunks {
- buff := make([]byte, 0)
- // Pre fill the window with the rest of the previous chunk
- for prefill = 0; prefill < offset; prefill++ {
- hasher.Roll(c[prefill])
- }
- // Fill the window with the current chunk and match hash byte by byte
- for ; offset < len(c); offset++ {
- h := hasher.Sum64()
- chunkId, exists = hashes[h]
- if exists {
- // log.Printf("Found existing chunk: New{id:%d, offset:%d} Old%d\n", i, offset, chunkId)
- break
+func (r *Repo) matchStream(stream io.Reader, hashes map[uint64]ChunkId) []Chunk {
+ var b byte
+ chunks := make([]Chunk, 0)
+ buff, err := readChunk(stream)
+ if err == io.EOF {
+ chunks = append(chunks, Chunk{Value: buff})
+ return chunks
+ }
+ bufStream := bufio.NewReader(stream)
+ hasher := rabinkarp64.New()
+ hasher.Write(buff)
+ buff = make([]byte, 0, chunkSize)
+ for err != io.EOF {
+ h := hasher.Sum64()
+ chunkId, exists := hashes[h]
+ if exists {
+ if len(buff) > 0 {
+ log.Printf("Add new partial chunk of size: %d\n", len(buff))
+ chunks = append(chunks, Chunk{Value: buff})
+ }
+ log.Printf("Found existing chunk with offset %d: %d\n", len(buff), chunkId)
+ chunks = append(chunks, Chunk{Id: &chunkId})
+ buff = make([]byte, 0, chunkSize)
+ for i := 0; i < chunkSize && err == nil; i++ {
+ b, err = bufStream.ReadByte()
+ hasher.Roll(b)
}
- hasher.Roll(c[offset])
- buff = append(buff, c[offset])
+ continue
}
- // Fill the window with the rest of the current chunk if it matched early
- for postfill = offset; postfill < len(c); postfill++ {
- hasher.Roll(c[postfill])
+ if len(buff) == chunkSize {
+ log.Println("Add new chunk")
+ chunks = append(chunks, Chunk{Value: buff})
+ buff = make([]byte, 0, chunkSize)
}
- if len(buff) > 0 {
- recipe = append(recipe, Chunk{Value: buff})
+ b, err = bufStream.ReadByte()
+ if err != io.EOF {
+ hasher.Roll(b)
+ buff = append(buff, b)
}
- if exists {
- recipe = append(recipe, Chunk{Id: &chunkId})
- }
- offset %= chunkSize
- i++
}
- return recipe
+ if len(buff) > 0 {
+ chunks = append(chunks, Chunk{Value: buff})
+ }
+ return chunks
}
func extractNewChunks(chunks []Chunk) (ret [][]Chunk) {
diff --git a/repo_test.go b/repo_test.go
index ba45e0e..b26d73a 100644
--- a/repo_test.go
+++ b/repo_test.go
@@ -40,7 +40,10 @@ func chunkCompare(t *testing.T, dataDir string, testFiles []string, chunkCount i
}
if bytes.Compare(c, content) != 0 {
t.Errorf("Chunk %d does not match file content", i)
- t.Log("Expected: ", c[:10], "...")
+ // for i, b := range c {
+ // fmt.Printf("E: %d, A: %d\n", b, content[i])
+ // }
+ t.Log("Expected: ", c[:10], "...", c[chunkSize-10:])
t.Log("Actual:", content)
}
i++
@@ -139,6 +142,9 @@ func TestStoreLoadFiles(t *testing.T) {
files1 := listFiles(dataDir)
storeFileList(resultFiles, files1)
files2 := loadFileList(resultFiles)
+ if len(files1) != 4 {
+ t.Errorf("Incorrect number of files: %d, should be %d\n", len(files1), 4)
+ }
for i, f := range files1 {
if f != files2[i] {
t.Errorf("Loaded file data %d does not match stored one", i)
@@ -166,16 +172,14 @@ func TestBsdiff(t *testing.T) {
ioutil.WriteFile(addedFile, input, 0664)
reader, writer = io.Pipe()
- newChunks := make(chan []byte, 16)
oldChunks := make(chan Chunk, 16)
files = listFiles(dataDir)
repo := NewRepo(resultDir)
versions := repo.loadVersions()
go loadChunks(versions, oldChunks)
go concatFiles(files, writer)
- go chunkStream(reader, newChunks)
hashes := hashChunks(oldChunks)
- recipe := repo.matchChunks(newChunks, hashes)
+ recipe := repo.matchStream(reader, hashes)
buff := new(bytes.Buffer)
r2, _ := recipe[2].Reader(repo.path)
r0, _ := recipe[0].Reader(repo.path)