aboutsummaryrefslogtreecommitdiff
path: root/repo/repo.go
diff options
context:
space:
mode:
authorn-peugnet <n.peugnet@free.fr>2021-10-06 11:34:14 +0200
committern-peugnet <n.peugnet@free.fr>2021-10-06 11:34:14 +0200
commitfccc89de47a3ed5fc576f28f7f02b1111a59c0c4 (patch)
tree9de36f7e6eab07c78d59428b2345aceab34df9cd /repo/repo.go
parent9b9710511b0dbe51ac030ef908f9468103b0bd0a (diff)
downloaddna-backup-fccc89de47a3ed5fc576f28f7f02b1111a59c0c4.tar.gz
dna-backup-fccc89de47a3ed5fc576f28f7f02b1111a59c0c4.zip
refactor: move repo and delta in their own package
Diffstat (limited to 'repo/repo.go')
-rw-r--r--repo/repo.go784
1 files changed, 784 insertions, 0 deletions
diff --git a/repo/repo.go b/repo/repo.go
new file mode 100644
index 0000000..97d0b5c
--- /dev/null
+++ b/repo/repo.go
@@ -0,0 +1,784 @@
+/*
+Manage a deduplicated versionned backups repository.
+
+Sample repository:
+
+```
+repo/
+├── 00000/
+│ ├── chunks/
+│ │ ├── 000000000000000
+│ │ ├── 000000000000001
+│ │ ├── 000000000000002
+│ │ └── 000000000000003
+│ ├── files
+│ ├── hashes
+│ └── recipe
+└── 00001/
+ ├── chunks/
+ │ ├── 000000000000000
+ │ └── 000000000000001
+ ├── files
+ ├── hashes
+ └── recipe
+```
+*/
+
+package repo
+
+import (
+ "bufio"
+ "bytes"
+ "encoding/gob"
+ "fmt"
+ "io"
+ "io/fs"
+ "os"
+ "path/filepath"
+ "reflect"
+ "strings"
+
+ "github.com/chmduquesne/rollinghash/rabinkarp64"
+ "github.com/n-peugnet/dna-backup/cache"
+ "github.com/n-peugnet/dna-backup/delta"
+ "github.com/n-peugnet/dna-backup/logger"
+ "github.com/n-peugnet/dna-backup/sketch"
+ "github.com/n-peugnet/dna-backup/slice"
+ "github.com/n-peugnet/dna-backup/utils"
+)
+
+func init() {
+ // register chunk structs for encoding/decoding using gob
+ gob.RegisterName("*dna-backup.StoredChunk", &StoredChunk{})
+ gob.RegisterName("*dna-backup.TempChunk", &TempChunk{})
+ gob.RegisterName("*dna-backup.DeltaChunk", &DeltaChunk{})
+ gob.RegisterName("dna-backup.File", File{})
+}
+
+type FingerprintMap map[uint64]*ChunkId
+type SketchMap map[uint64][]*ChunkId
+
+func (m SketchMap) Set(key []uint64, value *ChunkId) {
+ for _, s := range key {
+ prev := m[s]
+ if contains(prev, value) {
+ continue
+ }
+ m[s] = append(prev, value)
+ }
+}
+
+type Repo struct {
+ path string
+ chunkSize int
+ sketchWSize int
+ sketchSfCount int
+ sketchFCount int
+ pol rabinkarp64.Pol
+ differ delta.Differ
+ patcher delta.Patcher
+ fingerprints FingerprintMap
+ sketches SketchMap
+ recipe []Chunk
+ files []File
+ chunkCache cache.Cacher
+ chunkReadWrapper utils.ReadWrapper
+ chunkWriteWrapper utils.WriteWrapper
+}
+
+type chunkHashes struct {
+ Fp uint64
+ Sk []uint64
+}
+
+type chunkData struct {
+ hashes chunkHashes
+ content []byte
+ id *ChunkId
+}
+
+type File struct {
+ Path string
+ Size int64
+ Link string
+}
+
+func NewRepo(path string) *Repo {
+ var err error
+ path, err = filepath.Abs(path)
+ if err != nil {
+ logger.Fatal(err)
+ }
+ err = os.MkdirAll(path, 0775)
+ if err != nil {
+ logger.Panic(err)
+ }
+ var seed int64 = 1
+ p, err := rabinkarp64.RandomPolynomial(seed)
+ if err != nil {
+ logger.Panic(err)
+ }
+ return &Repo{
+ path: path,
+ chunkSize: 8 << 10,
+ sketchWSize: 32,
+ sketchSfCount: 3,
+ sketchFCount: 4,
+ pol: p,
+ differ: delta.Fdelta{},
+ patcher: delta.Fdelta{},
+ fingerprints: make(FingerprintMap),
+ sketches: make(SketchMap),
+ chunkCache: cache.NewFifoCache(10000),
+ chunkReadWrapper: utils.ZlibReader,
+ chunkWriteWrapper: utils.ZlibWriter,
+ }
+}
+
+func (r *Repo) Differ() delta.Differ {
+ return r.differ
+}
+
+func (r *Repo) Patcher() delta.Patcher {
+ return r.patcher
+}
+
+func (r *Repo) Commit(source string) {
+ source, err := filepath.Abs(source)
+ if err != nil {
+ logger.Fatal(err)
+ }
+ versions := r.loadVersions()
+ newVersion := len(versions) // TODO: add newVersion functino
+ newPath := filepath.Join(r.path, fmt.Sprintf(versionFmt, newVersion))
+ newChunkPath := filepath.Join(newPath, chunksName)
+ os.Mkdir(newPath, 0775) // TODO: handle errors
+ os.Mkdir(newChunkPath, 0775) // TODO: handle errors
+ logger.Info("listing files")
+ files := listFiles(source)
+ logger.Info("loading previous hashes")
+ r.loadHashes(versions)
+ logger.Info("loading previous file lists")
+ r.loadFileLists(versions)
+ logger.Info("loading previous recipies")
+ r.loadRecipes(versions)
+ storeQueue := make(chan chunkData, 32)
+ storeEnd := make(chan bool)
+ go r.storageWorker(newVersion, storeQueue, storeEnd)
+ var last, nlast, pass uint64
+ var recipe []Chunk
+ for ; nlast > last || pass == 0; pass++ {
+ logger.Infof("matcher pass number %d", pass+1)
+ last = nlast
+ reader, writer := io.Pipe()
+ go concatFiles(&files, writer)
+ recipe, nlast = r.matchStream(reader, storeQueue, newVersion, last)
+ }
+ close(storeQueue)
+ <-storeEnd
+ r.storeFileList(newVersion, unprefixFiles(files, source))
+ r.storeRecipe(newVersion, recipe)
+}
+
+func (r *Repo) Restore(destination string) {
+ versions := r.loadVersions()
+ logger.Info("loading previous file lists")
+ r.loadFileLists(versions)
+ logger.Info("loading previous recipies")
+ r.loadRecipes(versions)
+ reader, writer := io.Pipe()
+ logger.Info("restoring latest version")
+ go r.restoreStream(writer, r.recipe)
+ bufReader := bufio.NewReaderSize(reader, r.chunkSize*2)
+ for _, file := range r.files {
+ filePath := filepath.Join(destination, file.Path)
+ dir := filepath.Dir(filePath)
+ os.MkdirAll(dir, 0775) // TODO: handle errors
+ if file.Link != "" {
+ link := file.Link
+ if filepath.IsAbs(link) {
+ filepath.Join(destination, file.Link)
+ }
+ err := os.Symlink(link, filePath)
+ if err != nil {
+ logger.Errorf("restored symlink ", err)
+ }
+ } else {
+ f, _ := os.Create(filePath) // TODO: handle errors
+ n, err := io.CopyN(f, bufReader, file.Size)
+ if err != nil {
+ logger.Errorf("restored file, written %d/%d bytes: %s", filePath, n, file.Size, err)
+ }
+ if err := f.Close(); err != nil {
+ logger.Errorf("restored file ", err)
+ }
+ }
+ }
+}
+
+func (r *Repo) loadVersions() []string {
+ var versions []string
+ files, err := os.ReadDir(r.path)
+ if err != nil {
+ logger.Fatal(err)
+ }
+ for _, f := range files {
+ if !f.IsDir() {
+ continue
+ }
+ versions = append(versions, filepath.Join(r.path, f.Name()))
+ }
+ return versions
+}
+
+func listFiles(path string) []File {
+ var files []File
+ err := filepath.Walk(path, func(p string, i fs.FileInfo, err error) error {
+ if err != nil {
+ logger.Warning(err)
+ return nil
+ }
+ if i.IsDir() {
+ return nil
+ }
+ var file = File{Path: p, Size: i.Size()}
+ if i.Mode()&fs.ModeSymlink != 0 {
+ file, err = cleanSymlink(path, p, i)
+ if err != nil {
+ logger.Warning("skipping symlink ", err)
+ return nil
+ }
+ }
+ files = append(files, file)
+ return nil
+ })
+ if err != nil {
+ logger.Error(err)
+ }
+ return files
+}
+
+func cleanSymlink(root string, p string, i fs.FileInfo) (f File, err error) {
+ dir := filepath.Dir(p)
+ target, err := os.Readlink(p)
+ if err != nil {
+ return
+ }
+ isAbs := filepath.IsAbs(target)
+ cleaned := target
+ if !isAbs {
+ cleaned = filepath.Join(dir, cleaned)
+ }
+ cleaned = filepath.Clean(cleaned)
+ if !strings.HasPrefix(cleaned, root) {
+ err = fmt.Errorf("external %s -> %s", p, cleaned)
+ return
+ }
+ if isAbs {
+ f.Link, err = utils.Unprefix(cleaned, root)
+ } else {
+ f.Link, err = filepath.Rel(dir, filepath.Join(dir, target))
+ }
+ if err != nil {
+ return
+ }
+ if f.Link == "" {
+ err = fmt.Errorf("empty %s", p)
+ return
+ }
+ f.Path = p
+ f.Size = 0
+ return f, nil
+}
+
+func unprefixFiles(files []File, prefix string) (ret []File) {
+ var err error
+ ret = make([]File, len(files))
+ for i, f := range files {
+ if f.Path, err = utils.Unprefix(f.Path, prefix); err != nil {
+ logger.Warning(err)
+ } else {
+ ret[i] = f
+ }
+ }
+ return
+}
+
+// concatFiles reads the content of all the listed files into a continuous stream.
+// If any errors are encoutered while opening a file, it is then removed from the
+// list.
+//
+// If read is incomplete, then the actual read size is used.
+func concatFiles(files *[]File, stream io.WriteCloser) {
+ actual := make([]File, 0, len(*files))
+ for _, f := range *files {
+ if f.Link != "" {
+ actual = append(actual, f)
+ continue
+ }
+ file, err := os.Open(f.Path)
+ if err != nil {
+ logger.Warning(err)
+ continue
+ }
+ af := f
+ if n, err := io.Copy(stream, file); err != nil {
+ logger.Error("read ", n, " bytes, ", err)
+ af.Size = n
+ }
+ actual = append(actual, af)
+ if err = file.Close(); err != nil {
+ logger.Panic(err)
+ }
+ }
+ stream.Close()
+ *files = actual
+}
+
+func storeBasicStruct(dest string, wrapper utils.WriteWrapper, obj interface{}) {
+ file, err := os.Create(dest)
+ if err != nil {
+ logger.Panic(err)
+ }
+ out := wrapper(file)
+ encoder := gob.NewEncoder(out)
+ err = encoder.Encode(obj)
+ if err != nil {
+ logger.Panic(err)
+ }
+ if err = out.Close(); err != nil {
+ logger.Panic(err)
+ }
+ if err = file.Close(); err != nil {
+ logger.Panic(err)
+ }
+}
+
+func loadBasicStruct(path string, wrapper utils.ReadWrapper, obj interface{}) {
+ file, err := os.Open(path)
+ if err != nil {
+ logger.Panic(err)
+ }
+ in, err := wrapper(file)
+ if err != nil {
+ logger.Panic(err)
+ }
+ decoder := gob.NewDecoder(in)
+ err = decoder.Decode(obj)
+ if err != nil {
+ logger.Panic(err)
+ }
+ if err = in.Close(); err != nil {
+ logger.Panic(err)
+ }
+ if err = file.Close(); err != nil {
+ logger.Panic(err)
+ }
+}
+
+func (r *Repo) loadDeltas(versions []string, wrapper utils.ReadWrapper, name string) (ret slice.Slice) {
+ for _, v := range versions {
+ path := filepath.Join(v, name)
+ var delta slice.Delta
+ loadBasicStruct(path, wrapper, &delta)
+ ret = slice.Patch(ret, delta)
+ }
+ return
+}
+
+func fileList2slice(l []File) (ret slice.Slice) {
+ ret = make(slice.Slice, len(l))
+ for i := range l {
+ ret[i] = l[i]
+ }
+ return
+}
+
+func slice2fileList(s slice.Slice) (ret []File) {
+ ret = make([]File, len(s))
+ for i := range s {
+ if f, ok := s[i].(File); ok {
+ ret[i] = f
+ } else {
+ logger.Warningf("could not convert %s into a File", s[i])
+ }
+ }
+ return
+}
+
+// storeFileList stores the given list in the repo dir as a delta against the
+// previous version's one.
+func (r *Repo) storeFileList(version int, list []File) {
+ dest := filepath.Join(r.path, fmt.Sprintf(versionFmt, version), filesName)
+ delta := slice.Diff(fileList2slice(r.files), fileList2slice(list))
+ logger.Infof("files delta %s", delta.String())
+ storeBasicStruct(dest, r.chunkWriteWrapper, delta)
+}
+
+// loadFileLists loads incrementally the file lists' delta of each given version.
+func (r *Repo) loadFileLists(versions []string) {
+ r.files = slice2fileList(r.loadDeltas(versions, r.chunkReadWrapper, filesName))
+}
+
+// storageWorker is meant to be started in a goroutine and stores each new chunk's
+// data in the repo directory until the store queue channel is closed.
+//
+// it will put true in the end channel once everything is stored.
+func (r *Repo) storageWorker(version int, storeQueue <-chan chunkData, end chan<- bool) {
+ hashesFile := filepath.Join(r.path, fmt.Sprintf(versionFmt, version), hashesName)
+ file, err := os.Create(hashesFile)
+ if err != nil {
+ logger.Panic(err)
+ }
+ encoder := gob.NewEncoder(file)
+ for data := range storeQueue {
+ err = encoder.Encode(data.hashes)
+ r.StoreChunkContent(data.id, bytes.NewReader(data.content))
+ // logger.Debug("stored ", data.id)
+ }
+ if err = file.Close(); err != nil {
+ logger.Panic(err)
+ }
+ end <- true
+}
+
+func (r *Repo) StoreChunkContent(id *ChunkId, reader io.Reader) {
+ path := id.Path(r.path)
+ file, err := os.Create(path)
+ if err != nil {
+ logger.Panic("chunk store ", err)
+ }
+ wrapper := r.chunkWriteWrapper(file)
+ n, err := io.Copy(wrapper, reader)
+ if err != nil {
+ logger.Errorf("chunk store, %d written, %s", n, err)
+ }
+ if err := wrapper.Close(); err != nil {
+ logger.Warning("chunk store wrapper ", err)
+ }
+ if err := file.Close(); err != nil {
+ logger.Warning("chunk store ", err)
+ }
+}
+
+// LoadChunkContent loads a chunk from the repo directory.
+// If the chunk is in cache, get it from cache, else read it from drive.
+func (r *Repo) LoadChunkContent(id *ChunkId) *bytes.Reader {
+ value, exists := r.chunkCache.Get(id)
+ if !exists {
+ path := id.Path(r.path)
+ f, err := os.Open(path)
+ if err != nil {
+ logger.Panic("chunk load ", err)
+ }
+ wrapper, err := r.chunkReadWrapper(f)
+ if err != nil {
+ logger.Error("chunk load wrapper ", err)
+ }
+ value, err = io.ReadAll(wrapper)
+ if err != nil {
+ logger.Error("chunk load ", err)
+ }
+ if err = wrapper.Close(); err != nil {
+ logger.Warning("chunk load wrapper", err)
+ }
+ if err = f.Close(); err != nil {
+ logger.Warning("chunk load ", err)
+ }
+ r.chunkCache.Set(id, value)
+ }
+ return bytes.NewReader(value)
+}
+
+// TODO: use atoi for chunkid ?
+func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) {
+ for i, v := range versions {
+ p := filepath.Join(v, chunksName)
+ entries, err := os.ReadDir(p)
+ if err != nil {
+ logger.Error("version dir ", err)
+ }
+ for j, e := range entries {
+ if e.IsDir() {
+ continue
+ }
+ id := &ChunkId{Ver: i, Idx: uint64(j)}
+ c := NewStoredChunk(r, id)
+ chunks <- c
+ }
+ }
+ close(chunks)
+}
+
+// loadHashes loads and aggregates the hashes stored for each given version and
+// stores them in the repo maps.
+func (r *Repo) loadHashes(versions []string) {
+ for i, v := range versions {
+ path := filepath.Join(v, hashesName)
+ file, err := os.Open(path)
+ if err != nil {
+ logger.Error("hashes ", err)
+ }
+ decoder := gob.NewDecoder(file)
+ for j := 0; err == nil; j++ {
+ var h chunkHashes
+ if err = decoder.Decode(&h); err == nil {
+ id := &ChunkId{i, uint64(j)}
+ r.fingerprints[h.Fp] = id
+ r.sketches.Set(h.Sk, id)
+ }
+ }
+ if err != nil && err != io.EOF {
+ logger.Panic(err)
+ }
+ if err = file.Close(); err != nil {
+ logger.Warning(err)
+ }
+ }
+}
+
+func (r *Repo) chunkMinLen() int {
+ return sketch.SuperFeatureSize(r.chunkSize, r.sketchSfCount, r.sketchFCount)
+}
+
+func contains(s []*ChunkId, id *ChunkId) bool {
+ for _, v := range s {
+ if v == id {
+ return true
+ }
+ }
+ return false
+}
+
+// findSimilarChunk looks in the repo sketch map for a match of the given sketch.
+//
+// There can be multiple matches but only the best one is returned. Indeed, the
+// more superfeature matches, the better the quality of the match. For now we
+// consider that a single superfeature match is enough to count it as valid.
+func (r *Repo) findSimilarChunk(sketch []uint64) (*ChunkId, bool) {
+ var similarChunks = make(map[ChunkId]int)
+ var max int
+ var similarChunk *ChunkId
+ for _, s := range sketch {
+ chunkIds, exists := r.sketches[s]
+ if !exists {
+ continue
+ }
+ for _, id := range chunkIds {
+ count := similarChunks[*id]
+ count += 1
+ logger.Debugf("found %d %d time(s)", id, count)
+ if count > max {
+ max = count
+ similarChunk = id
+ }
+ similarChunks[*id] = count
+ }
+ }
+ return similarChunk, max > 0
+}
+
+// encodeTempChunk first tries to delta-encode the given chunk before attributing
+// it an Id and saving it into the fingerprints and sketches maps.
+func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64, storeQueue chan<- chunkData) (Chunk, bool) {
+ sk, _ := sketch.SketchChunk(temp.Reader(), r.pol, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount)
+ id, found := r.findSimilarChunk(sk)
+ if found {
+ var buff bytes.Buffer
+ if err := r.differ.Diff(r.LoadChunkContent(id), temp.Reader(), &buff); err != nil {
+ logger.Error("trying delta encode chunk:", temp, "with source:", id, ":", err)
+ } else {
+ logger.Debugf("add new delta chunk of size %d", len(buff.Bytes()))
+ return &DeltaChunk{
+ repo: r,
+ Source: id,
+ Patch: buff.Bytes(),
+ Size: temp.Len(),
+ }, true
+ }
+ }
+ if temp.Len() == r.chunkSize {
+ id := &ChunkId{Ver: version, Idx: *last}
+ *last++
+ hasher := rabinkarp64.NewFromPol(r.pol)
+ io.Copy(hasher, temp.Reader())
+ fp := hasher.Sum64()
+ r.fingerprints[fp] = id
+ r.sketches.Set(sk, id)
+ storeQueue <- chunkData{
+ hashes: chunkHashes{fp, sk},
+ content: temp.Bytes(),
+ id: id,
+ }
+ r.chunkCache.Set(id, temp.Bytes())
+ logger.Debug("add new chunk ", id)
+ return NewStoredChunk(r, id), false
+ }
+ logger.Debug("add new partial chunk of size: ", temp.Len())
+ return temp, false
+}
+
+// encodeTempChunks encodes the current temporary chunks based on the value of the previous one.
+// Temporary chunks can be partial. If the current chunk is smaller than the size of a
+// super-feature and there exists a previous chunk, then both are merged before attempting
+// to delta-encode them.
+func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version int, last *uint64, storeQueue chan<- chunkData) []Chunk {
+ if reflect.ValueOf(prev).IsNil() {
+ c, _ := r.encodeTempChunk(curr, version, last, storeQueue)
+ return []Chunk{c}
+ } else if curr.Len() < r.chunkMinLen() {
+ tmp := NewTempChunk(append(prev.Bytes(), curr.Bytes()...))
+ c, success := r.encodeTempChunk(tmp, version, last, storeQueue)
+ if success {
+ return []Chunk{c}
+ }
+ }
+ prevD, _ := r.encodeTempChunk(prev, version, last, storeQueue)
+ currD, _ := r.encodeTempChunk(curr, version, last, storeQueue)
+ return []Chunk{prevD, currD}
+}
+
+// matchStream is the heart of DNA-backup. Thus, it sounded rude not to add some comment to it.
+//
+// It applies a rolling hash on the content of a given stream to look for matching fingerprints
+// in the repo. If no match is found after the equivalent of three chunks of data are processed,
+// then the first unmatched chunk sketch is checked to see if it could be delta-encoded.
+// If not, the chunk is then stored as a new chunk for this version and its fingerprint and
+// sketch are added to the repo maps.
+//
+// If a match happens during the processing of the third chunk, then, if possible, the remaining
+// of the second chunk is merged with the first one to try to delta encode it at once.
+//
+// Each time a new chunk is added it is sent to the store worker through the store queue.
+func (r *Repo) matchStream(stream io.Reader, storeQueue chan<- chunkData, version int, last uint64) ([]Chunk, uint64) {
+ var b byte
+ var chunks []Chunk
+ var prev *TempChunk
+ var err error
+ bufStream := bufio.NewReaderSize(stream, r.chunkSize*2)
+ buff := make([]byte, r.chunkSize, r.chunkSize*2)
+ if n, err := io.ReadFull(stream, buff); n < r.chunkSize {
+ if err == io.ErrUnexpectedEOF {
+ c, _ := r.encodeTempChunk(NewTempChunk(buff[:n]), version, &last, storeQueue)
+ chunks = append(chunks, c)
+ return chunks, last
+ } else {
+ logger.Panicf("matching stream, read only %d bytes with error '%s'", n, err)
+ }
+ }
+ hasher := rabinkarp64.NewFromPol(r.pol)
+ hasher.Write(buff)
+ for err != io.EOF {
+ h := hasher.Sum64()
+ chunkId, exists := r.fingerprints[h]
+ if exists {
+ if len(buff) > r.chunkSize && len(buff) <= r.chunkSize*2 {
+ size := len(buff) - r.chunkSize
+ temp := NewTempChunk(buff[:size])
+ chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...)
+ prev = nil
+ } else if prev != nil {
+ c, _ := r.encodeTempChunk(prev, version, &last, storeQueue)
+ chunks = append(chunks, c)
+ prev = nil
+ }
+ logger.Debugf("add existing chunk: %d", chunkId)
+ chunks = append(chunks, NewStoredChunk(r, chunkId))
+ buff = make([]byte, 0, r.chunkSize*2)
+ for i := 0; i < r.chunkSize && err == nil; i++ {
+ b, err = bufStream.ReadByte()
+ if err != io.EOF {
+ hasher.Roll(b)
+ buff = append(buff, b)
+ }
+ }
+ continue
+ }
+ if len(buff) == r.chunkSize*2 {
+ if prev != nil {
+ chunk, _ := r.encodeTempChunk(prev, version, &last, storeQueue)
+ chunks = append(chunks, chunk)
+ }
+ prev = NewTempChunk(buff[:r.chunkSize])
+ tmp := buff[r.chunkSize:]
+ buff = make([]byte, r.chunkSize, r.chunkSize*2)
+ copy(buff, tmp)
+ }
+ b, err = bufStream.ReadByte()
+ if err != io.EOF {
+ hasher.Roll(b)
+ buff = append(buff, b)
+ }
+ }
+ if len(buff) > 0 {
+ var temp *TempChunk
+ if len(buff) > r.chunkSize {
+ if prev != nil {
+ chunk, _ := r.encodeTempChunk(prev, version, &last, storeQueue)
+ chunks = append(chunks, chunk)
+ }
+ prev = NewTempChunk(buff[:r.chunkSize])
+ temp = NewTempChunk(buff[r.chunkSize:])
+ } else {
+ temp = NewTempChunk(buff)
+ }
+ chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...)
+ }
+ return chunks, last
+}
+
+func (r *Repo) restoreStream(stream io.WriteCloser, recipe []Chunk) {
+ for _, c := range recipe {
+ if n, err := io.Copy(stream, c.Reader()); err != nil {
+ logger.Errorf("copying to stream, read %d bytes from chunk: %s", n, err)
+ }
+ }
+ stream.Close()
+}
+
+func recipe2slice(r []Chunk) (ret slice.Slice) {
+ ret = make(slice.Slice, len(r))
+ for i := range r {
+ ret[i] = r[i]
+ }
+ return
+}
+
+func slice2recipe(s slice.Slice) (ret []Chunk) {
+ ret = make([]Chunk, len(s))
+ for i := range s {
+ if c, ok := s[i].(Chunk); ok {
+ ret[i] = c
+ } else {
+ logger.Warningf("could not convert %s into a Chunk", s[i])
+ }
+ }
+ return
+}
+
+func (r *Repo) storeRecipe(version int, recipe []Chunk) {
+ dest := filepath.Join(r.path, fmt.Sprintf(versionFmt, version), recipeName)
+ delta := slice.Diff(recipe2slice(r.recipe), recipe2slice(recipe))
+ logger.Infof("recipe delta %s", delta.String())
+ storeBasicStruct(dest, r.chunkWriteWrapper, delta)
+}
+
+func (r *Repo) loadRecipes(versions []string) {
+ recipe := slice2recipe(r.loadDeltas(versions, r.chunkReadWrapper, recipeName))
+ for _, c := range recipe {
+ if rc, isRepo := c.(RepoChunk); isRepo {
+ rc.SetRepo(r)
+ }
+ }
+ r.recipe = recipe
+}
+
+func extractDeltaChunks(chunks []Chunk) (ret []*DeltaChunk) {
+ for _, c := range chunks {
+ tmp, isDelta := c.(*DeltaChunk)
+ if isDelta {
+ ret = append(ret, tmp)
+ }
+ }
+ return
+}