diff options
author | n-peugnet <n.peugnet@free.fr> | 2021-09-01 19:07:35 +0200 |
---|---|---|
committer | n-peugnet <n.peugnet@free.fr> | 2021-09-01 19:07:35 +0200 |
commit | db40818ef79ccb3f5f9232623f57ad284a4af7d0 (patch) | |
tree | 6b1b1a7b6169eb19f6ca17ff87f1075b41bd513f | |
parent | 34a84a44b4dfa513d8ceb1cfeec50ac78fb311e0 (diff) | |
download | dna-backup-db40818ef79ccb3f5f9232623f57ad284a4af7d0.tar.gz dna-backup-db40818ef79ccb3f5f9232623f57ad284a4af7d0.zip |
move some consts into repo
-rw-r--r-- | TODO.md | 19 | ||||
-rw-r--r-- | chunk.go | 8 | ||||
-rw-r--r-- | const.go | 16 | ||||
-rw-r--r-- | repo.go | 72 | ||||
-rw-r--r-- | repo_test.go | 54 | ||||
-rw-r--r-- | sketch.go | 2 | ||||
-rw-r--r-- | sketch_test.go | 4 |
7 files changed, 99 insertions, 76 deletions
@@ -1,9 +1,24 @@ priority 1 ---------- -- delta encode chunks -- match stream against chunks from itself +- add deltaEncode chunks function + - do not merge consecutive smaller chunks as these could be stored as chunks if no similar chunk is found. Thus it will need to be of `chunkSize` or less. Otherwise it could not be possibly used for deduplication. + ``` + for each new chunk: + find similar in sketchMap + if exists: + delta encode + else: + calculate fingerprint + store in fingerprintMap + store in sketchMap + ``` - read from repo + - store recipe + - load recipe + - read chunks in-order into a stream +- properly store informations to be DNA encoded priority 2 ---------- - use more the `Reader` API (which is analoguous to the `IOStream` in Java) +- refactor matchStream as right now it is quite @@ -34,13 +34,13 @@ func (i *ChunkId) Path(repo string) string { return path.Join(repo, fmt.Sprintf(versionFmt, i.Ver), chunksName, fmt.Sprintf(chunkIdFmt, i.Idx)) } -func (i *ChunkId) Reader(repo string) ChunkReader { - path := i.Path(repo) +func (i *ChunkId) Reader(repo *Repo) ChunkReader { + path := i.Path(repo.path) f, err := os.Open(path) if err != nil { log.Println("Cannot open chunk: ", path) } - return bufio.NewReaderSize(f, chunkSize) + return bufio.NewReaderSize(f, repo.chunkSize) } func NewLoadedChunk(id *ChunkId, value []byte) *LoadedChunk { @@ -80,7 +80,7 @@ func (c *ChunkFile) Id() *ChunkId { func (c *ChunkFile) Reader() ChunkReader { // log.Printf("Chunk %d: Reading from file\n", c.id) - return c.id.Reader(c.repo.path) + return c.id.Reader(c.repo) } func (c *ChunkFile) Len() int { @@ -1,14 +1,8 @@ package main -// Defined as var to prevent from using them as const as I want to keep -// beeing able to change tkem at runtime. -var ( - chunkSize = 8 << 10 - chunksName = "chunks" - chunkIdFmt = "%015d" - versionFmt = "%05d" - filesName = "files" - sketchWSize = 32 - sketchSfCount = 3 - sketchFCount = 4 +const ( + chunksName = "chunks" + chunkIdFmt = "%015d" + versionFmt = "%05d" + filesName = "files" ) @@ -43,7 +43,11 @@ type FingerprintMap map[uint64]*ChunkId type SketchMap map[uint64][]*ChunkId type Repo struct { - path string + path string + chunkSize int + sketchWSize int + sketchSfCount int + sketchFCount int } type File struct { @@ -53,7 +57,13 @@ type File struct { func NewRepo(path string) *Repo { os.MkdirAll(path, 0775) - return &Repo{path} + return &Repo{ + path: path, + chunkSize: 8 << 10, + sketchWSize: 32, + sketchSfCount: 3, + sketchFCount: 4, + } } func (r *Repo) Commit(source string) { @@ -69,7 +79,7 @@ func (r *Repo) Commit(source string) { files := listFiles(source) go r.loadChunks(versions, oldChunks) go concatFiles(files, writer) - fingerprints, _ := hashChunks(oldChunks) + fingerprints, _ := r.hashChunks(oldChunks) chunks := r.matchStream(reader, fingerprints) extractTempChunks(chunks) // storeChunks(newChunkPath, newChunks) @@ -124,14 +134,14 @@ func concatFiles(files []File, stream io.WriteCloser) { stream.Close() } -func chunkStream(stream io.Reader, chunks chan<- []byte) { +func (r *Repo) chunkStream(stream io.Reader, chunks chan<- []byte) { var buff []byte - var prev, read = chunkSize, 0 + var prev, read = r.chunkSize, 0 var err error for err != io.EOF { - if prev == chunkSize { - buff = make([]byte, chunkSize) + if prev == r.chunkSize { + buff = make([]byte, r.chunkSize) prev, err = stream.Read(buff) } else { read, err = stream.Read(buff[prev:]) @@ -140,11 +150,11 @@ func chunkStream(stream io.Reader, chunks chan<- []byte) { if err != nil && err != io.EOF { log.Println(err) } - if prev == chunkSize { + if prev == r.chunkSize { chunks <- buff } } - if prev != chunkSize { + if prev != r.chunkSize { chunks <- buff[:prev] } close(chunks) @@ -212,7 +222,7 @@ func (r *Repo) loadChunks(versions []string, chunks chan<- StoredChunk) { // For each chunk, both a fingerprint (hash over the full content) and a sketch // (resemblance hash based on maximal values of regions) are calculated and // stored in an hashmap which are then returned. -func hashChunks(chunks <-chan StoredChunk) (FingerprintMap, SketchMap) { +func (r *Repo) hashChunks(chunks <-chan StoredChunk) (FingerprintMap, SketchMap) { fingerprints := make(FingerprintMap) sketches := make(SketchMap) hasher := hash.Hash64(rabinkarp64.New()) @@ -221,7 +231,7 @@ func hashChunks(chunks <-chan StoredChunk) (FingerprintMap, SketchMap) { io.Copy(hasher, c.Reader()) h := hasher.Sum64() fingerprints[h] = c.Id() - sketch, _ := SketchChunk(c, sketchWSize, sketchSfCount, sketchFCount) + sketch, _ := SketchChunk(c, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount) for _, s := range sketch { prev := sketches[s] if contains(prev, c.Id()) { @@ -242,11 +252,11 @@ func contains(s []*ChunkId, id *ChunkId) bool { return false } -func findSimilarChunk(chunk Chunk, sketches SketchMap) (*ChunkId, bool) { +func (r *Repo) findSimilarChunk(chunk Chunk, sketches SketchMap) (*ChunkId, bool) { var similarChunks = make(map[ChunkId]int) var max int var similarChunk *ChunkId - sketch, _ := SketchChunk(chunk, sketchWSize, sketchSfCount, sketchFCount) + sketch, _ := SketchChunk(chunk, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount) for _, s := range sketch { chunkIds, exists := sketches[s] if !exists { @@ -268,10 +278,10 @@ func findSimilarChunk(chunk Chunk, sketches SketchMap) (*ChunkId, bool) { func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chunk { var b byte var chunks []Chunk - bufStream := bufio.NewReaderSize(stream, chunkSize) - buff := make([]byte, 0, chunkSize*2) - n, err := io.ReadFull(stream, buff[:chunkSize]) - if n < chunkSize { + bufStream := bufio.NewReaderSize(stream, r.chunkSize) + buff := make([]byte, 0, r.chunkSize*2) + n, err := io.ReadFull(stream, buff[:r.chunkSize]) + if n < r.chunkSize { chunks = append(chunks, NewTempChunk(buff[:n])) return chunks } @@ -281,26 +291,26 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun h := hasher.Sum64() chunkId, exists := fingerprints[h] if exists { - if len(buff) > chunkSize && len(buff) < chunkSize*2 { - size := len(buff) - chunkSize + if len(buff) > r.chunkSize && len(buff) < r.chunkSize*2 { + size := len(buff) - r.chunkSize log.Println("Add new partial chunk of size:", size) chunks = append(chunks, NewTempChunk(buff[:size])) } log.Printf("Add existing chunk: %d\n", chunkId) chunks = append(chunks, NewChunkFile(r, chunkId)) - buff = make([]byte, 0, chunkSize*2) - for i := 0; i < chunkSize && err == nil; i++ { + buff = make([]byte, 0, r.chunkSize*2) + for i := 0; i < r.chunkSize && err == nil; i++ { b, err = bufStream.ReadByte() hasher.Roll(b) buff = append(buff, b) } continue } - if len(buff) == chunkSize*2 { + if len(buff) == r.chunkSize*2 { log.Println("Add new chunk") - chunks = append(chunks, NewTempChunk(buff[:chunkSize])) - tmp := buff[chunkSize:] - buff = make([]byte, 0, chunkSize*2) + chunks = append(chunks, NewTempChunk(buff[:r.chunkSize])) + tmp := buff[r.chunkSize:] + buff = make([]byte, 0, r.chunkSize*2) buff = append(buff, tmp...) } b, err = bufStream.ReadByte() @@ -309,11 +319,11 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun buff = append(buff, b) } } - if len(buff) > chunkSize { + if len(buff) > r.chunkSize { log.Println("Add new chunk") - chunks = append(chunks, NewTempChunk(buff[:chunkSize])) - log.Println("Add new partial chunk of size:", len(buff)-chunkSize) - chunks = append(chunks, NewTempChunk(buff[chunkSize:])) + chunks = append(chunks, NewTempChunk(buff[:r.chunkSize])) + log.Println("Add new partial chunk of size:", len(buff)-r.chunkSize) + chunks = append(chunks, NewTempChunk(buff[r.chunkSize:])) } else if len(buff) > 0 { log.Println("Add new partial chunk of size:", len(buff)) chunks = append(chunks, NewTempChunk(buff)) @@ -324,13 +334,13 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun // mergeTempChunks joins temporary partial chunks from an array of chunks if possible. // If a chunk is smaller than the size required to calculate a super-feature, // it is then appended to the previous consecutive temporary chunk if it exists. -func mergeTempChunks(chunks []Chunk) (ret []Chunk) { +func (r *Repo) mergeTempChunks(chunks []Chunk) (ret []Chunk) { var prev *TempChunk var curr *TempChunk for _, c := range chunks { tmp, isTmp := c.(*TempChunk) if !isTmp { - if prev != nil && curr.Len() <= SuperFeatureSize(chunkSize, sketchSfCount, sketchFCount) { + if prev != nil && curr.Len() <= SuperFeatureSize(r.chunkSize, r.sketchSfCount, r.sketchFCount) { prev.AppendFrom(curr.Reader()) } else if curr != nil { ret = append(ret, curr) diff --git a/repo_test.go b/repo_test.go index 19e1677..d3aa0d9 100644 --- a/repo_test.go +++ b/repo_test.go @@ -13,15 +13,15 @@ import ( "github.com/gabstv/go-bsdiff/pkg/bsdiff" ) -func chunkCompare(t *testing.T, dataDir string, testFiles []string, chunkCount int) { +func chunkCompare(t *testing.T, dataDir string, repo *Repo, testFiles []string, chunkCount int) { reader, writer := io.Pipe() chunks := make(chan []byte) files := listFiles(dataDir) go concatFiles(files, writer) - go chunkStream(reader, chunks) + go repo.chunkStream(reader, chunks) offset := 0 - buff := make([]byte, chunkSize*chunkCount) + buff := make([]byte, repo.chunkSize*chunkCount) for _, f := range testFiles { content, err := os.ReadFile(path.Join(dataDir, f)) if err != nil { @@ -35,8 +35,8 @@ func chunkCompare(t *testing.T, dataDir string, testFiles []string, chunkCount i i := 0 for c := range chunks { - start := i * chunkSize - end := (i + 1) * chunkSize + start := i * repo.chunkSize + end := (i + 1) * repo.chunkSize if end > offset { end = offset } @@ -46,7 +46,7 @@ func chunkCompare(t *testing.T, dataDir string, testFiles []string, chunkCount i // for i, b := range c { // fmt.Printf("E: %d, A: %d\n", b, content[i]) // } - t.Log("Expected: ", c[:10], "...", c[end%chunkSize-10:]) + t.Log("Expected: ", c[:10], "...", c[end%repo.chunkSize-10:]) t.Log("Actual:", content) } i++ @@ -57,21 +57,24 @@ func chunkCompare(t *testing.T, dataDir string, testFiles []string, chunkCount i } func TestReadFiles1(t *testing.T) { - chunkCount := 590/chunkSize + 1 + repo := NewRepo("") + chunkCount := 590/repo.chunkSize + 1 dataDir := path.Join("test", "data", "logs", "1") files := []string{"logTest.log"} - chunkCompare(t, dataDir, files, chunkCount) + chunkCompare(t, dataDir, repo, files, chunkCount) } func TestReadFiles2(t *testing.T) { - chunkCount := 22899/chunkSize + 1 + repo := NewRepo("") + chunkCount := 22899/repo.chunkSize + 1 dataDir := path.Join("test", "data", "logs", "2") files := []string{"csvParserTest.log", "slipdb.log"} - chunkCompare(t, dataDir, files, chunkCount) + chunkCompare(t, dataDir, repo, files, chunkCount) } func TestReadFiles3(t *testing.T) { - chunkCount := 119398/chunkSize + 1 + repo := NewRepo("") + chunkCount := 119398/repo.chunkSize + 1 dataDir := path.Join("test", "data", "logs") files := []string{ path.Join("1", "logTest.log"), @@ -79,7 +82,7 @@ func TestReadFiles3(t *testing.T) { path.Join("2", "slipdb.log"), path.Join("3", "indexingTreeTest.log"), } - chunkCompare(t, dataDir, files, chunkCount) + chunkCompare(t, dataDir, repo, files, chunkCount) } func TestLoadChunks(t *testing.T) { @@ -97,8 +100,8 @@ func TestLoadChunks(t *testing.T) { files := listFiles(dataDir) go concatFiles(files, writer1) go concatFiles(files, writer2) - go chunkStream(reader1, chunks1) - go chunkStream(reader2, chunks2) + go repo.chunkStream(reader1, chunks1) + go repo.chunkStream(reader2, chunks2) storeChunks(resultChunks, chunks1) versions := []string{resultVersion} go repo.loadChunks(versions, chunks3) @@ -120,6 +123,7 @@ func TestLoadChunks(t *testing.T) { } func TestExtractNewChunks(t *testing.T) { + repo := NewRepo("") chunks := []Chunk{ &TempChunk{value: []byte{'a'}}, &LoadedChunk{id: &ChunkId{0, 0}}, @@ -127,7 +131,7 @@ func TestExtractNewChunks(t *testing.T) { &TempChunk{value: []byte{'c'}}, &LoadedChunk{id: &ChunkId{0, 1}}, } - newChunks := extractTempChunks(mergeTempChunks(chunks)) + newChunks := extractTempChunks(repo.mergeTempChunks(chunks)) assertLen(t, 2, newChunks, "New chunks:") assertChunkContent(t, []byte{'a'}, newChunks[0], "First new:") assertChunkContent(t, []byte{'b', 'c'}, newChunks[1], "Second New:") @@ -150,13 +154,13 @@ func TestStoreLoadFiles(t *testing.T) { } } -func prepareChunks(dataDir string, resultDir string, streamFunc func([]File, io.WriteCloser)) { - resultVersion := path.Join(resultDir, "00000") +func prepareChunks(dataDir string, repo *Repo, streamFunc func([]File, io.WriteCloser)) { + resultVersion := path.Join(repo.path, "00000") resultChunks := path.Join(resultVersion, chunksName) os.MkdirAll(resultChunks, 0775) reader := getDataStream(dataDir, streamFunc) chunks := make(chan []byte, 16) - go chunkStream(reader, chunks) + go repo.chunkStream(reader, chunks) storeChunks(resultChunks, chunks) } @@ -169,10 +173,11 @@ func getDataStream(dataDir string, streamFunc func([]File, io.WriteCloser)) io.R func TestBsdiff(t *testing.T) { resultDir := t.TempDir() + repo := NewRepo(resultDir) dataDir := path.Join("test", "data", "logs") addedFile := path.Join(dataDir, "2", "slogTest.log") // Store initial chunks - prepareChunks(dataDir, resultDir, concatFiles) + prepareChunks(dataDir, repo, concatFiles) // Modify data input := []byte("hello") @@ -181,26 +186,25 @@ func TestBsdiff(t *testing.T) { // Load previously stored chunks oldChunks := make(chan StoredChunk, 16) - repo := NewRepo(resultDir) versions := repo.loadVersions() go repo.loadChunks(versions, oldChunks) - fingerprints, sketches := hashChunks(oldChunks) + fingerprints, sketches := repo.hashChunks(oldChunks) // Read new data reader := getDataStream(dataDir, concatFiles) recipe := repo.matchStream(reader, fingerprints) - newChunks := extractTempChunks(mergeTempChunks(recipe)) + newChunks := extractTempChunks(repo.mergeTempChunks(recipe)) assertLen(t, 2, newChunks, "New chunks:") for _, c := range newChunks { - id, exists := findSimilarChunk(c, sketches) + id, exists := repo.findSimilarChunk(c, sketches) log.Println(id, exists) if exists { patch := new(bytes.Buffer) - stored := id.Reader(repo.path) + stored := id.Reader(repo) new := c.Reader() bsdiff.Reader(stored, new, patch) log.Println("Patch size:", patch.Len()) - if patch.Len() >= chunkSize/10 { + if patch.Len() >= repo.chunkSize/10 { t.Errorf("Bsdiff of chunk is too large: %d", patch.Len()) } } @@ -15,7 +15,7 @@ const fBytes = 8 // SketchChunk produces a sketch for a chunk based on wSize: the window size, // sfCount: the number of super-features, and fCount: the number of feature // per super-feature -func SketchChunk(chunk Chunk, wSize int, sfCount int, fCount int) (Sketch, error) { +func SketchChunk(chunk Chunk, chunkSize int, wSize int, sfCount int, fCount int) (Sketch, error) { var fSize = FeatureSize(chunkSize, sfCount, fCount) superfeatures := make([]uint64, 0, sfCount) features := make([]uint64, 0, fCount*sfCount) diff --git a/sketch_test.go b/sketch_test.go index 074bffd..962dce0 100644 --- a/sketch_test.go +++ b/sketch_test.go @@ -15,7 +15,7 @@ func TestSketchChunk(t *testing.T) { var i int for c := range chunks { if i < 1 { - sketch, err := SketchChunk(c, 32, 3, 4) + sketch, err := SketchChunk(c, 8<<10, 32, 3, 4) if err != nil { t.Error(err) } @@ -25,7 +25,7 @@ func TestSketchChunk(t *testing.T) { } } if i == 14 { - sketch, err := SketchChunk(c, 32, 3, 4) + sketch, err := SketchChunk(c, 8<<10, 32, 3, 4) if err != nil { t.Error(err) } |