aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorn-peugnet <n.peugnet@free.fr>2021-09-06 16:37:50 +0200
committern-peugnet <n.peugnet@free.fr>2021-09-06 16:37:50 +0200
commitb31f22ff61b69a0cb3e0a9d7955133b5304f4ff7 (patch)
treef188b6f15f54457ecc03089bb74385c27fcc8593
parentd9703a9daa05f30e77bb99393eab8a0d40e788e4 (diff)
downloaddna-backup-b31f22ff61b69a0cb3e0a9d7955133b5304f4ff7.tar.gz
dna-backup-b31f22ff61b69a0cb3e0a9d7955133b5304f4ff7.zip
calculate features concurently
-rw-r--r--chunk.go22
-rw-r--r--sketch.go53
2 files changed, 45 insertions, 30 deletions
diff --git a/chunk.go b/chunk.go
index 6c76d5e..d9833ea 100644
--- a/chunk.go
+++ b/chunk.go
@@ -1,7 +1,6 @@
package main
import (
- "bufio"
"bytes"
"fmt"
"io"
@@ -10,13 +9,8 @@ import (
"path"
)
-type ChunkReader interface {
- io.Reader
- io.ByteReader
-}
-
type Chunk interface {
- Reader() ChunkReader
+ Reader() io.ReadSeeker
Len() int
}
@@ -39,13 +33,13 @@ func (i *ChunkId) Path(repo string) string {
return path.Join(repo, fmt.Sprintf(versionFmt, i.Ver), chunksName, fmt.Sprintf(chunkIdFmt, i.Idx))
}
-func (i *ChunkId) Reader(repo *Repo) ChunkReader {
+func (i *ChunkId) Reader(repo *Repo) io.ReadSeeker {
path := i.Path(repo.path)
f, err := os.Open(path)
if err != nil {
log.Println("Cannot open chunk: ", path)
}
- return bufio.NewReaderSize(f, repo.chunkSize)
+ return f
}
func NewLoadedChunk(id *ChunkId, value []byte) *LoadedChunk {
@@ -61,7 +55,7 @@ func (c *LoadedChunk) Id() *ChunkId {
return c.id
}
-func (c *LoadedChunk) Reader() ChunkReader {
+func (c *LoadedChunk) Reader() io.ReadSeeker {
// log.Printf("Chunk %d: Reading from in-memory value\n", c.id)
return bytes.NewReader(c.value)
}
@@ -87,7 +81,7 @@ func (c *StoredChunk) Id() *ChunkId {
return c.id
}
-func (c *StoredChunk) Reader() ChunkReader {
+func (c *StoredChunk) Reader() io.ReadSeeker {
// log.Printf("Chunk %d: Reading from file\n", c.id)
return c.id.Reader(c.repo)
}
@@ -109,7 +103,7 @@ type TempChunk struct {
value []byte
}
-func (c *TempChunk) Reader() ChunkReader {
+func (c *TempChunk) Reader() io.ReadSeeker {
return bytes.NewReader(c.value)
}
@@ -136,10 +130,10 @@ type DeltaChunk struct {
size int
}
-func (c *DeltaChunk) Reader() ChunkReader {
+func (c *DeltaChunk) Reader() io.ReadSeeker {
var buff bytes.Buffer
c.repo.Patcher().Patch(c.source.Reader(c.repo), &buff, bytes.NewReader(c.patch))
- return &buff
+ return bytes.NewReader(buff.Bytes())
}
// TODO: Maybe return the size of the patch instead ?
diff --git a/sketch.go b/sketch.go
index 295a11b..693cacf 100644
--- a/sketch.go
+++ b/sketch.go
@@ -1,55 +1,76 @@
package main
import (
+ "bytes"
"encoding/binary"
"io"
"log"
+ "sync"
"github.com/chmduquesne/rollinghash/rabinkarp64"
)
type Sketch []uint64
+type ReadByteReader interface {
+ io.Reader
+ io.ByteReader
+}
+
const fBytes = 8
// SketchChunk produces a sketch for a chunk based on wSize: the window size,
// sfCount: the number of super-features, and fCount: the number of feature
// per super-feature
func SketchChunk(chunk Chunk, chunkSize int, wSize int, sfCount int, fCount int) (Sketch, error) {
+ var wg sync.WaitGroup
var fSize = FeatureSize(chunkSize, sfCount, fCount)
superfeatures := make([]uint64, 0, sfCount)
features := make([]uint64, 0, fCount*sfCount)
- buff := make([]byte, fBytes*fCount)
+ sfBuff := make([]byte, fBytes*fCount)
r := chunk.Reader()
- hasher := rabinkarp64.New()
for f := 0; f < chunk.Len()/fSize; f++ {
- hasher.Reset()
- n, err := io.CopyN(hasher, r, int64(wSize))
+ var fBuff bytes.Buffer
+ n, err := io.CopyN(&fBuff, r, int64(fSize))
if err != nil {
log.Println(n, err)
}
- max := hasher.Sum64()
- for w := 0; w < fSize-wSize; w++ {
- b, _ := r.ReadByte()
- hasher.Roll(b)
- h := hasher.Sum64()
- if h > max {
- max = h
- }
- }
- features = append(features, max)
+ features = append(features, 0)
+ wg.Add(1)
+ go calcFeature(&wg, &fBuff, wSize, fSize, &features[f])
}
+ hasher := rabinkarp64.New()
+ wg.Wait()
for sf := 0; sf < len(features)/fCount; sf++ {
for i := 0; i < fCount; i++ {
- binary.LittleEndian.PutUint64(buff[i*fBytes:(i+1)*fBytes], features[i+sf*fCount])
+ binary.LittleEndian.PutUint64(sfBuff[i*fBytes:(i+1)*fBytes], features[i+sf*fCount])
}
hasher.Reset()
- hasher.Write(buff)
+ hasher.Write(sfBuff)
superfeatures = append(superfeatures, hasher.Sum64())
}
return superfeatures, nil
}
+func calcFeature(wg *sync.WaitGroup, r ReadByteReader, wSize int, fSize int, result *uint64) {
+ defer wg.Done()
+ hasher := rabinkarp64.New()
+ n, err := io.CopyN(hasher, r, int64(wSize))
+ if err != nil {
+ log.Println(n, err)
+ }
+ max := hasher.Sum64()
+ for w := 0; w < fSize-wSize; w++ {
+ b, _ := r.ReadByte()
+ hasher.Roll(b)
+ h := hasher.Sum64()
+ if h > max {
+ max = h
+ }
+ }
+ *result = max
+}
+
func SuperFeatureSize(chunkSize int, sfCount int, fCount int) int {
return FeatureSize(chunkSize, sfCount, fCount) * sfCount
}