/* Manage a deduplicated versionned backups repository. Sample repository: ``` repo/ ├── 00000/ │ ├── chunks/ │ │ ├── 000000000000000 │ │ ├── 000000000000001 │ │ ├── 000000000000002 │ │ ├── 000000000000003 │ ├── files │ └── recipe └── 00001/ ├── chunks/ │ ├── 000000000000000 │ ├── 000000000000001 ├── files └── recipe ``` */ package main import ( "bufio" "bytes" "encoding/gob" "errors" "fmt" "hash" "io" "io/fs" "log" "os" "path" "path/filepath" "reflect" "strings" "github.com/chmduquesne/rollinghash/rabinkarp64" "github.com/n-peugnet/dna-backup/cache" "github.com/n-peugnet/dna-backup/sketch" "github.com/n-peugnet/dna-backup/utils" ) type FingerprintMap map[uint64]*ChunkId type SketchMap map[uint64][]*ChunkId type Repo struct { path string chunkSize int sketchWSize int sketchSfCount int sketchFCount int pol rabinkarp64.Pol differ Differ patcher Patcher fingerprints FingerprintMap sketches SketchMap chunkCache cache.Cacher } type File struct { Path string Size int64 } func NewRepo(path string) *Repo { err := os.MkdirAll(path, 0775) // if err != nil { // log.Panicln(err) // } var seed int64 = 1 p, err := rabinkarp64.RandomPolynomial(seed) if err != nil { log.Panicln(err) } return &Repo{ path: path, chunkSize: 8 << 10, sketchWSize: 32, sketchSfCount: 3, sketchFCount: 4, pol: p, differ: &Bsdiff{}, patcher: &Bsdiff{}, fingerprints: make(FingerprintMap), sketches: make(SketchMap), chunkCache: cache.NewFifoCache(1000), } } func (r *Repo) Differ() Differ { return r.differ } func (r *Repo) Patcher() Patcher { return r.patcher } func (r *Repo) Commit(source string) { source = utils.TrimTrailingSeparator(source) versions := r.loadVersions() newVersion := len(versions) // TODO: add newVersion functino newPath := path.Join(r.path, fmt.Sprintf(versionFmt, newVersion)) newChunkPath := path.Join(newPath, chunksName) newFilesPath := path.Join(newPath, filesName) newRecipePath := path.Join(newPath, recipeName) os.Mkdir(newPath, 0775) // TODO: handle errors os.Mkdir(newChunkPath, 0775) // TODO: handle errors reader, writer := io.Pipe() oldChunks := make(chan IdentifiedChunk, 16) files := listFiles(source) go r.loadChunks(versions, oldChunks) go concatFiles(files, writer) r.hashChunks(oldChunks) recipe := r.matchStream(reader, newVersion) storeRecipe(newRecipePath, recipe) storeFileList(newFilesPath, unprefixFiles(files, source)) fmt.Println(files) } func (r *Repo) Restore(destination string) { versions := r.loadVersions() latest := versions[len(versions)-1] latestFilesPath := path.Join(latest, filesName) latestRecipePath := path.Join(latest, recipeName) files := loadFileList(latestFilesPath) recipe := loadRecipe(latestRecipePath) reader, writer := io.Pipe() go r.restoreStream(writer, recipe) bufReader := bufio.NewReaderSize(reader, r.chunkSize*2) for _, file := range files { filePath := path.Join(destination, file.Path) dir := filepath.Dir(filePath) os.MkdirAll(dir, 0775) // TODO: handle errors f, _ := os.Create(filePath) // TODO: handle errors n, err := io.CopyN(f, bufReader, file.Size) if err != nil { log.Printf("Error storing file content for '%s', written %d/%d bytes: %s\n", filePath, n, file.Size, err) } } } func (r *Repo) loadVersions() []string { var versions []string files, err := os.ReadDir(r.path) if err != nil { log.Fatalln(err) } for _, f := range files { if !f.IsDir() { continue } versions = append(versions, path.Join(r.path, f.Name())) } return versions } func listFiles(path string) []File { var files []File err := filepath.Walk(path, func(p string, i fs.FileInfo, err error) error { if err != nil { log.Println(err) return err } if i.IsDir() { return nil } files = append(files, File{p, i.Size()}) return nil }) if err != nil { log.Println(err) } return files } func unprefixFiles(files []File, prefix string) (ret []File) { ret = make([]File, len(files)) preSize := len(prefix) for i, f := range files { if !strings.HasPrefix(f.Path, prefix) { log.Println("Warning", f.Path, "is not prefixed by", prefix) } else { f.Path = f.Path[preSize:] } ret[i] = f } return } func concatFiles(files []File, stream io.WriteCloser) { for _, f := range files { file, err := os.Open(f.Path) if err != nil { log.Printf("Error reading file '%s': %s\n", f.Path, err) continue } io.Copy(stream, file) } stream.Close() } func storeFileList(dest string, files []File) { file, err := os.Create(dest) if err == nil { encoder := gob.NewEncoder(file) err = encoder.Encode(files) } if err != nil { log.Panicln(err) } if err = file.Close(); err != nil { log.Panicln(err) } } func loadFileList(path string) []File { var files []File file, err := os.Open(path) if err == nil { decoder := gob.NewDecoder(file) err = decoder.Decode(&files) } if err != nil { log.Panicln(err) } if err = file.Close(); err != nil { log.Panicln(err) } return files } func (r *Repo) StoreChunkContent(id *ChunkId, reader io.Reader) error { path := id.Path(r.path) file, err := os.Create(path) if err != nil { return errors.New(fmt.Sprintf("Error creating chunk for '%s'; %s\n", path, err)) } n, err := io.Copy(file, reader) if err != nil { return errors.New(fmt.Sprintf("Error writing chunk content for '%s', written %d bytes: %s\n", path, n, err)) } if err := file.Close(); err != nil { return errors.New(fmt.Sprintf("Error closing chunk for '%s': %s\n", path, err)) } return nil } // LoadChunkContent loads a chunk from the repo. // If the chunk is in cache, get it from cache, else read it from drive. func (r *Repo) LoadChunkContent(id *ChunkId) *bytes.Reader { value, exists := r.chunkCache.Get(id) if !exists { path := id.Path(r.path) f, err := os.Open(path) if err != nil { log.Printf("Cannot open chunk '%s': %s\n", path, err) } value, err = io.ReadAll(f) if err != nil { log.Panicf("Could not read from chunk '%s': %s\n", path, err) } r.chunkCache.Set(id, value) } return bytes.NewReader(value) } // TODO: use atoi for chunkid func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) { for i, v := range versions { p := path.Join(v, chunksName) entries, err := os.ReadDir(p) if err != nil { log.Printf("Error reading version '%05d' in '%s' chunks: %s", i, v, err) } for j, e := range entries { if e.IsDir() { continue } f := path.Join(p, e.Name()) buff, err := os.ReadFile(f) if err != nil { log.Printf("Error reading chunk '%s': %s", f, err.Error()) } c := NewLoadedChunk( &ChunkId{ Ver: i, Idx: uint64(j), }, buff, ) chunks <- c } } close(chunks) } func (r *Repo) chunkMinLen() int { return sketch.SuperFeatureSize(r.chunkSize, r.sketchSfCount, r.sketchFCount) } // hashChunks calculates the hashes for a channel of chunks. // // For each chunk, both a fingerprint (hash over the full content) and a sketch // (resemblance hash based on maximal values of regions) are calculated and // stored in an hashmap. func (r *Repo) hashChunks(chunks <-chan IdentifiedChunk) { hasher := rabinkarp64.NewFromPol(r.pol) for c := range chunks { r.hashAndStoreChunk(c.GetId(), c.Reader(), hasher) } } func (r *Repo) hashAndStoreChunk(id *ChunkId, reader io.Reader, hasher hash.Hash64) { var chunk bytes.Buffer hasher.Reset() reader = io.TeeReader(reader, &chunk) io.Copy(hasher, reader) fingerprint := hasher.Sum64() sketch, _ := sketch.SketchChunk(&chunk, r.pol, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount) r.storeChunkId(id, fingerprint, sketch) } func (r *Repo) storeChunkId(id *ChunkId, fingerprint uint64, sketch []uint64) { r.fingerprints[fingerprint] = id for _, s := range sketch { prev := r.sketches[s] if contains(prev, id) { continue } r.sketches[s] = append(prev, id) } } func contains(s []*ChunkId, id *ChunkId) bool { for _, v := range s { if v == id { return true } } return false } func (r *Repo) findSimilarChunk(chunk Chunk) (*ChunkId, bool) { var similarChunks = make(map[ChunkId]int) var max int var similarChunk *ChunkId sketch, _ := sketch.SketchChunk(chunk.Reader(), r.pol, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount) for _, s := range sketch { chunkIds, exists := r.sketches[s] if !exists { continue } for _, id := range chunkIds { count := similarChunks[*id] count += 1 log.Printf("Found %d %d time(s)", id, count) if count > max { similarChunk = id } similarChunks[*id] = count } } return similarChunk, similarChunk != nil } func (r *Repo) tryDeltaEncodeChunk(temp BufferedChunk) (Chunk, bool) { id, found := r.findSimilarChunk(temp) if found { var buff bytes.Buffer if err := r.differ.Diff(r.LoadChunkContent(id), temp.Reader(), &buff); err != nil { log.Println("Error trying delta encode chunk:", temp, "with source:", id, ":", err) } else { return &DeltaChunk{ repo: r, Source: id, Patch: buff.Bytes(), Size: temp.Len(), }, true } } return temp, false } // encodeTempChunk first tries to delta-encode the given chunk before attributing // it an Id and saving it into the fingerprints and sketches maps. func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64) (chunk Chunk, isDelta bool) { chunk, isDelta = r.tryDeltaEncodeChunk(temp) if isDelta { log.Println("Add new delta chunk") return } if chunk.Len() == r.chunkSize { id := &ChunkId{Ver: version, Idx: *last} *last++ hasher := rabinkarp64.NewFromPol(r.pol) r.hashAndStoreChunk(id, temp.Reader(), hasher) r.StoreChunkContent(id, temp.Reader()) log.Println("Add new chunk", id) return NewStoredChunk(r, id), false } log.Println("Add new partial chunk of size:", chunk.Len()) return } // encodeTempChunks encodes the current temporary chunks based on the value of the previous one. // Temporary chunks can be partial. If the current chunk is smaller than the size of a // super-feature and there exists a previous chunk, then both are merged before attempting // to delta-encode them. func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version int, last *uint64) []Chunk { if reflect.ValueOf(prev).IsNil() { c, _ := r.encodeTempChunk(curr, version, last) return []Chunk{c} } else if curr.Len() < r.chunkMinLen() { c, success := r.encodeTempChunk(NewTempChunk(append(prev.Bytes(), curr.Bytes()...)), version, last) if success { return []Chunk{c} } else { return []Chunk{prev, curr} } } else { prevD, _ := r.encodeTempChunk(prev, version, last) currD, _ := r.encodeTempChunk(curr, version, last) return []Chunk{prevD, currD} } } func (r *Repo) matchStream(stream io.Reader, version int) []Chunk { var b byte var chunks []Chunk var prev *TempChunk var last uint64 var err error bufStream := bufio.NewReaderSize(stream, r.chunkSize*2) buff := make([]byte, r.chunkSize, r.chunkSize*2) if n, err := io.ReadFull(stream, buff); n < r.chunkSize { if err == io.EOF { chunks = append(chunks, NewTempChunk(buff[:n])) return chunks } else { log.Panicf("Error Read only %d bytes with error '%s'\n", n, err) } } hasher := rabinkarp64.NewFromPol(r.pol) hasher.Write(buff) for err != io.EOF { h := hasher.Sum64() chunkId, exists := r.fingerprints[h] if exists { if len(buff) > r.chunkSize && len(buff) < r.chunkSize*2 { size := len(buff) - r.chunkSize temp := NewTempChunk(buff[:size]) chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last)...) prev = nil } else if prev != nil { c, _ := r.encodeTempChunk(prev, version, &last) chunks = append(chunks, c) prev = nil } log.Printf("Add existing chunk: %d\n", chunkId) chunks = append(chunks, NewStoredChunk(r, chunkId)) buff = make([]byte, 0, r.chunkSize*2) for i := 0; i < r.chunkSize && err == nil; i++ { b, err = bufStream.ReadByte() if err != io.EOF { hasher.Roll(b) buff = append(buff, b) } } continue } if len(buff) == r.chunkSize*2 { if prev != nil { chunk, _ := r.encodeTempChunk(prev, version, &last) chunks = append(chunks, chunk) } prev = NewTempChunk(buff[:r.chunkSize]) tmp := buff[r.chunkSize:] buff = make([]byte, r.chunkSize, r.chunkSize*2) copy(buff, tmp) } b, err = bufStream.ReadByte() if err != io.EOF { hasher.Roll(b) buff = append(buff, b) } } if len(buff) > 0 { var temp *TempChunk if len(buff) > r.chunkSize { if prev != nil { chunk, _ := r.encodeTempChunk(prev, version, &last) chunks = append(chunks, chunk) } prev = NewTempChunk(buff[:r.chunkSize]) temp = NewTempChunk(buff[r.chunkSize:]) } else { temp = NewTempChunk(buff) } chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last)...) } return chunks } func (r *Repo) restoreStream(stream io.WriteCloser, recipe []Chunk) { for _, c := range recipe { if rc, isRepo := c.(RepoChunk); isRepo { rc.SetRepo(r) } if n, err := io.Copy(stream, c.Reader()); err != nil { log.Printf("Error copying to stream, read %d bytes from chunk: %s\n", n, err) } } stream.Close() } func storeRecipe(dest string, recipe []Chunk) { gob.Register(&StoredChunk{}) gob.Register(&TempChunk{}) gob.Register(&DeltaChunk{}) file, err := os.Create(dest) if err == nil { encoder := gob.NewEncoder(file) for _, c := range recipe { if err = encoder.Encode(&c); err != nil { log.Panicln(err) } } } if err != nil { log.Panicln(err) } if err = file.Close(); err != nil { log.Panicln(err) } } func loadRecipe(path string) []Chunk { var recipe []Chunk gob.Register(&StoredChunk{}) gob.Register(&TempChunk{}) gob.Register(&DeltaChunk{}) file, err := os.Open(path) if err == nil { decoder := gob.NewDecoder(file) for i := 0; err == nil; i++ { var c Chunk if err = decoder.Decode(&c); err == nil { recipe = append(recipe, c) } } } if err != nil && err != io.EOF { log.Panicln(err) } if err = file.Close(); err != nil { log.Panicln(err) } return recipe } func extractDeltaChunks(chunks []Chunk) (ret []*DeltaChunk) { for _, c := range chunks { tmp, isDelta := c.(*DeltaChunk) if isDelta { ret = append(ret, tmp) } } return }