aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--TODO.md19
-rw-r--r--chunk.go8
-rw-r--r--const.go16
-rw-r--r--repo.go72
-rw-r--r--repo_test.go54
-rw-r--r--sketch.go2
-rw-r--r--sketch_test.go4
7 files changed, 99 insertions, 76 deletions
diff --git a/TODO.md b/TODO.md
index 23454f7..496bf10 100644
--- a/TODO.md
+++ b/TODO.md
@@ -1,9 +1,24 @@
priority 1
----------
-- delta encode chunks
-- match stream against chunks from itself
+- add deltaEncode chunks function
+ - do not merge consecutive smaller chunks as these could be stored as chunks if no similar chunk is found. Thus it will need to be of `chunkSize` or less. Otherwise it could not be possibly used for deduplication.
+ ```
+ for each new chunk:
+ find similar in sketchMap
+ if exists:
+ delta encode
+ else:
+ calculate fingerprint
+ store in fingerprintMap
+ store in sketchMap
+ ```
- read from repo
+ - store recipe
+ - load recipe
+ - read chunks in-order into a stream
+- properly store informations to be DNA encoded
priority 2
----------
- use more the `Reader` API (which is analoguous to the `IOStream` in Java)
+- refactor matchStream as right now it is quite
diff --git a/chunk.go b/chunk.go
index 092b758..9927c94 100644
--- a/chunk.go
+++ b/chunk.go
@@ -34,13 +34,13 @@ func (i *ChunkId) Path(repo string) string {
return path.Join(repo, fmt.Sprintf(versionFmt, i.Ver), chunksName, fmt.Sprintf(chunkIdFmt, i.Idx))
}
-func (i *ChunkId) Reader(repo string) ChunkReader {
- path := i.Path(repo)
+func (i *ChunkId) Reader(repo *Repo) ChunkReader {
+ path := i.Path(repo.path)
f, err := os.Open(path)
if err != nil {
log.Println("Cannot open chunk: ", path)
}
- return bufio.NewReaderSize(f, chunkSize)
+ return bufio.NewReaderSize(f, repo.chunkSize)
}
func NewLoadedChunk(id *ChunkId, value []byte) *LoadedChunk {
@@ -80,7 +80,7 @@ func (c *ChunkFile) Id() *ChunkId {
func (c *ChunkFile) Reader() ChunkReader {
// log.Printf("Chunk %d: Reading from file\n", c.id)
- return c.id.Reader(c.repo.path)
+ return c.id.Reader(c.repo)
}
func (c *ChunkFile) Len() int {
diff --git a/const.go b/const.go
index c97578c..71762db 100644
--- a/const.go
+++ b/const.go
@@ -1,14 +1,8 @@
package main
-// Defined as var to prevent from using them as const as I want to keep
-// beeing able to change tkem at runtime.
-var (
- chunkSize = 8 << 10
- chunksName = "chunks"
- chunkIdFmt = "%015d"
- versionFmt = "%05d"
- filesName = "files"
- sketchWSize = 32
- sketchSfCount = 3
- sketchFCount = 4
+const (
+ chunksName = "chunks"
+ chunkIdFmt = "%015d"
+ versionFmt = "%05d"
+ filesName = "files"
)
diff --git a/repo.go b/repo.go
index ea27fb4..6f177c2 100644
--- a/repo.go
+++ b/repo.go
@@ -43,7 +43,11 @@ type FingerprintMap map[uint64]*ChunkId
type SketchMap map[uint64][]*ChunkId
type Repo struct {
- path string
+ path string
+ chunkSize int
+ sketchWSize int
+ sketchSfCount int
+ sketchFCount int
}
type File struct {
@@ -53,7 +57,13 @@ type File struct {
func NewRepo(path string) *Repo {
os.MkdirAll(path, 0775)
- return &Repo{path}
+ return &Repo{
+ path: path,
+ chunkSize: 8 << 10,
+ sketchWSize: 32,
+ sketchSfCount: 3,
+ sketchFCount: 4,
+ }
}
func (r *Repo) Commit(source string) {
@@ -69,7 +79,7 @@ func (r *Repo) Commit(source string) {
files := listFiles(source)
go r.loadChunks(versions, oldChunks)
go concatFiles(files, writer)
- fingerprints, _ := hashChunks(oldChunks)
+ fingerprints, _ := r.hashChunks(oldChunks)
chunks := r.matchStream(reader, fingerprints)
extractTempChunks(chunks)
// storeChunks(newChunkPath, newChunks)
@@ -124,14 +134,14 @@ func concatFiles(files []File, stream io.WriteCloser) {
stream.Close()
}
-func chunkStream(stream io.Reader, chunks chan<- []byte) {
+func (r *Repo) chunkStream(stream io.Reader, chunks chan<- []byte) {
var buff []byte
- var prev, read = chunkSize, 0
+ var prev, read = r.chunkSize, 0
var err error
for err != io.EOF {
- if prev == chunkSize {
- buff = make([]byte, chunkSize)
+ if prev == r.chunkSize {
+ buff = make([]byte, r.chunkSize)
prev, err = stream.Read(buff)
} else {
read, err = stream.Read(buff[prev:])
@@ -140,11 +150,11 @@ func chunkStream(stream io.Reader, chunks chan<- []byte) {
if err != nil && err != io.EOF {
log.Println(err)
}
- if prev == chunkSize {
+ if prev == r.chunkSize {
chunks <- buff
}
}
- if prev != chunkSize {
+ if prev != r.chunkSize {
chunks <- buff[:prev]
}
close(chunks)
@@ -212,7 +222,7 @@ func (r *Repo) loadChunks(versions []string, chunks chan<- StoredChunk) {
// For each chunk, both a fingerprint (hash over the full content) and a sketch
// (resemblance hash based on maximal values of regions) are calculated and
// stored in an hashmap which are then returned.
-func hashChunks(chunks <-chan StoredChunk) (FingerprintMap, SketchMap) {
+func (r *Repo) hashChunks(chunks <-chan StoredChunk) (FingerprintMap, SketchMap) {
fingerprints := make(FingerprintMap)
sketches := make(SketchMap)
hasher := hash.Hash64(rabinkarp64.New())
@@ -221,7 +231,7 @@ func hashChunks(chunks <-chan StoredChunk) (FingerprintMap, SketchMap) {
io.Copy(hasher, c.Reader())
h := hasher.Sum64()
fingerprints[h] = c.Id()
- sketch, _ := SketchChunk(c, sketchWSize, sketchSfCount, sketchFCount)
+ sketch, _ := SketchChunk(c, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount)
for _, s := range sketch {
prev := sketches[s]
if contains(prev, c.Id()) {
@@ -242,11 +252,11 @@ func contains(s []*ChunkId, id *ChunkId) bool {
return false
}
-func findSimilarChunk(chunk Chunk, sketches SketchMap) (*ChunkId, bool) {
+func (r *Repo) findSimilarChunk(chunk Chunk, sketches SketchMap) (*ChunkId, bool) {
var similarChunks = make(map[ChunkId]int)
var max int
var similarChunk *ChunkId
- sketch, _ := SketchChunk(chunk, sketchWSize, sketchSfCount, sketchFCount)
+ sketch, _ := SketchChunk(chunk, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount)
for _, s := range sketch {
chunkIds, exists := sketches[s]
if !exists {
@@ -268,10 +278,10 @@ func findSimilarChunk(chunk Chunk, sketches SketchMap) (*ChunkId, bool) {
func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chunk {
var b byte
var chunks []Chunk
- bufStream := bufio.NewReaderSize(stream, chunkSize)
- buff := make([]byte, 0, chunkSize*2)
- n, err := io.ReadFull(stream, buff[:chunkSize])
- if n < chunkSize {
+ bufStream := bufio.NewReaderSize(stream, r.chunkSize)
+ buff := make([]byte, 0, r.chunkSize*2)
+ n, err := io.ReadFull(stream, buff[:r.chunkSize])
+ if n < r.chunkSize {
chunks = append(chunks, NewTempChunk(buff[:n]))
return chunks
}
@@ -281,26 +291,26 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun
h := hasher.Sum64()
chunkId, exists := fingerprints[h]
if exists {
- if len(buff) > chunkSize && len(buff) < chunkSize*2 {
- size := len(buff) - chunkSize
+ if len(buff) > r.chunkSize && len(buff) < r.chunkSize*2 {
+ size := len(buff) - r.chunkSize
log.Println("Add new partial chunk of size:", size)
chunks = append(chunks, NewTempChunk(buff[:size]))
}
log.Printf("Add existing chunk: %d\n", chunkId)
chunks = append(chunks, NewChunkFile(r, chunkId))
- buff = make([]byte, 0, chunkSize*2)
- for i := 0; i < chunkSize && err == nil; i++ {
+ buff = make([]byte, 0, r.chunkSize*2)
+ for i := 0; i < r.chunkSize && err == nil; i++ {
b, err = bufStream.ReadByte()
hasher.Roll(b)
buff = append(buff, b)
}
continue
}
- if len(buff) == chunkSize*2 {
+ if len(buff) == r.chunkSize*2 {
log.Println("Add new chunk")
- chunks = append(chunks, NewTempChunk(buff[:chunkSize]))
- tmp := buff[chunkSize:]
- buff = make([]byte, 0, chunkSize*2)
+ chunks = append(chunks, NewTempChunk(buff[:r.chunkSize]))
+ tmp := buff[r.chunkSize:]
+ buff = make([]byte, 0, r.chunkSize*2)
buff = append(buff, tmp...)
}
b, err = bufStream.ReadByte()
@@ -309,11 +319,11 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun
buff = append(buff, b)
}
}
- if len(buff) > chunkSize {
+ if len(buff) > r.chunkSize {
log.Println("Add new chunk")
- chunks = append(chunks, NewTempChunk(buff[:chunkSize]))
- log.Println("Add new partial chunk of size:", len(buff)-chunkSize)
- chunks = append(chunks, NewTempChunk(buff[chunkSize:]))
+ chunks = append(chunks, NewTempChunk(buff[:r.chunkSize]))
+ log.Println("Add new partial chunk of size:", len(buff)-r.chunkSize)
+ chunks = append(chunks, NewTempChunk(buff[r.chunkSize:]))
} else if len(buff) > 0 {
log.Println("Add new partial chunk of size:", len(buff))
chunks = append(chunks, NewTempChunk(buff))
@@ -324,13 +334,13 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun
// mergeTempChunks joins temporary partial chunks from an array of chunks if possible.
// If a chunk is smaller than the size required to calculate a super-feature,
// it is then appended to the previous consecutive temporary chunk if it exists.
-func mergeTempChunks(chunks []Chunk) (ret []Chunk) {
+func (r *Repo) mergeTempChunks(chunks []Chunk) (ret []Chunk) {
var prev *TempChunk
var curr *TempChunk
for _, c := range chunks {
tmp, isTmp := c.(*TempChunk)
if !isTmp {
- if prev != nil && curr.Len() <= SuperFeatureSize(chunkSize, sketchSfCount, sketchFCount) {
+ if prev != nil && curr.Len() <= SuperFeatureSize(r.chunkSize, r.sketchSfCount, r.sketchFCount) {
prev.AppendFrom(curr.Reader())
} else if curr != nil {
ret = append(ret, curr)
diff --git a/repo_test.go b/repo_test.go
index 19e1677..d3aa0d9 100644
--- a/repo_test.go
+++ b/repo_test.go
@@ -13,15 +13,15 @@ import (
"github.com/gabstv/go-bsdiff/pkg/bsdiff"
)
-func chunkCompare(t *testing.T, dataDir string, testFiles []string, chunkCount int) {
+func chunkCompare(t *testing.T, dataDir string, repo *Repo, testFiles []string, chunkCount int) {
reader, writer := io.Pipe()
chunks := make(chan []byte)
files := listFiles(dataDir)
go concatFiles(files, writer)
- go chunkStream(reader, chunks)
+ go repo.chunkStream(reader, chunks)
offset := 0
- buff := make([]byte, chunkSize*chunkCount)
+ buff := make([]byte, repo.chunkSize*chunkCount)
for _, f := range testFiles {
content, err := os.ReadFile(path.Join(dataDir, f))
if err != nil {
@@ -35,8 +35,8 @@ func chunkCompare(t *testing.T, dataDir string, testFiles []string, chunkCount i
i := 0
for c := range chunks {
- start := i * chunkSize
- end := (i + 1) * chunkSize
+ start := i * repo.chunkSize
+ end := (i + 1) * repo.chunkSize
if end > offset {
end = offset
}
@@ -46,7 +46,7 @@ func chunkCompare(t *testing.T, dataDir string, testFiles []string, chunkCount i
// for i, b := range c {
// fmt.Printf("E: %d, A: %d\n", b, content[i])
// }
- t.Log("Expected: ", c[:10], "...", c[end%chunkSize-10:])
+ t.Log("Expected: ", c[:10], "...", c[end%repo.chunkSize-10:])
t.Log("Actual:", content)
}
i++
@@ -57,21 +57,24 @@ func chunkCompare(t *testing.T, dataDir string, testFiles []string, chunkCount i
}
func TestReadFiles1(t *testing.T) {
- chunkCount := 590/chunkSize + 1
+ repo := NewRepo("")
+ chunkCount := 590/repo.chunkSize + 1
dataDir := path.Join("test", "data", "logs", "1")
files := []string{"logTest.log"}
- chunkCompare(t, dataDir, files, chunkCount)
+ chunkCompare(t, dataDir, repo, files, chunkCount)
}
func TestReadFiles2(t *testing.T) {
- chunkCount := 22899/chunkSize + 1
+ repo := NewRepo("")
+ chunkCount := 22899/repo.chunkSize + 1
dataDir := path.Join("test", "data", "logs", "2")
files := []string{"csvParserTest.log", "slipdb.log"}
- chunkCompare(t, dataDir, files, chunkCount)
+ chunkCompare(t, dataDir, repo, files, chunkCount)
}
func TestReadFiles3(t *testing.T) {
- chunkCount := 119398/chunkSize + 1
+ repo := NewRepo("")
+ chunkCount := 119398/repo.chunkSize + 1
dataDir := path.Join("test", "data", "logs")
files := []string{
path.Join("1", "logTest.log"),
@@ -79,7 +82,7 @@ func TestReadFiles3(t *testing.T) {
path.Join("2", "slipdb.log"),
path.Join("3", "indexingTreeTest.log"),
}
- chunkCompare(t, dataDir, files, chunkCount)
+ chunkCompare(t, dataDir, repo, files, chunkCount)
}
func TestLoadChunks(t *testing.T) {
@@ -97,8 +100,8 @@ func TestLoadChunks(t *testing.T) {
files := listFiles(dataDir)
go concatFiles(files, writer1)
go concatFiles(files, writer2)
- go chunkStream(reader1, chunks1)
- go chunkStream(reader2, chunks2)
+ go repo.chunkStream(reader1, chunks1)
+ go repo.chunkStream(reader2, chunks2)
storeChunks(resultChunks, chunks1)
versions := []string{resultVersion}
go repo.loadChunks(versions, chunks3)
@@ -120,6 +123,7 @@ func TestLoadChunks(t *testing.T) {
}
func TestExtractNewChunks(t *testing.T) {
+ repo := NewRepo("")
chunks := []Chunk{
&TempChunk{value: []byte{'a'}},
&LoadedChunk{id: &ChunkId{0, 0}},
@@ -127,7 +131,7 @@ func TestExtractNewChunks(t *testing.T) {
&TempChunk{value: []byte{'c'}},
&LoadedChunk{id: &ChunkId{0, 1}},
}
- newChunks := extractTempChunks(mergeTempChunks(chunks))
+ newChunks := extractTempChunks(repo.mergeTempChunks(chunks))
assertLen(t, 2, newChunks, "New chunks:")
assertChunkContent(t, []byte{'a'}, newChunks[0], "First new:")
assertChunkContent(t, []byte{'b', 'c'}, newChunks[1], "Second New:")
@@ -150,13 +154,13 @@ func TestStoreLoadFiles(t *testing.T) {
}
}
-func prepareChunks(dataDir string, resultDir string, streamFunc func([]File, io.WriteCloser)) {
- resultVersion := path.Join(resultDir, "00000")
+func prepareChunks(dataDir string, repo *Repo, streamFunc func([]File, io.WriteCloser)) {
+ resultVersion := path.Join(repo.path, "00000")
resultChunks := path.Join(resultVersion, chunksName)
os.MkdirAll(resultChunks, 0775)
reader := getDataStream(dataDir, streamFunc)
chunks := make(chan []byte, 16)
- go chunkStream(reader, chunks)
+ go repo.chunkStream(reader, chunks)
storeChunks(resultChunks, chunks)
}
@@ -169,10 +173,11 @@ func getDataStream(dataDir string, streamFunc func([]File, io.WriteCloser)) io.R
func TestBsdiff(t *testing.T) {
resultDir := t.TempDir()
+ repo := NewRepo(resultDir)
dataDir := path.Join("test", "data", "logs")
addedFile := path.Join(dataDir, "2", "slogTest.log")
// Store initial chunks
- prepareChunks(dataDir, resultDir, concatFiles)
+ prepareChunks(dataDir, repo, concatFiles)
// Modify data
input := []byte("hello")
@@ -181,26 +186,25 @@ func TestBsdiff(t *testing.T) {
// Load previously stored chunks
oldChunks := make(chan StoredChunk, 16)
- repo := NewRepo(resultDir)
versions := repo.loadVersions()
go repo.loadChunks(versions, oldChunks)
- fingerprints, sketches := hashChunks(oldChunks)
+ fingerprints, sketches := repo.hashChunks(oldChunks)
// Read new data
reader := getDataStream(dataDir, concatFiles)
recipe := repo.matchStream(reader, fingerprints)
- newChunks := extractTempChunks(mergeTempChunks(recipe))
+ newChunks := extractTempChunks(repo.mergeTempChunks(recipe))
assertLen(t, 2, newChunks, "New chunks:")
for _, c := range newChunks {
- id, exists := findSimilarChunk(c, sketches)
+ id, exists := repo.findSimilarChunk(c, sketches)
log.Println(id, exists)
if exists {
patch := new(bytes.Buffer)
- stored := id.Reader(repo.path)
+ stored := id.Reader(repo)
new := c.Reader()
bsdiff.Reader(stored, new, patch)
log.Println("Patch size:", patch.Len())
- if patch.Len() >= chunkSize/10 {
+ if patch.Len() >= repo.chunkSize/10 {
t.Errorf("Bsdiff of chunk is too large: %d", patch.Len())
}
}
diff --git a/sketch.go b/sketch.go
index c88f043..295a11b 100644
--- a/sketch.go
+++ b/sketch.go
@@ -15,7 +15,7 @@ const fBytes = 8
// SketchChunk produces a sketch for a chunk based on wSize: the window size,
// sfCount: the number of super-features, and fCount: the number of feature
// per super-feature
-func SketchChunk(chunk Chunk, wSize int, sfCount int, fCount int) (Sketch, error) {
+func SketchChunk(chunk Chunk, chunkSize int, wSize int, sfCount int, fCount int) (Sketch, error) {
var fSize = FeatureSize(chunkSize, sfCount, fCount)
superfeatures := make([]uint64, 0, sfCount)
features := make([]uint64, 0, fCount*sfCount)
diff --git a/sketch_test.go b/sketch_test.go
index 074bffd..962dce0 100644
--- a/sketch_test.go
+++ b/sketch_test.go
@@ -15,7 +15,7 @@ func TestSketchChunk(t *testing.T) {
var i int
for c := range chunks {
if i < 1 {
- sketch, err := SketchChunk(c, 32, 3, 4)
+ sketch, err := SketchChunk(c, 8<<10, 32, 3, 4)
if err != nil {
t.Error(err)
}
@@ -25,7 +25,7 @@ func TestSketchChunk(t *testing.T) {
}
}
if i == 14 {
- sketch, err := SketchChunk(c, 32, 3, 4)
+ sketch, err := SketchChunk(c, 8<<10, 32, 3, 4)
if err != nil {
t.Error(err)
}