diff options
author | n-peugnet <n.peugnet@free.fr> | 2021-09-02 17:48:23 +0200 |
---|---|---|
committer | n-peugnet <n.peugnet@free.fr> | 2021-09-02 17:48:23 +0200 |
commit | d9703a9daa05f30e77bb99393eab8a0d40e788e4 (patch) | |
tree | 813f08947ad0958dd3c42f7db62bd39fe6430bbf | |
parent | 2a048d5d8b2b25326685ba53fc390781ba96deed (diff) | |
download | dna-backup-d9703a9daa05f30e77bb99393eab8a0d40e788e4.tar.gz dna-backup-d9703a9daa05f30e77bb99393eab8a0d40e788e4.zip |
hash and store chunks left by matchStream
-rw-r--r-- | TODO.md | 18 | ||||
-rw-r--r-- | chunk.go | 23 | ||||
-rw-r--r-- | repo.go | 91 | ||||
-rw-r--r-- | repo_test.go | 7 | ||||
-rw-r--r-- | sketch_test.go | 2 |
5 files changed, 86 insertions, 55 deletions
@@ -1,7 +1,7 @@ priority 1 ---------- -- add deltaEncode chunks function - - do not merge consecutive smaller chunks as these could be stored as chunks if no similar chunk is found. Thus it will need to be of `chunkSize` or less. Otherwise it could not be possibly used for deduplication. +- [x] add deltaEncode chunks function + - [x] do not merge consecutive smaller chunks as these could be stored as chunks if no similar chunk is found. Thus it will need to be of `chunkSize` or less. Otherwise it could not be possibly used for deduplication. ``` for each new chunk: find similar in sketchMap @@ -12,13 +12,13 @@ priority 1 store in fingerprintMap store in sketchMap ``` -- read from repo - - store recipe - - load recipe - - read chunks in-order into a stream -- properly store informations to be DNA encoded +- [ ] read from repo + - [ ] store recipe + - [ ] load recipe + - [ ] read chunks in-order into a stream +- [ ] properly store informations to be DNA encoded priority 2 ---------- -- use more the `Reader` API (which is analoguous to the `IOStream` in Java) -- refactor matchStream as right now it is quite complex +- [ ] use more the `Reader` API (which is analoguous to the `IOStream` in Java) +- [ ] refactor matchStream as right now it is quite complex @@ -20,11 +20,16 @@ type Chunk interface { Len() int } -type StoredChunk interface { +type IdentifiedChunk interface { Chunk Id() *ChunkId } +type BufferedChunk interface { + Chunk + Bytes() []byte +} + type ChunkId struct { Ver int Idx uint64 @@ -65,25 +70,29 @@ func (c *LoadedChunk) Len() int { return len(c.value) } -func NewChunkFile(repo *Repo, id *ChunkId) *ChunkFile { - return &ChunkFile{repo: repo, id: id} +func (c *LoadedChunk) Bytes() []byte { + return c.value +} + +func NewStoredFile(repo *Repo, id *ChunkId) *StoredChunk { + return &StoredChunk{repo: repo, id: id} } -type ChunkFile struct { +type StoredChunk struct { repo *Repo id *ChunkId } -func (c *ChunkFile) Id() *ChunkId { +func (c *StoredChunk) Id() *ChunkId { return c.id } -func (c *ChunkFile) Reader() ChunkReader { +func (c *StoredChunk) Reader() ChunkReader { // log.Printf("Chunk %d: Reading from file\n", c.id) return c.id.Reader(c.repo) } -func (c *ChunkFile) Len() int { +func (c *StoredChunk) Len() int { path := c.id.Path(c.repo.path) info, err := os.Stat(path) if err != nil { @@ -92,12 +92,12 @@ func (r *Repo) Commit(source string) { os.Mkdir(newPath, 0775) os.Mkdir(newChunkPath, 0775) reader, writer := io.Pipe() - oldChunks := make(chan StoredChunk, 16) + oldChunks := make(chan IdentifiedChunk, 16) files := listFiles(source) go r.loadChunks(versions, oldChunks) go concatFiles(files, writer) r.hashChunks(oldChunks) - chunks := r.matchStream(reader) + chunks := r.matchStream(reader, newVersion) extractTempChunks(chunks) // storeChunks(newChunkPath, newChunks) // storeFiles(newFilesPath, files) @@ -209,7 +209,7 @@ func storeChunks(dest string, chunks <-chan []byte) { } } -func (r *Repo) loadChunks(versions []string, chunks chan<- StoredChunk) { +func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) { for i, v := range versions { p := path.Join(v, chunksName) entries, err := os.ReadDir(p) @@ -243,23 +243,30 @@ func (r *Repo) loadChunks(versions []string, chunks chan<- StoredChunk) { // For each chunk, both a fingerprint (hash over the full content) and a sketch // (resemblance hash based on maximal values of regions) are calculated and // stored in an hashmap. -func (r *Repo) hashChunks(chunks <-chan StoredChunk) { - hasher := hash.Hash64(rabinkarp64.New()) +func (r *Repo) hashChunks(chunks <-chan IdentifiedChunk) { + hasher := rabinkarp64.New() for c := range chunks { - hasher.Reset() - io.Copy(hasher, c.Reader()) - h := hasher.Sum64() - r.fingerprints[h] = c.Id() - sketch, _ := SketchChunk(c, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount) - for _, s := range sketch { - prev := r.sketches[s] - if contains(prev, c.Id()) { - continue - } - r.sketches[s] = append(prev, c.Id()) + r.hashAndStoreChunk(c, hasher) + } +} + +func (r *Repo) hashAndStoreChunk(chunk IdentifiedChunk, hasher hash.Hash64) { + hasher.Reset() + io.Copy(hasher, chunk.Reader()) + fingerprint := hasher.Sum64() + sketch, _ := SketchChunk(chunk, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount) + r.storeChunkId(chunk.Id(), fingerprint, sketch) +} + +func (r *Repo) storeChunkId(id *ChunkId, fingerprint uint64, sketch []uint64) { + r.fingerprints[fingerprint] = id + for _, s := range sketch { + prev := r.sketches[s] + if contains(prev, id) { + continue } + r.sketches[s] = append(prev, id) } - return } func contains(s []*ChunkId, id *ChunkId) bool { @@ -294,7 +301,7 @@ func (r *Repo) findSimilarChunk(chunk Chunk) (*ChunkId, bool) { return similarChunk, similarChunk != nil } -func (r *Repo) tryDeltaEncodeChunk(temp *TempChunk) (Chunk, bool) { +func (r *Repo) tryDeltaEncodeChunk(temp BufferedChunk) (Chunk, bool) { id, found := r.findSimilarChunk(temp) if found { var buff bytes.Buffer @@ -309,32 +316,51 @@ func (r *Repo) tryDeltaEncodeChunk(temp *TempChunk) (Chunk, bool) { }, true } } - // TODO: if temp is of chunkSize, save it as a real new Chunk (add it to maps) return temp, false } -func (r *Repo) tryDeltaEncodeChunks(prev *TempChunk, curr *TempChunk) []Chunk { +func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (chunk Chunk, isDelta bool) { + chunk, isDelta = r.tryDeltaEncodeChunk(temp) + if isDelta { + log.Println("Add new delta chunk") + return + } + if chunk.Len() == r.chunkSize { + *last++ + id := &ChunkId{Ver: version, Idx: *last} + ic := NewLoadedChunk(id, temp.Bytes()) + hasher := rabinkarp64.New() + r.hashAndStoreChunk(ic, hasher) + log.Println("Add new chunk", id) + return ic, false + } + log.Println("Add new partial chunk of size:", chunk.Len()) + return +} + +func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version int, last *uint64) []Chunk { if prev == nil { - c, _ := r.tryDeltaEncodeChunk(curr) + c, _ := r.encodeTempChunk(curr, version, last) return []Chunk{c} } else if curr.Len() < r.chunkMinLen() { - c, success := r.tryDeltaEncodeChunk(NewTempChunk(append(prev.Bytes(), curr.Bytes()...))) + c, success := r.encodeTempChunk(NewTempChunk(append(prev.Bytes(), curr.Bytes()...)), version, last) if success { return []Chunk{c} } else { return []Chunk{prev, curr} } } else { - prevD, _ := r.tryDeltaEncodeChunk(prev) - currD, _ := r.tryDeltaEncodeChunk(curr) + prevD, _ := r.encodeTempChunk(prev, version, last) + currD, _ := r.encodeTempChunk(curr, version, last) return []Chunk{prevD, currD} } } -func (r *Repo) matchStream(stream io.Reader) []Chunk { +func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { var b byte var chunks []Chunk var prev *TempChunk + var last uint64 bufStream := bufio.NewReaderSize(stream, r.chunkSize) buff := make([]byte, 0, r.chunkSize*2) n, err := io.ReadFull(stream, buff[:r.chunkSize]) @@ -350,17 +376,16 @@ func (r *Repo) matchStream(stream io.Reader) []Chunk { if exists { if len(buff) > r.chunkSize && len(buff) < r.chunkSize*2 { size := len(buff) - r.chunkSize - log.Println("Add new partial chunk of size:", size) temp := NewTempChunk(buff[:size]) - chunks = append(chunks, r.tryDeltaEncodeChunks(prev, temp)...) + chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last)...) prev = nil } else if prev != nil { - c, _ := r.tryDeltaEncodeChunk(prev) + c, _ := r.encodeTempChunk(prev, version, &last) chunks = append(chunks, c) prev = nil } log.Printf("Add existing chunk: %d\n", chunkId) - chunks = append(chunks, NewChunkFile(r, chunkId)) + chunks = append(chunks, NewStoredFile(r, chunkId)) buff = make([]byte, 0, r.chunkSize*2) for i := 0; i < r.chunkSize && err == nil; i++ { b, err = bufStream.ReadByte() @@ -370,9 +395,8 @@ func (r *Repo) matchStream(stream io.Reader) []Chunk { continue } if len(buff) == r.chunkSize*2 { - log.Println("Add new chunk") if prev != nil { - chunk, _ := r.tryDeltaEncodeChunk(prev) + chunk, _ := r.encodeTempChunk(prev, version, &last) chunks = append(chunks, chunk) } prev = NewTempChunk(buff[:r.chunkSize]) @@ -389,15 +413,12 @@ func (r *Repo) matchStream(stream io.Reader) []Chunk { if len(buff) > 0 { var temp *TempChunk if len(buff) > r.chunkSize { - log.Println("Add new chunk") prev = NewTempChunk(buff[:r.chunkSize]) - log.Println("Add new partial chunk of size:", len(buff)-r.chunkSize) temp = NewTempChunk(buff[r.chunkSize:]) } else { - log.Println("Add new partial chunk of size:", len(buff)) temp = NewTempChunk(buff) } - chunks = append(chunks, r.tryDeltaEncodeChunks(prev, temp)...) + chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last)...) } return chunks } diff --git a/repo_test.go b/repo_test.go index 694c413..21eccac 100644 --- a/repo_test.go +++ b/repo_test.go @@ -94,7 +94,7 @@ func TestLoadChunks(t *testing.T) { reader2, writer2 := io.Pipe() chunks1 := make(chan []byte, 16) chunks2 := make(chan []byte, 16) - chunks3 := make(chan StoredChunk, 16) + chunks3 := make(chan IdentifiedChunk, 16) files := listFiles(dataDir) go concatFiles(files, writer1) go concatFiles(files, writer2) @@ -185,14 +185,15 @@ func TestBsdiff(t *testing.T) { defer os.Remove(addedFile2) // Load previously stored chunks - oldChunks := make(chan StoredChunk, 16) + oldChunks := make(chan IdentifiedChunk, 16) versions := repo.loadVersions() + newVersion := len(versions) go repo.loadChunks(versions, oldChunks) repo.hashChunks(oldChunks) // Read new data reader := getDataStream(dataDir, concatFiles) - recipe := repo.matchStream(reader) + recipe := repo.matchStream(reader, newVersion) newChunks := extractDeltaChunks(repo.mergeTempChunks(recipe)) assertLen(t, 2, newChunks, "New delta chunks:") for _, c := range newChunks { diff --git a/sketch_test.go b/sketch_test.go index 962dce0..d08e2e3 100644 --- a/sketch_test.go +++ b/sketch_test.go @@ -8,7 +8,7 @@ import ( func TestSketchChunk(t *testing.T) { dataDir := path.Join("test", "data", "repo_8k") - chunks := make(chan StoredChunk, 16) + chunks := make(chan IdentifiedChunk, 16) repo := NewRepo(dataDir) versions := repo.loadVersions() go repo.loadChunks(versions, chunks) |