From d9703a9daa05f30e77bb99393eab8a0d40e788e4 Mon Sep 17 00:00:00 2001 From: n-peugnet Date: Thu, 2 Sep 2021 17:48:23 +0200 Subject: hash and store chunks left by matchStream --- repo.go | 91 ++++++++++++++++++++++++++++++++++++++++------------------------- 1 file changed, 56 insertions(+), 35 deletions(-) (limited to 'repo.go') diff --git a/repo.go b/repo.go index 7fa4625..29f683c 100644 --- a/repo.go +++ b/repo.go @@ -92,12 +92,12 @@ func (r *Repo) Commit(source string) { os.Mkdir(newPath, 0775) os.Mkdir(newChunkPath, 0775) reader, writer := io.Pipe() - oldChunks := make(chan StoredChunk, 16) + oldChunks := make(chan IdentifiedChunk, 16) files := listFiles(source) go r.loadChunks(versions, oldChunks) go concatFiles(files, writer) r.hashChunks(oldChunks) - chunks := r.matchStream(reader) + chunks := r.matchStream(reader, newVersion) extractTempChunks(chunks) // storeChunks(newChunkPath, newChunks) // storeFiles(newFilesPath, files) @@ -209,7 +209,7 @@ func storeChunks(dest string, chunks <-chan []byte) { } } -func (r *Repo) loadChunks(versions []string, chunks chan<- StoredChunk) { +func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) { for i, v := range versions { p := path.Join(v, chunksName) entries, err := os.ReadDir(p) @@ -243,23 +243,30 @@ func (r *Repo) loadChunks(versions []string, chunks chan<- StoredChunk) { // For each chunk, both a fingerprint (hash over the full content) and a sketch // (resemblance hash based on maximal values of regions) are calculated and // stored in an hashmap. -func (r *Repo) hashChunks(chunks <-chan StoredChunk) { - hasher := hash.Hash64(rabinkarp64.New()) +func (r *Repo) hashChunks(chunks <-chan IdentifiedChunk) { + hasher := rabinkarp64.New() for c := range chunks { - hasher.Reset() - io.Copy(hasher, c.Reader()) - h := hasher.Sum64() - r.fingerprints[h] = c.Id() - sketch, _ := SketchChunk(c, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount) - for _, s := range sketch { - prev := r.sketches[s] - if contains(prev, c.Id()) { - continue - } - r.sketches[s] = append(prev, c.Id()) + r.hashAndStoreChunk(c, hasher) + } +} + +func (r *Repo) hashAndStoreChunk(chunk IdentifiedChunk, hasher hash.Hash64) { + hasher.Reset() + io.Copy(hasher, chunk.Reader()) + fingerprint := hasher.Sum64() + sketch, _ := SketchChunk(chunk, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount) + r.storeChunkId(chunk.Id(), fingerprint, sketch) +} + +func (r *Repo) storeChunkId(id *ChunkId, fingerprint uint64, sketch []uint64) { + r.fingerprints[fingerprint] = id + for _, s := range sketch { + prev := r.sketches[s] + if contains(prev, id) { + continue } + r.sketches[s] = append(prev, id) } - return } func contains(s []*ChunkId, id *ChunkId) bool { @@ -294,7 +301,7 @@ func (r *Repo) findSimilarChunk(chunk Chunk) (*ChunkId, bool) { return similarChunk, similarChunk != nil } -func (r *Repo) tryDeltaEncodeChunk(temp *TempChunk) (Chunk, bool) { +func (r *Repo) tryDeltaEncodeChunk(temp BufferedChunk) (Chunk, bool) { id, found := r.findSimilarChunk(temp) if found { var buff bytes.Buffer @@ -309,32 +316,51 @@ func (r *Repo) tryDeltaEncodeChunk(temp *TempChunk) (Chunk, bool) { }, true } } - // TODO: if temp is of chunkSize, save it as a real new Chunk (add it to maps) return temp, false } -func (r *Repo) tryDeltaEncodeChunks(prev *TempChunk, curr *TempChunk) []Chunk { +func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (chunk Chunk, isDelta bool) { + chunk, isDelta = r.tryDeltaEncodeChunk(temp) + if isDelta { + log.Println("Add new delta chunk") + return + } + if chunk.Len() == r.chunkSize { + *last++ + id := &ChunkId{Ver: version, Idx: *last} + ic := NewLoadedChunk(id, temp.Bytes()) + hasher := rabinkarp64.New() + r.hashAndStoreChunk(ic, hasher) + log.Println("Add new chunk", id) + return ic, false + } + log.Println("Add new partial chunk of size:", chunk.Len()) + return +} + +func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version int, last *uint64) []Chunk { if prev == nil { - c, _ := r.tryDeltaEncodeChunk(curr) + c, _ := r.encodeTempChunk(curr, version, last) return []Chunk{c} } else if curr.Len() < r.chunkMinLen() { - c, success := r.tryDeltaEncodeChunk(NewTempChunk(append(prev.Bytes(), curr.Bytes()...))) + c, success := r.encodeTempChunk(NewTempChunk(append(prev.Bytes(), curr.Bytes()...)), version, last) if success { return []Chunk{c} } else { return []Chunk{prev, curr} } } else { - prevD, _ := r.tryDeltaEncodeChunk(prev) - currD, _ := r.tryDeltaEncodeChunk(curr) + prevD, _ := r.encodeTempChunk(prev, version, last) + currD, _ := r.encodeTempChunk(curr, version, last) return []Chunk{prevD, currD} } } -func (r *Repo) matchStream(stream io.Reader) []Chunk { +func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { var b byte var chunks []Chunk var prev *TempChunk + var last uint64 bufStream := bufio.NewReaderSize(stream, r.chunkSize) buff := make([]byte, 0, r.chunkSize*2) n, err := io.ReadFull(stream, buff[:r.chunkSize]) @@ -350,17 +376,16 @@ func (r *Repo) matchStream(stream io.Reader) []Chunk { if exists { if len(buff) > r.chunkSize && len(buff) < r.chunkSize*2 { size := len(buff) - r.chunkSize - log.Println("Add new partial chunk of size:", size) temp := NewTempChunk(buff[:size]) - chunks = append(chunks, r.tryDeltaEncodeChunks(prev, temp)...) + chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last)...) prev = nil } else if prev != nil { - c, _ := r.tryDeltaEncodeChunk(prev) + c, _ := r.encodeTempChunk(prev, version, &last) chunks = append(chunks, c) prev = nil } log.Printf("Add existing chunk: %d\n", chunkId) - chunks = append(chunks, NewChunkFile(r, chunkId)) + chunks = append(chunks, NewStoredFile(r, chunkId)) buff = make([]byte, 0, r.chunkSize*2) for i := 0; i < r.chunkSize && err == nil; i++ { b, err = bufStream.ReadByte() @@ -370,9 +395,8 @@ func (r *Repo) matchStream(stream io.Reader) []Chunk { continue } if len(buff) == r.chunkSize*2 { - log.Println("Add new chunk") if prev != nil { - chunk, _ := r.tryDeltaEncodeChunk(prev) + chunk, _ := r.encodeTempChunk(prev, version, &last) chunks = append(chunks, chunk) } prev = NewTempChunk(buff[:r.chunkSize]) @@ -389,15 +413,12 @@ func (r *Repo) matchStream(stream io.Reader) []Chunk { if len(buff) > 0 { var temp *TempChunk if len(buff) > r.chunkSize { - log.Println("Add new chunk") prev = NewTempChunk(buff[:r.chunkSize]) - log.Println("Add new partial chunk of size:", len(buff)-r.chunkSize) temp = NewTempChunk(buff[r.chunkSize:]) } else { - log.Println("Add new partial chunk of size:", len(buff)) temp = NewTempChunk(buff) } - chunks = append(chunks, r.tryDeltaEncodeChunks(prev, temp)...) + chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last)...) } return chunks } -- cgit v1.2.3