diff options
author | n-peugnet <n.peugnet@free.fr> | 2021-10-06 11:34:14 +0200 |
---|---|---|
committer | n-peugnet <n.peugnet@free.fr> | 2021-10-06 11:34:14 +0200 |
commit | fccc89de47a3ed5fc576f28f7f02b1111a59c0c4 (patch) | |
tree | 9de36f7e6eab07c78d59428b2345aceab34df9cd /repo/repo.go | |
parent | 9b9710511b0dbe51ac030ef908f9468103b0bd0a (diff) | |
download | dna-backup-fccc89de47a3ed5fc576f28f7f02b1111a59c0c4.tar.gz dna-backup-fccc89de47a3ed5fc576f28f7f02b1111a59c0c4.zip |
refactor: move repo and delta in their own package
Diffstat (limited to 'repo/repo.go')
-rw-r--r-- | repo/repo.go | 784 |
1 files changed, 784 insertions, 0 deletions
diff --git a/repo/repo.go b/repo/repo.go new file mode 100644 index 0000000..97d0b5c --- /dev/null +++ b/repo/repo.go @@ -0,0 +1,784 @@ +/* +Manage a deduplicated versionned backups repository. + +Sample repository: + +``` +repo/ +├── 00000/ +│ ├── chunks/ +│ │ ├── 000000000000000 +│ │ ├── 000000000000001 +│ │ ├── 000000000000002 +│ │ └── 000000000000003 +│ ├── files +│ ├── hashes +│ └── recipe +└── 00001/ + ├── chunks/ + │ ├── 000000000000000 + │ └── 000000000000001 + ├── files + ├── hashes + └── recipe +``` +*/ + +package repo + +import ( + "bufio" + "bytes" + "encoding/gob" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "reflect" + "strings" + + "github.com/chmduquesne/rollinghash/rabinkarp64" + "github.com/n-peugnet/dna-backup/cache" + "github.com/n-peugnet/dna-backup/delta" + "github.com/n-peugnet/dna-backup/logger" + "github.com/n-peugnet/dna-backup/sketch" + "github.com/n-peugnet/dna-backup/slice" + "github.com/n-peugnet/dna-backup/utils" +) + +func init() { + // register chunk structs for encoding/decoding using gob + gob.RegisterName("*dna-backup.StoredChunk", &StoredChunk{}) + gob.RegisterName("*dna-backup.TempChunk", &TempChunk{}) + gob.RegisterName("*dna-backup.DeltaChunk", &DeltaChunk{}) + gob.RegisterName("dna-backup.File", File{}) +} + +type FingerprintMap map[uint64]*ChunkId +type SketchMap map[uint64][]*ChunkId + +func (m SketchMap) Set(key []uint64, value *ChunkId) { + for _, s := range key { + prev := m[s] + if contains(prev, value) { + continue + } + m[s] = append(prev, value) + } +} + +type Repo struct { + path string + chunkSize int + sketchWSize int + sketchSfCount int + sketchFCount int + pol rabinkarp64.Pol + differ delta.Differ + patcher delta.Patcher + fingerprints FingerprintMap + sketches SketchMap + recipe []Chunk + files []File + chunkCache cache.Cacher + chunkReadWrapper utils.ReadWrapper + chunkWriteWrapper utils.WriteWrapper +} + +type chunkHashes struct { + Fp uint64 + Sk []uint64 +} + +type chunkData struct { + hashes chunkHashes + content []byte + id *ChunkId +} + +type File struct { + Path string + Size int64 + Link string +} + +func NewRepo(path string) *Repo { + var err error + path, err = filepath.Abs(path) + if err != nil { + logger.Fatal(err) + } + err = os.MkdirAll(path, 0775) + if err != nil { + logger.Panic(err) + } + var seed int64 = 1 + p, err := rabinkarp64.RandomPolynomial(seed) + if err != nil { + logger.Panic(err) + } + return &Repo{ + path: path, + chunkSize: 8 << 10, + sketchWSize: 32, + sketchSfCount: 3, + sketchFCount: 4, + pol: p, + differ: delta.Fdelta{}, + patcher: delta.Fdelta{}, + fingerprints: make(FingerprintMap), + sketches: make(SketchMap), + chunkCache: cache.NewFifoCache(10000), + chunkReadWrapper: utils.ZlibReader, + chunkWriteWrapper: utils.ZlibWriter, + } +} + +func (r *Repo) Differ() delta.Differ { + return r.differ +} + +func (r *Repo) Patcher() delta.Patcher { + return r.patcher +} + +func (r *Repo) Commit(source string) { + source, err := filepath.Abs(source) + if err != nil { + logger.Fatal(err) + } + versions := r.loadVersions() + newVersion := len(versions) // TODO: add newVersion functino + newPath := filepath.Join(r.path, fmt.Sprintf(versionFmt, newVersion)) + newChunkPath := filepath.Join(newPath, chunksName) + os.Mkdir(newPath, 0775) // TODO: handle errors + os.Mkdir(newChunkPath, 0775) // TODO: handle errors + logger.Info("listing files") + files := listFiles(source) + logger.Info("loading previous hashes") + r.loadHashes(versions) + logger.Info("loading previous file lists") + r.loadFileLists(versions) + logger.Info("loading previous recipies") + r.loadRecipes(versions) + storeQueue := make(chan chunkData, 32) + storeEnd := make(chan bool) + go r.storageWorker(newVersion, storeQueue, storeEnd) + var last, nlast, pass uint64 + var recipe []Chunk + for ; nlast > last || pass == 0; pass++ { + logger.Infof("matcher pass number %d", pass+1) + last = nlast + reader, writer := io.Pipe() + go concatFiles(&files, writer) + recipe, nlast = r.matchStream(reader, storeQueue, newVersion, last) + } + close(storeQueue) + <-storeEnd + r.storeFileList(newVersion, unprefixFiles(files, source)) + r.storeRecipe(newVersion, recipe) +} + +func (r *Repo) Restore(destination string) { + versions := r.loadVersions() + logger.Info("loading previous file lists") + r.loadFileLists(versions) + logger.Info("loading previous recipies") + r.loadRecipes(versions) + reader, writer := io.Pipe() + logger.Info("restoring latest version") + go r.restoreStream(writer, r.recipe) + bufReader := bufio.NewReaderSize(reader, r.chunkSize*2) + for _, file := range r.files { + filePath := filepath.Join(destination, file.Path) + dir := filepath.Dir(filePath) + os.MkdirAll(dir, 0775) // TODO: handle errors + if file.Link != "" { + link := file.Link + if filepath.IsAbs(link) { + filepath.Join(destination, file.Link) + } + err := os.Symlink(link, filePath) + if err != nil { + logger.Errorf("restored symlink ", err) + } + } else { + f, _ := os.Create(filePath) // TODO: handle errors + n, err := io.CopyN(f, bufReader, file.Size) + if err != nil { + logger.Errorf("restored file, written %d/%d bytes: %s", filePath, n, file.Size, err) + } + if err := f.Close(); err != nil { + logger.Errorf("restored file ", err) + } + } + } +} + +func (r *Repo) loadVersions() []string { + var versions []string + files, err := os.ReadDir(r.path) + if err != nil { + logger.Fatal(err) + } + for _, f := range files { + if !f.IsDir() { + continue + } + versions = append(versions, filepath.Join(r.path, f.Name())) + } + return versions +} + +func listFiles(path string) []File { + var files []File + err := filepath.Walk(path, func(p string, i fs.FileInfo, err error) error { + if err != nil { + logger.Warning(err) + return nil + } + if i.IsDir() { + return nil + } + var file = File{Path: p, Size: i.Size()} + if i.Mode()&fs.ModeSymlink != 0 { + file, err = cleanSymlink(path, p, i) + if err != nil { + logger.Warning("skipping symlink ", err) + return nil + } + } + files = append(files, file) + return nil + }) + if err != nil { + logger.Error(err) + } + return files +} + +func cleanSymlink(root string, p string, i fs.FileInfo) (f File, err error) { + dir := filepath.Dir(p) + target, err := os.Readlink(p) + if err != nil { + return + } + isAbs := filepath.IsAbs(target) + cleaned := target + if !isAbs { + cleaned = filepath.Join(dir, cleaned) + } + cleaned = filepath.Clean(cleaned) + if !strings.HasPrefix(cleaned, root) { + err = fmt.Errorf("external %s -> %s", p, cleaned) + return + } + if isAbs { + f.Link, err = utils.Unprefix(cleaned, root) + } else { + f.Link, err = filepath.Rel(dir, filepath.Join(dir, target)) + } + if err != nil { + return + } + if f.Link == "" { + err = fmt.Errorf("empty %s", p) + return + } + f.Path = p + f.Size = 0 + return f, nil +} + +func unprefixFiles(files []File, prefix string) (ret []File) { + var err error + ret = make([]File, len(files)) + for i, f := range files { + if f.Path, err = utils.Unprefix(f.Path, prefix); err != nil { + logger.Warning(err) + } else { + ret[i] = f + } + } + return +} + +// concatFiles reads the content of all the listed files into a continuous stream. +// If any errors are encoutered while opening a file, it is then removed from the +// list. +// +// If read is incomplete, then the actual read size is used. +func concatFiles(files *[]File, stream io.WriteCloser) { + actual := make([]File, 0, len(*files)) + for _, f := range *files { + if f.Link != "" { + actual = append(actual, f) + continue + } + file, err := os.Open(f.Path) + if err != nil { + logger.Warning(err) + continue + } + af := f + if n, err := io.Copy(stream, file); err != nil { + logger.Error("read ", n, " bytes, ", err) + af.Size = n + } + actual = append(actual, af) + if err = file.Close(); err != nil { + logger.Panic(err) + } + } + stream.Close() + *files = actual +} + +func storeBasicStruct(dest string, wrapper utils.WriteWrapper, obj interface{}) { + file, err := os.Create(dest) + if err != nil { + logger.Panic(err) + } + out := wrapper(file) + encoder := gob.NewEncoder(out) + err = encoder.Encode(obj) + if err != nil { + logger.Panic(err) + } + if err = out.Close(); err != nil { + logger.Panic(err) + } + if err = file.Close(); err != nil { + logger.Panic(err) + } +} + +func loadBasicStruct(path string, wrapper utils.ReadWrapper, obj interface{}) { + file, err := os.Open(path) + if err != nil { + logger.Panic(err) + } + in, err := wrapper(file) + if err != nil { + logger.Panic(err) + } + decoder := gob.NewDecoder(in) + err = decoder.Decode(obj) + if err != nil { + logger.Panic(err) + } + if err = in.Close(); err != nil { + logger.Panic(err) + } + if err = file.Close(); err != nil { + logger.Panic(err) + } +} + +func (r *Repo) loadDeltas(versions []string, wrapper utils.ReadWrapper, name string) (ret slice.Slice) { + for _, v := range versions { + path := filepath.Join(v, name) + var delta slice.Delta + loadBasicStruct(path, wrapper, &delta) + ret = slice.Patch(ret, delta) + } + return +} + +func fileList2slice(l []File) (ret slice.Slice) { + ret = make(slice.Slice, len(l)) + for i := range l { + ret[i] = l[i] + } + return +} + +func slice2fileList(s slice.Slice) (ret []File) { + ret = make([]File, len(s)) + for i := range s { + if f, ok := s[i].(File); ok { + ret[i] = f + } else { + logger.Warningf("could not convert %s into a File", s[i]) + } + } + return +} + +// storeFileList stores the given list in the repo dir as a delta against the +// previous version's one. +func (r *Repo) storeFileList(version int, list []File) { + dest := filepath.Join(r.path, fmt.Sprintf(versionFmt, version), filesName) + delta := slice.Diff(fileList2slice(r.files), fileList2slice(list)) + logger.Infof("files delta %s", delta.String()) + storeBasicStruct(dest, r.chunkWriteWrapper, delta) +} + +// loadFileLists loads incrementally the file lists' delta of each given version. +func (r *Repo) loadFileLists(versions []string) { + r.files = slice2fileList(r.loadDeltas(versions, r.chunkReadWrapper, filesName)) +} + +// storageWorker is meant to be started in a goroutine and stores each new chunk's +// data in the repo directory until the store queue channel is closed. +// +// it will put true in the end channel once everything is stored. +func (r *Repo) storageWorker(version int, storeQueue <-chan chunkData, end chan<- bool) { + hashesFile := filepath.Join(r.path, fmt.Sprintf(versionFmt, version), hashesName) + file, err := os.Create(hashesFile) + if err != nil { + logger.Panic(err) + } + encoder := gob.NewEncoder(file) + for data := range storeQueue { + err = encoder.Encode(data.hashes) + r.StoreChunkContent(data.id, bytes.NewReader(data.content)) + // logger.Debug("stored ", data.id) + } + if err = file.Close(); err != nil { + logger.Panic(err) + } + end <- true +} + +func (r *Repo) StoreChunkContent(id *ChunkId, reader io.Reader) { + path := id.Path(r.path) + file, err := os.Create(path) + if err != nil { + logger.Panic("chunk store ", err) + } + wrapper := r.chunkWriteWrapper(file) + n, err := io.Copy(wrapper, reader) + if err != nil { + logger.Errorf("chunk store, %d written, %s", n, err) + } + if err := wrapper.Close(); err != nil { + logger.Warning("chunk store wrapper ", err) + } + if err := file.Close(); err != nil { + logger.Warning("chunk store ", err) + } +} + +// LoadChunkContent loads a chunk from the repo directory. +// If the chunk is in cache, get it from cache, else read it from drive. +func (r *Repo) LoadChunkContent(id *ChunkId) *bytes.Reader { + value, exists := r.chunkCache.Get(id) + if !exists { + path := id.Path(r.path) + f, err := os.Open(path) + if err != nil { + logger.Panic("chunk load ", err) + } + wrapper, err := r.chunkReadWrapper(f) + if err != nil { + logger.Error("chunk load wrapper ", err) + } + value, err = io.ReadAll(wrapper) + if err != nil { + logger.Error("chunk load ", err) + } + if err = wrapper.Close(); err != nil { + logger.Warning("chunk load wrapper", err) + } + if err = f.Close(); err != nil { + logger.Warning("chunk load ", err) + } + r.chunkCache.Set(id, value) + } + return bytes.NewReader(value) +} + +// TODO: use atoi for chunkid ? +func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) { + for i, v := range versions { + p := filepath.Join(v, chunksName) + entries, err := os.ReadDir(p) + if err != nil { + logger.Error("version dir ", err) + } + for j, e := range entries { + if e.IsDir() { + continue + } + id := &ChunkId{Ver: i, Idx: uint64(j)} + c := NewStoredChunk(r, id) + chunks <- c + } + } + close(chunks) +} + +// loadHashes loads and aggregates the hashes stored for each given version and +// stores them in the repo maps. +func (r *Repo) loadHashes(versions []string) { + for i, v := range versions { + path := filepath.Join(v, hashesName) + file, err := os.Open(path) + if err != nil { + logger.Error("hashes ", err) + } + decoder := gob.NewDecoder(file) + for j := 0; err == nil; j++ { + var h chunkHashes + if err = decoder.Decode(&h); err == nil { + id := &ChunkId{i, uint64(j)} + r.fingerprints[h.Fp] = id + r.sketches.Set(h.Sk, id) + } + } + if err != nil && err != io.EOF { + logger.Panic(err) + } + if err = file.Close(); err != nil { + logger.Warning(err) + } + } +} + +func (r *Repo) chunkMinLen() int { + return sketch.SuperFeatureSize(r.chunkSize, r.sketchSfCount, r.sketchFCount) +} + +func contains(s []*ChunkId, id *ChunkId) bool { + for _, v := range s { + if v == id { + return true + } + } + return false +} + +// findSimilarChunk looks in the repo sketch map for a match of the given sketch. +// +// There can be multiple matches but only the best one is returned. Indeed, the +// more superfeature matches, the better the quality of the match. For now we +// consider that a single superfeature match is enough to count it as valid. +func (r *Repo) findSimilarChunk(sketch []uint64) (*ChunkId, bool) { + var similarChunks = make(map[ChunkId]int) + var max int + var similarChunk *ChunkId + for _, s := range sketch { + chunkIds, exists := r.sketches[s] + if !exists { + continue + } + for _, id := range chunkIds { + count := similarChunks[*id] + count += 1 + logger.Debugf("found %d %d time(s)", id, count) + if count > max { + max = count + similarChunk = id + } + similarChunks[*id] = count + } + } + return similarChunk, max > 0 +} + +// encodeTempChunk first tries to delta-encode the given chunk before attributing +// it an Id and saving it into the fingerprints and sketches maps. +func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64, storeQueue chan<- chunkData) (Chunk, bool) { + sk, _ := sketch.SketchChunk(temp.Reader(), r.pol, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount) + id, found := r.findSimilarChunk(sk) + if found { + var buff bytes.Buffer + if err := r.differ.Diff(r.LoadChunkContent(id), temp.Reader(), &buff); err != nil { + logger.Error("trying delta encode chunk:", temp, "with source:", id, ":", err) + } else { + logger.Debugf("add new delta chunk of size %d", len(buff.Bytes())) + return &DeltaChunk{ + repo: r, + Source: id, + Patch: buff.Bytes(), + Size: temp.Len(), + }, true + } + } + if temp.Len() == r.chunkSize { + id := &ChunkId{Ver: version, Idx: *last} + *last++ + hasher := rabinkarp64.NewFromPol(r.pol) + io.Copy(hasher, temp.Reader()) + fp := hasher.Sum64() + r.fingerprints[fp] = id + r.sketches.Set(sk, id) + storeQueue <- chunkData{ + hashes: chunkHashes{fp, sk}, + content: temp.Bytes(), + id: id, + } + r.chunkCache.Set(id, temp.Bytes()) + logger.Debug("add new chunk ", id) + return NewStoredChunk(r, id), false + } + logger.Debug("add new partial chunk of size: ", temp.Len()) + return temp, false +} + +// encodeTempChunks encodes the current temporary chunks based on the value of the previous one. +// Temporary chunks can be partial. If the current chunk is smaller than the size of a +// super-feature and there exists a previous chunk, then both are merged before attempting +// to delta-encode them. +func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version int, last *uint64, storeQueue chan<- chunkData) []Chunk { + if reflect.ValueOf(prev).IsNil() { + c, _ := r.encodeTempChunk(curr, version, last, storeQueue) + return []Chunk{c} + } else if curr.Len() < r.chunkMinLen() { + tmp := NewTempChunk(append(prev.Bytes(), curr.Bytes()...)) + c, success := r.encodeTempChunk(tmp, version, last, storeQueue) + if success { + return []Chunk{c} + } + } + prevD, _ := r.encodeTempChunk(prev, version, last, storeQueue) + currD, _ := r.encodeTempChunk(curr, version, last, storeQueue) + return []Chunk{prevD, currD} +} + +// matchStream is the heart of DNA-backup. Thus, it sounded rude not to add some comment to it. +// +// It applies a rolling hash on the content of a given stream to look for matching fingerprints +// in the repo. If no match is found after the equivalent of three chunks of data are processed, +// then the first unmatched chunk sketch is checked to see if it could be delta-encoded. +// If not, the chunk is then stored as a new chunk for this version and its fingerprint and +// sketch are added to the repo maps. +// +// If a match happens during the processing of the third chunk, then, if possible, the remaining +// of the second chunk is merged with the first one to try to delta encode it at once. +// +// Each time a new chunk is added it is sent to the store worker through the store queue. +func (r *Repo) matchStream(stream io.Reader, storeQueue chan<- chunkData, version int, last uint64) ([]Chunk, uint64) { + var b byte + var chunks []Chunk + var prev *TempChunk + var err error + bufStream := bufio.NewReaderSize(stream, r.chunkSize*2) + buff := make([]byte, r.chunkSize, r.chunkSize*2) + if n, err := io.ReadFull(stream, buff); n < r.chunkSize { + if err == io.ErrUnexpectedEOF { + c, _ := r.encodeTempChunk(NewTempChunk(buff[:n]), version, &last, storeQueue) + chunks = append(chunks, c) + return chunks, last + } else { + logger.Panicf("matching stream, read only %d bytes with error '%s'", n, err) + } + } + hasher := rabinkarp64.NewFromPol(r.pol) + hasher.Write(buff) + for err != io.EOF { + h := hasher.Sum64() + chunkId, exists := r.fingerprints[h] + if exists { + if len(buff) > r.chunkSize && len(buff) <= r.chunkSize*2 { + size := len(buff) - r.chunkSize + temp := NewTempChunk(buff[:size]) + chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...) + prev = nil + } else if prev != nil { + c, _ := r.encodeTempChunk(prev, version, &last, storeQueue) + chunks = append(chunks, c) + prev = nil + } + logger.Debugf("add existing chunk: %d", chunkId) + chunks = append(chunks, NewStoredChunk(r, chunkId)) + buff = make([]byte, 0, r.chunkSize*2) + for i := 0; i < r.chunkSize && err == nil; i++ { + b, err = bufStream.ReadByte() + if err != io.EOF { + hasher.Roll(b) + buff = append(buff, b) + } + } + continue + } + if len(buff) == r.chunkSize*2 { + if prev != nil { + chunk, _ := r.encodeTempChunk(prev, version, &last, storeQueue) + chunks = append(chunks, chunk) + } + prev = NewTempChunk(buff[:r.chunkSize]) + tmp := buff[r.chunkSize:] + buff = make([]byte, r.chunkSize, r.chunkSize*2) + copy(buff, tmp) + } + b, err = bufStream.ReadByte() + if err != io.EOF { + hasher.Roll(b) + buff = append(buff, b) + } + } + if len(buff) > 0 { + var temp *TempChunk + if len(buff) > r.chunkSize { + if prev != nil { + chunk, _ := r.encodeTempChunk(prev, version, &last, storeQueue) + chunks = append(chunks, chunk) + } + prev = NewTempChunk(buff[:r.chunkSize]) + temp = NewTempChunk(buff[r.chunkSize:]) + } else { + temp = NewTempChunk(buff) + } + chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...) + } + return chunks, last +} + +func (r *Repo) restoreStream(stream io.WriteCloser, recipe []Chunk) { + for _, c := range recipe { + if n, err := io.Copy(stream, c.Reader()); err != nil { + logger.Errorf("copying to stream, read %d bytes from chunk: %s", n, err) + } + } + stream.Close() +} + +func recipe2slice(r []Chunk) (ret slice.Slice) { + ret = make(slice.Slice, len(r)) + for i := range r { + ret[i] = r[i] + } + return +} + +func slice2recipe(s slice.Slice) (ret []Chunk) { + ret = make([]Chunk, len(s)) + for i := range s { + if c, ok := s[i].(Chunk); ok { + ret[i] = c + } else { + logger.Warningf("could not convert %s into a Chunk", s[i]) + } + } + return +} + +func (r *Repo) storeRecipe(version int, recipe []Chunk) { + dest := filepath.Join(r.path, fmt.Sprintf(versionFmt, version), recipeName) + delta := slice.Diff(recipe2slice(r.recipe), recipe2slice(recipe)) + logger.Infof("recipe delta %s", delta.String()) + storeBasicStruct(dest, r.chunkWriteWrapper, delta) +} + +func (r *Repo) loadRecipes(versions []string) { + recipe := slice2recipe(r.loadDeltas(versions, r.chunkReadWrapper, recipeName)) + for _, c := range recipe { + if rc, isRepo := c.(RepoChunk); isRepo { + rc.SetRepo(r) + } + } + r.recipe = recipe +} + +func extractDeltaChunks(chunks []Chunk) (ret []*DeltaChunk) { + for _, c := range chunks { + tmp, isDelta := c.(*DeltaChunk) + if isDelta { + ret = append(ret, tmp) + } + } + return +} |