aboutsummaryrefslogtreecommitdiff
path: root/repo.go
diff options
context:
space:
mode:
Diffstat (limited to 'repo.go')
-rw-r--r--repo.go95
1 files changed, 78 insertions, 17 deletions
diff --git a/repo.go b/repo.go
index 32666f4..c077dff 100644
--- a/repo.go
+++ b/repo.go
@@ -26,6 +26,7 @@ package main
import (
"bufio"
+ "bytes"
"encoding/gob"
"fmt"
"hash"
@@ -91,8 +92,8 @@ func (r *Repo) Commit(source string) {
files := listFiles(source)
go r.loadChunks(versions, oldChunks)
go concatFiles(files, writer)
- fingerprints, _ := r.hashChunks(oldChunks)
- chunks := r.matchStream(reader, fingerprints)
+ fingerprints, sketches := r.hashChunks(oldChunks)
+ chunks := r.matchStream(reader, fingerprints, sketches)
extractTempChunks(chunks)
// storeChunks(newChunkPath, newChunks)
// storeFiles(newFilesPath, files)
@@ -146,6 +147,10 @@ func concatFiles(files []File, stream io.WriteCloser) {
stream.Close()
}
+func (r *Repo) chunkMinLen() int {
+ return SuperFeatureSize(r.chunkSize, r.sketchSfCount, r.sketchFCount)
+}
+
func (r *Repo) chunkStream(stream io.Reader, chunks chan<- []byte) {
var buff []byte
var prev, read = r.chunkSize, 0
@@ -287,15 +292,47 @@ func (r *Repo) findSimilarChunk(chunk Chunk, sketches SketchMap) (*ChunkId, bool
return similarChunk, similarChunk != nil
}
-// TODO: encode stream
-func (r *Repo) encodeStream(stream io.Reader, fingerprints FingerprintMap, sketches SketchMap) []Chunk {
- chunks := r.matchStream(stream, fingerprints)
- return chunks
+func (r *Repo) tryDeltaEncodeChunk(temp *TempChunk, sketches SketchMap) (Chunk, bool) {
+ id, found := r.findSimilarChunk(temp, sketches)
+ if found {
+ var buff bytes.Buffer
+ if err := r.differ.Diff(id.Reader(r), temp.Reader(), &buff); err != nil {
+ log.Println("Error trying delta encode chunk:", temp, "with source:", id, ":", err)
+ } else {
+ return &DeltaChunk{
+ repo: r,
+ source: id,
+ patch: buff.Bytes(),
+ size: temp.Len(),
+ }, true
+ }
+ }
+ // TODO: if temp is of chunkSize, save it as a real new Chunk (add it to maps)
+ return temp, false
}
-func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chunk {
+func (r *Repo) tryDeltaEncodeChunks(prev *TempChunk, curr *TempChunk, sketches SketchMap) []Chunk {
+ if prev == nil {
+ c, _ := r.tryDeltaEncodeChunk(curr, sketches)
+ return []Chunk{c}
+ } else if curr.Len() < r.chunkMinLen() {
+ c, success := r.tryDeltaEncodeChunk(NewTempChunk(append(prev.Bytes(), curr.Bytes()...)), sketches)
+ if success {
+ return []Chunk{c}
+ } else {
+ return []Chunk{prev, curr}
+ }
+ } else {
+ prevD, _ := r.tryDeltaEncodeChunk(prev, sketches)
+ currD, _ := r.tryDeltaEncodeChunk(curr, sketches)
+ return []Chunk{prevD, currD}
+ }
+}
+
+func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap, sketches SketchMap) []Chunk {
var b byte
var chunks []Chunk
+ var prev *TempChunk
bufStream := bufio.NewReaderSize(stream, r.chunkSize)
buff := make([]byte, 0, r.chunkSize*2)
n, err := io.ReadFull(stream, buff[:r.chunkSize])
@@ -312,7 +349,13 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun
if len(buff) > r.chunkSize && len(buff) < r.chunkSize*2 {
size := len(buff) - r.chunkSize
log.Println("Add new partial chunk of size:", size)
- chunks = append(chunks, NewTempChunk(buff[:size]))
+ temp := NewTempChunk(buff[:size])
+ chunks = append(chunks, r.tryDeltaEncodeChunks(prev, temp, sketches)...)
+ prev = nil
+ } else if prev != nil {
+ c, _ := r.tryDeltaEncodeChunk(prev, sketches)
+ chunks = append(chunks, c)
+ prev = nil
}
log.Printf("Add existing chunk: %d\n", chunkId)
chunks = append(chunks, NewChunkFile(r, chunkId))
@@ -326,7 +369,11 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun
}
if len(buff) == r.chunkSize*2 {
log.Println("Add new chunk")
- chunks = append(chunks, NewTempChunk(buff[:r.chunkSize]))
+ if prev != nil {
+ chunk, _ := r.tryDeltaEncodeChunk(prev, sketches)
+ chunks = append(chunks, chunk)
+ }
+ prev = NewTempChunk(buff[:r.chunkSize])
tmp := buff[r.chunkSize:]
buff = make([]byte, 0, r.chunkSize*2)
buff = append(buff, tmp...)
@@ -337,14 +384,18 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun
buff = append(buff, b)
}
}
- if len(buff) > r.chunkSize {
- log.Println("Add new chunk")
- chunks = append(chunks, NewTempChunk(buff[:r.chunkSize]))
- log.Println("Add new partial chunk of size:", len(buff)-r.chunkSize)
- chunks = append(chunks, NewTempChunk(buff[r.chunkSize:]))
- } else if len(buff) > 0 {
- log.Println("Add new partial chunk of size:", len(buff))
- chunks = append(chunks, NewTempChunk(buff))
+ if len(buff) > 0 {
+ var temp *TempChunk
+ if len(buff) > r.chunkSize {
+ log.Println("Add new chunk")
+ prev = NewTempChunk(buff[:r.chunkSize])
+ log.Println("Add new partial chunk of size:", len(buff)-r.chunkSize)
+ temp = NewTempChunk(buff[r.chunkSize:])
+ } else {
+ log.Println("Add new partial chunk of size:", len(buff))
+ temp = NewTempChunk(buff)
+ }
+ chunks = append(chunks, r.tryDeltaEncodeChunks(prev, temp, sketches)...)
}
return chunks
}
@@ -390,6 +441,16 @@ func extractTempChunks(chunks []Chunk) (ret []*TempChunk) {
return
}
+func extractDeltaChunks(chunks []Chunk) (ret []*DeltaChunk) {
+ for _, c := range chunks {
+ tmp, isDelta := c.(*DeltaChunk)
+ if isDelta {
+ ret = append(ret, tmp)
+ }
+ }
+ return
+}
+
func writeFile(filePath string, object interface{}) error {
file, err := os.Create(filePath)
if err == nil {