From 2f20511f442cecc764c817709c4358a149984766 Mon Sep 17 00:00:00 2001 From: n-peugnet Date: Wed, 15 Sep 2021 15:06:30 +0200 Subject: add storeWorker and use it to store chunk content and hashes --- repo.go | 151 +++++++++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 96 insertions(+), 55 deletions(-) (limited to 'repo.go') diff --git a/repo.go b/repo.go index 4aa8cfc..126ce41 100644 --- a/repo.go +++ b/repo.go @@ -51,6 +51,16 @@ import ( type FingerprintMap map[uint64]*ChunkId type SketchMap map[uint64][]*ChunkId +func (m SketchMap) Set(key []uint64, value *ChunkId) { + for _, s := range key { + prev := m[s] + if contains(prev, value) { + continue + } + m[s] = append(prev, value) + } +} + type Repo struct { path string chunkSize int @@ -67,6 +77,17 @@ type Repo struct { chunkWriteWrapper func(w io.Writer) io.WriteCloser } +type chunkHashes struct { + Fp uint64 + Sk []uint64 +} + +type chunkData struct { + hashes chunkHashes + content []byte + id *ChunkId +} + type File struct { Path string Size int64 @@ -114,22 +135,16 @@ func (r *Repo) Commit(source string) { newPath := filepath.Join(r.path, fmt.Sprintf(versionFmt, newVersion)) newChunkPath := filepath.Join(newPath, chunksName) newFilesPath := filepath.Join(newPath, filesName) - newFingerprintsPath := filepath.Join(newPath, fingerprintsName) newRecipePath := filepath.Join(newPath, recipeName) - newSketchesPath := filepath.Join(newPath, sketchesName) os.Mkdir(newPath, 0775) // TODO: handle errors os.Mkdir(newChunkPath, 0775) // TODO: handle errors reader, writer := io.Pipe() - oldChunks := make(chan IdentifiedChunk, 16) files := listFiles(source) - go r.loadChunks(versions, oldChunks) + r.loadHashes(versions) go concatFiles(files, writer) - r.hashChunks(oldChunks) recipe := r.matchStream(reader, newVersion) storeFileList(newFilesPath, unprefixFiles(files, source)) - storeFingerprints(newFingerprintsPath, r.fingerprints) storeRecipe(newRecipePath, recipe) - storeSketches(newSketchesPath, r.sketches) logger.Info(files) } @@ -263,6 +278,27 @@ func loadFileList(path string) []File { return files } +func (r *Repo) storageWorker(version int, storeQueue <-chan chunkData, end chan<- bool) { + hashesFile := filepath.Join(r.path, fmt.Sprintf(versionFmt, version), hashesName) + file, err := os.Create(hashesFile) + if err != nil { + logger.Panic(err) + } + encoder := gob.NewEncoder(file) + for data := range storeQueue { + err = encoder.Encode(data.hashes) + err := r.StoreChunkContent(data.id, bytes.NewReader(data.content)) + if err != nil { + logger.Error(err) + } + logger.Info("stored", data.id) + } + if err = file.Close(); err != nil { + logger.Panic(err) + } + end <- true +} + func (r *Repo) StoreChunkContent(id *ChunkId, reader io.Reader) error { path := id.Path(r.path) file, err := os.Create(path) @@ -309,7 +345,7 @@ func (r *Repo) LoadChunkContent(id *ChunkId) *bytes.Reader { return bytes.NewReader(value) } -// TODO: use atoi for chunkid +// TODO: use atoi for chunkid ? func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) { for i, v := range versions { p := filepath.Join(v, chunksName) @@ -329,41 +365,58 @@ func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) { close(chunks) } +func (r *Repo) loadHashes(versions []string) { + for i, v := range versions { + path := filepath.Join(v, hashesName) + file, err := os.Open(path) + if err == nil { + decoder := gob.NewDecoder(file) + for j := 0; err == nil; j++ { + var h chunkHashes + if err = decoder.Decode(&h); err == nil { + id := &ChunkId{i, uint64(j)} + r.fingerprints[h.Fp] = id + r.sketches.Set(h.Sk, id) + } + } + } + if err != nil && err != io.EOF { + logger.Panic(err) + } + if err = file.Close(); err != nil { + logger.Panic(err) + } + } +} + func (r *Repo) chunkMinLen() int { return sketch.SuperFeatureSize(r.chunkSize, r.sketchSfCount, r.sketchFCount) } // hashChunks calculates the hashes for a channel of chunks. -// // For each chunk, both a fingerprint (hash over the full content) and a sketch // (resemblance hash based on maximal values of regions) are calculated and // stored in an hashmap. func (r *Repo) hashChunks(chunks <-chan IdentifiedChunk) { for c := range chunks { - r.hashAndStoreChunk(c.GetId(), c.Reader()) + r.hashChunk(c.GetId(), c.Reader()) } } -func (r *Repo) hashAndStoreChunk(id *ChunkId, reader io.Reader) { +// hashChunk calculates the hashes for a chunk and store them in th repo hashmaps. +func (r *Repo) hashChunk(id *ChunkId, reader io.Reader) (fp uint64, sk []uint64) { var buffSk bytes.Buffer var buffFp bytes.Buffer var wg sync.WaitGroup reader = io.TeeReader(reader, &buffSk) io.Copy(&buffFp, reader) - var fp uint64 - var sk []uint64 wg.Add(2) go r.makeFingerprint(id, &buffFp, &wg, &fp) go r.makeSketch(id, &buffSk, &wg, &sk) wg.Wait() r.fingerprints[fp] = id - for _, s := range sk { - prev := r.sketches[s] - if contains(prev, id) { - continue - } - r.sketches[s] = append(prev, id) - } + r.sketches.Set(sk, id) + return } func (r *Repo) makeFingerprint(id *ChunkId, reader io.Reader, wg *sync.WaitGroup, ret *uint64) { @@ -429,7 +482,7 @@ func (r *Repo) tryDeltaEncodeChunk(temp BufferedChunk) (Chunk, bool) { // encodeTempChunk first tries to delta-encode the given chunk before attributing // it an Id and saving it into the fingerprints and sketches maps. -func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (chunk Chunk, isDelta bool) { +func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64, storeQueue chan<- chunkData) (chunk Chunk, isDelta bool) { chunk, isDelta = r.tryDeltaEncodeChunk(temp) if isDelta { logger.Info("add new delta chunk") @@ -438,11 +491,13 @@ func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (c if chunk.Len() == r.chunkSize { id := &ChunkId{Ver: version, Idx: *last} *last++ - r.hashAndStoreChunk(id, temp.Reader()) - err := r.StoreChunkContent(id, temp.Reader()) - if err != nil { - logger.Error(err) + fp, sk := r.hashChunk(id, temp.Reader()) + storeQueue <- chunkData{ + hashes: chunkHashes{fp, sk}, + content: temp.Bytes(), + id: id, } + r.chunkCache.Set(id, temp.Bytes()) logger.Info("add new chunk", id) return NewStoredChunk(r, id), false } @@ -454,20 +509,21 @@ func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (c // Temporary chunks can be partial. If the current chunk is smaller than the size of a // super-feature and there exists a previous chunk, then both are merged before attempting // to delta-encode them. -func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version int, last *uint64) []Chunk { +func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version int, last *uint64, storeQueue chan<- chunkData) []Chunk { if reflect.ValueOf(prev).IsNil() { - c, _ := r.encodeTempChunk(curr, version, last) + c, _ := r.encodeTempChunk(curr, version, last, storeQueue) return []Chunk{c} } else if curr.Len() < r.chunkMinLen() { - c, success := r.encodeTempChunk(NewTempChunk(append(prev.Bytes(), curr.Bytes()...)), version, last) + tmp := NewTempChunk(append(prev.Bytes(), curr.Bytes()...)) + c, success := r.encodeTempChunk(tmp, version, last, storeQueue) if success { return []Chunk{c} } else { return []Chunk{prev, curr} } } else { - prevD, _ := r.encodeTempChunk(prev, version, last) - currD, _ := r.encodeTempChunk(curr, version, last) + prevD, _ := r.encodeTempChunk(prev, version, last, storeQueue) + currD, _ := r.encodeTempChunk(curr, version, last, storeQueue) return []Chunk{prevD, currD} } } @@ -490,6 +546,9 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { } hasher := rabinkarp64.NewFromPol(r.pol) hasher.Write(buff) + storeQueue := make(chan chunkData, 10) + storeEnd := make(chan bool) + go r.storageWorker(version, storeQueue, storeEnd) for err != io.EOF { h := hasher.Sum64() chunkId, exists := r.fingerprints[h] @@ -497,10 +556,10 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { if len(buff) > r.chunkSize && len(buff) < r.chunkSize*2 { size := len(buff) - r.chunkSize temp := NewTempChunk(buff[:size]) - chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last)...) + chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...) prev = nil } else if prev != nil { - c, _ := r.encodeTempChunk(prev, version, &last) + c, _ := r.encodeTempChunk(prev, version, &last, storeQueue) chunks = append(chunks, c) prev = nil } @@ -518,7 +577,7 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { } if len(buff) == r.chunkSize*2 { if prev != nil { - chunk, _ := r.encodeTempChunk(prev, version, &last) + chunk, _ := r.encodeTempChunk(prev, version, &last, storeQueue) chunks = append(chunks, chunk) } prev = NewTempChunk(buff[:r.chunkSize]) @@ -536,7 +595,7 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { var temp *TempChunk if len(buff) > r.chunkSize { if prev != nil { - chunk, _ := r.encodeTempChunk(prev, version, &last) + chunk, _ := r.encodeTempChunk(prev, version, &last, storeQueue) chunks = append(chunks, chunk) } prev = NewTempChunk(buff[:r.chunkSize]) @@ -544,8 +603,10 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { } else { temp = NewTempChunk(buff) } - chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last)...) + chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...) } + close(storeQueue) + <-storeEnd return chunks } @@ -606,26 +667,6 @@ func loadRecipe(path string) []Chunk { return recipe } -func storeFingerprints(dest string, fingerprints FingerprintMap) { - storeBasicStruct(dest, fingerprints) -} - -func loadFingerprints(path string) FingerprintMap { - var fingerprints FingerprintMap - loadBasicStruct(path, &fingerprints) - return fingerprints -} - -func storeSketches(dest string, sketches SketchMap) { - storeBasicStruct(dest, sketches) -} - -func loadSketches(path string) SketchMap { - var sketches SketchMap - loadBasicStruct(path, &sketches) - return sketches -} - func extractDeltaChunks(chunks []Chunk) (ret []*DeltaChunk) { for _, c := range chunks { tmp, isDelta := c.(*DeltaChunk) -- cgit v1.2.3