diff options
-rw-r--r-- | TODO.md | 2 | ||||
-rw-r--r-- | docs/note-2021-09-14.md | 17 | ||||
-rw-r--r-- | repo.go | 54 | ||||
-rw-r--r-- | sketch/sketch.go | 9 |
4 files changed, 53 insertions, 29 deletions
@@ -33,6 +33,8 @@ priority 2 - [x] remove `LoadedChunk` and only use `StoredChunk` instead now that the cache is implemented - [ ] store file list compressed +- [ ] keep hash workers so that they reuse the same hasher and reset it instead + of creating a new one each time. This could save some processing time reunion 7/09 ------------ diff --git a/docs/note-2021-09-14.md b/docs/note-2021-09-14.md new file mode 100644 index 0000000..14fb973 --- /dev/null +++ b/docs/note-2021-09-14.md @@ -0,0 +1,17 @@ +Perf improvements with concurent hash calculation +================================================= + +Using the source code dataset here are the new times: + +`19:38:46.745` -> `19:41:56.652` = `00:03:09,907` + +But this time I also had the good idea to close all my processes and to use +a tmp directory for writing. + +With the same setup, the previous perf was: + +`19:26:05.954` -> `19:29:20.805` = `00:03:14,851` + +So not that big of an improvement but it seems that at the same time CPU usage +has decreased a bit. Maybe because less synchronisation calls were made ? + @@ -33,13 +33,13 @@ import ( "bytes" "encoding/gob" "fmt" - "hash" "io" "io/fs" "os" "path/filepath" "reflect" "strings" + "sync" "github.com/chmduquesne/rollinghash/rabinkarp64" "github.com/n-peugnet/dna-backup/cache" @@ -93,7 +93,7 @@ func NewRepo(path string) *Repo { patcher: &Bsdiff{}, fingerprints: make(FingerprintMap), sketches: make(SketchMap), - chunkCache: cache.NewFifoCache(1000), + chunkCache: cache.NewFifoCache(10000), chunkReadWrapper: utils.ZlibReader, chunkWriteWrapper: utils.ZlibWriter, } @@ -339,25 +339,25 @@ func (r *Repo) chunkMinLen() int { // (resemblance hash based on maximal values of regions) are calculated and // stored in an hashmap. func (r *Repo) hashChunks(chunks <-chan IdentifiedChunk) { - hasher := rabinkarp64.NewFromPol(r.pol) for c := range chunks { - r.hashAndStoreChunk(c.GetId(), c.Reader(), hasher) - } -} - -func (r *Repo) hashAndStoreChunk(id *ChunkId, reader io.Reader, hasher hash.Hash64) { - var chunk bytes.Buffer - hasher.Reset() - reader = io.TeeReader(reader, &chunk) - io.Copy(hasher, reader) - fingerprint := hasher.Sum64() - sketch, _ := sketch.SketchChunk(&chunk, r.pol, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount) - r.storeChunkId(id, fingerprint, sketch) -} - -func (r *Repo) storeChunkId(id *ChunkId, fingerprint uint64, sketch []uint64) { - r.fingerprints[fingerprint] = id - for _, s := range sketch { + r.hashAndStoreChunk(c.GetId(), c.Reader()) + } +} + +func (r *Repo) hashAndStoreChunk(id *ChunkId, reader io.Reader) { + var buffSk bytes.Buffer + var buffFp bytes.Buffer + var wg sync.WaitGroup + reader = io.TeeReader(reader, &buffSk) + io.Copy(&buffFp, reader) + var fp uint64 + var sk []uint64 + wg.Add(2) + go r.makeFingerprint(id, &buffFp, &wg, &fp) + go r.makeSketch(id, &buffSk, &wg, &sk) + wg.Wait() + r.fingerprints[fp] = id + for _, s := range sk { prev := r.sketches[s] if contains(prev, id) { continue @@ -366,6 +366,17 @@ func (r *Repo) storeChunkId(id *ChunkId, fingerprint uint64, sketch []uint64) { } } +func (r *Repo) makeFingerprint(id *ChunkId, reader io.Reader, wg *sync.WaitGroup, ret *uint64) { + defer wg.Done() + hasher := rabinkarp64.NewFromPol(r.pol) + io.Copy(hasher, reader) + *ret = hasher.Sum64() +} + +func (r *Repo) makeSketch(id *ChunkId, reader io.Reader, wg *sync.WaitGroup, ret *[]uint64) { + defer wg.Done() + *ret, _ = sketch.SketchChunk(reader, r.pol, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount) +} func contains(s []*ChunkId, id *ChunkId) bool { for _, v := range s { if v == id { @@ -427,8 +438,7 @@ func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (c if chunk.Len() == r.chunkSize { id := &ChunkId{Ver: version, Idx: *last} *last++ - hasher := rabinkarp64.NewFromPol(r.pol) - r.hashAndStoreChunk(id, temp.Reader(), hasher) + r.hashAndStoreChunk(id, temp.Reader()) err := r.StoreChunkContent(id, temp.Reader()) if err != nil { logger.Error(err) diff --git a/sketch/sketch.go b/sketch/sketch.go index ca8c238..7870c3d 100644 --- a/sketch/sketch.go +++ b/sketch/sketch.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/binary" "io" - "sync" "github.com/chmduquesne/rollinghash/rabinkarp64" "github.com/n-peugnet/dna-backup/logger" @@ -23,7 +22,6 @@ const fBytes = 8 // sfCount: the number of super-features, and fCount: the number of feature // per super-feature func SketchChunk(r io.Reader, pol rabinkarp64.Pol, chunkSize int, wSize int, sfCount int, fCount int) (Sketch, error) { - var wg sync.WaitGroup var fSize = FeatureSize(chunkSize, sfCount, fCount) var chunk bytes.Buffer superfeatures := make([]uint64, 0, sfCount) @@ -41,11 +39,9 @@ func SketchChunk(r io.Reader, pol rabinkarp64.Pol, chunkSize int, wSize int, sfC continue } features = append(features, 0) - wg.Add(1) - go calcFeature(&wg, pol, &fBuff, wSize, fSize, &features[f]) + calcFeature(pol, &fBuff, wSize, fSize, &features[f]) } hasher := rabinkarp64.NewFromPol(pol) - wg.Wait() for sf := 0; sf < len(features)/fCount; sf++ { for i := 0; i < fCount; i++ { binary.LittleEndian.PutUint64(sfBuff[i*fBytes:(i+1)*fBytes], features[i+sf*fCount]) @@ -57,8 +53,7 @@ func SketchChunk(r io.Reader, pol rabinkarp64.Pol, chunkSize int, wSize int, sfC return superfeatures, nil } -func calcFeature(wg *sync.WaitGroup, p rabinkarp64.Pol, r ReadByteReader, wSize int, fSize int, result *uint64) { - defer wg.Done() +func calcFeature(p rabinkarp64.Pol, r ReadByteReader, wSize int, fSize int, result *uint64) { hasher := rabinkarp64.NewFromPol(p) n, err := io.CopyN(hasher, r, int64(wSize)) if err != nil { |