aboutsummaryrefslogtreecommitdiff
path: root/repo.go
diff options
context:
space:
mode:
Diffstat (limited to 'repo.go')
-rw-r--r--repo.go151
1 files changed, 96 insertions, 55 deletions
diff --git a/repo.go b/repo.go
index 4aa8cfc..126ce41 100644
--- a/repo.go
+++ b/repo.go
@@ -51,6 +51,16 @@ import (
type FingerprintMap map[uint64]*ChunkId
type SketchMap map[uint64][]*ChunkId
+func (m SketchMap) Set(key []uint64, value *ChunkId) {
+ for _, s := range key {
+ prev := m[s]
+ if contains(prev, value) {
+ continue
+ }
+ m[s] = append(prev, value)
+ }
+}
+
type Repo struct {
path string
chunkSize int
@@ -67,6 +77,17 @@ type Repo struct {
chunkWriteWrapper func(w io.Writer) io.WriteCloser
}
+type chunkHashes struct {
+ Fp uint64
+ Sk []uint64
+}
+
+type chunkData struct {
+ hashes chunkHashes
+ content []byte
+ id *ChunkId
+}
+
type File struct {
Path string
Size int64
@@ -114,22 +135,16 @@ func (r *Repo) Commit(source string) {
newPath := filepath.Join(r.path, fmt.Sprintf(versionFmt, newVersion))
newChunkPath := filepath.Join(newPath, chunksName)
newFilesPath := filepath.Join(newPath, filesName)
- newFingerprintsPath := filepath.Join(newPath, fingerprintsName)
newRecipePath := filepath.Join(newPath, recipeName)
- newSketchesPath := filepath.Join(newPath, sketchesName)
os.Mkdir(newPath, 0775) // TODO: handle errors
os.Mkdir(newChunkPath, 0775) // TODO: handle errors
reader, writer := io.Pipe()
- oldChunks := make(chan IdentifiedChunk, 16)
files := listFiles(source)
- go r.loadChunks(versions, oldChunks)
+ r.loadHashes(versions)
go concatFiles(files, writer)
- r.hashChunks(oldChunks)
recipe := r.matchStream(reader, newVersion)
storeFileList(newFilesPath, unprefixFiles(files, source))
- storeFingerprints(newFingerprintsPath, r.fingerprints)
storeRecipe(newRecipePath, recipe)
- storeSketches(newSketchesPath, r.sketches)
logger.Info(files)
}
@@ -263,6 +278,27 @@ func loadFileList(path string) []File {
return files
}
+func (r *Repo) storageWorker(version int, storeQueue <-chan chunkData, end chan<- bool) {
+ hashesFile := filepath.Join(r.path, fmt.Sprintf(versionFmt, version), hashesName)
+ file, err := os.Create(hashesFile)
+ if err != nil {
+ logger.Panic(err)
+ }
+ encoder := gob.NewEncoder(file)
+ for data := range storeQueue {
+ err = encoder.Encode(data.hashes)
+ err := r.StoreChunkContent(data.id, bytes.NewReader(data.content))
+ if err != nil {
+ logger.Error(err)
+ }
+ logger.Info("stored", data.id)
+ }
+ if err = file.Close(); err != nil {
+ logger.Panic(err)
+ }
+ end <- true
+}
+
func (r *Repo) StoreChunkContent(id *ChunkId, reader io.Reader) error {
path := id.Path(r.path)
file, err := os.Create(path)
@@ -309,7 +345,7 @@ func (r *Repo) LoadChunkContent(id *ChunkId) *bytes.Reader {
return bytes.NewReader(value)
}
-// TODO: use atoi for chunkid
+// TODO: use atoi for chunkid ?
func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) {
for i, v := range versions {
p := filepath.Join(v, chunksName)
@@ -329,41 +365,58 @@ func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) {
close(chunks)
}
+func (r *Repo) loadHashes(versions []string) {
+ for i, v := range versions {
+ path := filepath.Join(v, hashesName)
+ file, err := os.Open(path)
+ if err == nil {
+ decoder := gob.NewDecoder(file)
+ for j := 0; err == nil; j++ {
+ var h chunkHashes
+ if err = decoder.Decode(&h); err == nil {
+ id := &ChunkId{i, uint64(j)}
+ r.fingerprints[h.Fp] = id
+ r.sketches.Set(h.Sk, id)
+ }
+ }
+ }
+ if err != nil && err != io.EOF {
+ logger.Panic(err)
+ }
+ if err = file.Close(); err != nil {
+ logger.Panic(err)
+ }
+ }
+}
+
func (r *Repo) chunkMinLen() int {
return sketch.SuperFeatureSize(r.chunkSize, r.sketchSfCount, r.sketchFCount)
}
// hashChunks calculates the hashes for a channel of chunks.
-//
// For each chunk, both a fingerprint (hash over the full content) and a sketch
// (resemblance hash based on maximal values of regions) are calculated and
// stored in an hashmap.
func (r *Repo) hashChunks(chunks <-chan IdentifiedChunk) {
for c := range chunks {
- r.hashAndStoreChunk(c.GetId(), c.Reader())
+ r.hashChunk(c.GetId(), c.Reader())
}
}
-func (r *Repo) hashAndStoreChunk(id *ChunkId, reader io.Reader) {
+// hashChunk calculates the hashes for a chunk and store them in th repo hashmaps.
+func (r *Repo) hashChunk(id *ChunkId, reader io.Reader) (fp uint64, sk []uint64) {
var buffSk bytes.Buffer
var buffFp bytes.Buffer
var wg sync.WaitGroup
reader = io.TeeReader(reader, &buffSk)
io.Copy(&buffFp, reader)
- var fp uint64
- var sk []uint64
wg.Add(2)
go r.makeFingerprint(id, &buffFp, &wg, &fp)
go r.makeSketch(id, &buffSk, &wg, &sk)
wg.Wait()
r.fingerprints[fp] = id
- for _, s := range sk {
- prev := r.sketches[s]
- if contains(prev, id) {
- continue
- }
- r.sketches[s] = append(prev, id)
- }
+ r.sketches.Set(sk, id)
+ return
}
func (r *Repo) makeFingerprint(id *ChunkId, reader io.Reader, wg *sync.WaitGroup, ret *uint64) {
@@ -429,7 +482,7 @@ func (r *Repo) tryDeltaEncodeChunk(temp BufferedChunk) (Chunk, bool) {
// encodeTempChunk first tries to delta-encode the given chunk before attributing
// it an Id and saving it into the fingerprints and sketches maps.
-func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (chunk Chunk, isDelta bool) {
+func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64, storeQueue chan<- chunkData) (chunk Chunk, isDelta bool) {
chunk, isDelta = r.tryDeltaEncodeChunk(temp)
if isDelta {
logger.Info("add new delta chunk")
@@ -438,11 +491,13 @@ func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (c
if chunk.Len() == r.chunkSize {
id := &ChunkId{Ver: version, Idx: *last}
*last++
- r.hashAndStoreChunk(id, temp.Reader())
- err := r.StoreChunkContent(id, temp.Reader())
- if err != nil {
- logger.Error(err)
+ fp, sk := r.hashChunk(id, temp.Reader())
+ storeQueue <- chunkData{
+ hashes: chunkHashes{fp, sk},
+ content: temp.Bytes(),
+ id: id,
}
+ r.chunkCache.Set(id, temp.Bytes())
logger.Info("add new chunk", id)
return NewStoredChunk(r, id), false
}
@@ -454,20 +509,21 @@ func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (c
// Temporary chunks can be partial. If the current chunk is smaller than the size of a
// super-feature and there exists a previous chunk, then both are merged before attempting
// to delta-encode them.
-func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version int, last *uint64) []Chunk {
+func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version int, last *uint64, storeQueue chan<- chunkData) []Chunk {
if reflect.ValueOf(prev).IsNil() {
- c, _ := r.encodeTempChunk(curr, version, last)
+ c, _ := r.encodeTempChunk(curr, version, last, storeQueue)
return []Chunk{c}
} else if curr.Len() < r.chunkMinLen() {
- c, success := r.encodeTempChunk(NewTempChunk(append(prev.Bytes(), curr.Bytes()...)), version, last)
+ tmp := NewTempChunk(append(prev.Bytes(), curr.Bytes()...))
+ c, success := r.encodeTempChunk(tmp, version, last, storeQueue)
if success {
return []Chunk{c}
} else {
return []Chunk{prev, curr}
}
} else {
- prevD, _ := r.encodeTempChunk(prev, version, last)
- currD, _ := r.encodeTempChunk(curr, version, last)
+ prevD, _ := r.encodeTempChunk(prev, version, last, storeQueue)
+ currD, _ := r.encodeTempChunk(curr, version, last, storeQueue)
return []Chunk{prevD, currD}
}
}
@@ -490,6 +546,9 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
}
hasher := rabinkarp64.NewFromPol(r.pol)
hasher.Write(buff)
+ storeQueue := make(chan chunkData, 10)
+ storeEnd := make(chan bool)
+ go r.storageWorker(version, storeQueue, storeEnd)
for err != io.EOF {
h := hasher.Sum64()
chunkId, exists := r.fingerprints[h]
@@ -497,10 +556,10 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
if len(buff) > r.chunkSize && len(buff) < r.chunkSize*2 {
size := len(buff) - r.chunkSize
temp := NewTempChunk(buff[:size])
- chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last)...)
+ chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...)
prev = nil
} else if prev != nil {
- c, _ := r.encodeTempChunk(prev, version, &last)
+ c, _ := r.encodeTempChunk(prev, version, &last, storeQueue)
chunks = append(chunks, c)
prev = nil
}
@@ -518,7 +577,7 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
}
if len(buff) == r.chunkSize*2 {
if prev != nil {
- chunk, _ := r.encodeTempChunk(prev, version, &last)
+ chunk, _ := r.encodeTempChunk(prev, version, &last, storeQueue)
chunks = append(chunks, chunk)
}
prev = NewTempChunk(buff[:r.chunkSize])
@@ -536,7 +595,7 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
var temp *TempChunk
if len(buff) > r.chunkSize {
if prev != nil {
- chunk, _ := r.encodeTempChunk(prev, version, &last)
+ chunk, _ := r.encodeTempChunk(prev, version, &last, storeQueue)
chunks = append(chunks, chunk)
}
prev = NewTempChunk(buff[:r.chunkSize])
@@ -544,8 +603,10 @@ func (r *Repo) matchStream(stream io.Reader, version int) []Chunk {
} else {
temp = NewTempChunk(buff)
}
- chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last)...)
+ chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...)
}
+ close(storeQueue)
+ <-storeEnd
return chunks
}
@@ -606,26 +667,6 @@ func loadRecipe(path string) []Chunk {
return recipe
}
-func storeFingerprints(dest string, fingerprints FingerprintMap) {
- storeBasicStruct(dest, fingerprints)
-}
-
-func loadFingerprints(path string) FingerprintMap {
- var fingerprints FingerprintMap
- loadBasicStruct(path, &fingerprints)
- return fingerprints
-}
-
-func storeSketches(dest string, sketches SketchMap) {
- storeBasicStruct(dest, sketches)
-}
-
-func loadSketches(path string) SketchMap {
- var sketches SketchMap
- loadBasicStruct(path, &sketches)
- return sketches
-}
-
func extractDeltaChunks(chunks []Chunk) (ret []*DeltaChunk) {
for _, c := range chunks {
tmp, isDelta := c.(*DeltaChunk)