diff options
author | n-peugnet <n.peugnet@free.fr> | 2021-10-18 15:51:55 +0200 |
---|---|---|
committer | n-peugnet <n.peugnet@free.fr> | 2021-10-18 15:51:55 +0200 |
commit | 5539220a8839519becd45b63be65ed86fa9286a4 (patch) | |
tree | 426b532b525de92604be9602ec88edb47a2ad024 | |
parent | 89fb0a85f78a415477e450b0091d8c2b994b687d (diff) | |
download | dna-backup-5539220a8839519becd45b63be65ed86fa9286a4.tar.gz dna-backup-5539220a8839519becd45b63be65ed86fa9286a4.zip |
do the real export in a dir containing multiple pool files
-rw-r--r-- | dna/drive.go | 181 | ||||
-rw-r--r-- | dna/writer.go | 45 | ||||
-rw-r--r-- | repo/export_dir.go | 62 |
3 files changed, 195 insertions, 93 deletions
diff --git a/dna/drive.go b/dna/drive.go index f99e7d4..013c4b4 100644 --- a/dna/drive.go +++ b/dna/drive.go @@ -17,14 +17,189 @@ package dna -import "io" +import ( + "bytes" + "encoding/gob" + "fmt" + "io" + "os" + "path/filepath" + + "github.com/n-peugnet/dna-backup/logger" + "github.com/n-peugnet/dna-backup/utils" +) + +type Direction int + +const ( + Forward Direction = iota + Backward +) type DnaDrive struct { poolCount int trackSize int tracksPerPool int + pools []Pool + writeWrapper utils.WriteWrapper + readWrapper utils.ReadWrapper +} + +type Pool struct { + Data io.ReadWriteCloser + TrackCount int +} + +type Header struct { + Chunks uint64 + Recipe uint64 + Files uint64 +} + +type dnaVersion struct { + Input dnaInput + Output dnaOutput +} + +type dnaInput struct { + Chunks io.WriteCloser + Recipe io.WriteCloser + Files io.WriteCloser +} + +type dnaOutput struct { + Chunks io.ReadCloser + Recipe io.ReadCloser + Files io.ReadCloser +} + +func New( + destination string, + poolCount int, + trackSize int, + tracksPerPool int, + writeWrapper utils.WriteWrapper, + readWrapper utils.ReadWrapper, +) *DnaDrive { + pools := make([]Pool, poolCount) + os.MkdirAll(destination, 0755) + for i := range pools { + path := filepath.Join(destination, fmt.Sprintf("%02d", i)) + file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + logger.Panic(err) + } + stat, err := file.Stat() + if err != nil { + logger.Panic(err) + } + pools[i] = Pool{file, int(stat.Size()) / trackSize} + } + return &DnaDrive{ + poolCount: poolCount, + trackSize: trackSize, + tracksPerPool: tracksPerPool, + pools: pools, + writeWrapper: writeWrapper, + readWrapper: readWrapper, + } +} + +func (d *DnaDrive) VersionInput() (dnaInput, <-chan bool) { + rChunks, wChunks := io.Pipe() + rRecipe, wRecipe := io.Pipe() + rFiles, wFiles := io.Pipe() + version := dnaVersion{ + Input: dnaInput{wChunks, wRecipe, wFiles}, + Output: dnaOutput{rChunks, rRecipe, rFiles}, + } + end := make(chan bool) + go d.writeVersion(version.Output, end) + return version.Input, end +} + +func (d *DnaDrive) writeVersion(output dnaOutput, end chan<- bool) { + var err error + var recipe, files, version bytes.Buffer + n := d.write(output.Chunks, d.pools[1:], Forward) + _, err = io.Copy(&recipe, output.Recipe) + if err != nil { + logger.Error("dna export recipe ", err) + } + _, err = io.Copy(&files, output.Files) + if err != nil { + logger.Error("dna export files ", err) + } + header := Header{ + uint64(n), + uint64(recipe.Len()), + uint64(files.Len()), + } + e := gob.NewEncoder(&version) + err = e.Encode(header) + if err != nil { + logger.Error("dna export version header: ", err) + } + logger.Debugf("version len %d", version.Len()) + rest := int64(d.trackSize - version.Len()) + logger.Debugf("version rest %d", rest) + n, err = io.CopyN(&version, &recipe, rest) + logger.Debugf("recipe copied in version %d", n) + rest -= n + logger.Debugf("version rest %d", rest) + if err == io.EOF && rest > 0 { // recipe is written to version but there is space left + n, err = io.CopyN(&version, &files, rest) + logger.Debugf("files copied in version %d", n) + rest -= n + logger.Debugf("version rest %d", rest) + if err == io.EOF && rest > 0 { // files is writtent to version but there is space left + // version.Write(make([]byte, rest)) + } else if err != nil { // another error than EOF happened + logger.Error("dna export files: ", err) + } else { // files has not been fully written so we write what is left to pools + d.write(&files, d.pools[1:], Backward) + } + } else if err != nil { // another error than EOF happened + logger.Error("dna export recipe: ", err) + } else { // recipe has not been fully written so we concat with files and write what is left to pools + io.Copy(&recipe, &files) + d.write(&recipe, d.pools[1:], Backward) + } + d.write(&version, d.pools[:1], Forward) + end <- true } -func (d *DnaDrive) Writer(w io.Writer) io.WriteCloser { - return NewWriter(w, d.trackSize) +func (d *DnaDrive) write(r io.Reader, pools []Pool, direction Direction) int64 { + var err error + var i, n int + var count int64 + if direction == Backward { + i = len(pools) - 1 + } + for err != io.ErrUnexpectedEOF && err != io.EOF { + if pools[i].TrackCount == d.tracksPerPool { + if direction == Backward { + i-- + } else { + i++ + } + if i < 0 || i >= len(pools) { + logger.Panic("dna export: no space left") + } + continue + } + buf := make([]byte, d.trackSize) + n, err = io.ReadFull(r, buf) + if err == io.EOF { + break + } + logger.Debug("written track:", n, err) + count += int64(n) + n, errw := pools[i].Data.Write(buf) + if errw != nil { + logger.Error("dna export: pool %d: %d/%d bytes written: %s", i, n, len(buf), errw) + } + pools[i].TrackCount++ + } + return count } diff --git a/dna/writer.go b/dna/writer.go deleted file mode 100644 index 6b232cd..0000000 --- a/dna/writer.go +++ /dev/null @@ -1,45 +0,0 @@ -/* Copyright (C) 2021 Nicolas Peugnet <n.peugnet@free.fr> - - This file is part of dna-backup. - - dna-backup is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - dna-backup is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with dna-backup. If not, see <https://www.gnu.org/licenses/>. */ - -package dna - -import ( - "io" - - "github.com/n-peugnet/dna-backup/utils" -) - -type writer struct { - *utils.WriteCounter - trackSize int -} - -func NewWriter(w io.Writer, trackSize int) io.WriteCloser { - return &writer{ - WriteCounter: utils.NewWriteCounter(w), - trackSize: trackSize, - } -} - -func (d *writer) Close() (err error) { - // add padding for the last track - padding := make([]byte, d.trackSize-d.Count()%d.trackSize) - if _, err = d.Write(padding); err != nil { - return err - } - return nil -} diff --git a/repo/export_dir.go b/repo/export_dir.go index 8c63fdb..bffa7f0 100644 --- a/repo/export_dir.go +++ b/repo/export_dir.go @@ -18,9 +18,6 @@ package repo import ( - "bytes" - "compress/zlib" - "encoding/binary" "io" "github.com/n-peugnet/dna-backup/dna" @@ -28,61 +25,36 @@ import ( "github.com/n-peugnet/dna-backup/utils" ) -type Version struct { - Chunks uint64 - Recipe uint64 - Files uint64 -} - func (r *Repo) ExportDir(dest string, trackSize int) { r.Init() - versions := make([]Version, len(r.versions)) + exporter := dna.New(dest, 96, trackSize, 10000, utils.ZlibWriter, utils.ZlibReader) chunks := r.loadChunks(r.versions) - for i := range versions { - var count int64 - var content bytes.Buffer // replace with a reader capable of switching files - var recipe, fileList []byte + for i := range r.versions { var err error - tracker := dna.NewWriter(&content, trackSize) - counter := utils.NewWriteCounter(tracker) - compressor := zlib.NewWriter(counter) - for _, c := range chunks[i] { - n, err := io.Copy(compressor, c.Reader()) - if err != nil { - logger.Error(err) + input, end := exporter.VersionInput() + if len(chunks[i]) > 0 { + for _, c := range chunks[i] { + _, err := io.Copy(input.Chunks, c.Reader()) + if err != nil { + logger.Error(err) + } } - count += n + input.Chunks.Close() } - compressor.Close() - tracker.Close() - readDelta(r.versions[i], recipeName, utils.NopReadWrapper, func(rc io.ReadCloser) { - recipe, err = io.ReadAll(rc) + readDelta(r.versions[i], recipeName, r.chunkReadWrapper, func(rc io.ReadCloser) { + _, err = io.Copy(input.Recipe, rc) if err != nil { logger.Error("load recipe ", err) } + input.Recipe.Close() }) - readDelta(r.versions[i], filesName, utils.NopReadWrapper, func(rc io.ReadCloser) { - fileList, err = io.ReadAll(rc) + readDelta(r.versions[i], filesName, r.chunkReadWrapper, func(rc io.ReadCloser) { + _, err = io.Copy(input.Files, rc) if err != nil { logger.Error("load files ", err) } + input.Files.Close() }) - versions[i] = Version{ - uint64(counter.Count()), - uint64(len(recipe)), - uint64(len(fileList)), - } - header := versions[i].createHeader() - logger.Info(header) - } -} - -func (v Version) createHeader() []byte { - buf := make([]byte, binary.MaxVarintLen64*3) - i := 0 - for _, x := range []uint64{v.Chunks, v.Recipe, v.Files} { - n := binary.PutUvarint(buf[i:], x) - i += n + <-end } - return buf[:i] } |