aboutsummaryrefslogtreecommitdiff
path: root/repo.go
diff options
context:
space:
mode:
Diffstat (limited to 'repo.go')
-rw-r--r--repo.go783
1 files changed, 0 insertions, 783 deletions
diff --git a/repo.go b/repo.go
deleted file mode 100644
index 543f6c3..0000000
--- a/repo.go
+++ /dev/null
@@ -1,783 +0,0 @@
-/*
-Manage a deduplicated versionned backups repository.
-
-Sample repository:
-
-```
-repo/
-├── 00000/
-│ ├── chunks/
-│ │ ├── 000000000000000
-│ │ ├── 000000000000001
-│ │ ├── 000000000000002
-│ │ └── 000000000000003
-│ ├── files
-│ ├── hashes
-│ └── recipe
-└── 00001/
- ├── chunks/
- │ ├── 000000000000000
- │ └── 000000000000001
- ├── files
- ├── hashes
- └── recipe
-```
-*/
-
-package main
-
-import (
- "bufio"
- "bytes"
- "encoding/gob"
- "fmt"
- "io"
- "io/fs"
- "os"
- "path/filepath"
- "reflect"
- "strings"
-
- "github.com/chmduquesne/rollinghash/rabinkarp64"
- "github.com/n-peugnet/dna-backup/cache"
- "github.com/n-peugnet/dna-backup/logger"
- "github.com/n-peugnet/dna-backup/sketch"
- "github.com/n-peugnet/dna-backup/slice"
- "github.com/n-peugnet/dna-backup/utils"
-)
-
-func init() {
- // register chunk structs for encoding/decoding using gob
- gob.RegisterName("*dna-backup.StoredChunk", &StoredChunk{})
- gob.RegisterName("*dna-backup.TempChunk", &TempChunk{})
- gob.RegisterName("*dna-backup.DeltaChunk", &DeltaChunk{})
- gob.RegisterName("dna-backup.File", File{})
-}
-
-type FingerprintMap map[uint64]*ChunkId
-type SketchMap map[uint64][]*ChunkId
-
-func (m SketchMap) Set(key []uint64, value *ChunkId) {
- for _, s := range key {
- prev := m[s]
- if contains(prev, value) {
- continue
- }
- m[s] = append(prev, value)
- }
-}
-
-type Repo struct {
- path string
- chunkSize int
- sketchWSize int
- sketchSfCount int
- sketchFCount int
- pol rabinkarp64.Pol
- differ Differ
- patcher Patcher
- fingerprints FingerprintMap
- sketches SketchMap
- recipe []Chunk
- files []File
- chunkCache cache.Cacher
- chunkReadWrapper utils.ReadWrapper
- chunkWriteWrapper utils.WriteWrapper
-}
-
-type chunkHashes struct {
- Fp uint64
- Sk []uint64
-}
-
-type chunkData struct {
- hashes chunkHashes
- content []byte
- id *ChunkId
-}
-
-type File struct {
- Path string
- Size int64
- Link string
-}
-
-func NewRepo(path string) *Repo {
- var err error
- path, err = filepath.Abs(path)
- if err != nil {
- logger.Fatal(err)
- }
- err = os.MkdirAll(path, 0775)
- if err != nil {
- logger.Panic(err)
- }
- var seed int64 = 1
- p, err := rabinkarp64.RandomPolynomial(seed)
- if err != nil {
- logger.Panic(err)
- }
- return &Repo{
- path: path,
- chunkSize: 8 << 10,
- sketchWSize: 32,
- sketchSfCount: 3,
- sketchFCount: 4,
- pol: p,
- differ: Fdelta{},
- patcher: Fdelta{},
- fingerprints: make(FingerprintMap),
- sketches: make(SketchMap),
- chunkCache: cache.NewFifoCache(10000),
- chunkReadWrapper: utils.ZlibReader,
- chunkWriteWrapper: utils.ZlibWriter,
- }
-}
-
-func (r *Repo) Differ() Differ {
- return r.differ
-}
-
-func (r *Repo) Patcher() Patcher {
- return r.patcher
-}
-
-func (r *Repo) Commit(source string) {
- source, err := filepath.Abs(source)
- if err != nil {
- logger.Fatal(err)
- }
- versions := r.loadVersions()
- newVersion := len(versions) // TODO: add newVersion functino
- newPath := filepath.Join(r.path, fmt.Sprintf(versionFmt, newVersion))
- newChunkPath := filepath.Join(newPath, chunksName)
- os.Mkdir(newPath, 0775) // TODO: handle errors
- os.Mkdir(newChunkPath, 0775) // TODO: handle errors
- logger.Info("listing files")
- files := listFiles(source)
- logger.Info("loading previous hashes")
- r.loadHashes(versions)
- logger.Info("loading previous file lists")
- r.loadFileLists(versions)
- logger.Info("loading previous recipies")
- r.loadRecipes(versions)
- storeQueue := make(chan chunkData, 32)
- storeEnd := make(chan bool)
- go r.storageWorker(newVersion, storeQueue, storeEnd)
- var last, nlast, pass uint64
- var recipe []Chunk
- for ; nlast > last || pass == 0; pass++ {
- logger.Infof("matcher pass number %d", pass+1)
- last = nlast
- reader, writer := io.Pipe()
- go concatFiles(&files, writer)
- recipe, nlast = r.matchStream(reader, storeQueue, newVersion, last)
- }
- close(storeQueue)
- <-storeEnd
- r.storeFileList(newVersion, unprefixFiles(files, source))
- r.storeRecipe(newVersion, recipe)
-}
-
-func (r *Repo) Restore(destination string) {
- versions := r.loadVersions()
- logger.Info("loading previous file lists")
- r.loadFileLists(versions)
- logger.Info("loading previous recipies")
- r.loadRecipes(versions)
- reader, writer := io.Pipe()
- logger.Info("restoring latest version")
- go r.restoreStream(writer, r.recipe)
- bufReader := bufio.NewReaderSize(reader, r.chunkSize*2)
- for _, file := range r.files {
- filePath := filepath.Join(destination, file.Path)
- dir := filepath.Dir(filePath)
- os.MkdirAll(dir, 0775) // TODO: handle errors
- if file.Link != "" {
- link := file.Link
- if filepath.IsAbs(link) {
- filepath.Join(destination, file.Link)
- }
- err := os.Symlink(link, filePath)
- if err != nil {
- logger.Errorf("restored symlink ", err)
- }
- } else {
- f, _ := os.Create(filePath) // TODO: handle errors
- n, err := io.CopyN(f, bufReader, file.Size)
- if err != nil {
- logger.Errorf("restored file, written %d/%d bytes: %s", filePath, n, file.Size, err)
- }
- if err := f.Close(); err != nil {
- logger.Errorf("restored file ", err)
- }
- }
- }
-}
-
-func (r *Repo) loadVersions() []string {
- var versions []string
- files, err := os.ReadDir(r.path)
- if err != nil {
- logger.Fatal(err)
- }
- for _, f := range files {
- if !f.IsDir() {
- continue
- }
- versions = append(versions, filepath.Join(r.path, f.Name()))
- }
- return versions
-}
-
-func listFiles(path string) []File {
- var files []File
- err := filepath.Walk(path, func(p string, i fs.FileInfo, err error) error {
- if err != nil {
- logger.Warning(err)
- return nil
- }
- if i.IsDir() {
- return nil
- }
- var file = File{Path: p, Size: i.Size()}
- if i.Mode()&fs.ModeSymlink != 0 {
- file, err = cleanSymlink(path, p, i)
- if err != nil {
- logger.Warning("skipping symlink ", err)
- return nil
- }
- }
- files = append(files, file)
- return nil
- })
- if err != nil {
- logger.Error(err)
- }
- return files
-}
-
-func cleanSymlink(root string, p string, i fs.FileInfo) (f File, err error) {
- dir := filepath.Dir(p)
- target, err := os.Readlink(p)
- if err != nil {
- return
- }
- isAbs := filepath.IsAbs(target)
- cleaned := target
- if !isAbs {
- cleaned = filepath.Join(dir, cleaned)
- }
- cleaned = filepath.Clean(cleaned)
- if !strings.HasPrefix(cleaned, root) {
- err = fmt.Errorf("external %s -> %s", p, cleaned)
- return
- }
- if isAbs {
- f.Link, err = utils.Unprefix(cleaned, root)
- } else {
- f.Link, err = filepath.Rel(dir, filepath.Join(dir, target))
- }
- if err != nil {
- return
- }
- if f.Link == "" {
- err = fmt.Errorf("empty %s", p)
- return
- }
- f.Path = p
- f.Size = 0
- return f, nil
-}
-
-func unprefixFiles(files []File, prefix string) (ret []File) {
- var err error
- ret = make([]File, len(files))
- for i, f := range files {
- if f.Path, err = utils.Unprefix(f.Path, prefix); err != nil {
- logger.Warning(err)
- } else {
- ret[i] = f
- }
- }
- return
-}
-
-// concatFiles reads the content of all the listed files into a continuous stream.
-// If any errors are encoutered while opening a file, it is then removed from the
-// list.
-//
-// If read is incomplete, then the actual read size is used.
-func concatFiles(files *[]File, stream io.WriteCloser) {
- actual := make([]File, 0, len(*files))
- for _, f := range *files {
- if f.Link != "" {
- actual = append(actual, f)
- continue
- }
- file, err := os.Open(f.Path)
- if err != nil {
- logger.Warning(err)
- continue
- }
- af := f
- if n, err := io.Copy(stream, file); err != nil {
- logger.Error("read ", n, " bytes, ", err)
- af.Size = n
- }
- actual = append(actual, af)
- if err = file.Close(); err != nil {
- logger.Panic(err)
- }
- }
- stream.Close()
- *files = actual
-}
-
-func storeBasicStruct(dest string, wrapper utils.WriteWrapper, obj interface{}) {
- file, err := os.Create(dest)
- if err != nil {
- logger.Panic(err)
- }
- out := wrapper(file)
- encoder := gob.NewEncoder(out)
- err = encoder.Encode(obj)
- if err != nil {
- logger.Panic(err)
- }
- if err = out.Close(); err != nil {
- logger.Panic(err)
- }
- if err = file.Close(); err != nil {
- logger.Panic(err)
- }
-}
-
-func loadBasicStruct(path string, wrapper utils.ReadWrapper, obj interface{}) {
- file, err := os.Open(path)
- if err != nil {
- logger.Panic(err)
- }
- in, err := wrapper(file)
- if err != nil {
- logger.Panic(err)
- }
- decoder := gob.NewDecoder(in)
- err = decoder.Decode(obj)
- if err != nil {
- logger.Panic(err)
- }
- if err = in.Close(); err != nil {
- logger.Panic(err)
- }
- if err = file.Close(); err != nil {
- logger.Panic(err)
- }
-}
-
-func (r *Repo) loadDeltas(versions []string, wrapper utils.ReadWrapper, name string) (ret slice.Slice) {
- for _, v := range versions {
- path := filepath.Join(v, name)
- var delta slice.Delta
- loadBasicStruct(path, wrapper, &delta)
- ret = slice.Patch(ret, delta)
- }
- return
-}
-
-func fileList2slice(l []File) (ret slice.Slice) {
- ret = make(slice.Slice, len(l))
- for i := range l {
- ret[i] = l[i]
- }
- return
-}
-
-func slice2fileList(s slice.Slice) (ret []File) {
- ret = make([]File, len(s))
- for i := range s {
- if f, ok := s[i].(File); ok {
- ret[i] = f
- } else {
- logger.Warningf("could not convert %s into a File", s[i])
- }
- }
- return
-}
-
-// storeFileList stores the given list in the repo dir as a delta against the
-// previous version's one.
-func (r *Repo) storeFileList(version int, list []File) {
- dest := filepath.Join(r.path, fmt.Sprintf(versionFmt, version), filesName)
- delta := slice.Diff(fileList2slice(r.files), fileList2slice(list))
- logger.Infof("files delta %s", delta.String())
- storeBasicStruct(dest, r.chunkWriteWrapper, delta)
-}
-
-// loadFileLists loads incrementally the file lists' delta of each given version.
-func (r *Repo) loadFileLists(versions []string) {
- r.files = slice2fileList(r.loadDeltas(versions, r.chunkReadWrapper, filesName))
-}
-
-// storageWorker is meant to be started in a goroutine and stores each new chunk's
-// data in the repo directory until the store queue channel is closed.
-//
-// it will put true in the end channel once everything is stored.
-func (r *Repo) storageWorker(version int, storeQueue <-chan chunkData, end chan<- bool) {
- hashesFile := filepath.Join(r.path, fmt.Sprintf(versionFmt, version), hashesName)
- file, err := os.Create(hashesFile)
- if err != nil {
- logger.Panic(err)
- }
- encoder := gob.NewEncoder(file)
- for data := range storeQueue {
- err = encoder.Encode(data.hashes)
- r.StoreChunkContent(data.id, bytes.NewReader(data.content))
- // logger.Debug("stored ", data.id)
- }
- if err = file.Close(); err != nil {
- logger.Panic(err)
- }
- end <- true
-}
-
-func (r *Repo) StoreChunkContent(id *ChunkId, reader io.Reader) {
- path := id.Path(r.path)
- file, err := os.Create(path)
- if err != nil {
- logger.Panic("chunk store ", err)
- }
- wrapper := r.chunkWriteWrapper(file)
- n, err := io.Copy(wrapper, reader)
- if err != nil {
- logger.Errorf("chunk store, %d written, %s", n, err)
- }
- if err := wrapper.Close(); err != nil {
- logger.Warning("chunk store wrapper ", err)
- }
- if err := file.Close(); err != nil {
- logger.Warning("chunk store ", err)
- }
-}
-
-// LoadChunkContent loads a chunk from the repo directory.
-// If the chunk is in cache, get it from cache, else read it from drive.
-func (r *Repo) LoadChunkContent(id *ChunkId) *bytes.Reader {
- value, exists := r.chunkCache.Get(id)
- if !exists {
- path := id.Path(r.path)
- f, err := os.Open(path)
- if err != nil {
- logger.Panic("chunk load ", err)
- }
- wrapper, err := r.chunkReadWrapper(f)
- if err != nil {
- logger.Error("chunk load wrapper ", err)
- }
- value, err = io.ReadAll(wrapper)
- if err != nil {
- logger.Error("chunk load ", err)
- }
- if err = wrapper.Close(); err != nil {
- logger.Warning("chunk load wrapper", err)
- }
- if err = f.Close(); err != nil {
- logger.Warning("chunk load ", err)
- }
- r.chunkCache.Set(id, value)
- }
- return bytes.NewReader(value)
-}
-
-// TODO: use atoi for chunkid ?
-func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) {
- for i, v := range versions {
- p := filepath.Join(v, chunksName)
- entries, err := os.ReadDir(p)
- if err != nil {
- logger.Error("version dir ", err)
- }
- for j, e := range entries {
- if e.IsDir() {
- continue
- }
- id := &ChunkId{Ver: i, Idx: uint64(j)}
- c := NewStoredChunk(r, id)
- chunks <- c
- }
- }
- close(chunks)
-}
-
-// loadHashes loads and aggregates the hashes stored for each given version and
-// stores them in the repo maps.
-func (r *Repo) loadHashes(versions []string) {
- for i, v := range versions {
- path := filepath.Join(v, hashesName)
- file, err := os.Open(path)
- if err != nil {
- logger.Error("hashes ", err)
- }
- decoder := gob.NewDecoder(file)
- for j := 0; err == nil; j++ {
- var h chunkHashes
- if err = decoder.Decode(&h); err == nil {
- id := &ChunkId{i, uint64(j)}
- r.fingerprints[h.Fp] = id
- r.sketches.Set(h.Sk, id)
- }
- }
- if err != nil && err != io.EOF {
- logger.Panic(err)
- }
- if err = file.Close(); err != nil {
- logger.Warning(err)
- }
- }
-}
-
-func (r *Repo) chunkMinLen() int {
- return sketch.SuperFeatureSize(r.chunkSize, r.sketchSfCount, r.sketchFCount)
-}
-
-func contains(s []*ChunkId, id *ChunkId) bool {
- for _, v := range s {
- if v == id {
- return true
- }
- }
- return false
-}
-
-// findSimilarChunk looks in the repo sketch map for a match of the given sketch.
-//
-// There can be multiple matches but only the best one is returned. Indeed, the
-// more superfeature matches, the better the quality of the match. For now we
-// consider that a single superfeature match is enough to count it as valid.
-func (r *Repo) findSimilarChunk(sketch []uint64) (*ChunkId, bool) {
- var similarChunks = make(map[ChunkId]int)
- var max int
- var similarChunk *ChunkId
- for _, s := range sketch {
- chunkIds, exists := r.sketches[s]
- if !exists {
- continue
- }
- for _, id := range chunkIds {
- count := similarChunks[*id]
- count += 1
- logger.Debugf("found %d %d time(s)", id, count)
- if count > max {
- max = count
- similarChunk = id
- }
- similarChunks[*id] = count
- }
- }
- return similarChunk, max > 0
-}
-
-// encodeTempChunk first tries to delta-encode the given chunk before attributing
-// it an Id and saving it into the fingerprints and sketches maps.
-func (r *Repo) encodeTempChunk(temp BufferedChunk, version int, last *uint64, storeQueue chan<- chunkData) (Chunk, bool) {
- sk, _ := sketch.SketchChunk(temp.Reader(), r.pol, r.chunkSize, r.sketchWSize, r.sketchSfCount, r.sketchFCount)
- id, found := r.findSimilarChunk(sk)
- if found {
- var buff bytes.Buffer
- if err := r.differ.Diff(r.LoadChunkContent(id), temp.Reader(), &buff); err != nil {
- logger.Error("trying delta encode chunk:", temp, "with source:", id, ":", err)
- } else {
- logger.Debugf("add new delta chunk of size %d", len(buff.Bytes()))
- return &DeltaChunk{
- repo: r,
- Source: id,
- Patch: buff.Bytes(),
- Size: temp.Len(),
- }, true
- }
- }
- if temp.Len() == r.chunkSize {
- id := &ChunkId{Ver: version, Idx: *last}
- *last++
- hasher := rabinkarp64.NewFromPol(r.pol)
- io.Copy(hasher, temp.Reader())
- fp := hasher.Sum64()
- r.fingerprints[fp] = id
- r.sketches.Set(sk, id)
- storeQueue <- chunkData{
- hashes: chunkHashes{fp, sk},
- content: temp.Bytes(),
- id: id,
- }
- r.chunkCache.Set(id, temp.Bytes())
- logger.Debug("add new chunk ", id)
- return NewStoredChunk(r, id), false
- }
- logger.Debug("add new partial chunk of size: ", temp.Len())
- return temp, false
-}
-
-// encodeTempChunks encodes the current temporary chunks based on the value of the previous one.
-// Temporary chunks can be partial. If the current chunk is smaller than the size of a
-// super-feature and there exists a previous chunk, then both are merged before attempting
-// to delta-encode them.
-func (r *Repo) encodeTempChunks(prev BufferedChunk, curr BufferedChunk, version int, last *uint64, storeQueue chan<- chunkData) []Chunk {
- if reflect.ValueOf(prev).IsNil() {
- c, _ := r.encodeTempChunk(curr, version, last, storeQueue)
- return []Chunk{c}
- } else if curr.Len() < r.chunkMinLen() {
- tmp := NewTempChunk(append(prev.Bytes(), curr.Bytes()...))
- c, success := r.encodeTempChunk(tmp, version, last, storeQueue)
- if success {
- return []Chunk{c}
- }
- }
- prevD, _ := r.encodeTempChunk(prev, version, last, storeQueue)
- currD, _ := r.encodeTempChunk(curr, version, last, storeQueue)
- return []Chunk{prevD, currD}
-}
-
-// matchStream is the heart of DNA-backup. Thus, it sounded rude not to add some comment to it.
-//
-// It applies a rolling hash on the content of a given stream to look for matching fingerprints
-// in the repo. If no match is found after the equivalent of three chunks of data are processed,
-// then the first unmatched chunk sketch is checked to see if it could be delta-encoded.
-// If not, the chunk is then stored as a new chunk for this version and its fingerprint and
-// sketch are added to the repo maps.
-//
-// If a match happens during the processing of the third chunk, then, if possible, the remaining
-// of the second chunk is merged with the first one to try to delta encode it at once.
-//
-// Each time a new chunk is added it is sent to the store worker through the store queue.
-func (r *Repo) matchStream(stream io.Reader, storeQueue chan<- chunkData, version int, last uint64) ([]Chunk, uint64) {
- var b byte
- var chunks []Chunk
- var prev *TempChunk
- var err error
- bufStream := bufio.NewReaderSize(stream, r.chunkSize*2)
- buff := make([]byte, r.chunkSize, r.chunkSize*2)
- if n, err := io.ReadFull(stream, buff); n < r.chunkSize {
- if err == io.ErrUnexpectedEOF {
- c, _ := r.encodeTempChunk(NewTempChunk(buff[:n]), version, &last, storeQueue)
- chunks = append(chunks, c)
- return chunks, last
- } else {
- logger.Panicf("matching stream, read only %d bytes with error '%s'", n, err)
- }
- }
- hasher := rabinkarp64.NewFromPol(r.pol)
- hasher.Write(buff)
- for err != io.EOF {
- h := hasher.Sum64()
- chunkId, exists := r.fingerprints[h]
- if exists {
- if len(buff) > r.chunkSize && len(buff) <= r.chunkSize*2 {
- size := len(buff) - r.chunkSize
- temp := NewTempChunk(buff[:size])
- chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...)
- prev = nil
- } else if prev != nil {
- c, _ := r.encodeTempChunk(prev, version, &last, storeQueue)
- chunks = append(chunks, c)
- prev = nil
- }
- logger.Debugf("add existing chunk: %d", chunkId)
- chunks = append(chunks, NewStoredChunk(r, chunkId))
- buff = make([]byte, 0, r.chunkSize*2)
- for i := 0; i < r.chunkSize && err == nil; i++ {
- b, err = bufStream.ReadByte()
- if err != io.EOF {
- hasher.Roll(b)
- buff = append(buff, b)
- }
- }
- continue
- }
- if len(buff) == r.chunkSize*2 {
- if prev != nil {
- chunk, _ := r.encodeTempChunk(prev, version, &last, storeQueue)
- chunks = append(chunks, chunk)
- }
- prev = NewTempChunk(buff[:r.chunkSize])
- tmp := buff[r.chunkSize:]
- buff = make([]byte, r.chunkSize, r.chunkSize*2)
- copy(buff, tmp)
- }
- b, err = bufStream.ReadByte()
- if err != io.EOF {
- hasher.Roll(b)
- buff = append(buff, b)
- }
- }
- if len(buff) > 0 {
- var temp *TempChunk
- if len(buff) > r.chunkSize {
- if prev != nil {
- chunk, _ := r.encodeTempChunk(prev, version, &last, storeQueue)
- chunks = append(chunks, chunk)
- }
- prev = NewTempChunk(buff[:r.chunkSize])
- temp = NewTempChunk(buff[r.chunkSize:])
- } else {
- temp = NewTempChunk(buff)
- }
- chunks = append(chunks, r.encodeTempChunks(prev, temp, version, &last, storeQueue)...)
- }
- return chunks, last
-}
-
-func (r *Repo) restoreStream(stream io.WriteCloser, recipe []Chunk) {
- for _, c := range recipe {
- if n, err := io.Copy(stream, c.Reader()); err != nil {
- logger.Errorf("copying to stream, read %d bytes from chunk: %s", n, err)
- }
- }
- stream.Close()
-}
-
-func recipe2slice(r []Chunk) (ret slice.Slice) {
- ret = make(slice.Slice, len(r))
- for i := range r {
- ret[i] = r[i]
- }
- return
-}
-
-func slice2recipe(s slice.Slice) (ret []Chunk) {
- ret = make([]Chunk, len(s))
- for i := range s {
- if c, ok := s[i].(Chunk); ok {
- ret[i] = c
- } else {
- logger.Warningf("could not convert %s into a Chunk", s[i])
- }
- }
- return
-}
-
-func (r *Repo) storeRecipe(version int, recipe []Chunk) {
- dest := filepath.Join(r.path, fmt.Sprintf(versionFmt, version), recipeName)
- delta := slice.Diff(recipe2slice(r.recipe), recipe2slice(recipe))
- logger.Infof("recipe delta %s", delta.String())
- storeBasicStruct(dest, r.chunkWriteWrapper, delta)
-}
-
-func (r *Repo) loadRecipes(versions []string) {
- recipe := slice2recipe(r.loadDeltas(versions, r.chunkReadWrapper, recipeName))
- for _, c := range recipe {
- if rc, isRepo := c.(RepoChunk); isRepo {
- rc.SetRepo(r)
- }
- }
- r.recipe = recipe
-}
-
-func extractDeltaChunks(chunks []Chunk) (ret []*DeltaChunk) {
- for _, c := range chunks {
- tmp, isDelta := c.(*DeltaChunk)
- if isDelta {
- ret = append(ret, tmp)
- }
- }
- return
-}