aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--TODO.md3
-rw-r--r--chunk.go77
-rw-r--r--chunk_test.go14
-rw-r--r--go.mod1
-rw-r--r--go.sum3
-rw-r--r--repo.go37
-rw-r--r--repo_test.go44
-rw-r--r--sketch.go5
-rw-r--r--sketch_test.go7
9 files changed, 95 insertions, 96 deletions
diff --git a/TODO.md b/TODO.md
index eb911f3..a55e618 100644
--- a/TODO.md
+++ b/TODO.md
@@ -2,9 +2,6 @@ priority 1
----------
- join non-deduplicated chunks
- choose when and how to
-- detect Similar chunks
- - implement "N-Transform SuperFeature" hash from Shilane-2012
- - use the hash for detection
priority 2
----------
diff --git a/chunk.go b/chunk.go
index 20dfe58..92e3563 100644
--- a/chunk.go
+++ b/chunk.go
@@ -10,6 +10,20 @@ import (
"path"
)
+type ChunkReader interface {
+ io.Reader
+ io.ByteReader
+}
+
+type Chunk interface {
+ Reader() ChunkReader
+}
+
+type StoredChunk interface {
+ Chunk
+ Id() *ChunkId
+}
+
type ChunkId struct {
Ver int
Idx uint64
@@ -24,45 +38,50 @@ func (i *ChunkId) Reader(repo string) ChunkReader {
return bufio.NewReaderSize(f, chunkSize)
}
-type ChunkReader interface {
- io.Reader
- io.ByteReader
+func NewLoadedChunk(id *ChunkId, value []byte) *LoadedChunk {
+ return &LoadedChunk{id: id, value: value}
}
-type Chunk struct {
- Repo *Repo
- Id *ChunkId
- Value []byte
+type LoadedChunk struct {
+ id *ChunkId
+ value []byte
}
-func (c *Chunk) Read(buff []byte) (int, error) {
- r, err := c.Reader()
- if err != nil {
- return 0, err
- }
- return r.Read(buff)
+func (c *LoadedChunk) Id() *ChunkId {
+ return c.id
}
-func (c *Chunk) Reader() (ChunkReader, error) {
- if c.Value != nil {
- log.Printf("Chunk %d: Reading from in-memory value\n", c.Id)
- return bytes.NewReader(c.Value), nil
- }
- if c.Id != nil {
- log.Printf("Chunk %d: Reading from file\n", c.Id)
- return c.Id.Reader(c.Repo.path), nil
- }
- return nil, &ChunkError{"Uninitialized chunk"}
+func (c *LoadedChunk) Reader() ChunkReader {
+ // log.Printf("Chunk %d: Reading from in-memory value\n", c.id)
+ return bytes.NewReader(c.value)
+}
+
+func NewChunkFile(repo *Repo, id *ChunkId) *ChunkFile {
+ return &ChunkFile{repo: repo, id: id}
+}
+
+type ChunkFile struct {
+ repo *Repo
+ id *ChunkId
+}
+
+func (c *ChunkFile) Id() *ChunkId {
+ return c.id
+}
+
+func (c *ChunkFile) Reader() ChunkReader {
+ // log.Printf("Chunk %d: Reading from file\n", c.id)
+ return c.id.Reader(c.repo.path)
}
-func (c *Chunk) isStored() bool {
- return c.Id != nil
+func NewTempChunk(value []byte) *TempChunk {
+ return &TempChunk{value: value}
}
-type ChunkError struct {
- err string
+type TempChunk struct {
+ value []byte
}
-func (e *ChunkError) Error() string {
- return fmt.Sprintf("Chunk error: %s", e.err)
+func (c *TempChunk) Reader() ChunkReader {
+ return bytes.NewReader(c.value)
}
diff --git a/chunk_test.go b/chunk_test.go
deleted file mode 100644
index 4ea6b44..0000000
--- a/chunk_test.go
+++ /dev/null
@@ -1,14 +0,0 @@
-package main
-
-import "testing"
-
-func TestIsStored(t *testing.T) {
- stored := Chunk{Id: &ChunkId{0, 0}}
- if !stored.isStored() {
- t.Error("Chunk ", stored, " should be stored")
- }
- unstored := Chunk{}
- if unstored.isStored() {
- t.Error("Chunk ", unstored, " should not be stored")
- }
-}
diff --git a/go.mod b/go.mod
index faf604d..59fed50 100644
--- a/go.mod
+++ b/go.mod
@@ -5,5 +5,4 @@ go 1.16
require (
github.com/chmduquesne/rollinghash v4.0.0+incompatible
github.com/gabstv/go-bsdiff v1.0.5
- github.com/google/go-cmp v0.5.6
)
diff --git a/go.sum b/go.sum
index e450c22..24e690c 100644
--- a/go.sum
+++ b/go.sum
@@ -4,6 +4,3 @@ github.com/dsnet/compress v0.0.0-20171208185109-cc9eb1d7ad76 h1:eX+pdPPlD279OWgd
github.com/dsnet/compress v0.0.0-20171208185109-cc9eb1d7ad76/go.mod h1:KjxHHirfLaw19iGT70HvVjHQsL1vq1SRQB4yOsAfy2s=
github.com/gabstv/go-bsdiff v1.0.5 h1:g29MC/38Eaig+iAobW10/CiFvPtin8U3Jj4yNLcNG9k=
github.com/gabstv/go-bsdiff v1.0.5/go.mod h1:/Zz6GK+/f/TMylRtVaW3uwZlb0FZITILfA0q12XKGwg=
-github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
-github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
-golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
diff --git a/repo.go b/repo.go
index 0779bf0..599b8da 100644
--- a/repo.go
+++ b/repo.go
@@ -65,7 +65,7 @@ func (r *Repo) Commit(source string) {
os.Mkdir(newPath, 0775)
os.Mkdir(newChunkPath, 0775)
reader, writer := io.Pipe()
- oldChunks := make(chan Chunk, 16)
+ oldChunks := make(chan StoredChunk, 16)
files := listFiles(source)
go r.loadChunks(versions, oldChunks)
go concatFiles(files, writer)
@@ -178,7 +178,7 @@ func storeChunks(dest string, chunks <-chan []byte) {
}
}
-func (r *Repo) loadChunks(versions []string, chunks chan<- Chunk) {
+func (r *Repo) loadChunks(versions []string, chunks chan<- StoredChunk) {
for i, v := range versions {
p := path.Join(v, chunksName)
entries, err := os.ReadDir(p)
@@ -194,14 +194,13 @@ func (r *Repo) loadChunks(versions []string, chunks chan<- Chunk) {
if err != nil {
log.Printf("Error reading chunk '%s': %s", f, err.Error())
}
- c := Chunk{
- Repo: r,
- Id: &ChunkId{
+ c := NewLoadedChunk(
+ &ChunkId{
Ver: i,
Idx: uint64(j),
},
- Value: buff,
- }
+ buff,
+ )
chunks <- c
}
}
@@ -213,25 +212,26 @@ func (r *Repo) loadChunks(versions []string, chunks chan<- Chunk) {
// For each chunk, both a fingerprint (hash over the full content) and a sketch
// (resemblance hash based on maximal values of regions) are calculated and
// stored in an hashmap which are then returned.
-func hashChunks(chunks <-chan Chunk) (FingerprintMap, SketchMap) {
+func hashChunks(chunks <-chan StoredChunk) (FingerprintMap, SketchMap) {
fingerprints := make(FingerprintMap)
sketches := make(SketchMap)
hasher := hash.Hash64(rabinkarp64.New())
for c := range chunks {
hasher.Reset()
- hasher.Write(c.Value)
+ io.Copy(hasher, c.Reader())
h := hasher.Sum64()
- fingerprints[h] = c.Id
+ fingerprints[h] = c.Id()
sketch, _ := SketchChunk(c, 32, 3, 4)
for _, s := range sketch {
- sketches[s] = append(sketches[s], c.Id)
+ sketches[s] = append(sketches[s], c.Id())
}
}
return fingerprints, sketches
}
func findSimilarChunks(chunks []Chunk, sketches SketchMap) {
- for _, c := range chunks {
+ for i, c := range chunks {
+ log.Println("New chunk:", i)
sketch, _ := SketchChunk(c, 32, 3, 4)
for _, s := range sketch {
chunkId, exists := sketches[s]
@@ -254,7 +254,7 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun
bufStream := bufio.NewReaderSize(stream, chunkSize)
buff, err := readChunk(bufStream)
if err == io.EOF {
- chunks = append(chunks, Chunk{Repo: r, Value: buff})
+ chunks = append(chunks, NewTempChunk(buff))
return chunks
}
hasher := rabinkarp64.New()
@@ -266,10 +266,10 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun
if exists {
if len(buff) > 0 {
log.Printf("Add new partial chunk of size: %d\n", len(buff))
- chunks = append(chunks, Chunk{Repo: r, Value: buff[:chunkSize]})
+ chunks = append(chunks, NewTempChunk(buff[:chunkSize]))
}
log.Printf("Add existing chunk: %d\n", chunkId)
- chunks = append(chunks, Chunk{Repo: r, Id: chunkId})
+ chunks = append(chunks, NewChunkFile(r, chunkId))
buff = make([]byte, 0, chunkSize)
for i := 0; i < chunkSize && err == nil; i++ {
b, err = bufStream.ReadByte()
@@ -279,7 +279,7 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun
}
if len(buff) == chunkSize {
log.Println("Add new chunk")
- chunks = append(chunks, Chunk{Repo: r, Value: buff})
+ chunks = append(chunks, NewTempChunk(buff))
buff = make([]byte, 0, chunkSize)
}
b, err = bufStream.ReadByte()
@@ -289,7 +289,7 @@ func (r *Repo) matchStream(stream io.Reader, fingerprints FingerprintMap) []Chun
}
}
if len(buff) > 0 {
- chunks = append(chunks, Chunk{Repo: r, Value: buff})
+ chunks = append(chunks, NewTempChunk(buff))
}
return chunks
}
@@ -300,7 +300,8 @@ func extractNewChunks(chunks []Chunk) (ret [][]Chunk) {
var i int
ret = append(ret, nil)
for _, c := range chunks {
- if c.isStored() {
+ _, isTmp := c.(*TempChunk)
+ if !isTmp {
if len(ret[i]) != 0 {
i++
ret = append(ret, nil)
diff --git a/repo_test.go b/repo_test.go
index d9c938d..40d4374 100644
--- a/repo_test.go
+++ b/repo_test.go
@@ -7,10 +7,10 @@ import (
"log"
"os"
"path"
+ "reflect"
"testing"
"github.com/gabstv/go-bsdiff/pkg/bsdiff"
- "github.com/google/go-cmp/cmp"
)
func chunkCompare(t *testing.T, dataDir string, testFiles []string, chunkCount int) {
@@ -91,7 +91,7 @@ func TestLoadChunks(t *testing.T) {
reader2, writer2 := io.Pipe()
chunks1 := make(chan []byte, 16)
chunks2 := make(chan []byte, 16)
- chunks3 := make(chan Chunk, 16)
+ chunks3 := make(chan StoredChunk, 16)
files := listFiles(dataDir)
go concatFiles(files, writer1)
go concatFiles(files, writer2)
@@ -104,10 +104,14 @@ func TestLoadChunks(t *testing.T) {
i := 0
for c2 := range chunks2 {
c3 := <-chunks3
- if bytes.Compare(c2, c3.Value) != 0 {
+ buff, err := io.ReadAll(c3.Reader())
+ if err != nil {
+ t.Errorf("Error reading from chunk %d: %s\n", c3, err)
+ }
+ if bytes.Compare(c2, buff) != 0 {
t.Errorf("Chunk %d does not match file content", i)
t.Log("Expected: ", c2[:10], "...")
- t.Log("Actual:", c3.Value)
+ t.Log("Actual:", buff)
}
i++
}
@@ -115,11 +119,11 @@ func TestLoadChunks(t *testing.T) {
func TestExtractNewChunks(t *testing.T) {
chunks := []Chunk{
- {Value: []byte{'a'}},
- {Id: &ChunkId{0, 0}},
- {Value: []byte{'b'}},
- {Value: []byte{'c'}},
- {Id: &ChunkId{0, 1}},
+ &TempChunk{value: []byte{'a'}},
+ &LoadedChunk{id: &ChunkId{0, 0}},
+ &TempChunk{value: []byte{'b'}},
+ &TempChunk{value: []byte{'c'}},
+ &LoadedChunk{id: &ChunkId{0, 1}},
}
newChunks := extractNewChunks(chunks)
if len(newChunks) != 2 {
@@ -130,7 +134,7 @@ func TestExtractNewChunks(t *testing.T) {
t.Error("New chunks second slice should contain 2 chunks")
t.Log("Actual: ", newChunks[0])
}
- if !cmp.Equal(newChunks[1][0], chunks[2]) {
+ if !reflect.DeepEqual(newChunks[1][0], chunks[2]) {
t.Error("New chunks do not match")
t.Log("Expected: ", chunks[2])
t.Log("Actual: ", newChunks[1][0])
@@ -170,11 +174,12 @@ func TestBsdiff(t *testing.T) {
go chunkStream(reader, chunks)
storeChunks(resultChunks, chunks)
- input, _ := ioutil.ReadFile(path.Join(dataDir, "1", "logTest.log"))
+ input := []byte("hello")
ioutil.WriteFile(addedFile, input, 0664)
+ defer os.Remove(addedFile)
reader, writer = io.Pipe()
- oldChunks := make(chan Chunk, 16)
+ oldChunks := make(chan StoredChunk, 16)
files = listFiles(dataDir)
repo := NewRepo(resultDir)
versions := repo.loadVersions()
@@ -183,20 +188,19 @@ func TestBsdiff(t *testing.T) {
fingerprints, sketches := hashChunks(oldChunks)
recipe := repo.matchStream(reader, fingerprints)
buff := new(bytes.Buffer)
- r2, _ := recipe[2].Reader()
- r0, _ := recipe[0].Reader()
+ r2 := recipe[2].Reader()
+ r0 := recipe[0].Reader()
bsdiff.Reader(r2, r0, buff)
- if len(buff.Bytes()) < 500 {
- t.Errorf("Bsdiff of chunk is too small: %d", len(buff.Bytes()))
+ log.Println("Diff size:", buff.Len())
+ if buff.Len() < 500 {
+ t.Errorf("Bsdiff of chunk is too small: %d", buff.Len())
}
- if len(buff.Bytes()) >= chunkSize {
- t.Errorf("Bsdiff of chunk is too large: %d", len(buff.Bytes()))
+ if buff.Len() >= chunkSize {
+ t.Errorf("Bsdiff of chunk is too large: %d", buff.Len())
}
newChunks := extractNewChunks(recipe)
log.Println("Checking new chunks:", len(newChunks[0]))
for _, c := range newChunks {
findSimilarChunks(c, sketches)
}
-
- os.Remove(addedFile)
}
diff --git a/sketch.go b/sketch.go
index f226661..db7e4e6 100644
--- a/sketch.go
+++ b/sketch.go
@@ -20,10 +20,7 @@ func SketchChunk(chunk Chunk, wSize int, sfCount int, fCount int) (Sketch, error
superfeatures := make([]uint64, 0, sfCount)
features := make([]uint64, 0, fCount)
buff := make([]byte, fBytes*fCount)
- r, err := chunk.Reader()
- if err != nil {
- return nil, err
- }
+ r := chunk.Reader()
hasher := rabinkarp64.New()
for sf := 0; sf < sfCount; sf++ {
features = features[:0]
diff --git a/sketch_test.go b/sketch_test.go
index 2f568e6..86a2c58 100644
--- a/sketch_test.go
+++ b/sketch_test.go
@@ -2,14 +2,13 @@ package main
import (
"path"
+ "reflect"
"testing"
-
- "github.com/google/go-cmp/cmp"
)
func TestSketchChunk(t *testing.T) {
dataDir := path.Join("test", "data", "repo_8k")
- chunks := make(chan Chunk, 16)
+ chunks := make(chan StoredChunk, 16)
repo := NewRepo(dataDir)
versions := repo.loadVersions()
go repo.loadChunks(versions, chunks)
@@ -21,7 +20,7 @@ func TestSketchChunk(t *testing.T) {
t.Error(err)
}
expected := Sketch{429857165471867, 6595034117354675, 8697818304802825}
- if !cmp.Equal(sketch, expected) {
+ if !reflect.DeepEqual(sketch, expected) {
t.Errorf("Sketch does not match, expected: %d, actual: %d", expected, sketch)
}
}