diff options
author | n-peugnet <n.peugnet@free.fr> | 2021-08-28 20:54:34 +0200 |
---|---|---|
committer | n-peugnet <n.peugnet@free.fr> | 2021-08-28 20:54:34 +0200 |
commit | 2cd98a3bcd18870da4404693212119dd8ccfaf68 (patch) | |
tree | 9fb91f1d9bf6401b7d4bc3c2dcdfcb779d230e5a | |
parent | 129a86b3a6780b7aee5a7469cc5adeaf2ea6c20f (diff) | |
download | dna-backup-2cd98a3bcd18870da4404693212119dd8ccfaf68.tar.gz dna-backup-2cd98a3bcd18870da4404693212119dd8ccfaf68.zip |
refactor(chunks): use more interfaces
-rw-r--r-- | TODO.md | 3 | ||||
-rw-r--r-- | chunk.go | 77 | ||||
-rw-r--r-- | chunk_test.go | 14 | ||||
-rw-r--r-- | go.mod | 1 | ||||
-rw-r--r-- | go.sum | 3 | ||||
-rw-r--r-- | repo.go | 37 | ||||
-rw-r--r-- | repo_test.go | 44 | ||||
-rw-r--r-- | sketch.go | 5 | ||||
-rw-r--r-- | sketch_test.go | 7 |
9 files changed, 95 insertions, 96 deletions
@@ -2,9 +2,6 @@ priority 1 ---------- - join non-deduplicated chunks - choose when and how to -- detect Similar chunks - - implement "N-Transform SuperFeature" hash from Shilane-2012 - - use the hash for detection priority 2 ---------- @@ -10,6 +10,20 @@ import ( "path" ) +type ChunkReader interface { + io.Reader + io.ByteReader +} + +type Chunk interface { + Reader() ChunkReader +} + +type StoredChunk interface { + Chunk + Id() *ChunkId +} + type ChunkId struct { Ver int Idx uint64 @@ -24,45 +38,50 @@ func (i *ChunkId) Reader(repo string) ChunkReader { return bufio.NewReaderSize(f, chunkSize) } -type ChunkReader interface { - io.Reader - io.ByteReader +func NewLoadedChunk(id *ChunkId, value []byte) *LoadedChunk { + return &LoadedChunk{id: id, value: value} } -type Chunk struct { - Repo *Repo - Id *ChunkId - Value []byte +type LoadedChunk struct { + id *ChunkId + value []byte } -func (c *Chunk) Read(buff []byte) (int, error) { - r, err := c.Reader() - if err != nil { - return 0, err - } - return r.Read(buff) +func (c *LoadedChunk) Id() *ChunkId { + return c.id } -func (c *Chunk) Reader() (ChunkReader, error) { - if c.Value != nil { - log.Printf("Chunk %d: Reading from in-memory value\n", c.Id) - return bytes.NewReader(c.Value), nil - } - if c.Id != nil { - log.Printf("Chunk %d: Reading from file\n", c.Id) - return c.Id.Reader(c.Repo.path), nil - } - return nil, &ChunkError{"Uninitialized chunk"} +func (c *LoadedChunk) Reader() ChunkReader { + // log.Printf("Chunk %d: Reading from in-memory value\n", c.id) + return bytes.NewReader(c.value) +} + +func NewChunkFile(repo *Repo, id *ChunkId) *ChunkFile { + return &ChunkFile{repo: repo, id: id} +} + +type ChunkFile struct { + repo *Repo + id *ChunkId +} + +func (c *ChunkFile) Id() *ChunkId { + return c.id +} + +func (c *ChunkFile) Reader() ChunkReader { + // log.Printf("Chunk %d: Reading from file\n", c.id) + return c.id.Reader(c.repo.path) } -func (c *Chunk) isStored() bool { - return c.Id != nil +func NewTempChunk(value []byte) *TempChunk { + return &TempChunk{value: value} } -type ChunkError struct { - err string +type TempChunk struct { + value []byte } -func (e *ChunkError) Error() string { - return fmt.Sprintf("Chunk error: %s", e.err) +func (c *TempChunk) Reader() ChunkReader { + return bytes.NewReader(c.value) } diff --git a/chunk_test.go b/chunk_test.go deleted file mode 100644 index 4ea6b44..0000000 --- a/chunk_test.go +++ /dev/null @@ -1,14 +0,0 @@ -package main - -import "testing" - -func TestIsStored(t *testing.T) { - stored := Chunk{Id: &ChunkId{0, 0}} - if !stored.isStored() { - t.Error("Chunk ", stored, " should be stored") - } - unstored := Chunk{} - if unstored.isStored() { - t.Error("Chunk ", unstored, " should not be stored") - } -} @@ -5,5 +5,4 @@ go 1.16 require ( github.com/chmduquesne/rollinghash v4.0.0+incompatible github.com/gabstv/go-bsdiff v1.0.5 - github.com/google/go-cmp v0.5.6 ) @@ -4,6 +4,3 @@ github.com/dsnet/compress v0.0.0-20171208185109-cc9eb1d7ad76 h1:eX+pdPPlD279OWgd github.com/dsnet/compress v0.0.0-20171208185109-cc9eb1d7ad76/go.mod h1:KjxHHirfLaw19iGT70HvVjHQsL1vq1SRQB4yOsAfy2s= github.com/gabstv/go-bsdiff v1.0.5 h1:g29MC/38Eaig+iAobW10/CiFvPtin8U3Jj4yNLcNG9k= github.com/gabstv/go-bsdiff v1.0.5/go.mod h1:/Zz6GK+/f/TMylRtVaW3uwZlb0FZITILfA0q12XKGwg= -github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= -github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -65,7 +65,7 @@ func (r *Repo) Commit(source string) { os.Mkdir(newPath, 0775) os.Mkdir(newChunkPath, 0775) reader, writer := io.Pipe() - oldChunks := make(chan Chunk, 16) + oldChunks := make(chan StoredChunk, 16) files := listFiles(source) go r.loadChunks(versions, oldChunks) go concatFiles(files, writer) @@ -178,7 +178,7 @@ func storeChunks(dest string, chunks <-chan []byte) { } } -func (r *Repo) loadChunks(versions []string, chunks chan<- Chunk) { +func (r *Repo) loadChunks(versions []string, chunks chan<- StoredChunk) { for i, v := range versions { p := path.Join(v, chunksName) entries, err := os.ReadDir(p) @@ -194,14 +194,13 @@ func (r *Repo) loadChunks(versions []string, chunks chan<- Chunk) { if err != nil { log.Printf("Error reading chunk '%s': %s", f, err.Error()) } - c := Chunk{ - Repo: r, - Id: &ChunkId{ + c := NewLoadedChunk( + &ChunkId{ Ver: i, Idx: uint64(j), }, - Value: buff, - } + buff, + ) chunks <- c } } @@ -213,25 +212,26 @@ func (r *Repo) loadChunks(versions []string, chunks chan<- Chunk) { // For each chunk, both a fingerprint (hash over the full content) and a sketch // (resemblance hash based on maximal values of regions) are calculated and // stored in an hashmap which are then returned. -func hashChunks(chunks <-chan Chunk) (FingerprintMap, SketchMap) { +func hashChunks(chunks <-chan StoredChunk) (FingerprintMap, SketchMap) { fingerprints := make(FingerprintMap) sketches := make(SketchMap) hasher := hash.Hash64(rabinkarp64.New()) for c := range chunks { hasher.Reset() - hasher.Write(c.Value) + io.Copy(hasher, c.Reader()) h := hasher.Sum64() - fingerprints[h] = c.Id + fingerprints[h] = c.Id() sketch, _ := SketchChunk(c, 32, 3, 4) for _, s := range sketch { - sketches[s] = append(sketches[s], c.Id) + sketches[s] = append(sketches[s], c.Id()) } } return fingerprints, sketches } func findSimilarChunks(chunks []Chunk, sketches SketchMap) { - for _, c := range chunks { + for i, c := range chunks { + log.Println("New chunk:", i) sketch, _ := SketchChunk(c, 32, 3, 4) for _, s := range sketch { chunkId, exists := sketches[s] @@ -254,7 +254,7 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun bufStream := bufio.NewReaderSize(stream, chunkSize) buff, err := readChunk(bufStream) if err == io.EOF { - chunks = append(chunks, Chunk{Repo: r, Value: buff}) + chunks = append(chunks, NewTempChunk(buff)) return chunks } hasher := rabinkarp64.New() @@ -266,10 +266,10 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun if exists { if len(buff) > 0 { log.Printf("Add new partial chunk of size: %d\n", len(buff)) - chunks = append(chunks, Chunk{Repo: r, Value: buff[:chunkSize]}) + chunks = append(chunks, NewTempChunk(buff[:chunkSize])) } log.Printf("Add existing chunk: %d\n", chunkId) - chunks = append(chunks, Chunk{Repo: r, Id: chunkId}) + chunks = append(chunks, NewChunkFile(r, chunkId)) buff = make([]byte, 0, chunkSize) for i := 0; i < chunkSize && err == nil; i++ { b, err = bufStream.ReadByte() @@ -279,7 +279,7 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun } if len(buff) == chunkSize { log.Println("Add new chunk") - chunks = append(chunks, Chunk{Repo: r, Value: buff}) + chunks = append(chunks, NewTempChunk(buff)) buff = make([]byte, 0, chunkSize) } b, err = bufStream.ReadByte() @@ -289,7 +289,7 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun } } if len(buff) > 0 { - chunks = append(chunks, Chunk{Repo: r, Value: buff}) + chunks = append(chunks, NewTempChunk(buff)) } return chunks } @@ -300,7 +300,8 @@ func extractNewChunks(chunks []Chunk) (ret [][]Chunk) { var i int ret = append(ret, nil) for _, c := range chunks { - if c.isStored() { + _, isTmp := c.(*TempChunk) + if !isTmp { if len(ret[i]) != 0 { i++ ret = append(ret, nil) diff --git a/repo_test.go b/repo_test.go index d9c938d..40d4374 100644 --- a/repo_test.go +++ b/repo_test.go @@ -7,10 +7,10 @@ import ( "log" "os" "path" + "reflect" "testing" "github.com/gabstv/go-bsdiff/pkg/bsdiff" - "github.com/google/go-cmp/cmp" ) func chunkCompare(t *testing.T, dataDir string, testFiles []string, chunkCount int) { @@ -91,7 +91,7 @@ func TestLoadChunks(t *testing.T) { reader2, writer2 := io.Pipe() chunks1 := make(chan []byte, 16) chunks2 := make(chan []byte, 16) - chunks3 := make(chan Chunk, 16) + chunks3 := make(chan StoredChunk, 16) files := listFiles(dataDir) go concatFiles(files, writer1) go concatFiles(files, writer2) @@ -104,10 +104,14 @@ func TestLoadChunks(t *testing.T) { i := 0 for c2 := range chunks2 { c3 := <-chunks3 - if bytes.Compare(c2, c3.Value) != 0 { + buff, err := io.ReadAll(c3.Reader()) + if err != nil { + t.Errorf("Error reading from chunk %d: %s\n", c3, err) + } + if bytes.Compare(c2, buff) != 0 { t.Errorf("Chunk %d does not match file content", i) t.Log("Expected: ", c2[:10], "...") - t.Log("Actual:", c3.Value) + t.Log("Actual:", buff) } i++ } @@ -115,11 +119,11 @@ func TestLoadChunks(t *testing.T) { func TestExtractNewChunks(t *testing.T) { chunks := []Chunk{ - {Value: []byte{'a'}}, - {Id: &ChunkId{0, 0}}, - {Value: []byte{'b'}}, - {Value: []byte{'c'}}, - {Id: &ChunkId{0, 1}}, + &TempChunk{value: []byte{'a'}}, + &LoadedChunk{id: &ChunkId{0, 0}}, + &TempChunk{value: []byte{'b'}}, + &TempChunk{value: []byte{'c'}}, + &LoadedChunk{id: &ChunkId{0, 1}}, } newChunks := extractNewChunks(chunks) if len(newChunks) != 2 { @@ -130,7 +134,7 @@ func TestExtractNewChunks(t *testing.T) { t.Error("New chunks second slice should contain 2 chunks") t.Log("Actual: ", newChunks[0]) } - if !cmp.Equal(newChunks[1][0], chunks[2]) { + if !reflect.DeepEqual(newChunks[1][0], chunks[2]) { t.Error("New chunks do not match") t.Log("Expected: ", chunks[2]) t.Log("Actual: ", newChunks[1][0]) @@ -170,11 +174,12 @@ func TestBsdiff(t *testing.T) { go chunkStream(reader, chunks) storeChunks(resultChunks, chunks) - input, _ := ioutil.ReadFile(path.Join(dataDir, "1", "logTest.log")) + input := []byte("hello") ioutil.WriteFile(addedFile, input, 0664) + defer os.Remove(addedFile) reader, writer = io.Pipe() - oldChunks := make(chan Chunk, 16) + oldChunks := make(chan StoredChunk, 16) files = listFiles(dataDir) repo := NewRepo(resultDir) versions := repo.loadVersions() @@ -183,20 +188,19 @@ func TestBsdiff(t *testing.T) { fingerprints, sketches := hashChunks(oldChunks) recipe := repo.matchStream(reader, fingerprints) buff := new(bytes.Buffer) - r2, _ := recipe[2].Reader() - r0, _ := recipe[0].Reader() + r2 := recipe[2].Reader() + r0 := recipe[0].Reader() bsdiff.Reader(r2, r0, buff) - if len(buff.Bytes()) < 500 { - t.Errorf("Bsdiff of chunk is too small: %d", len(buff.Bytes())) + log.Println("Diff size:", buff.Len()) + if buff.Len() < 500 { + t.Errorf("Bsdiff of chunk is too small: %d", buff.Len()) } - if len(buff.Bytes()) >= chunkSize { - t.Errorf("Bsdiff of chunk is too large: %d", len(buff.Bytes())) + if buff.Len() >= chunkSize { + t.Errorf("Bsdiff of chunk is too large: %d", buff.Len()) } newChunks := extractNewChunks(recipe) log.Println("Checking new chunks:", len(newChunks[0])) for _, c := range newChunks { findSimilarChunks(c, sketches) } - - os.Remove(addedFile) } @@ -20,10 +20,7 @@ func SketchChunk(chunk Chunk, wSize int, sfCount int, fCount int) (Sketch, error superfeatures := make([]uint64, 0, sfCount) features := make([]uint64, 0, fCount) buff := make([]byte, fBytes*fCount) - r, err := chunk.Reader() - if err != nil { - return nil, err - } + r := chunk.Reader() hasher := rabinkarp64.New() for sf := 0; sf < sfCount; sf++ { features = features[:0] diff --git a/sketch_test.go b/sketch_test.go index 2f568e6..86a2c58 100644 --- a/sketch_test.go +++ b/sketch_test.go @@ -2,14 +2,13 @@ package main import ( "path" + "reflect" "testing" - - "github.com/google/go-cmp/cmp" ) func TestSketchChunk(t *testing.T) { dataDir := path.Join("test", "data", "repo_8k") - chunks := make(chan Chunk, 16) + chunks := make(chan StoredChunk, 16) repo := NewRepo(dataDir) versions := repo.loadVersions() go repo.loadChunks(versions, chunks) @@ -21,7 +20,7 @@ func TestSketchChunk(t *testing.T) { t.Error(err) } expected := Sketch{429857165471867, 6595034117354675, 8697818304802825} - if !cmp.Equal(sketch, expected) { + if !reflect.DeepEqual(sketch, expected) { t.Errorf("Sketch does not match, expected: %d, actual: %d", expected, sketch) } } |