aboutsummaryrefslogtreecommitdiff
path: root/repo/repo.go
diff options
context:
space:
mode:
authorn-peugnet <n.peugnet@free.fr>2021-10-13 16:18:24 +0200
committern-peugnet <n.peugnet@free.fr>2021-10-13 16:18:24 +0200
commita3c5cae3d204eba00055e3a49f69bd8849a1053a (patch)
tree93da71deb36e8e117510f5a324eac9e71a272c7f /repo/repo.go
parente2c176a084f46fcc235ae1bb69dda6965c4e80a2 (diff)
downloaddna-backup-a3c5cae3d204eba00055e3a49f69bd8849a1053a.tar.gz
dna-backup-a3c5cae3d204eba00055e3a49f69bd8849a1053a.zip
load repo metadata in threads inside Init func
Diffstat (limited to 'repo/repo.go')
-rw-r--r--repo/repo.go45
1 files changed, 26 insertions, 19 deletions
diff --git a/repo/repo.go b/repo/repo.go
index b8c07f2..864dc9e 100644
--- a/repo/repo.go
+++ b/repo/repo.go
@@ -37,6 +37,7 @@ import (
"path/filepath"
"reflect"
"strings"
+ "sync"
"github.com/chmduquesne/rollinghash/rabinkarp64"
"github.com/n-peugnet/dna-backup/cache"
@@ -69,6 +70,7 @@ func (m SketchMap) Set(key []uint64, value *ChunkId) {
type Repo struct {
path string
+ versions []string
chunkSize int
sketchWSize int
sketchSfCount int
@@ -149,16 +151,13 @@ func (r *Repo) Commit(source string) {
if err != nil {
logger.Fatal(err)
}
- versions := r.loadVersions()
- newVersion := len(versions) // TODO: add newVersion functino
+ r.Init()
+ newVersion := len(r.versions) // TODO: add newVersion functino
newPath := filepath.Join(r.path, fmt.Sprintf(versionFmt, newVersion))
newChunkPath := filepath.Join(newPath, chunksName)
os.Mkdir(newPath, 0775) // TODO: handle errors
os.Mkdir(newChunkPath, 0775) // TODO: handle errors
files := listFiles(source)
- r.loadHashes(versions)
- r.loadFileLists(versions)
- r.loadRecipes(versions)
storeQueue := make(chan chunkData, 32)
storeEnd := make(chan bool)
go r.storageWorker(newVersion, storeQueue, storeEnd)
@@ -178,12 +177,9 @@ func (r *Repo) Commit(source string) {
}
func (r *Repo) Restore(destination string) {
- versions := r.loadVersions()
- r.loadFileLists(versions)
- logger.Info("loading previous recipies")
- r.loadRecipes(versions)
+ r.Init()
reader, writer := io.Pipe()
- logger.Info("restoring latest version")
+ logger.Info("restore latest version")
go r.restoreStream(writer, r.recipe)
bufReader := bufio.NewReaderSize(reader, r.chunkSize*2)
for _, file := range r.files {
@@ -212,8 +208,17 @@ func (r *Repo) Restore(destination string) {
}
}
-func (r *Repo) loadVersions() []string {
- var versions []string
+func (r *Repo) Init() {
+ var wg sync.WaitGroup
+ r.loadVersions()
+ wg.Add(3)
+ go r.loadHashes(r.versions, &wg)
+ go r.loadFileLists(r.versions, &wg)
+ go r.loadRecipes(r.versions, &wg)
+ wg.Wait()
+}
+
+func (r *Repo) loadVersions() {
files, err := os.ReadDir(r.path)
if err != nil {
logger.Fatal(err)
@@ -222,9 +227,8 @@ func (r *Repo) loadVersions() []string {
if !f.IsDir() {
continue
}
- versions = append(versions, filepath.Join(r.path, f.Name()))
+ r.versions = append(r.versions, filepath.Join(r.path, f.Name()))
}
- return versions
}
func listFiles(path string) []File {
@@ -405,11 +409,12 @@ func (r *Repo) storeFileList(version int, list []File) {
}
// loadFileLists loads incrementally the file lists' delta of each given version.
-func (r *Repo) loadFileLists(versions []string) {
+func (r *Repo) loadFileLists(versions []string, wg *sync.WaitGroup) {
logger.Info("load previous file lists")
var files []File
r.filesRaw = loadDeltas(&files, versions, r.patcher, r.chunkReadWrapper, filesName)
r.files = files
+ wg.Done()
}
// storageWorker is meant to be started in a goroutine and stores each new chunk's
@@ -483,8 +488,8 @@ func (r *Repo) LoadChunkContent(id *ChunkId) *bytes.Reader {
}
// TODO: use atoi for chunkid ?
-func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) {
- for i, v := range versions {
+func (r *Repo) LoadChunks(chunks chan<- IdentifiedChunk) {
+ for i, v := range r.versions {
p := filepath.Join(v, chunksName)
entries, err := os.ReadDir(p)
if err != nil {
@@ -504,7 +509,7 @@ func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) {
// loadHashes loads and aggregates the hashes stored for each given version and
// stores them in the repo maps.
-func (r *Repo) loadHashes(versions []string) {
+func (r *Repo) loadHashes(versions []string, wg *sync.WaitGroup) {
logger.Info("load previous hashes")
for i, v := range versions {
path := filepath.Join(v, hashesName)
@@ -528,6 +533,7 @@ func (r *Repo) loadHashes(versions []string) {
logger.Warning(err)
}
}
+ wg.Done()
}
func (r *Repo) chunkMinLen() int {
@@ -735,7 +741,7 @@ func (r *Repo) storeRecipe(version int, recipe []Chunk) {
storeDelta(r.recipeRaw, recipe, dest, r.differ, r.chunkWriteWrapper)
}
-func (r *Repo) loadRecipes(versions []string) {
+func (r *Repo) loadRecipes(versions []string, wg *sync.WaitGroup) {
logger.Info("load previous recipies")
var recipe []Chunk
r.recipeRaw = loadDeltas(&recipe, versions, r.patcher, r.chunkReadWrapper, recipeName)
@@ -745,6 +751,7 @@ func (r *Repo) loadRecipes(versions []string) {
}
}
r.recipe = recipe
+ wg.Done()
}
func extractDeltaChunks(chunks []Chunk) (ret []*DeltaChunk) {