From 78251f11c91b2504edfc02b760ef53bd352b856c Mon Sep 17 00:00:00 2001 From: n-peugnet Date: Thu, 26 Aug 2021 19:21:46 +0200 Subject: add SketchChunk function --- repo.go | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) (limited to 'repo.go') diff --git a/repo.go b/repo.go index f893b47..833bfdf 100644 --- a/repo.go +++ b/repo.go @@ -64,7 +64,7 @@ func (r *Repo) Commit(source string) { reader, writer := io.Pipe() oldChunks := make(chan Chunk, 16) files := listFiles(source) - go loadChunks(versions, oldChunks) + go r.loadChunks(versions, oldChunks) go concatFiles(files, writer) hashes := hashChunks(oldChunks) chunks := r.matchStream(reader, hashes) @@ -75,7 +75,7 @@ func (r *Repo) Commit(source string) { } func (r *Repo) loadVersions() []string { - versions := make([]string, 0) + var versions []string files, err := os.ReadDir(r.path) if err != nil { log.Fatalln(err) @@ -155,7 +155,7 @@ func storeFileList(dest string, files []File) { } func loadFileList(path string) []File { - files := make([]File, 0) + var files []File err := readFile(path, &files) if err != nil { log.Println(err) @@ -181,7 +181,7 @@ func storeChunks(dest string, chunks <-chan []byte) { } } -func loadChunks(versions []string, chunks chan<- Chunk) { +func (r *Repo) loadChunks(versions []string, chunks chan<- Chunk) { for i, v := range versions { p := path.Join(v, chunksName) entries, err := os.ReadDir(p) @@ -198,6 +198,7 @@ func loadChunks(versions []string, chunks chan<- Chunk) { log.Printf("Error reading chunk '%s': %s", f, err.Error()) } c := Chunk{ + Repo: r, Id: &ChunkId{ Ver: i, Idx: uint64(j), @@ -230,26 +231,26 @@ func readChunk(stream io.Reader) ([]byte, error) { func (r *Repo) matchStream(stream io.Reader, hashes map[uint64]ChunkId) []Chunk { var b byte - chunks := make([]Chunk, 0) - buff, err := readChunk(stream) + var chunks []Chunk + bufStream := bufio.NewReaderSize(stream, chunkSize) + buff, err := readChunk(bufStream) if err == io.EOF { - chunks = append(chunks, Chunk{Value: buff}) + chunks = append(chunks, Chunk{Repo: r, Value: buff}) return chunks } - bufStream := bufio.NewReader(stream) hasher := rabinkarp64.New() hasher.Write(buff) - buff = make([]byte, 0, chunkSize) + buff = buff[0:0] for err != io.EOF { h := hasher.Sum64() chunkId, exists := hashes[h] if exists { if len(buff) > 0 { log.Printf("Add new partial chunk of size: %d\n", len(buff)) - chunks = append(chunks, Chunk{Value: buff}) + chunks = append(chunks, Chunk{Repo: r, Value: buff}) } - log.Printf("Found existing chunk with offset %d: %d\n", len(buff), chunkId) - chunks = append(chunks, Chunk{Id: &chunkId}) + log.Printf("Add existing chunk: %d\n", chunkId) + chunks = append(chunks, Chunk{Repo: r, Id: &chunkId}) buff = make([]byte, 0, chunkSize) for i := 0; i < chunkSize && err == nil; i++ { b, err = bufStream.ReadByte() @@ -259,7 +260,7 @@ func (r *Repo) matchStream(stream io.Reader, hashes map[uint64]ChunkId) []Chunk } if len(buff) == chunkSize { log.Println("Add new chunk") - chunks = append(chunks, Chunk{Value: buff}) + chunks = append(chunks, Chunk{Repo: r, Value: buff}) buff = make([]byte, 0, chunkSize) } b, err = bufStream.ReadByte() @@ -269,19 +270,21 @@ func (r *Repo) matchStream(stream io.Reader, hashes map[uint64]ChunkId) []Chunk } } if len(buff) > 0 { - chunks = append(chunks, Chunk{Value: buff}) + chunks = append(chunks, Chunk{Repo: r, Value: buff}) } return chunks } +// extractNewChunks extracts new chunks from an array of chunks and +// returns them in an array of consecutive new chunk's array func extractNewChunks(chunks []Chunk) (ret [][]Chunk) { var i int - ret = append(ret, make([]Chunk, 0)) + ret = append(ret, nil) for _, c := range chunks { if c.isStored() { if len(ret[i]) != 0 { i++ - ret = append(ret, make([]Chunk, 0)) + ret = append(ret, nil) } } else { ret[i] = append(ret[i], c) -- cgit v1.2.3