aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--TODO.md5
-rw-r--r--const.go13
-rw-r--r--docs/note-2021-09-15.md6
-rw-r--r--repo.go151
-rw-r--r--repo_test.go10
-rw-r--r--testdata/repo_8k/00000/fingerprintsbin208 -> 0 bytes
-rw-r--r--testdata/repo_8k/00000/hashesbin0 -> 570 bytes
-rw-r--r--testdata/repo_8k/00000/sketchesbin538 -> 0 bytes
-rw-r--r--testdata/repo_8k_zlib/00000/fingerprintsbin208 -> 0 bytes
-rw-r--r--testdata/repo_8k_zlib/00000/hashesbin0 -> 570 bytes
-rw-r--r--testdata/repo_8k_zlib/00000/sketchesbin538 -> 0 bytes
11 files changed, 117 insertions, 68 deletions
diff --git a/TODO.md b/TODO.md
index dc400a2..adf90a3 100644
--- a/TODO.md
+++ b/TODO.md
@@ -26,7 +26,10 @@ priority 2
- [ ] option to commit without deltas to save new base chunks
- [ ] custom binary marshal and unmarshal for chunks
- [x] use `loadChunkContent` in `loadChunks`
-- [ ] TODO: store hashes for faster maps rebuild
+- [x] save hashes for faster maps rebuild
+ - [x] store hashes for current version's chunks
+ - [x] load hashes for each version
+- [x] use store queue to asynchronously store `chunkData`
- [ ] try [Fdelta](https://github.com/amlwwalker/fdelta) and
[Xdelta](https://github.com/nine-lives-later/go-xdelta) instead of Bsdiff
- [ ] maybe use an LRU cache instead of the current FIFO one.
diff --git a/const.go b/const.go
index d5ec7f0..c40cb97 100644
--- a/const.go
+++ b/const.go
@@ -1,11 +1,10 @@
package main
const (
- chunksName = "chunks"
- chunkIdFmt = "%015d"
- versionFmt = "%05d"
- filesName = "files"
- fingerprintsName = "fingerprints"
- recipeName = "recipe"
- sketchesName = "sketches"
+ chunksName = "chunks"
+ chunkIdFmt = "%015d"
+ versionFmt = "%05d"
+ filesName = "files"
+ hashesName = "hashes"
+ recipeName = "recipe"
)
diff --git a/docs/note-2021-09-15.md b/docs/note-2021-09-15.md
new file mode 100644
index 0000000..8caf1cc
--- /dev/null
+++ b/docs/note-2021-09-15.md
@@ -0,0 +1,6 @@
+Added storage worker
+====================
+
+## Time
+
+`14:49:00.221` -> `14:51:34.009` = `00:02:33.788`
diff --git a/repo.go b/repo.go
index 4aa8cfc..126ce41 100644
--- a/repo.go
+++ b/repo.go
@@ -51,6 +51,16 @@ import (
type FingerprintMap map[uint64]*ChunkId
type SketchMap map[uint64][]*ChunkId
+func (m SketchMap) Set(key []uint64, value *ChunkId) {
+ for _, s := range key {
+ prev := m[s]
+ if contains(prev, value) {
+ continue
+ }
+ m[s] = append(prev, value)
+ }
+}
+
type Repo struct {
path string
chunkSize int
@@ -67,6 +77,17 @@ type Repo struct {
chunkWriteWrapper func(w io.Writer) io.WriteCloser
}
+type chunkHashes struct {
+ Fp uint64
+ Sk []uint64
+}
+
+type chunkData struct {
+ hashes chunkHashes
+ content []byte
+ id *ChunkId
+}
+
type File struct {
Path string
Size int64
@@ -114,22 +135,16 @@ func (r *Repo) Commit(source string) {
newPath := filepath.Join(r.path, fmt.Sprintf(versionFmt, newVersion))
newChunkPath := filepath.Join(newPath, chunksName)
newFilesPath := filepath.Join(newPath, filesName)
- newFingerprintsPath := filepath.Join(newPath, fingerprintsName)
newRecipePath := filepath.Join(newPath, recipeName)
- newSketchesPath := filepath.Join(newPath, sketchesName)
os.Mkdir(newPath, 0775) // TODO: handle errors
os.Mkdir(newChunkPath, 0775) // TODO: handle errors
reader, writer := io.Pipe()
- oldChunks := make(chan IdentifiedChunk, 16)
files := listFiles(source)
- go r.loadChunks(versions, oldChunks)
+ r.loadHashes(versions)
go concatFiles(files, writer)
- r.hashChunks(oldChunks)
recipe := r.matchStream(reader, newVersion)
storeFileList(newFilesPath, unprefixFiles(files, source))
- storeFingerprints(newFingerprintsPath, r.fingerprints)
storeRecipe(newRecipePath, recipe)
- storeSketches(newSketchesPath, r.sketches)
logger.Info(files)
}
@@ -263,6 +278,27 @@ func loadFileList(path string) []File {
return files
}
+func (r *Repo) storageWorker(version int, storeQueue <-chan chunkData, end chan<- bool) {
+ hashesFile := filepath.Join(r.path, fmt.Sprintf(versionFmt, version), hashesName)
+ file, err := os.Create(hashesFile)
+ if err != nil {
+ logger.Panic(err)
+ }
+ encoder := gob.NewEncoder(file)
+ for data := range storeQueue {
+ err = encoder.Encode(data.hashes)
+ err := r.StoreChunkContent(data.id, bytes.NewReader(data.content))
+ if err != nil {
+ logger.Error(err)
+ }
+ logger.Info("stored", data.id)
+ }
+ if err = file.Close(); err != nil {
+ logger.Panic(err)
+ }
+ end <- true
+}
+
func (r *Repo) StoreChunkContent(id *ChunkId, reader io.Reader) error {
path := id.Path(r.path)
file, err := os.Create(path)
@@ -309,7 +345,7 @@ func (r *Repo) LoadChunkContent(id *ChunkId) *bytes.Reader {
return bytes.NewReader(value)
}
-// TODO: use atoi for chunkid
+// TODO: use atoi for chunkid ?
func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) {
for i, v := range versions {
p := filepath.Join(v, chunksName)
@@ -329,41 +365,58 @@ func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) {
close(chunks)
}
+func (r *Repo) loadHashes(versions []string) {
+ for i, v := range versions {
+ path := filepath.Join(v, hashesName)
+ file, err := os.Open(path)
+ if err == nil {
+ decoder := gob.NewDecoder(file)
+ for j := 0; err == nil; j++ {
+ var h chunkHashes
+ if err = decoder.Decode(&h); err == nil {
+ id := &ChunkId{i, uint64(j)}
+ r.fingerprints[h.Fp] = id
+ r.sketches.Set(h.Sk, id)
+ }
+ }
+ }
+ if err != nil && err != io.EOF {
+ logger.Panic(err)
+ }
+ if err = file.Close(); err != nil {
+ logger.Panic(err)
+ }
+ }
+}
+
func (r *Repo) chunkMinLen() int {
return sketch.SuperFeatureSize(r.chunkSize, r.sketchSfCount, r.sketchFCount)
}
// hashChunks calculates the hashes for a channel of chunks.
-//
// For each chunk, both a fingerprint (hash over the full content) and a sketch
// (resemblance hash based on maximal values of regions) are calculated and
// stored in an hashmap.
func (r *Repo) hashChunks(chunks <-chan IdentifiedChunk) {
for c := range chunks {
- r.hashAndStoreChunk(c.GetId(), c.Reader())
+ r.hashChunk(c.GetId(), c.Reader())
}
}
-func (r *Repo) hashAndStoreChunk(id *ChunkId, reader io.Reader) {
+// hashChunk calculates the hashes for a chunk and store them in th repo hashmaps.
+func (r *Repo) hashChunk(id *ChunkId, reader io.Reader) (fp uint64, sk []uint64) {
var buffSk bytes.Buffer
var buffFp bytes.Buffer
var wg sync.WaitGroup
reader = io.TeeReader(reader, &buffSk)
io.Copy(&buffFp, reader)
- var fp uint64
- var sk []uint64
wg.Add(2)
go r.makeFingerprint(id, &buffFp, &wg, &fp)
go r.makeSketch(id, &buffSk, &wg, &sk)
wg.Wait()
r.fingerprints[fp] = id
- for _, s := range sk {
- prev := r.sketches[s]
- if contains(prev, id) {
- continue
- }
- r.sketches[s] = append(prev, id)
- }
+ r.sketches.Set(sk, id)
+ return
}
func (r *Repo) makeFingerprint(id *ChunkId, reader io.Reader, wg *sync.WaitGroup, ret *uint64) {
@@ -429,7 +482,7 @@ func (r *Repo) tryDeltaEncodeChunk(temp BufferedChunk) (Chunk, bool) {
// encodeTempChunk first tries to delta-encode the given chunk before attributing
// it an Id and saving it into the fingerprints and sketches maps.
-func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (chunk Chunk, isDelta bool) {
+func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64, storeQueue chan<- chunkData) (chunk Chunk, isDelta bool) {
chunk, isDelta = r.tryDeltaEncodeChunk(temp)
if isDelta {
logger.Info("add new delta chunk")
@@ -438,11 +491,13 @@ func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (c
if chunk.Len() == r.chunkSize {
id := &ChunkId{Ver: version, Idx: *last}
*last++
- r.hashAndStoreChunk(id, temp.Reader())
- err := r.StoreChunkContent(id, temp.Reader())
- if err != nil {
- logger.Error(err)
+ fp, sk := r.hashChunk(id, temp.Reader())
+ storeQueue <- chunkData{
+ hashes: chunkHashes{fp, sk},
+ content: temp.Bytes(),
+ id: id,
}
+ r.chunkCache.Set(id, temp.Bytes())
logger.Info("add new chunk", id)
return NewStoredChunk(r, id), false
}
@@ -454,20 +509,21 @@ func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (c
// Temporary chunks can be partial. If the current chunk is smaller than the size of a
// super-feature and there exists a previous chunk, then both are merged before attempting
// to delta-encode them.
-func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version int, last *uint64) []Chunk {
+func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version int, last *uint64, storeQueue chan<- chunkData) []Chunk {
if reflect.ValueOf(prev).IsNil() {
- c, _ := r.encodeTempChunk(curr, version, last)
+ c, _ := r.encodeTempChunk(curr, version, last, storeQueue)
return []Chunk{c}
} else if curr.Len() < r.chunkMinLen() {
- c, success := r.encodeTempChunk(NewTempChunk(append(prev.Bytes(), curr.Bytes()...)), version, last)
+ tmp := NewTempChunk(append(prev.Bytes(), curr.Bytes()...))
+ c, success := r.encodeTempChunk(tmp, version, last, storeQueue)
if success {
return []Chunk{c}
} else {
return []Chunk{prev, curr}
}
} else {
- prevD, _ := r.encodeTempChunk(prev, version, last)
- currD, _ := r.encodeTempChunk(curr, version, last)
+ prevD, _ := r.encodeTempChunk(prev, version, last, storeQueue)
+ currD, _ := r.encodeTempChunk(curr, version, last, storeQueue)
return []Chunk{prevD, currD}
}
}
@@ -490,6 +546,9 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
}
hasher := rabinkarp64.NewFromPol(r.pol)
hasher.Write(buff)
+ storeQueue := make(chan chunkData, 10)
+ storeEnd := make(chan bool)
+ go r.storageWorker(version, storeQueue, storeEnd)
for err != io.EOF {
h := hasher.Sum64()
chunkId, exists := r.fingerprints[h]
@@ -497,10 +556,10 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
if len(buff) > r.chunkSize && len(buff) < r.chunkSize*2 {
size := len(buff) - r.chunkSize
temp := NewTempChunk(buff[:size])
- chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last)...)
+ chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...)
prev = nil
} else if prev != nil {
- c, _ := r.encodeTempChunk(prev, version, &last)
+ c, _ := r.encodeTempChunk(prev, version, &last, storeQueue)
chunks = append(chunks, c)
prev = nil
}
@@ -518,7 +577,7 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
}
if len(buff) == r.chunkSize*2 {
if prev != nil {
- chunk, _ := r.encodeTempChunk(prev, version, &last)
+ chunk, _ := r.encodeTempChunk(prev, version, &last, storeQueue)
chunks = append(chunks, chunk)
}
prev = NewTempChunk(buff[:r.chunkSize])
@@ -536,7 +595,7 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
var temp *TempChunk
if len(buff) > r.chunkSize {
if prev != nil {
- chunk, _ := r.encodeTempChunk(prev, version, &last)
+ chunk, _ := r.encodeTempChunk(prev, version, &last, storeQueue)
chunks = append(chunks, chunk)
}
prev = NewTempChunk(buff[:r.chunkSize])
@@ -544,8 +603,10 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
} else {
temp = NewTempChunk(buff)
}
- chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last)...)
+ chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...)
}
+ close(storeQueue)
+ <-storeEnd
return chunks
}
@@ -606,26 +667,6 @@ func loadRecipe(path string) []Chunk {
return recipe
}
-func storeFingerprints(dest string, fingerprints FingerprintMap) {
- storeBasicStruct(dest, fingerprints)
-}
-
-func loadFingerprints(path string) FingerprintMap {
- var fingerprints FingerprintMap
- loadBasicStruct(path, &fingerprints)
- return fingerprints
-}
-
-func storeSketches(dest string, sketches SketchMap) {
- storeBasicStruct(dest, sketches)
-}
-
-func loadSketches(path string) SketchMap {
- var sketches SketchMap
- loadBasicStruct(path, &sketches)
- return sketches
-}
-
func extractDeltaChunks(chunks []Chunk) (ret []*DeltaChunk) {
for _, c := range chunks {
tmp, isDelta := c.(*DeltaChunk)
diff --git a/repo_test.go b/repo_test.go
index f6f6edd..cf88224 100644
--- a/repo_test.go
+++ b/repo_test.go
@@ -230,11 +230,13 @@ func TestBsdiff(t *testing.T) {
// Load previously stored chunks
oldChunks := make(chan IdentifiedChunk, 16)
versions := repo.loadVersions()
- newVersion := len(versions)
go repo.loadChunks(versions, oldChunks)
repo.hashChunks(oldChunks)
// Read new data
+ newVersion := len(versions)
+ newPath := filepath.Join(repo.path, fmt.Sprintf(versionFmt, newVersion))
+ os.MkdirAll(newPath, 0775)
reader := getDataStream(dataDir, concatFiles)
recipe := repo.matchStream(reader, newVersion)
newChunks := extractDeltaChunks(recipe)
@@ -341,10 +343,8 @@ func assertCompatibleRepoFile(t *testing.T, expected string, actual string, pref
t.Fatal(prefix, "chunk do not match:", aRecipe[i], ", expected", eChunk)
}
}
- } else if filepath.Base(expected) == sketchesName {
- // TODO: check Sketches file
- } else if filepath.Base(expected) == fingerprintsName {
- // TODO: check Fingerprints file
+ } else if filepath.Base(expected) == hashesName {
+ // TODO: check Hashes file
} else {
// Chunk content file
assertSameFile(t, expected, actual, prefix)
diff --git a/testdata/repo_8k/00000/fingerprints b/testdata/repo_8k/00000/fingerprints
deleted file mode 100644
index d015949..0000000
--- a/testdata/repo_8k/00000/fingerprints
+++ /dev/null
Binary files differ
diff --git a/testdata/repo_8k/00000/hashes b/testdata/repo_8k/00000/hashes
new file mode 100644
index 0000000..ec622b2
--- /dev/null
+++ b/testdata/repo_8k/00000/hashes
Binary files differ
diff --git a/testdata/repo_8k/00000/sketches b/testdata/repo_8k/00000/sketches
deleted file mode 100644
index 83572df..0000000
--- a/testdata/repo_8k/00000/sketches
+++ /dev/null
Binary files differ
diff --git a/testdata/repo_8k_zlib/00000/fingerprints b/testdata/repo_8k_zlib/00000/fingerprints
deleted file mode 100644
index d015949..0000000
--- a/testdata/repo_8k_zlib/00000/fingerprints
+++ /dev/null
Binary files differ
diff --git a/testdata/repo_8k_zlib/00000/hashes b/testdata/repo_8k_zlib/00000/hashes
new file mode 100644
index 0000000..ec622b2
--- /dev/null
+++ b/testdata/repo_8k_zlib/00000/hashes
Binary files differ
diff --git a/testdata/repo_8k_zlib/00000/sketches b/testdata/repo_8k_zlib/00000/sketches
deleted file mode 100644
index 83572df..0000000
--- a/testdata/repo_8k_zlib/00000/sketches
+++ /dev/null
Binary files differ