diff options
author | n-peugnet <n.peugnet@free.fr> | 2021-09-06 16:37:50 +0200 |
---|---|---|
committer | n-peugnet <n.peugnet@free.fr> | 2021-09-06 16:37:50 +0200 |
commit | b31f22ff61b69a0cb3e0a9d7955133b5304f4ff7 (patch) | |
tree | f188b6f15f54457ecc03089bb74385c27fcc8593 | |
parent | d9703a9daa05f30e77bb99393eab8a0d40e788e4 (diff) | |
download | dna-backup-b31f22ff61b69a0cb3e0a9d7955133b5304f4ff7.tar.gz dna-backup-b31f22ff61b69a0cb3e0a9d7955133b5304f4ff7.zip |
calculate features concurently
-rw-r--r-- | chunk.go | 22 | ||||
-rw-r--r-- | sketch.go | 53 |
2 files changed, 45 insertions, 30 deletions
@@ -1,7 +1,6 @@ package main import ( - "bufio" "bytes" "fmt" "io" @@ -10,13 +9,8 @@ import ( "path" ) -type ChunkReader interface { - io.Reader - io.ByteReader -} - type Chunk interface { - Reader() ChunkReader + Reader() io.ReadSeeker Len() int } @@ -39,13 +33,13 @@ func (i *ChunkId) Path(repo string) string { return path.Join(repo, fmt.Sprintf(versionFmt, i.Ver), chunksName, fmt.Sprintf(chunkIdFmt, i.Idx)) } -func (i *ChunkId) Reader(repo *Repo) ChunkReader { +func (i *ChunkId) Reader(repo *Repo) io.ReadSeeker { path := i.Path(repo.path) f, err := os.Open(path) if err != nil { log.Println("Cannot open chunk: ", path) } - return bufio.NewReaderSize(f, repo.chunkSize) + return f } func NewLoadedChunk(id *ChunkId, value []byte) *LoadedChunk { @@ -61,7 +55,7 @@ func (c *LoadedChunk) Id() *ChunkId { return c.id } -func (c *LoadedChunk) Reader() ChunkReader { +func (c *LoadedChunk) Reader() io.ReadSeeker { // log.Printf("Chunk %d: Reading from in-memory value\n", c.id) return bytes.NewReader(c.value) } @@ -87,7 +81,7 @@ func (c *StoredChunk) Id() *ChunkId { return c.id } -func (c *StoredChunk) Reader() ChunkReader { +func (c *StoredChunk) Reader() io.ReadSeeker { // log.Printf("Chunk %d: Reading from file\n", c.id) return c.id.Reader(c.repo) } @@ -109,7 +103,7 @@ type TempChunk struct { value []byte } -func (c *TempChunk) Reader() ChunkReader { +func (c *TempChunk) Reader() io.ReadSeeker { return bytes.NewReader(c.value) } @@ -136,10 +130,10 @@ type DeltaChunk struct { size int } -func (c *DeltaChunk) Reader() ChunkReader { +func (c *DeltaChunk) Reader() io.ReadSeeker { var buff bytes.Buffer c.repo.Patcher().Patch(c.source.Reader(c.repo), &buff, bytes.NewReader(c.patch)) - return &buff + return bytes.NewReader(buff.Bytes()) } // TODO: Maybe return the size of the patch instead ? @@ -1,55 +1,76 @@ package main import ( + "bytes" "encoding/binary" "io" "log" + "sync" "github.com/chmduquesne/rollinghash/rabinkarp64" ) type Sketch []uint64 +type ReadByteReader interface { + io.Reader + io.ByteReader +} + const fBytes = 8 // SketchChunk produces a sketch for a chunk based on wSize: the window size, // sfCount: the number of super-features, and fCount: the number of feature // per super-feature func SketchChunk(chunk Chunk, chunkSize int, wSize int, sfCount int, fCount int) (Sketch, error) { + var wg sync.WaitGroup var fSize = FeatureSize(chunkSize, sfCount, fCount) superfeatures := make([]uint64, 0, sfCount) features := make([]uint64, 0, fCount*sfCount) - buff := make([]byte, fBytes*fCount) + sfBuff := make([]byte, fBytes*fCount) r := chunk.Reader() - hasher := rabinkarp64.New() for f := 0; f < chunk.Len()/fSize; f++ { - hasher.Reset() - n, err := io.CopyN(hasher, r, int64(wSize)) + var fBuff bytes.Buffer + n, err := io.CopyN(&fBuff, r, int64(fSize)) if err != nil { log.Println(n, err) } - max := hasher.Sum64() - for w := 0; w < fSize-wSize; w++ { - b, _ := r.ReadByte() - hasher.Roll(b) - h := hasher.Sum64() - if h > max { - max = h - } - } - features = append(features, max) + features = append(features, 0) + wg.Add(1) + go calcFeature(&wg, &fBuff, wSize, fSize, &features[f]) } + hasher := rabinkarp64.New() + wg.Wait() for sf := 0; sf < len(features)/fCount; sf++ { for i := 0; i < fCount; i++ { - binary.LittleEndian.PutUint64(buff[i*fBytes:(i+1)*fBytes], features[i+sf*fCount]) + binary.LittleEndian.PutUint64(sfBuff[i*fBytes:(i+1)*fBytes], features[i+sf*fCount]) } hasher.Reset() - hasher.Write(buff) + hasher.Write(sfBuff) superfeatures = append(superfeatures, hasher.Sum64()) } return superfeatures, nil } +func calcFeature(wg *sync.WaitGroup, r ReadByteReader, wSize int, fSize int, result *uint64) { + defer wg.Done() + hasher := rabinkarp64.New() + n, err := io.CopyN(hasher, r, int64(wSize)) + if err != nil { + log.Println(n, err) + } + max := hasher.Sum64() + for w := 0; w < fSize-wSize; w++ { + b, _ := r.ReadByte() + hasher.Roll(b) + h := hasher.Sum64() + if h > max { + max = h + } + } + *result = max +} + func SuperFeatureSize(chunkSize int, sfCount int, fCount int) int { return FeatureSize(chunkSize, sfCount, fCount) * sfCount } |