aboutsummaryrefslogtreecommitdiff
path: root/repo.go
diff options
context:
space:
mode:
authorn-peugnet <n.peugnet@free.fr>2021-09-14 19:08:56 +0200
committern-peugnet <n.peugnet@free.fr>2021-09-14 19:44:42 +0200
commitf21468b03329a3751a50eb829e07271d23ce4720 (patch)
treedaca8e0a5d5bc983ccda602665735141cfa10399 /repo.go
parent20732336becb322729838a0283a4d1391f23de28 (diff)
downloaddna-backup-f21468b03329a3751a50eb829e07271d23ce4720.tar.gz
dna-backup-f21468b03329a3751a50eb829e07271d23ce4720.zip
perf: move hash parallelism from sketch to repo
Diffstat (limited to 'repo.go')
-rw-r--r--repo.go54
1 files changed, 32 insertions, 22 deletions
diff --git a/repo.go b/repo.go
index cd13588..4aa8cfc 100644
--- a/repo.go
+++ b/repo.go
@@ -33,13 +33,13 @@ import (
"bytes"
"encoding/gob"
"fmt"
- "hash"
"io"
"io/fs"
"os"
"path/filepath"
"reflect"
"strings"
+ "sync"
"github.com/chmduquesne/rollinghash/rabinkarp64"
"github.com/n-peugnet/dna-backup/cache"
@@ -93,7 +93,7 @@ func NewRepo(path string) *Repo {
patcher: &Bsdiff{},
fingerprints: make(FingerprintMap),
sketches: make(SketchMap),
- chunkCache: cache.NewFifoCache(1000),
+ chunkCache: cache.NewFifoCache(10000),
chunkReadWrapper: utils.ZlibReader,
chunkWriteWrapper: utils.ZlibWriter,
}
@@ -339,25 +339,25 @@ func (r *Repo) chunkMinLen() int {
// (resemblance hash based on maximal values of regions) are calculated and
// stored in an hashmap.
func (r *Repo) hashChunks(chunks <-chan IdentifiedChunk) {
- hasher := rabinkarp64.NewFromPol(r.pol)
for c := range chunks {
- r.hashAndStoreChunk(c.GetId(), c.Reader(), hasher)
- }
-}
-
-func (r *Repo) hashAndStoreChunk(id *ChunkId, reader io.Reader, hasher hash.Hash64) {
- var chunk bytes.Buffer
- hasher.Reset()
- reader = io.TeeReader(reader, &chunk)
- io.Copy(hasher, reader)
- fingerprint := hasher.Sum64()
- sketch, _ := sketch.SketchChunk(&chunk, r.pol, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount)
- r.storeChunkId(id, fingerprint, sketch)
-}
-
-func (r *Repo) storeChunkId(id *ChunkId, fingerprint uint64, sketch []uint64) {
- r.fingerprints[fingerprint] = id
- for _, s := range sketch {
+ r.hashAndStoreChunk(c.GetId(), c.Reader())
+ }
+}
+
+func (r *Repo) hashAndStoreChunk(id *ChunkId, reader io.Reader) {
+ var buffSk bytes.Buffer
+ var buffFp bytes.Buffer
+ var wg sync.WaitGroup
+ reader = io.TeeReader(reader, &buffSk)
+ io.Copy(&buffFp, reader)
+ var fp uint64
+ var sk []uint64
+ wg.Add(2)
+ go r.makeFingerprint(id, &buffFp, &wg, &fp)
+ go r.makeSketch(id, &buffSk, &wg, &sk)
+ wg.Wait()
+ r.fingerprints[fp] = id
+ for _, s := range sk {
prev := r.sketches[s]
if contains(prev, id) {
continue
@@ -366,6 +366,17 @@ func (r *Repo) storeChunkId(id *ChunkId, fingerprint uint64, sketch []uint64) {
}
}
+func (r *Repo) makeFingerprint(id *ChunkId, reader io.Reader, wg *sync.WaitGroup, ret *uint64) {
+ defer wg.Done()
+ hasher := rabinkarp64.NewFromPol(r.pol)
+ io.Copy(hasher, reader)
+ *ret = hasher.Sum64()
+}
+
+func (r *Repo) makeSketch(id *ChunkId, reader io.Reader, wg *sync.WaitGroup, ret *[]uint64) {
+ defer wg.Done()
+ *ret, _ = sketch.SketchChunk(reader, r.pol, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount)
+}
func contains(s []*ChunkId, id *ChunkId) bool {
for _, v := range s {
if v == id {
@@ -427,8 +438,7 @@ func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (c
if chunk.Len() == r.chunkSize {
id := &ChunkId{Ver: version, Idx: *last}
*last++
- hasher := rabinkarp64.NewFromPol(r.pol)
- r.hashAndStoreChunk(id, temp.Reader(), hasher)
+ r.hashAndStoreChunk(id, temp.Reader())
err := r.StoreChunkContent(id, temp.Reader())
if err != nil {
logger.Error(err)