diff options
-rw-r--r-- | TODO.md | 5 | ||||
-rw-r--r-- | const.go | 13 | ||||
-rw-r--r-- | docs/note-2021-09-15.md | 6 | ||||
-rw-r--r-- | repo.go | 151 | ||||
-rw-r--r-- | repo_test.go | 10 | ||||
-rw-r--r-- | testdata/repo_8k/00000/fingerprints | bin | 208 -> 0 bytes | |||
-rw-r--r-- | testdata/repo_8k/00000/hashes | bin | 0 -> 570 bytes | |||
-rw-r--r-- | testdata/repo_8k/00000/sketches | bin | 538 -> 0 bytes | |||
-rw-r--r-- | testdata/repo_8k_zlib/00000/fingerprints | bin | 208 -> 0 bytes | |||
-rw-r--r-- | testdata/repo_8k_zlib/00000/hashes | bin | 0 -> 570 bytes | |||
-rw-r--r-- | testdata/repo_8k_zlib/00000/sketches | bin | 538 -> 0 bytes |
11 files changed, 117 insertions, 68 deletions
@@ -26,7 +26,10 @@ priority 2 - [ ] option to commit without deltas to save new base chunks - [ ] custom binary marshal and unmarshal for chunks - [x] use `loadChunkContent` in `loadChunks` -- [ ] TODO: store hashes for faster maps rebuild +- [x] save hashes for faster maps rebuild + - [x] store hashes for current version's chunks + - [x] load hashes for each version +- [x] use store queue to asynchronously store `chunkData` - [ ] try [Fdelta](https://github.com/amlwwalker/fdelta) and [Xdelta](https://github.com/nine-lives-later/go-xdelta) instead of Bsdiff - [ ] maybe use an LRU cache instead of the current FIFO one. @@ -1,11 +1,10 @@ package main const ( - chunksName = "chunks" - chunkIdFmt = "%015d" - versionFmt = "%05d" - filesName = "files" - fingerprintsName = "fingerprints" - recipeName = "recipe" - sketchesName = "sketches" + chunksName = "chunks" + chunkIdFmt = "%015d" + versionFmt = "%05d" + filesName = "files" + hashesName = "hashes" + recipeName = "recipe" ) diff --git a/docs/note-2021-09-15.md b/docs/note-2021-09-15.md new file mode 100644 index 0000000..8caf1cc --- /dev/null +++ b/docs/note-2021-09-15.md @@ -0,0 +1,6 @@ +Added storage worker +==================== + +## Time + +`14:49:00.221` -> `14:51:34.009` = `00:02:33.788` @@ -51,6 +51,16 @@ import ( type FingerprintMap map[uint64]*ChunkId type SketchMap map[uint64][]*ChunkId +func (m SketchMap) Set(key []uint64, value *ChunkId) { + for _, s := range key { + prev := m[s] + if contains(prev, value) { + continue + } + m[s] = append(prev, value) + } +} + type Repo struct { path string chunkSize int @@ -67,6 +77,17 @@ type Repo struct { chunkWriteWrapper func(w io.Writer) io.WriteCloser } +type chunkHashes struct { + Fp uint64 + Sk []uint64 +} + +type chunkData struct { + hashes chunkHashes + content []byte + id *ChunkId +} + type File struct { Path string Size int64 @@ -114,22 +135,16 @@ func (r *Repo) Commit(source string) { newPath := filepath.Join(r.path, fmt.Sprintf(versionFmt, newVersion)) newChunkPath := filepath.Join(newPath, chunksName) newFilesPath := filepath.Join(newPath, filesName) - newFingerprintsPath := filepath.Join(newPath, fingerprintsName) newRecipePath := filepath.Join(newPath, recipeName) - newSketchesPath := filepath.Join(newPath, sketchesName) os.Mkdir(newPath, 0775) // TODO: handle errors os.Mkdir(newChunkPath, 0775) // TODO: handle errors reader, writer := io.Pipe() - oldChunks := make(chan IdentifiedChunk, 16) files := listFiles(source) - go r.loadChunks(versions, oldChunks) + r.loadHashes(versions) go concatFiles(files, writer) - r.hashChunks(oldChunks) recipe := r.matchStream(reader, newVersion) storeFileList(newFilesPath, unprefixFiles(files, source)) - storeFingerprints(newFingerprintsPath, r.fingerprints) storeRecipe(newRecipePath, recipe) - storeSketches(newSketchesPath, r.sketches) logger.Info(files) } @@ -263,6 +278,27 @@ func loadFileList(path string) []File { return files } +func (r *Repo) storageWorker(version int, storeQueue <-chan chunkData, end chan<- bool) { + hashesFile := filepath.Join(r.path, fmt.Sprintf(versionFmt, version), hashesName) + file, err := os.Create(hashesFile) + if err != nil { + logger.Panic(err) + } + encoder := gob.NewEncoder(file) + for data := range storeQueue { + err = encoder.Encode(data.hashes) + err := r.StoreChunkContent(data.id, bytes.NewReader(data.content)) + if err != nil { + logger.Error(err) + } + logger.Info("stored", data.id) + } + if err = file.Close(); err != nil { + logger.Panic(err) + } + end <- true +} + func (r *Repo) StoreChunkContent(id *ChunkId, reader io.Reader) error { path := id.Path(r.path) file, err := os.Create(path) @@ -309,7 +345,7 @@ func (r *Repo) LoadChunkContent(id *ChunkId) *bytes.Reader { return bytes.NewReader(value) } -// TODO: use atoi for chunkid +// TODO: use atoi for chunkid ? func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) { for i, v := range versions { p := filepath.Join(v, chunksName) @@ -329,41 +365,58 @@ func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) { close(chunks) } +func (r *Repo) loadHashes(versions []string) { + for i, v := range versions { + path := filepath.Join(v, hashesName) + file, err := os.Open(path) + if err == nil { + decoder := gob.NewDecoder(file) + for j := 0; err == nil; j++ { + var h chunkHashes + if err = decoder.Decode(&h); err == nil { + id := &ChunkId{i, uint64(j)} + r.fingerprints[h.Fp] = id + r.sketches.Set(h.Sk, id) + } + } + } + if err != nil && err != io.EOF { + logger.Panic(err) + } + if err = file.Close(); err != nil { + logger.Panic(err) + } + } +} + func (r *Repo) chunkMinLen() int { return sketch.SuperFeatureSize(r.chunkSize, r.sketchSfCount, r.sketchFCount) } // hashChunks calculates the hashes for a channel of chunks. -// // For each chunk, both a fingerprint (hash over the full content) and a sketch // (resemblance hash based on maximal values of regions) are calculated and // stored in an hashmap. func (r *Repo) hashChunks(chunks <-chan IdentifiedChunk) { for c := range chunks { - r.hashAndStoreChunk(c.GetId(), c.Reader()) + r.hashChunk(c.GetId(), c.Reader()) } } -func (r *Repo) hashAndStoreChunk(id *ChunkId, reader io.Reader) { +// hashChunk calculates the hashes for a chunk and store them in th repo hashmaps. +func (r *Repo) hashChunk(id *ChunkId, reader io.Reader) (fp uint64, sk []uint64) { var buffSk bytes.Buffer var buffFp bytes.Buffer var wg sync.WaitGroup reader = io.TeeReader(reader, &buffSk) io.Copy(&buffFp, reader) - var fp uint64 - var sk []uint64 wg.Add(2) go r.makeFingerprint(id, &buffFp, &wg, &fp) go r.makeSketch(id, &buffSk, &wg, &sk) wg.Wait() r.fingerprints[fp] = id - for _, s := range sk { - prev := r.sketches[s] - if contains(prev, id) { - continue - } - r.sketches[s] = append(prev, id) - } + r.sketches.Set(sk, id) + return } func (r *Repo) makeFingerprint(id *ChunkId, reader io.Reader, wg *sync.WaitGroup, ret *uint64) { @@ -429,7 +482,7 @@ func (r *Repo) tryDeltaEncodeChunk(temp BufferedChunk) (Chunk, bool) { // encodeTempChunk first tries to delta-encode the given chunk before attributing // it an Id and saving it into the fingerprints and sketches maps. -func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (chunk Chunk, isDelta bool) { +func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64, storeQueue chan<- chunkData) (chunk Chunk, isDelta bool) { chunk, isDelta = r.tryDeltaEncodeChunk(temp) if isDelta { logger.Info("add new delta chunk") @@ -438,11 +491,13 @@ func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (c if chunk.Len() == r.chunkSize { id := &ChunkId{Ver: version, Idx: *last} *last++ - r.hashAndStoreChunk(id, temp.Reader()) - err := r.StoreChunkContent(id, temp.Reader()) - if err != nil { - logger.Error(err) + fp, sk := r.hashChunk(id, temp.Reader()) + storeQueue <- chunkData{ + hashes: chunkHashes{fp, sk}, + content: temp.Bytes(), + id: id, } + r.chunkCache.Set(id, temp.Bytes()) logger.Info("add new chunk", id) return NewStoredChunk(r, id), false } @@ -454,20 +509,21 @@ func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (c // Temporary chunks can be partial. If the current chunk is smaller than the size of a // super-feature and there exists a previous chunk, then both are merged before attempting // to delta-encode them. -func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version int, last *uint64) []Chunk { +func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version int, last *uint64, storeQueue chan<- chunkData) []Chunk { if reflect.ValueOf(prev).IsNil() { - c, _ := r.encodeTempChunk(curr, version, last) + c, _ := r.encodeTempChunk(curr, version, last, storeQueue) return []Chunk{c} } else if curr.Len() < r.chunkMinLen() { - c, success := r.encodeTempChunk(NewTempChunk(append(prev.Bytes(), curr.Bytes()...)), version, last) + tmp := NewTempChunk(append(prev.Bytes(), curr.Bytes()...)) + c, success := r.encodeTempChunk(tmp, version, last, storeQueue) if success { return []Chunk{c} } else { return []Chunk{prev, curr} } } else { - prevD, _ := r.encodeTempChunk(prev, version, last) - currD, _ := r.encodeTempChunk(curr, version, last) + prevD, _ := r.encodeTempChunk(prev, version, last, storeQueue) + currD, _ := r.encodeTempChunk(curr, version, last, storeQueue) return []Chunk{prevD, currD} } } @@ -490,6 +546,9 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { } hasher := rabinkarp64.NewFromPol(r.pol) hasher.Write(buff) + storeQueue := make(chan chunkData, 10) + storeEnd := make(chan bool) + go r.storageWorker(version, storeQueue, storeEnd) for err != io.EOF { h := hasher.Sum64() chunkId, exists := r.fingerprints[h] @@ -497,10 +556,10 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { if len(buff) > r.chunkSize && len(buff) < r.chunkSize*2 { size := len(buff) - r.chunkSize temp := NewTempChunk(buff[:size]) - chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last)...) + chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...) prev = nil } else if prev != nil { - c, _ := r.encodeTempChunk(prev, version, &last) + c, _ := r.encodeTempChunk(prev, version, &last, storeQueue) chunks = append(chunks, c) prev = nil } @@ -518,7 +577,7 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { } if len(buff) == r.chunkSize*2 { if prev != nil { - chunk, _ := r.encodeTempChunk(prev, version, &last) + chunk, _ := r.encodeTempChunk(prev, version, &last, storeQueue) chunks = append(chunks, chunk) } prev = NewTempChunk(buff[:r.chunkSize]) @@ -536,7 +595,7 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { var temp *TempChunk if len(buff) > r.chunkSize { if prev != nil { - chunk, _ := r.encodeTempChunk(prev, version, &last) + chunk, _ := r.encodeTempChunk(prev, version, &last, storeQueue) chunks = append(chunks, chunk) } prev = NewTempChunk(buff[:r.chunkSize]) @@ -544,8 +603,10 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { } else { temp = NewTempChunk(buff) } - chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last)...) + chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...) } + close(storeQueue) + <-storeEnd return chunks } @@ -606,26 +667,6 @@ func loadRecipe(path string) []Chunk { return recipe } -func storeFingerprints(dest string, fingerprints FingerprintMap) { - storeBasicStruct(dest, fingerprints) -} - -func loadFingerprints(path string) FingerprintMap { - var fingerprints FingerprintMap - loadBasicStruct(path, &fingerprints) - return fingerprints -} - -func storeSketches(dest string, sketches SketchMap) { - storeBasicStruct(dest, sketches) -} - -func loadSketches(path string) SketchMap { - var sketches SketchMap - loadBasicStruct(path, &sketches) - return sketches -} - func extractDeltaChunks(chunks []Chunk) (ret []*DeltaChunk) { for _, c := range chunks { tmp, isDelta := c.(*DeltaChunk) diff --git a/repo_test.go b/repo_test.go index f6f6edd..cf88224 100644 --- a/repo_test.go +++ b/repo_test.go @@ -230,11 +230,13 @@ func TestBsdiff(t *testing.T) { // Load previously stored chunks oldChunks := make(chan IdentifiedChunk, 16) versions := repo.loadVersions() - newVersion := len(versions) go repo.loadChunks(versions, oldChunks) repo.hashChunks(oldChunks) // Read new data + newVersion := len(versions) + newPath := filepath.Join(repo.path, fmt.Sprintf(versionFmt, newVersion)) + os.MkdirAll(newPath, 0775) reader := getDataStream(dataDir, concatFiles) recipe := repo.matchStream(reader, newVersion) newChunks := extractDeltaChunks(recipe) @@ -341,10 +343,8 @@ func assertCompatibleRepoFile(t *testing.T, expected string, actual string, pref t.Fatal(prefix, "chunk do not match:", aRecipe[i], ", expected", eChunk) } } - } else if filepath.Base(expected) == sketchesName { - // TODO: check Sketches file - } else if filepath.Base(expected) == fingerprintsName { - // TODO: check Fingerprints file + } else if filepath.Base(expected) == hashesName { + // TODO: check Hashes file } else { // Chunk content file assertSameFile(t, expected, actual, prefix) diff --git a/testdata/repo_8k/00000/fingerprints b/testdata/repo_8k/00000/fingerprints Binary files differdeleted file mode 100644 index d015949..0000000 --- a/testdata/repo_8k/00000/fingerprints +++ /dev/null diff --git a/testdata/repo_8k/00000/hashes b/testdata/repo_8k/00000/hashes Binary files differnew file mode 100644 index 0000000..ec622b2 --- /dev/null +++ b/testdata/repo_8k/00000/hashes diff --git a/testdata/repo_8k/00000/sketches b/testdata/repo_8k/00000/sketches Binary files differdeleted file mode 100644 index 83572df..0000000 --- a/testdata/repo_8k/00000/sketches +++ /dev/null diff --git a/testdata/repo_8k_zlib/00000/fingerprints b/testdata/repo_8k_zlib/00000/fingerprints Binary files differdeleted file mode 100644 index d015949..0000000 --- a/testdata/repo_8k_zlib/00000/fingerprints +++ /dev/null diff --git a/testdata/repo_8k_zlib/00000/hashes b/testdata/repo_8k_zlib/00000/hashes Binary files differnew file mode 100644 index 0000000..ec622b2 --- /dev/null +++ b/testdata/repo_8k_zlib/00000/hashes diff --git a/testdata/repo_8k_zlib/00000/sketches b/testdata/repo_8k_zlib/00000/sketches Binary files differdeleted file mode 100644 index 83572df..0000000 --- a/testdata/repo_8k_zlib/00000/sketches +++ /dev/null |