aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--dna/drive.go181
-rw-r--r--dna/writer.go45
-rw-r--r--repo/export_dir.go62
3 files changed, 195 insertions, 93 deletions
diff --git a/dna/drive.go b/dna/drive.go
index f99e7d4..013c4b4 100644
--- a/dna/drive.go
+++ b/dna/drive.go
@@ -17,14 +17,189 @@
package dna
-import "io"
+import (
+ "bytes"
+ "encoding/gob"
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+
+ "github.com/n-peugnet/dna-backup/logger"
+ "github.com/n-peugnet/dna-backup/utils"
+)
+
+type Direction int
+
+const (
+ Forward Direction = iota
+ Backward
+)
type DnaDrive struct {
poolCount int
trackSize int
tracksPerPool int
+ pools []Pool
+ writeWrapper utils.WriteWrapper
+ readWrapper utils.ReadWrapper
+}
+
+type Pool struct {
+ Data io.ReadWriteCloser
+ TrackCount int
+}
+
+type Header struct {
+ Chunks uint64
+ Recipe uint64
+ Files uint64
+}
+
+type dnaVersion struct {
+ Input dnaInput
+ Output dnaOutput
+}
+
+type dnaInput struct {
+ Chunks io.WriteCloser
+ Recipe io.WriteCloser
+ Files io.WriteCloser
+}
+
+type dnaOutput struct {
+ Chunks io.ReadCloser
+ Recipe io.ReadCloser
+ Files io.ReadCloser
+}
+
+func New(
+ destination string,
+ poolCount int,
+ trackSize int,
+ tracksPerPool int,
+ writeWrapper utils.WriteWrapper,
+ readWrapper utils.ReadWrapper,
+) *DnaDrive {
+ pools := make([]Pool, poolCount)
+ os.MkdirAll(destination, 0755)
+ for i := range pools {
+ path := filepath.Join(destination, fmt.Sprintf("%02d", i))
+ file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0644)
+ if err != nil {
+ logger.Panic(err)
+ }
+ stat, err := file.Stat()
+ if err != nil {
+ logger.Panic(err)
+ }
+ pools[i] = Pool{file, int(stat.Size()) / trackSize}
+ }
+ return &DnaDrive{
+ poolCount: poolCount,
+ trackSize: trackSize,
+ tracksPerPool: tracksPerPool,
+ pools: pools,
+ writeWrapper: writeWrapper,
+ readWrapper: readWrapper,
+ }
+}
+
+func (d *DnaDrive) VersionInput() (dnaInput, <-chan bool) {
+ rChunks, wChunks := io.Pipe()
+ rRecipe, wRecipe := io.Pipe()
+ rFiles, wFiles := io.Pipe()
+ version := dnaVersion{
+ Input: dnaInput{wChunks, wRecipe, wFiles},
+ Output: dnaOutput{rChunks, rRecipe, rFiles},
+ }
+ end := make(chan bool)
+ go d.writeVersion(version.Output, end)
+ return version.Input, end
+}
+
+func (d *DnaDrive) writeVersion(output dnaOutput, end chan<- bool) {
+ var err error
+ var recipe, files, version bytes.Buffer
+ n := d.write(output.Chunks, d.pools[1:], Forward)
+ _, err = io.Copy(&recipe, output.Recipe)
+ if err != nil {
+ logger.Error("dna export recipe ", err)
+ }
+ _, err = io.Copy(&files, output.Files)
+ if err != nil {
+ logger.Error("dna export files ", err)
+ }
+ header := Header{
+ uint64(n),
+ uint64(recipe.Len()),
+ uint64(files.Len()),
+ }
+ e := gob.NewEncoder(&version)
+ err = e.Encode(header)
+ if err != nil {
+ logger.Error("dna export version header: ", err)
+ }
+ logger.Debugf("version len %d", version.Len())
+ rest := int64(d.trackSize - version.Len())
+ logger.Debugf("version rest %d", rest)
+ n, err = io.CopyN(&version, &recipe, rest)
+ logger.Debugf("recipe copied in version %d", n)
+ rest -= n
+ logger.Debugf("version rest %d", rest)
+ if err == io.EOF && rest > 0 { // recipe is written to version but there is space left
+ n, err = io.CopyN(&version, &files, rest)
+ logger.Debugf("files copied in version %d", n)
+ rest -= n
+ logger.Debugf("version rest %d", rest)
+ if err == io.EOF && rest > 0 { // files is writtent to version but there is space left
+ // version.Write(make([]byte, rest))
+ } else if err != nil { // another error than EOF happened
+ logger.Error("dna export files: ", err)
+ } else { // files has not been fully written so we write what is left to pools
+ d.write(&files, d.pools[1:], Backward)
+ }
+ } else if err != nil { // another error than EOF happened
+ logger.Error("dna export recipe: ", err)
+ } else { // recipe has not been fully written so we concat with files and write what is left to pools
+ io.Copy(&recipe, &files)
+ d.write(&recipe, d.pools[1:], Backward)
+ }
+ d.write(&version, d.pools[:1], Forward)
+ end <- true
}
-func (d *DnaDrive) Writer(w io.Writer) io.WriteCloser {
- return NewWriter(w, d.trackSize)
+func (d *DnaDrive) write(r io.Reader, pools []Pool, direction Direction) int64 {
+ var err error
+ var i, n int
+ var count int64
+ if direction == Backward {
+ i = len(pools) - 1
+ }
+ for err != io.ErrUnexpectedEOF && err != io.EOF {
+ if pools[i].TrackCount == d.tracksPerPool {
+ if direction == Backward {
+ i--
+ } else {
+ i++
+ }
+ if i < 0 || i >= len(pools) {
+ logger.Panic("dna export: no space left")
+ }
+ continue
+ }
+ buf := make([]byte, d.trackSize)
+ n, err = io.ReadFull(r, buf)
+ if err == io.EOF {
+ break
+ }
+ logger.Debug("written track:", n, err)
+ count += int64(n)
+ n, errw := pools[i].Data.Write(buf)
+ if errw != nil {
+ logger.Error("dna export: pool %d: %d/%d bytes written: %s", i, n, len(buf), errw)
+ }
+ pools[i].TrackCount++
+ }
+ return count
}
diff --git a/dna/writer.go b/dna/writer.go
deleted file mode 100644
index 6b232cd..0000000
--- a/dna/writer.go
+++ /dev/null
@@ -1,45 +0,0 @@
-/* Copyright (C) 2021 Nicolas Peugnet <n.peugnet@free.fr>
-
- This file is part of dna-backup.
-
- dna-backup is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation, either version 3 of the License, or
- (at your option) any later version.
-
- dna-backup is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with dna-backup. If not, see <https://www.gnu.org/licenses/>. */
-
-package dna
-
-import (
- "io"
-
- "github.com/n-peugnet/dna-backup/utils"
-)
-
-type writer struct {
- *utils.WriteCounter
- trackSize int
-}
-
-func NewWriter(w io.Writer, trackSize int) io.WriteCloser {
- return &writer{
- WriteCounter: utils.NewWriteCounter(w),
- trackSize: trackSize,
- }
-}
-
-func (d *writer) Close() (err error) {
- // add padding for the last track
- padding := make([]byte, d.trackSize-d.Count()%d.trackSize)
- if _, err = d.Write(padding); err != nil {
- return err
- }
- return nil
-}
diff --git a/repo/export_dir.go b/repo/export_dir.go
index 8c63fdb..bffa7f0 100644
--- a/repo/export_dir.go
+++ b/repo/export_dir.go
@@ -18,9 +18,6 @@
package repo
import (
- "bytes"
- "compress/zlib"
- "encoding/binary"
"io"
"github.com/n-peugnet/dna-backup/dna"
@@ -28,61 +25,36 @@ import (
"github.com/n-peugnet/dna-backup/utils"
)
-type Version struct {
- Chunks uint64
- Recipe uint64
- Files uint64
-}
-
func (r *Repo) ExportDir(dest string, trackSize int) {
r.Init()
- versions := make([]Version, len(r.versions))
+ exporter := dna.New(dest, 96, trackSize, 10000, utils.ZlibWriter, utils.ZlibReader)
chunks := r.loadChunks(r.versions)
- for i := range versions {
- var count int64
- var content bytes.Buffer // replace with a reader capable of switching files
- var recipe, fileList []byte
+ for i := range r.versions {
var err error
- tracker := dna.NewWriter(&content, trackSize)
- counter := utils.NewWriteCounter(tracker)
- compressor := zlib.NewWriter(counter)
- for _, c := range chunks[i] {
- n, err := io.Copy(compressor, c.Reader())
- if err != nil {
- logger.Error(err)
+ input, end := exporter.VersionInput()
+ if len(chunks[i]) > 0 {
+ for _, c := range chunks[i] {
+ _, err := io.Copy(input.Chunks, c.Reader())
+ if err != nil {
+ logger.Error(err)
+ }
}
- count += n
+ input.Chunks.Close()
}
- compressor.Close()
- tracker.Close()
- readDelta(r.versions[i], recipeName, utils.NopReadWrapper, func(rc io.ReadCloser) {
- recipe, err = io.ReadAll(rc)
+ readDelta(r.versions[i], recipeName, r.chunkReadWrapper, func(rc io.ReadCloser) {
+ _, err = io.Copy(input.Recipe, rc)
if err != nil {
logger.Error("load recipe ", err)
}
+ input.Recipe.Close()
})
- readDelta(r.versions[i], filesName, utils.NopReadWrapper, func(rc io.ReadCloser) {
- fileList, err = io.ReadAll(rc)
+ readDelta(r.versions[i], filesName, r.chunkReadWrapper, func(rc io.ReadCloser) {
+ _, err = io.Copy(input.Files, rc)
if err != nil {
logger.Error("load files ", err)
}
+ input.Files.Close()
})
- versions[i] = Version{
- uint64(counter.Count()),
- uint64(len(recipe)),
- uint64(len(fileList)),
- }
- header := versions[i].createHeader()
- logger.Info(header)
- }
-}
-
-func (v Version) createHeader() []byte {
- buf := make([]byte, binary.MaxVarintLen64*3)
- i := 0
- for _, x := range []uint64{v.Chunks, v.Recipe, v.Files} {
- n := binary.PutUvarint(buf[i:], x)
- i += n
+ <-end
}
- return buf[:i]
}