From a3c5cae3d204eba00055e3a49f69bd8849a1053a Mon Sep 17 00:00:00 2001 From: n-peugnet Date: Wed, 13 Oct 2021 16:18:24 +0200 Subject: load repo metadata in threads inside Init func --- repo/repo.go | 45 ++++++++++++++++++++++++++------------------- repo/repo_test.go | 21 ++++++++++++--------- 2 files changed, 38 insertions(+), 28 deletions(-) diff --git a/repo/repo.go b/repo/repo.go index b8c07f2..864dc9e 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -37,6 +37,7 @@ import ( "path/filepath" "reflect" "strings" + "sync" "github.com/chmduquesne/rollinghash/rabinkarp64" "github.com/n-peugnet/dna-backup/cache" @@ -69,6 +70,7 @@ func (m SketchMap) Set(key []uint64, value *ChunkId) { type Repo struct { path string + versions []string chunkSize int sketchWSize int sketchSfCount int @@ -149,16 +151,13 @@ func (r *Repo) Commit(source string) { if err != nil { logger.Fatal(err) } - versions := r.loadVersions() - newVersion := len(versions) // TODO: add newVersion functino + r.Init() + newVersion := len(r.versions) // TODO: add newVersion functino newPath := filepath.Join(r.path, fmt.Sprintf(versionFmt, newVersion)) newChunkPath := filepath.Join(newPath, chunksName) os.Mkdir(newPath, 0775) // TODO: handle errors os.Mkdir(newChunkPath, 0775) // TODO: handle errors files := listFiles(source) - r.loadHashes(versions) - r.loadFileLists(versions) - r.loadRecipes(versions) storeQueue := make(chan chunkData, 32) storeEnd := make(chan bool) go r.storageWorker(newVersion, storeQueue, storeEnd) @@ -178,12 +177,9 @@ func (r *Repo) Commit(source string) { } func (r *Repo) Restore(destination string) { - versions := r.loadVersions() - r.loadFileLists(versions) - logger.Info("loading previous recipies") - r.loadRecipes(versions) + r.Init() reader, writer := io.Pipe() - logger.Info("restoring latest version") + logger.Info("restore latest version") go r.restoreStream(writer, r.recipe) bufReader := bufio.NewReaderSize(reader, r.chunkSize*2) for _, file := range r.files { @@ -212,8 +208,17 @@ func (r *Repo) Restore(destination string) { } } -func (r *Repo) loadVersions() []string { - var versions []string +func (r *Repo) Init() { + var wg sync.WaitGroup + r.loadVersions() + wg.Add(3) + go r.loadHashes(r.versions, &wg) + go r.loadFileLists(r.versions, &wg) + go r.loadRecipes(r.versions, &wg) + wg.Wait() +} + +func (r *Repo) loadVersions() { files, err := os.ReadDir(r.path) if err != nil { logger.Fatal(err) @@ -222,9 +227,8 @@ func (r *Repo) loadVersions() []string { if !f.IsDir() { continue } - versions = append(versions, filepath.Join(r.path, f.Name())) + r.versions = append(r.versions, filepath.Join(r.path, f.Name())) } - return versions } func listFiles(path string) []File { @@ -405,11 +409,12 @@ func (r *Repo) storeFileList(version int, list []File) { } // loadFileLists loads incrementally the file lists' delta of each given version. -func (r *Repo) loadFileLists(versions []string) { +func (r *Repo) loadFileLists(versions []string, wg *sync.WaitGroup) { logger.Info("load previous file lists") var files []File r.filesRaw = loadDeltas(&files, versions, r.patcher, r.chunkReadWrapper, filesName) r.files = files + wg.Done() } // storageWorker is meant to be started in a goroutine and stores each new chunk's @@ -483,8 +488,8 @@ func (r *Repo) LoadChunkContent(id *ChunkId) *bytes.Reader { } // TODO: use atoi for chunkid ? -func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) { - for i, v := range versions { +func (r *Repo) LoadChunks(chunks chan<- IdentifiedChunk) { + for i, v := range r.versions { p := filepath.Join(v, chunksName) entries, err := os.ReadDir(p) if err != nil { @@ -504,7 +509,7 @@ func (r *Repo) loadChunks(versions []string, chunks chan<- IdentifiedChunk) { // loadHashes loads and aggregates the hashes stored for each given version and // stores them in the repo maps. -func (r *Repo) loadHashes(versions []string) { +func (r *Repo) loadHashes(versions []string, wg *sync.WaitGroup) { logger.Info("load previous hashes") for i, v := range versions { path := filepath.Join(v, hashesName) @@ -528,6 +533,7 @@ func (r *Repo) loadHashes(versions []string) { logger.Warning(err) } } + wg.Done() } func (r *Repo) chunkMinLen() int { @@ -735,7 +741,7 @@ func (r *Repo) storeRecipe(version int, recipe []Chunk) { storeDelta(r.recipeRaw, recipe, dest, r.differ, r.chunkWriteWrapper) } -func (r *Repo) loadRecipes(versions []string) { +func (r *Repo) loadRecipes(versions []string, wg *sync.WaitGroup) { logger.Info("load previous recipies") var recipe []Chunk r.recipeRaw = loadDeltas(&recipe, versions, r.patcher, r.chunkReadWrapper, recipeName) @@ -745,6 +751,7 @@ func (r *Repo) loadRecipes(versions []string) { } } r.recipe = recipe + wg.Done() } func extractDeltaChunks(chunks []Chunk) (ret []*DeltaChunk) { diff --git a/repo/repo_test.go b/repo/repo_test.go index a7d35a4..1384b92 100644 --- a/repo/repo_test.go +++ b/repo/repo_test.go @@ -189,8 +189,7 @@ func TestReadFiles3(t *testing.T) { func TestSymlinks(t *testing.T) { var output bytes.Buffer - multi := io.MultiWriter(&output, os.Stderr) - logger.SetOutput(multi) + logger.SetOutput(&output) defer logger.SetOutput(os.Stderr) tmpDir, err := filepath.EvalSymlinks(t.TempDir()) if err != nil { @@ -251,8 +250,8 @@ func TestLoadChunks(t *testing.T) { go repo.chunkStream(reader1, chunks1) go repo.chunkStream(reader2, chunks2) storeChunks(resultChunks, chunks1) - versions := []string{resultVersion} - go repo.loadChunks(versions, chunks3) + repo.versions = []string{resultVersion} + go repo.LoadChunks(chunks3) i := 0 for c2 := range chunks2 { @@ -312,12 +311,12 @@ func TestBsdiff(t *testing.T) { // Load previously stored chunks oldChunks := make(chan IdentifiedChunk, 16) - versions := repo.loadVersions() - go repo.loadChunks(versions, oldChunks) + repo.loadVersions() + go repo.LoadChunks(oldChunks) repo.hashChunks(oldChunks) // Read new data - newVersion := len(versions) + newVersion := len(repo.versions) newPath := filepath.Join(repo.path, fmt.Sprintf(versionFmt, newVersion)) os.MkdirAll(newPath, 0775) reader := getDataStream(dataDir, concatFiles) @@ -396,7 +395,8 @@ func TestHashes(t *testing.T) { repo1 := NewRepo(source, 8<<10) repo1.chunkReadWrapper = utils.ZlibReader repo1.chunkWriteWrapper = utils.ZlibWriter - go repo1.loadChunks([]string{filepath.Join(source, "00000")}, chunks) + repo1.versions = []string{filepath.Join(source, "00000")} + go repo1.LoadChunks(chunks) for c := range chunks { fp, sk := repo1.hashChunk(c.GetId(), c.Reader()) content, err := io.ReadAll(c.Reader()) @@ -418,7 +418,10 @@ func TestHashes(t *testing.T) { <-storeEnd testutils.AssertLen(t, 0, repo2.fingerprints, "Fingerprints") testutils.AssertLen(t, 0, repo2.sketches, "Sketches") - repo2.loadHashes([]string{filepath.Join(dest, "00000")}) + var wg sync.WaitGroup + wg.Add(1) + go repo2.loadHashes([]string{filepath.Join(dest, "00000")}, &wg) + wg.Wait() testutils.AssertSame(t, repo1.fingerprints, repo2.fingerprints, "Fingerprint maps") testutils.AssertSame(t, repo1.sketches, repo2.sketches, "Sketches maps") } -- cgit v1.2.3