diff options
author | n-peugnet <n.peugnet@free.fr> | 2021-09-14 19:08:56 +0200 |
---|---|---|
committer | n-peugnet <n.peugnet@free.fr> | 2021-09-14 19:44:42 +0200 |
commit | f21468b03329a3751a50eb829e07271d23ce4720 (patch) | |
tree | daca8e0a5d5bc983ccda602665735141cfa10399 /repo.go | |
parent | 20732336becb322729838a0283a4d1391f23de28 (diff) | |
download | dna-backup-f21468b03329a3751a50eb829e07271d23ce4720.tar.gz dna-backup-f21468b03329a3751a50eb829e07271d23ce4720.zip |
perf: move hash parallelism from sketch to repo
Diffstat (limited to 'repo.go')
-rw-r--r-- | repo.go | 54 |
1 files changed, 32 insertions, 22 deletions
@@ -33,13 +33,13 @@ import ( "bytes" "encoding/gob" "fmt" - "hash" "io" "io/fs" "os" "path/filepath" "reflect" "strings" + "sync" "github.com/chmduquesne/rollinghash/rabinkarp64" "github.com/n-peugnet/dna-backup/cache" @@ -93,7 +93,7 @@ func NewRepo(path string) *Repo { patcher: &Bsdiff{}, fingerprints: make(FingerprintMap), sketches: make(SketchMap), - chunkCache: cache.NewFifoCache(1000), + chunkCache: cache.NewFifoCache(10000), chunkReadWrapper: utils.ZlibReader, chunkWriteWrapper: utils.ZlibWriter, } @@ -339,25 +339,25 @@ func (r *Repo) chunkMinLen() int { // (resemblance hash based on maximal values of regions) are calculated and // stored in an hashmap. func (r *Repo) hashChunks(chunks <-chan IdentifiedChunk) { - hasher := rabinkarp64.NewFromPol(r.pol) for c := range chunks { - r.hashAndStoreChunk(c.GetId(), c.Reader(), hasher) - } -} - -func (r *Repo) hashAndStoreChunk(id *ChunkId, reader io.Reader, hasher hash.Hash64) { - var chunk bytes.Buffer - hasher.Reset() - reader = io.TeeReader(reader, &chunk) - io.Copy(hasher, reader) - fingerprint := hasher.Sum64() - sketch, _ := sketch.SketchChunk(&chunk, r.pol, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount) - r.storeChunkId(id, fingerprint, sketch) -} - -func (r *Repo) storeChunkId(id *ChunkId, fingerprint uint64, sketch []uint64) { - r.fingerprints[fingerprint] = id - for _, s := range sketch { + r.hashAndStoreChunk(c.GetId(), c.Reader()) + } +} + +func (r *Repo) hashAndStoreChunk(id *ChunkId, reader io.Reader) { + var buffSk bytes.Buffer + var buffFp bytes.Buffer + var wg sync.WaitGroup + reader = io.TeeReader(reader, &buffSk) + io.Copy(&buffFp, reader) + var fp uint64 + var sk []uint64 + wg.Add(2) + go r.makeFingerprint(id, &buffFp, &wg, &fp) + go r.makeSketch(id, &buffSk, &wg, &sk) + wg.Wait() + r.fingerprints[fp] = id + for _, s := range sk { prev := r.sketches[s] if contains(prev, id) { continue @@ -366,6 +366,17 @@ func (r *Repo) storeChunkId(id *ChunkId, fingerprint uint64, sketch []uint64) { } } +func (r *Repo) makeFingerprint(id *ChunkId, reader io.Reader, wg *sync.WaitGroup, ret *uint64) { + defer wg.Done() + hasher := rabinkarp64.NewFromPol(r.pol) + io.Copy(hasher, reader) + *ret = hasher.Sum64() +} + +func (r *Repo) makeSketch(id *ChunkId, reader io.Reader, wg *sync.WaitGroup, ret *[]uint64) { + defer wg.Done() + *ret, _ = sketch.SketchChunk(reader, r.pol, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount) +} func contains(s []*ChunkId, id *ChunkId) bool { for _, v := range s { if v == id { @@ -427,8 +438,7 @@ func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (c if chunk.Len() == r.chunkSize { id := &ChunkId{Ver: version, Idx: *last} *last++ - hasher := rabinkarp64.NewFromPol(r.pol) - r.hashAndStoreChunk(id, temp.Reader(), hasher) + r.hashAndStoreChunk(id, temp.Reader()) err := r.StoreChunkContent(id, temp.Reader()) if err != nil { logger.Error(err) |