aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorn-peugnet <n.peugnet@free.fr>2021-09-02 17:48:23 +0200
committern-peugnet <n.peugnet@free.fr>2021-09-02 17:48:23 +0200
commitd9703a9daa05f30e77bb99393eab8a0d40e788e4 (patch)
tree813f08947ad0958dd3c42f7db62bd39fe6430bbf
parent2a048d5d8b2b25326685ba53fc390781ba96deed (diff)
downloaddna-backup-d9703a9daa05f30e77bb99393eab8a0d40e788e4.tar.gz
dna-backup-d9703a9daa05f30e77bb99393eab8a0d40e788e4.zip
hash and store chunks left by matchStream
-rw-r--r--TODO.md18
-rw-r--r--chunk.go23
-rw-r--r--repo.go91
-rw-r--r--repo_test.go7
-rw-r--r--sketch_test.go2
5 files changed, 86 insertions, 55 deletions
diff --git a/TODO.md b/TODO.md
index d4873aa..e7e1976 100644
--- a/TODO.md
+++ b/TODO.md
@@ -1,7 +1,7 @@
priority 1
----------
-- add deltaEncode chunks function
- - do not merge consecutive smaller chunks as these could be stored as chunks if no similar chunk is found. Thus it will need to be of `chunkSize` or less. Otherwise it could not be possibly used for deduplication.
+- [x] add deltaEncode chunks function
+ - [x] do not merge consecutive smaller chunks as these could be stored as chunks if no similar chunk is found. Thus it will need to be of `chunkSize` or less. Otherwise it could not be possibly used for deduplication.
```
for each new chunk:
find similar in sketchMap
@@ -12,13 +12,13 @@ priority 1
store in fingerprintMap
store in sketchMap
```
-- read from repo
- - store recipe
- - load recipe
- - read chunks in-order into a stream
-- properly store informations to be DNA encoded
+- [ ] read from repo
+ - [ ] store recipe
+ - [ ] load recipe
+ - [ ] read chunks in-order into a stream
+- [ ] properly store informations to be DNA encoded
priority 2
----------
-- use more the `Reader` API (which is analoguous to the `IOStream` in Java)
-- refactor matchStream as right now it is quite complex
+- [ ] use more the `Reader` API (which is analoguous to the `IOStream` in Java)
+- [ ] refactor matchStream as right now it is quite complex
diff --git a/chunk.go b/chunk.go
index abcdf1c..6c76d5e 100644
--- a/chunk.go
+++ b/chunk.go
@@ -20,11 +20,16 @@ type Chunk interface {
Len() int
}
-type StoredChunk interface {
+type IdentifiedChunk interface {
Chunk
Id() *ChunkId
}
+type BufferedChunk interface {
+ Chunk
+ Bytes() []byte
+}
+
type ChunkId struct {
Ver int
Idx uint64
@@ -65,25 +70,29 @@ func (c *LoadedChunk) Len() int {
return len(c.value)
}
-func NewChunkFile(repo *Repo, id *ChunkId) *ChunkFile {
- return &ChunkFile{repo: repo, id: id}
+func (c *LoadedChunk) Bytes() []byte {
+ return c.value
+}
+
+func NewStoredFile(repo *Repo, id *ChunkId) *StoredChunk {
+ return &StoredChunk{repo: repo, id: id}
}
-type ChunkFile struct {
+type StoredChunk struct {
repo *Repo
id *ChunkId
}
-func (c *ChunkFile) Id() *ChunkId {
+func (c *StoredChunk) Id() *ChunkId {
return c.id
}
-func (c *ChunkFile) Reader() ChunkReader {
+func (c *StoredChunk) Reader() ChunkReader {
// log.Printf("Chunk %d: Reading from file\n", c.id)
return c.id.Reader(c.repo)
}
-func (c *ChunkFile) Len() int {
+func (c *StoredChunk) Len() int {
path := c.id.Path(c.repo.path)
info, err := os.Stat(path)
if err != nil {
diff --git a/repo.go b/repo.go
index 7fa4625..29f683c 100644
--- a/repo.go
+++ b/repo.go
@@ -92,12 +92,12 @@ func (r *Repo) Commit(source string) {
os.Mkdir(newPath, 0775)
os.Mkdir(newChunkPath, 0775)
reader, writer := io.Pipe()
- oldChunks := make(chan StoredChunk, 16)
+ oldChunks := make(chan IdentifiedChunk, 16)
files := listFiles(source)
go r.loadChunks(versions, oldChunks)
go concatFiles(files, writer)
r.hashChunks(oldChunks)
- chunks := r.matchStream(reader)
+ chunks := r.matchStream(reader, newVersion)
extractTempChunks(chunks)
// storeChunks(newChunkPath, newChunks)
// storeFiles(newFilesPath, files)
@@ -209,7 +209,7 @@ func storeChunks(dest string, chunks <-chan []byte) {
}
}
-func (r *Repo) loadChunks(versions []string, chunks chan<- StoredChunk) {
+func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) {
for i, v := range versions {
p := path.Join(v, chunksName)
entries, err := os.ReadDir(p)
@@ -243,23 +243,30 @@ func (r *Repo) loadChunks(versions []string, chunks chan<- StoredChunk) {
// For each chunk, both a fingerprint (hash over the full content) and a sketch
// (resemblance hash based on maximal values of regions) are calculated and
// stored in an hashmap.
-func (r *Repo) hashChunks(chunks <-chan StoredChunk) {
- hasher := hash.Hash64(rabinkarp64.New())
+func (r *Repo) hashChunks(chunks <-chan IdentifiedChunk) {
+ hasher := rabinkarp64.New()
for c := range chunks {
- hasher.Reset()
- io.Copy(hasher, c.Reader())
- h := hasher.Sum64()
- r.fingerprints[h] = c.Id()
- sketch, _ := SketchChunk(c, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount)
- for _, s := range sketch {
- prev := r.sketches[s]
- if contains(prev, c.Id()) {
- continue
- }
- r.sketches[s] = append(prev, c.Id())
+ r.hashAndStoreChunk(c, hasher)
+ }
+}
+
+func (r *Repo) hashAndStoreChunk(chunk IdentifiedChunk, hasher hash.Hash64) {
+ hasher.Reset()
+ io.Copy(hasher, chunk.Reader())
+ fingerprint := hasher.Sum64()
+ sketch, _ := SketchChunk(chunk, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount)
+ r.storeChunkId(chunk.Id(), fingerprint, sketch)
+}
+
+func (r *Repo) storeChunkId(id *ChunkId, fingerprint uint64, sketch []uint64) {
+ r.fingerprints[fingerprint] = id
+ for _, s := range sketch {
+ prev := r.sketches[s]
+ if contains(prev, id) {
+ continue
}
+ r.sketches[s] = append(prev, id)
}
- return
}
func contains(s []*ChunkId, id *ChunkId) bool {
@@ -294,7 +301,7 @@ func (r *Repo) findSimilarChunk(chunk Chunk) (*ChunkId, bool) {
return similarChunk, similarChunk != nil
}
-func (r *Repo) tryDeltaEncodeChunk(temp *TempChunk) (Chunk, bool) {
+func (r *Repo) tryDeltaEncodeChunk(temp BufferedChunk) (Chunk, bool) {
id, found := r.findSimilarChunk(temp)
if found {
var buff bytes.Buffer
@@ -309,32 +316,51 @@ func (r *Repo) tryDeltaEncodeChunk(temp *TempChunk) (Chunk, bool) {
}, true
}
}
- // TODO: if temp is of chunkSize, save it as a real new Chunk (add it to maps)
return temp, false
}
-func (r *Repo) tryDeltaEncodeChunks(prev *TempChunk, curr *TempChunk) []Chunk {
+func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (chunk Chunk, isDelta bool) {
+ chunk, isDelta = r.tryDeltaEncodeChunk(temp)
+ if isDelta {
+ log.Println("Add new delta chunk")
+ return
+ }
+ if chunk.Len() == r.chunkSize {
+ *last++
+ id := &ChunkId{Ver: version, Idx: *last}
+ ic := NewLoadedChunk(id, temp.Bytes())
+ hasher := rabinkarp64.New()
+ r.hashAndStoreChunk(ic, hasher)
+ log.Println("Add new chunk", id)
+ return ic, false
+ }
+ log.Println("Add new partial chunk of size:", chunk.Len())
+ return
+}
+
+func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version int, last *uint64) []Chunk {
if prev == nil {
- c, _ := r.tryDeltaEncodeChunk(curr)
+ c, _ := r.encodeTempChunk(curr, version, last)
return []Chunk{c}
} else if curr.Len() < r.chunkMinLen() {
- c, success := r.tryDeltaEncodeChunk(NewTempChunk(append(prev.Bytes(), curr.Bytes()...)))
+ c, success := r.encodeTempChunk(NewTempChunk(append(prev.Bytes(), curr.Bytes()...)), version, last)
if success {
return []Chunk{c}
} else {
return []Chunk{prev, curr}
}
} else {
- prevD, _ := r.tryDeltaEncodeChunk(prev)
- currD, _ := r.tryDeltaEncodeChunk(curr)
+ prevD, _ := r.encodeTempChunk(prev, version, last)
+ currD, _ := r.encodeTempChunk(curr, version, last)
return []Chunk{prevD, currD}
}
}
-func (r *Repo) matchStream(stream io.Reader) []Chunk {
+func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
var b byte
var chunks []Chunk
var prev *TempChunk
+ var last uint64
bufStream := bufio.NewReaderSize(stream, r.chunkSize)
buff := make([]byte, 0, r.chunkSize*2)
n, err := io.ReadFull(stream, buff[:r.chunkSize])
@@ -350,17 +376,16 @@ func (r *Repo) matchStream(stream io.Reader) []Chunk {
if exists {
if len(buff) > r.chunkSize && len(buff) < r.chunkSize*2 {
size := len(buff) - r.chunkSize
- log.Println("Add new partial chunk of size:", size)
temp := NewTempChunk(buff[:size])
- chunks = append(chunks, r.tryDeltaEncodeChunks(prev, temp)...)
+ chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last)...)
prev = nil
} else if prev != nil {
- c, _ := r.tryDeltaEncodeChunk(prev)
+ c, _ := r.encodeTempChunk(prev, version, &last)
chunks = append(chunks, c)
prev = nil
}
log.Printf("Add existing chunk: %d\n", chunkId)
- chunks = append(chunks, NewChunkFile(r, chunkId))
+ chunks = append(chunks, NewStoredFile(r, chunkId))
buff = make([]byte, 0, r.chunkSize*2)
for i := 0; i < r.chunkSize && err == nil; i++ {
b, err = bufStream.ReadByte()
@@ -370,9 +395,8 @@ func (r *Repo) matchStream(stream io.Reader) []Chunk {
continue
}
if len(buff) == r.chunkSize*2 {
- log.Println("Add new chunk")
if prev != nil {
- chunk, _ := r.tryDeltaEncodeChunk(prev)
+ chunk, _ := r.encodeTempChunk(prev, version, &last)
chunks = append(chunks, chunk)
}
prev = NewTempChunk(buff[:r.chunkSize])
@@ -389,15 +413,12 @@ func (r *Repo) matchStream(stream io.Reader) []Chunk {
if len(buff) > 0 {
var temp *TempChunk
if len(buff) > r.chunkSize {
- log.Println("Add new chunk")
prev = NewTempChunk(buff[:r.chunkSize])
- log.Println("Add new partial chunk of size:", len(buff)-r.chunkSize)
temp = NewTempChunk(buff[r.chunkSize:])
} else {
- log.Println("Add new partial chunk of size:", len(buff))
temp = NewTempChunk(buff)
}
- chunks = append(chunks, r.tryDeltaEncodeChunks(prev, temp)...)
+ chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last)...)
}
return chunks
}
diff --git a/repo_test.go b/repo_test.go
index 694c413..21eccac 100644
--- a/repo_test.go
+++ b/repo_test.go
@@ -94,7 +94,7 @@ func TestLoadChunks(t *testing.T) {
reader2, writer2 := io.Pipe()
chunks1 := make(chan []byte, 16)
chunks2 := make(chan []byte, 16)
- chunks3 := make(chan StoredChunk, 16)
+ chunks3 := make(chan IdentifiedChunk, 16)
files := listFiles(dataDir)
go concatFiles(files, writer1)
go concatFiles(files, writer2)
@@ -185,14 +185,15 @@ func TestBsdiff(t *testing.T) {
defer os.Remove(addedFile2)
// Load previously stored chunks
- oldChunks := make(chan StoredChunk, 16)
+ oldChunks := make(chan IdentifiedChunk, 16)
versions := repo.loadVersions()
+ newVersion := len(versions)
go repo.loadChunks(versions, oldChunks)
repo.hashChunks(oldChunks)
// Read new data
reader := getDataStream(dataDir, concatFiles)
- recipe := repo.matchStream(reader)
+ recipe := repo.matchStream(reader, newVersion)
newChunks := extractDeltaChunks(repo.mergeTempChunks(recipe))
assertLen(t, 2, newChunks, "New delta chunks:")
for _, c := range newChunks {
diff --git a/sketch_test.go b/sketch_test.go
index 962dce0..d08e2e3 100644
--- a/sketch_test.go
+++ b/sketch_test.go
@@ -8,7 +8,7 @@ import (
func TestSketchChunk(t *testing.T) {
dataDir := path.Join("test", "data", "repo_8k")
- chunks := make(chan StoredChunk, 16)
+ chunks := make(chan IdentifiedChunk, 16)
repo := NewRepo(dataDir)
versions := repo.loadVersions()
go repo.loadChunks(versions, chunks)