aboutsummaryrefslogtreecommitdiff
path: root/repo
diff options
context:
space:
mode:
authorn-peugnet <n.peugnet@free.fr>2021-10-13 16:18:24 +0200
committern-peugnet <n.peugnet@free.fr>2021-10-13 16:18:24 +0200
commita3c5cae3d204eba00055e3a49f69bd8849a1053a (patch)
tree93da71deb36e8e117510f5a324eac9e71a272c7f /repo
parente2c176a084f46fcc235ae1bb69dda6965c4e80a2 (diff)
downloaddna-backup-a3c5cae3d204eba00055e3a49f69bd8849a1053a.tar.gz
dna-backup-a3c5cae3d204eba00055e3a49f69bd8849a1053a.zip
load repo metadata in threads inside Init func
Diffstat (limited to 'repo')
-rw-r--r--repo/repo.go45
-rw-r--r--repo/repo_test.go21
2 files changed, 38 insertions, 28 deletions
diff --git a/repo/repo.go b/repo/repo.go
index b8c07f2..864dc9e 100644
--- a/repo/repo.go
+++ b/repo/repo.go
@@ -37,6 +37,7 @@ import (
"path/filepath"
"reflect"
"strings"
+ "sync"
"github.com/chmduquesne/rollinghash/rabinkarp64"
"github.com/n-peugnet/dna-backup/cache"
@@ -69,6 +70,7 @@ func (m SketchMap) Set(key []uint64, value *ChunkId) {
type Repo struct {
path string
+ versions []string
chunkSize int
sketchWSize int
sketchSfCount int
@@ -149,16 +151,13 @@ func (r *Repo) Commit(source string) {
if err != nil {
logger.Fatal(err)
}
- versions := r.loadVersions()
- newVersion := len(versions) // TODO: add newVersion functino
+ r.Init()
+ newVersion := len(r.versions) // TODO: add newVersion functino
newPath := filepath.Join(r.path, fmt.Sprintf(versionFmt, newVersion))
newChunkPath := filepath.Join(newPath, chunksName)
os.Mkdir(newPath, 0775) // TODO: handle errors
os.Mkdir(newChunkPath, 0775) // TODO: handle errors
files := listFiles(source)
- r.loadHashes(versions)
- r.loadFileLists(versions)
- r.loadRecipes(versions)
storeQueue := make(chan chunkData, 32)
storeEnd := make(chan bool)
go r.storageWorker(newVersion, storeQueue, storeEnd)
@@ -178,12 +177,9 @@ func (r *Repo) Commit(source string) {
}
func (r *Repo) Restore(destination string) {
- versions := r.loadVersions()
- r.loadFileLists(versions)
- logger.Info("loading previous recipies")
- r.loadRecipes(versions)
+ r.Init()
reader, writer := io.Pipe()
- logger.Info("restoring latest version")
+ logger.Info("restore latest version")
go r.restoreStream(writer, r.recipe)
bufReader := bufio.NewReaderSize(reader, r.chunkSize*2)
for _, file := range r.files {
@@ -212,8 +208,17 @@ func (r *Repo) Restore(destination string) {
}
}
-func (r *Repo) loadVersions() []string {
- var versions []string
+func (r *Repo) Init() {
+ var wg sync.WaitGroup
+ r.loadVersions()
+ wg.Add(3)
+ go r.loadHashes(r.versions, &wg)
+ go r.loadFileLists(r.versions, &wg)
+ go r.loadRecipes(r.versions, &wg)
+ wg.Wait()
+}
+
+func (r *Repo) loadVersions() {
files, err := os.ReadDir(r.path)
if err != nil {
logger.Fatal(err)
@@ -222,9 +227,8 @@ func (r *Repo) loadVersions() []string {
if !f.IsDir() {
continue
}
- versions = append(versions, filepath.Join(r.path, f.Name()))
+ r.versions = append(r.versions, filepath.Join(r.path, f.Name()))
}
- return versions
}
func listFiles(path string) []File {
@@ -405,11 +409,12 @@ func (r *Repo) storeFileList(version int, list []File) {
}
// loadFileLists loads incrementally the file lists' delta of each given version.
-func (r *Repo) loadFileLists(versions []string) {
+func (r *Repo) loadFileLists(versions []string, wg *sync.WaitGroup) {
logger.Info("load previous file lists")
var files []File
r.filesRaw = loadDeltas(&files, versions, r.patcher, r.chunkReadWrapper, filesName)
r.files = files
+ wg.Done()
}
// storageWorker is meant to be started in a goroutine and stores each new chunk's
@@ -483,8 +488,8 @@ func (r *Repo) LoadChunkContent(id *ChunkId) *bytes.Reader {
}
// TODO: use atoi for chunkid ?
-func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) {
- for i, v := range versions {
+func (r *Repo) LoadChunks(chunks chan<- IdentifiedChunk) {
+ for i, v := range r.versions {
p := filepath.Join(v, chunksName)
entries, err := os.ReadDir(p)
if err != nil {
@@ -504,7 +509,7 @@ func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) {
// loadHashes loads and aggregates the hashes stored for each given version and
// stores them in the repo maps.
-func (r *Repo) loadHashes(versions []string) {
+func (r *Repo) loadHashes(versions []string, wg *sync.WaitGroup) {
logger.Info("load previous hashes")
for i, v := range versions {
path := filepath.Join(v, hashesName)
@@ -528,6 +533,7 @@ func (r *Repo) loadHashes(versions []string) {
logger.Warning(err)
}
}
+ wg.Done()
}
func (r *Repo) chunkMinLen() int {
@@ -735,7 +741,7 @@ func (r *Repo) storeRecipe(version int, recipe []Chunk) {
storeDelta(r.recipeRaw, recipe, dest, r.differ, r.chunkWriteWrapper)
}
-func (r *Repo) loadRecipes(versions []string) {
+func (r *Repo) loadRecipes(versions []string, wg *sync.WaitGroup) {
logger.Info("load previous recipies")
var recipe []Chunk
r.recipeRaw = loadDeltas(&recipe, versions, r.patcher, r.chunkReadWrapper, recipeName)
@@ -745,6 +751,7 @@ func (r *Repo) loadRecipes(versions []string) {
}
}
r.recipe = recipe
+ wg.Done()
}
func extractDeltaChunks(chunks []Chunk) (ret []*DeltaChunk) {
diff --git a/repo/repo_test.go b/repo/repo_test.go
index a7d35a4..1384b92 100644
--- a/repo/repo_test.go
+++ b/repo/repo_test.go
@@ -189,8 +189,7 @@ func TestReadFiles3(t *testing.T) {
func TestSymlinks(t *testing.T) {
var output bytes.Buffer
- multi := io.MultiWriter(&output, os.Stderr)
- logger.SetOutput(multi)
+ logger.SetOutput(&output)
defer logger.SetOutput(os.Stderr)
tmpDir, err := filepath.EvalSymlinks(t.TempDir())
if err != nil {
@@ -251,8 +250,8 @@ func TestLoadChunks(t *testing.T) {
go repo.chunkStream(reader1, chunks1)
go repo.chunkStream(reader2, chunks2)
storeChunks(resultChunks, chunks1)
- versions := []string{resultVersion}
- go repo.loadChunks(versions, chunks3)
+ repo.versions = []string{resultVersion}
+ go repo.LoadChunks(chunks3)
i := 0
for c2 := range chunks2 {
@@ -312,12 +311,12 @@ func TestBsdiff(t *testing.T) {
// Load previously stored chunks
oldChunks := make(chan IdentifiedChunk, 16)
- versions := repo.loadVersions()
- go repo.loadChunks(versions, oldChunks)
+ repo.loadVersions()
+ go repo.LoadChunks(oldChunks)
repo.hashChunks(oldChunks)
// Read new data
- newVersion := len(versions)
+ newVersion := len(repo.versions)
newPath := filepath.Join(repo.path, fmt.Sprintf(versionFmt, newVersion))
os.MkdirAll(newPath, 0775)
reader := getDataStream(dataDir, concatFiles)
@@ -396,7 +395,8 @@ func TestHashes(t *testing.T) {
repo1 := NewRepo(source, 8<<10)
repo1.chunkReadWrapper = utils.ZlibReader
repo1.chunkWriteWrapper = utils.ZlibWriter
- go repo1.loadChunks([]string{filepath.Join(source, "00000")}, chunks)
+ repo1.versions = []string{filepath.Join(source, "00000")}
+ go repo1.LoadChunks(chunks)
for c := range chunks {
fp, sk := repo1.hashChunk(c.GetId(), c.Reader())
content, err := io.ReadAll(c.Reader())
@@ -418,7 +418,10 @@ func TestHashes(t *testing.T) {
<-storeEnd
testutils.AssertLen(t, 0, repo2.fingerprints, "Fingerprints")
testutils.AssertLen(t, 0, repo2.sketches, "Sketches")
- repo2.loadHashes([]string{filepath.Join(dest, "00000")})
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go repo2.loadHashes([]string{filepath.Join(dest, "00000")}, &wg)
+ wg.Wait()
testutils.AssertSame(t, repo1.fingerprints, repo2.fingerprints, "Fingerprint maps")
testutils.AssertSame(t, repo1.sketches, repo2.sketches, "Sketches maps")
}