diff options
-rw-r--r-- | repo.go | 56 | ||||
-rw-r--r-- | repo_test.go | 25 |
2 files changed, 51 insertions, 30 deletions
@@ -60,11 +60,13 @@ func (r *Repo) Commit(source string) { // newFilesPath := path.Join(newPath, filesName) os.Mkdir(newPath, 0775) os.Mkdir(newChunkPath, 0775) + reader, writer := io.Pipe() newChunks := make(chan []byte, 16) oldChunks := make(chan Chunk, 16) files := listFiles(source) go loadChunks(versions, oldChunks) - go readFiles(files, newChunks) + go concatFiles(files, writer) + go chunkStream(reader, newChunks) hashes := hashChunks(oldChunks) chunks := r.matchChunks(newChunks, hashes) extractNewChunks(chunks) @@ -108,44 +110,52 @@ func listFiles(path string) []File { return files } -func readFiles(files []File, chunks chan<- []byte) { - var buff []byte - var prev, read = chunkSize, 0 - +func concatFiles(files []File, stream io.WriteCloser) { for _, f := range files { file, err := os.Open(f.Path) if err != nil { - log.Println(err) + log.Printf("Error reading file '%s': %s\n", f.Path, err) continue } - for err != io.EOF { - if prev == chunkSize { - buff = make([]byte, chunkSize) - prev, err = file.Read(buff) - } else { - read, err = file.Read(buff[prev:]) - prev += read - } - if err != nil && err != io.EOF { - log.Println(err) - } - if prev == chunkSize { - chunks <- buff - } + io.Copy(stream, file) + } + stream.Close() +} + +func chunkStream(stream io.Reader, chunks chan<- []byte) { + var buff []byte + var prev, read = chunkSize, 0 + var err error + + for err != io.EOF { + if prev == chunkSize { + buff = make([]byte, chunkSize) + prev, err = stream.Read(buff) + } else { + read, err = stream.Read(buff[prev:]) + prev += read } + if err != nil && err != io.EOF { + log.Println(err) + } + if prev == chunkSize { + chunks <- buff + } + } + if prev != chunkSize { + chunks <- buff } - chunks <- buff close(chunks) } -func storeFiles(dest string, files []File) { +func storeFileList(dest string, files []File) { err := writeFile(dest, files) if err != nil { log.Println(err) } } -func loadFiles(path string) []File { +func loadFileList(path string) []File { files := make([]File, 0) err := readFile(path, &files) if err != nil { diff --git a/repo_test.go b/repo_test.go index 6a7a526..ba45e0e 100644 --- a/repo_test.go +++ b/repo_test.go @@ -2,6 +2,7 @@ package main import ( "bytes" + "io" "io/ioutil" "os" "path" @@ -12,9 +13,11 @@ import ( ) func chunkCompare(t *testing.T, dataDir string, testFiles []string, chunkCount int) { + reader, writer := io.Pipe() chunks := make(chan []byte) files := listFiles(dataDir) - go readFiles(files, chunks) + go concatFiles(files, writer) + go chunkStream(reader, chunks) offset := 0 buff := make([]byte, chunkSize*chunkCount) @@ -79,12 +82,16 @@ func TestLoadChunks(t *testing.T) { resultVersion := path.Join(resultDir, "00000") resultChunks := path.Join(resultVersion, chunksName) os.MkdirAll(resultChunks, 0775) + reader1, writer1 := io.Pipe() + reader2, writer2 := io.Pipe() chunks1 := make(chan []byte, 16) chunks2 := make(chan []byte, 16) chunks3 := make(chan Chunk, 16) files := listFiles(dataDir) - go readFiles(files, chunks1) - go readFiles(files, chunks2) + go concatFiles(files, writer1) + go concatFiles(files, writer2) + go chunkStream(reader1, chunks1) + go chunkStream(reader2, chunks2) storeChunks(resultChunks, chunks1) versions := []string{resultVersion} go loadChunks(versions, chunks3) @@ -130,8 +137,8 @@ func TestStoreLoadFiles(t *testing.T) { dataDir := path.Join("test", "data") resultFiles := path.Join(resultDir, filesName) files1 := listFiles(dataDir) - storeFiles(resultFiles, files1) - files2 := loadFiles(resultFiles) + storeFileList(resultFiles, files1) + files2 := loadFileList(resultFiles) for i, f := range files1 { if f != files2[i] { t.Errorf("Loaded file data %d does not match stored one", i) @@ -148,21 +155,25 @@ func TestBsdiff(t *testing.T) { resultVersion := path.Join(resultDir, "00000") resultChunks := path.Join(resultVersion, chunksName) os.MkdirAll(resultChunks, 0775) + reader, writer := io.Pipe() chunks := make(chan []byte, 16) files := listFiles(dataDir) - go readFiles(files, chunks) + go concatFiles(files, writer) + go chunkStream(reader, chunks) storeChunks(resultChunks, chunks) input, _ := ioutil.ReadFile(path.Join(dataDir, "logs.1", "logTest.log")) ioutil.WriteFile(addedFile, input, 0664) + reader, writer = io.Pipe() newChunks := make(chan []byte, 16) oldChunks := make(chan Chunk, 16) files = listFiles(dataDir) repo := NewRepo(resultDir) versions := repo.loadVersions() go loadChunks(versions, oldChunks) - go readFiles(files, newChunks) + go concatFiles(files, writer) + go chunkStream(reader, newChunks) hashes := hashChunks(oldChunks) recipe := repo.matchChunks(newChunks, hashes) buff := new(bytes.Buffer) |