aboutsummaryrefslogtreecommitdiff
path: root/repo.go
diff options
context:
space:
mode:
authorn-peugnet <n.peugnet@free.fr>2021-09-02 17:48:23 +0200
committern-peugnet <n.peugnet@free.fr>2021-09-02 17:48:23 +0200
commitd9703a9daa05f30e77bb99393eab8a0d40e788e4 (patch)
tree813f08947ad0958dd3c42f7db62bd39fe6430bbf /repo.go
parent2a048d5d8b2b25326685ba53fc390781ba96deed (diff)
downloaddna-backup-d9703a9daa05f30e77bb99393eab8a0d40e788e4.tar.gz
dna-backup-d9703a9daa05f30e77bb99393eab8a0d40e788e4.zip
hash and store chunks left by matchStream
Diffstat (limited to 'repo.go')
-rw-r--r--repo.go91
1 files changed, 56 insertions, 35 deletions
diff --git a/repo.go b/repo.go
index 7fa4625..29f683c 100644
--- a/repo.go
+++ b/repo.go
@@ -92,12 +92,12 @@ func (r *Repo) Commit(source string) {
os.Mkdir(newPath, 0775)
os.Mkdir(newChunkPath, 0775)
reader, writer := io.Pipe()
- oldChunks := make(chan StoredChunk, 16)
+ oldChunks := make(chan IdentifiedChunk, 16)
files := listFiles(source)
go r.loadChunks(versions, oldChunks)
go concatFiles(files, writer)
r.hashChunks(oldChunks)
- chunks := r.matchStream(reader)
+ chunks := r.matchStream(reader, newVersion)
extractTempChunks(chunks)
// storeChunks(newChunkPath, newChunks)
// storeFiles(newFilesPath, files)
@@ -209,7 +209,7 @@ func storeChunks(dest string, chunks <-chan []byte) {
}
}
-func (r *Repo) loadChunks(versions []string, chunks chan<- StoredChunk) {
+func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) {
for i, v := range versions {
p := path.Join(v, chunksName)
entries, err := os.ReadDir(p)
@@ -243,23 +243,30 @@ func (r *Repo) loadChunks(versions []string, chunks chan<- StoredChunk) {
// For each chunk, both a fingerprint (hash over the full content) and a sketch
// (resemblance hash based on maximal values of regions) are calculated and
// stored in an hashmap.
-func (r *Repo) hashChunks(chunks <-chan StoredChunk) {
- hasher := hash.Hash64(rabinkarp64.New())
+func (r *Repo) hashChunks(chunks <-chan IdentifiedChunk) {
+ hasher := rabinkarp64.New()
for c := range chunks {
- hasher.Reset()
- io.Copy(hasher, c.Reader())
- h := hasher.Sum64()
- r.fingerprints[h] = c.Id()
- sketch, _ := SketchChunk(c, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount)
- for _, s := range sketch {
- prev := r.sketches[s]
- if contains(prev, c.Id()) {
- continue
- }
- r.sketches[s] = append(prev, c.Id())
+ r.hashAndStoreChunk(c, hasher)
+ }
+}
+
+func (r *Repo) hashAndStoreChunk(chunk IdentifiedChunk, hasher hash.Hash64) {
+ hasher.Reset()
+ io.Copy(hasher, chunk.Reader())
+ fingerprint := hasher.Sum64()
+ sketch, _ := SketchChunk(chunk, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount)
+ r.storeChunkId(chunk.Id(), fingerprint, sketch)
+}
+
+func (r *Repo) storeChunkId(id *ChunkId, fingerprint uint64, sketch []uint64) {
+ r.fingerprints[fingerprint] = id
+ for _, s := range sketch {
+ prev := r.sketches[s]
+ if contains(prev, id) {
+ continue
}
+ r.sketches[s] = append(prev, id)
}
- return
}
func contains(s []*ChunkId, id *ChunkId) bool {
@@ -294,7 +301,7 @@ func (r *Repo) findSimilarChunk(chunk Chunk) (*ChunkId, bool) {
return similarChunk, similarChunk != nil
}
-func (r *Repo) tryDeltaEncodeChunk(temp *TempChunk) (Chunk, bool) {
+func (r *Repo) tryDeltaEncodeChunk(temp BufferedChunk) (Chunk, bool) {
id, found := r.findSimilarChunk(temp)
if found {
var buff bytes.Buffer
@@ -309,32 +316,51 @@ func (r *Repo) tryDeltaEncodeChunk(temp *TempChunk) (Chunk, bool) {
}, true
}
}
- // TODO: if temp is of chunkSize, save it as a real new Chunk (add it to maps)
return temp, false
}
-func (r *Repo) tryDeltaEncodeChunks(prev *TempChunk, curr *TempChunk) []Chunk {
+func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (chunk Chunk, isDelta bool) {
+ chunk, isDelta = r.tryDeltaEncodeChunk(temp)
+ if isDelta {
+ log.Println("Add new delta chunk")
+ return
+ }
+ if chunk.Len() == r.chunkSize {
+ *last++
+ id := &ChunkId{Ver: version, Idx: *last}
+ ic := NewLoadedChunk(id, temp.Bytes())
+ hasher := rabinkarp64.New()
+ r.hashAndStoreChunk(ic, hasher)
+ log.Println("Add new chunk", id)
+ return ic, false
+ }
+ log.Println("Add new partial chunk of size:", chunk.Len())
+ return
+}
+
+func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version int, last *uint64) []Chunk {
if prev == nil {
- c, _ := r.tryDeltaEncodeChunk(curr)
+ c, _ := r.encodeTempChunk(curr, version, last)
return []Chunk{c}
} else if curr.Len() < r.chunkMinLen() {
- c, success := r.tryDeltaEncodeChunk(NewTempChunk(append(prev.Bytes(), curr.Bytes()...)))
+ c, success := r.encodeTempChunk(NewTempChunk(append(prev.Bytes(), curr.Bytes()...)), version, last)
if success {
return []Chunk{c}
} else {
return []Chunk{prev, curr}
}
} else {
- prevD, _ := r.tryDeltaEncodeChunk(prev)
- currD, _ := r.tryDeltaEncodeChunk(curr)
+ prevD, _ := r.encodeTempChunk(prev, version, last)
+ currD, _ := r.encodeTempChunk(curr, version, last)
return []Chunk{prevD, currD}
}
}
-func (r *Repo) matchStream(stream io.Reader) []Chunk {
+func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
var b byte
var chunks []Chunk
var prev *TempChunk
+ var last uint64
bufStream := bufio.NewReaderSize(stream, r.chunkSize)
buff := make([]byte, 0, r.chunkSize*2)
n, err := io.ReadFull(stream, buff[:r.chunkSize])
@@ -350,17 +376,16 @@ func (r *Repo) matchStream(stream io.Reader) []Chunk {
if exists {
if len(buff) > r.chunkSize && len(buff) < r.chunkSize*2 {
size := len(buff) - r.chunkSize
- log.Println("Add new partial chunk of size:", size)
temp := NewTempChunk(buff[:size])
- chunks = append(chunks, r.tryDeltaEncodeChunks(prev, temp)...)
+ chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last)...)
prev = nil
} else if prev != nil {
- c, _ := r.tryDeltaEncodeChunk(prev)
+ c, _ := r.encodeTempChunk(prev, version, &last)
chunks = append(chunks, c)
prev = nil
}
log.Printf("Add existing chunk: %d\n", chunkId)
- chunks = append(chunks, NewChunkFile(r, chunkId))
+ chunks = append(chunks, NewStoredFile(r, chunkId))
buff = make([]byte, 0, r.chunkSize*2)
for i := 0; i < r.chunkSize && err == nil; i++ {
b, err = bufStream.ReadByte()
@@ -370,9 +395,8 @@ func (r *Repo) matchStream(stream io.Reader) []Chunk {
continue
}
if len(buff) == r.chunkSize*2 {
- log.Println("Add new chunk")
if prev != nil {
- chunk, _ := r.tryDeltaEncodeChunk(prev)
+ chunk, _ := r.encodeTempChunk(prev, version, &last)
chunks = append(chunks, chunk)
}
prev = NewTempChunk(buff[:r.chunkSize])
@@ -389,15 +413,12 @@ func (r *Repo) matchStream(stream io.Reader) []Chunk {
if len(buff) > 0 {
var temp *TempChunk
if len(buff) > r.chunkSize {
- log.Println("Add new chunk")
prev = NewTempChunk(buff[:r.chunkSize])
- log.Println("Add new partial chunk of size:", len(buff)-r.chunkSize)
temp = NewTempChunk(buff[r.chunkSize:])
} else {
- log.Println("Add new partial chunk of size:", len(buff))
temp = NewTempChunk(buff)
}
- chunks = append(chunks, r.tryDeltaEncodeChunks(prev, temp)...)
+ chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last)...)
}
return chunks
}