diff --git a/gitindex/catfile.go b/gitindex/catfile.go new file mode 100644 index 000000000..89418fdcc --- /dev/null +++ b/gitindex/catfile.go @@ -0,0 +1,200 @@ +// Copyright 2016 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gitindex + +import ( + "bufio" + "bytes" + "encoding/hex" + "fmt" + "io" + "os/exec" + "strconv" + + "github.com/go-git/go-git/v5/plumbing" +) + +// catfileReader provides streaming access to git blob objects via a pipelined +// "git cat-file --batch --buffer" process. A writer goroutine feeds all blob +// SHAs to stdin while the caller reads responses one at a time, similar to +// archive/tar.Reader. +// +// The --buffer flag switches git's output from per-object flush (write_or_die) +// to libc stdio buffering (fwrite), reducing syscalls. After stdin EOF, git +// calls fflush(stdout) to deliver any remaining output. +// +// Usage: +// +// cr, err := newCatfileReader(repoDir, ids) +// if err != nil { ... } +// defer cr.Close() +// +// for { +// size, missing, err := cr.Next() +// if err == io.EOF { break } +// if missing { continue } +// if size > maxSize { continue } // unread bytes auto-skipped +// content := make([]byte, size) +// io.ReadFull(cr, content) +// } +type catfileReader struct { + cmd *exec.Cmd + reader *bufio.Reader + writeErr <-chan error + + // pending tracks unread content bytes + trailing LF for the current + // entry. Next() discards any pending bytes before reading the next header. + pending int64 +} + +// newCatfileReader starts a "git cat-file --batch --buffer" process and feeds +// all ids to its stdin via a background goroutine. The caller must call Close +// when done. +func newCatfileReader(repoDir string, ids []plumbing.Hash) (*catfileReader, error) { + cmd := exec.Command("git", "cat-file", "--batch", "--buffer") + cmd.Dir = repoDir + + stdin, err := cmd.StdinPipe() + if err != nil { + return nil, fmt.Errorf("stdin pipe: %w", err) + } + + stdout, err := cmd.StdoutPipe() + if err != nil { + stdin.Close() + return nil, fmt.Errorf("stdout pipe: %w", err) + } + + if err := cmd.Start(); err != nil { + stdin.Close() + stdout.Close() + return nil, fmt.Errorf("start git cat-file: %w", err) + } + + // Writer goroutine: feed all SHAs then close stdin to trigger flush. + writeErr := make(chan error, 1) + go func() { + defer stdin.Close() + bw := bufio.NewWriterSize(stdin, 64*1024) + var hexBuf [41]byte + hexBuf[40] = '\n' + for _, id := range ids { + hex.Encode(hexBuf[:40], id[:]) + if _, err := bw.Write(hexBuf[:]); err != nil { + writeErr <- err + return + } + } + writeErr <- bw.Flush() + }() + + return &catfileReader{ + cmd: cmd, + reader: bufio.NewReaderSize(stdout, 512*1024), + writeErr: writeErr, + }, nil +} + +// Next advances to the next blob entry. It returns the blob's size and whether +// it is missing. Any unread content from the previous entry is automatically +// discarded. Returns io.EOF when all entries have been consumed. +// +// After Next returns successfully with missing=false, call Read to consume the +// blob content, or call Next again to skip it. +func (cr *catfileReader) Next() (size int64, missing bool, err error) { + // Discard unread content from the previous entry. + if cr.pending > 0 { + if _, err := cr.reader.Discard(int(cr.pending)); err != nil { + return 0, false, fmt.Errorf("discard pending bytes: %w", err) + } + cr.pending = 0 + } + + headerBytes, err := cr.reader.ReadBytes('\n') + if err != nil { + if err == io.EOF { + return 0, false, io.EOF + } + return 0, false, fmt.Errorf("read header: %w", err) + } + header := headerBytes[:len(headerBytes)-1] // trim \n + + if bytes.HasSuffix(header, []byte(" missing")) { + return 0, true, nil + } + + // Parse size from " ". + lastSpace := bytes.LastIndexByte(header, ' ') + if lastSpace == -1 { + return 0, false, fmt.Errorf("unexpected header: %q", header) + } + size, err = strconv.ParseInt(string(header[lastSpace+1:]), 10, 64) + if err != nil { + return 0, false, fmt.Errorf("parse size from %q: %w", header, err) + } + + // Track pending bytes: content + trailing LF. + cr.pending = size + 1 + return size, false, nil +} + +// Read reads from the current blob's content. Implements io.Reader. Returns +// io.EOF when the blob's content has been fully read (the trailing LF +// delimiter is consumed automatically). +func (cr *catfileReader) Read(p []byte) (int, error) { + if cr.pending <= 0 { + return 0, io.EOF + } + + // Don't read into the trailing LF byte — reserve it. + contentRemaining := cr.pending - 1 + if contentRemaining <= 0 { + // Only the trailing LF remains; consume it and signal EOF. + if _, err := cr.reader.ReadByte(); err != nil { + return 0, fmt.Errorf("read trailing LF: %w", err) + } + cr.pending = 0 + return 0, io.EOF + } + + // Limit the read to the remaining content bytes. + if int64(len(p)) > contentRemaining { + p = p[:contentRemaining] + } + n, err := cr.reader.Read(p) + cr.pending -= int64(n) + if err != nil { + return n, err + } + + // If we've consumed all content bytes, also consume the trailing LF. + if cr.pending == 1 { + if _, err := cr.reader.ReadByte(); err != nil { + return n, fmt.Errorf("read trailing LF: %w", err) + } + cr.pending = 0 + } + + return n, nil +} + +// Close shuts down the cat-file process and waits for it to exit. +func (cr *catfileReader) Close() error { + // Drain stdout so git can flush and exit without blocking. + _, _ = io.Copy(io.Discard, cr.reader) + // Wait for writer goroutine. + <-cr.writeErr + return cr.cmd.Wait() +} diff --git a/gitindex/catfile_bench_test.go b/gitindex/catfile_bench_test.go new file mode 100644 index 000000000..ec2626a7b --- /dev/null +++ b/gitindex/catfile_bench_test.go @@ -0,0 +1,159 @@ +package gitindex + +import ( + "fmt" + "io" + "os" + "testing" + + "github.com/go-git/go-git/v5/plumbing" +) + +// Set ZOEKT_BENCH_REPO to a git checkout to enable these benchmarks. +// +// git clone --depth=1 https://github.com/kubernetes/kubernetes /tmp/k8s +// ZOEKT_BENCH_REPO=/tmp/k8s go test ./gitindex/ -bench=BenchmarkBlobRead -benchmem -count=5 -timeout=600s + +func requireBenchGitRepo(b *testing.B) string { + b.Helper() + dir := os.Getenv("ZOEKT_BENCH_REPO") + if dir == "" { + b.Skip("ZOEKT_BENCH_REPO not set") + } + return dir +} + +// collectBlobKeys opens the repo, walks HEAD, and returns all fileKeys with +// their BlobLocations plus the repo directory path. +func collectBlobKeys(b *testing.B, repoDir string) (map[fileKey]BlobLocation, string) { + b.Helper() + + repo, closer, err := openRepo(repoDir) + if err != nil { + b.Fatalf("openRepo: %v", err) + } + b.Cleanup(func() { closer.Close() }) + + head, err := repo.Head() + if err != nil { + b.Fatalf("Head: %v", err) + } + + commit, err := repo.CommitObject(head.Hash()) + if err != nil { + b.Fatalf("CommitObject: %v", err) + } + + tree, err := commit.Tree() + if err != nil { + b.Fatalf("Tree: %v", err) + } + + rw := NewRepoWalker(repo, "https://example.com/repo", nil) + if _, err := rw.CollectFiles(tree, "HEAD", nil); err != nil { + b.Fatalf("CollectFiles: %v", err) + } + + return rw.Files, repoDir +} + +// sortedBlobKeys returns fileKeys for deterministic iteration. +func sortedBlobKeys(files map[fileKey]BlobLocation) []fileKey { + keys := make([]fileKey, 0, len(files)) + for k := range files { + keys = append(keys, k) + } + return keys +} + +// BenchmarkBlobRead_GoGit measures the current go-git BlobObject approach: +// sequential calls to repo.GitRepo.BlobObject(hash) for each file. +func BenchmarkBlobRead_GoGit(b *testing.B) { + repoDir := requireBenchGitRepo(b) + files, _ := collectBlobKeys(b, repoDir) + keys := sortedBlobKeys(files) + b.Logf("collected %d blob keys", len(keys)) + + for _, n := range []int{1_000, 5_000, len(keys)} { + n = min(n, len(keys)) + subset := keys[:n] + + b.Run(fmt.Sprintf("files=%d", n), func(b *testing.B) { + b.ReportAllocs() + var totalBytes int64 + for b.Loop() { + totalBytes = 0 + for _, key := range subset { + loc := files[key] + blob, err := loc.GitRepo.BlobObject(key.ID) + if err != nil { + b.Fatalf("BlobObject(%s): %v", key.ID, err) + } + r, err := blob.Reader() + if err != nil { + b.Fatalf("Reader: %v", err) + } + n, err := io.Copy(io.Discard, r) + r.Close() + if err != nil { + b.Fatalf("Read: %v", err) + } + totalBytes += n + } + } + b.ReportMetric(float64(totalBytes), "content-bytes/op") + b.ReportMetric(float64(len(subset)), "files/op") + }) + } +} + +// BenchmarkBlobRead_CatfileReader measures the streaming catfileReader approach: +// all SHAs written to stdin at once via --buffer, responses read one at a time. +// This is the production path used by indexGitRepo. +func BenchmarkBlobRead_CatfileReader(b *testing.B) { + repoDir := requireBenchGitRepo(b) + files, gitDir := collectBlobKeys(b, repoDir) + keys := sortedBlobKeys(files) + b.Logf("collected %d blob keys", len(keys)) + + ids := make([]plumbing.Hash, len(keys)) + for i, k := range keys { + ids[i] = k.ID + } + + for _, n := range []int{1_000, 5_000, len(keys)} { + n = min(n, len(keys)) + subset := ids[:n] + + b.Run(fmt.Sprintf("files=%d", n), func(b *testing.B) { + b.ReportAllocs() + var totalBytes int64 + for b.Loop() { + totalBytes = 0 + cr, err := newCatfileReader(gitDir, subset) + if err != nil { + b.Fatalf("newCatfileReader: %v", err) + } + for range subset { + size, missing, err := cr.Next() + if err != nil { + cr.Close() + b.Fatalf("Next: %v", err) + } + if missing { + continue + } + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + cr.Close() + b.Fatalf("ReadFull: %v", err) + } + totalBytes += int64(len(content)) + } + cr.Close() + } + b.ReportMetric(float64(totalBytes), "content-bytes/op") + b.ReportMetric(float64(len(subset)), "files/op") + }) + } +} diff --git a/gitindex/catfile_test.go b/gitindex/catfile_test.go new file mode 100644 index 000000000..c871bd7d2 --- /dev/null +++ b/gitindex/catfile_test.go @@ -0,0 +1,230 @@ +package gitindex + +import ( + "io" + "os" + "os/exec" + "path/filepath" + "testing" + + "github.com/go-git/go-git/v5/plumbing" +) + +// createTestRepo creates a git repo with various test files and returns +// the repo path and a map of filename -> blob SHA. +func createTestRepo(t *testing.T) (string, map[string]plumbing.Hash) { + t.Helper() + dir := t.TempDir() + repoDir := filepath.Join(dir, "repo") + + script := ` +set -e +git init -b main repo +cd repo +git config user.email "test@test.com" +git config user.name "Test" + +# Normal text file +echo "hello world" > hello.txt + +# Empty file +touch empty.txt + +# Binary file with newlines embedded +printf '\x00\x01\x02\nhello\nworld\n\x03\x04' > binary.bin + +# Large-ish file (64KB of data) +dd if=/dev/urandom bs=1024 count=64 of=large.bin 2>/dev/null + +git add -A +git commit -m "initial" +` + cmd := exec.Command("/bin/sh", "-c", script) + cmd.Dir = dir + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + t.Fatalf("create test repo: %v", err) + } + + // Get blob SHAs for each file. + blobs := map[string]plumbing.Hash{} + for _, name := range []string{"hello.txt", "empty.txt", "binary.bin", "large.bin"} { + out, err := exec.Command("git", "-C", repoDir, "rev-parse", "HEAD:"+name).Output() + if err != nil { + t.Fatalf("rev-parse %s: %v", name, err) + } + sha := string(out[:len(out)-1]) // trim newline + blobs[name] = plumbing.NewHash(sha) + } + + return repoDir, blobs +} + +func TestCatfileReader(t *testing.T) { + repoDir, blobs := createTestRepo(t) + + ids := []plumbing.Hash{ + blobs["hello.txt"], + blobs["empty.txt"], + blobs["binary.bin"], + blobs["large.bin"], + } + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatalf("newCatfileReader: %v", err) + } + defer cr.Close() + + // hello.txt + size, missing, err := cr.Next() + if err != nil { + t.Fatalf("Next hello.txt: %v", err) + } + if missing { + t.Fatal("hello.txt unexpectedly missing") + } + if size != 12 { + t.Errorf("hello.txt size = %d, want 12", size) + } + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + t.Fatalf("ReadFull hello.txt: %v", err) + } + if string(content) != "hello world\n" { + t.Errorf("hello.txt content = %q", content) + } + + // empty.txt + size, missing, err = cr.Next() + if err != nil { + t.Fatalf("Next empty.txt: %v", err) + } + if size != 0 { + t.Errorf("empty.txt size = %d, want 0", size) + } + + // binary.bin — read content and verify binary data survives. + size, missing, err = cr.Next() + if err != nil { + t.Fatalf("Next binary.bin: %v", err) + } + binContent := make([]byte, size) + if _, err := io.ReadFull(cr, binContent); err != nil { + t.Fatalf("ReadFull binary.bin: %v", err) + } + if binContent[0] != 0x00 || binContent[3] != '\n' { + t.Errorf("binary.bin unexpected leading bytes: %x", binContent[:5]) + } + + // large.bin + size, missing, err = cr.Next() + if err != nil { + t.Fatalf("Next large.bin: %v", err) + } + if size != 64*1024 { + t.Errorf("large.bin size = %d, want %d", size, 64*1024) + } + largeContent := make([]byte, size) + if _, err := io.ReadFull(cr, largeContent); err != nil { + t.Fatalf("ReadFull large.bin: %v", err) + } + + // EOF after all entries. + _, _, err = cr.Next() + if err != io.EOF { + t.Errorf("expected io.EOF after last entry, got %v", err) + } +} + +func TestCatfileReader_Skip(t *testing.T) { + repoDir, blobs := createTestRepo(t) + + ids := []plumbing.Hash{ + blobs["hello.txt"], + blobs["large.bin"], + blobs["binary.bin"], + } + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatalf("newCatfileReader: %v", err) + } + defer cr.Close() + + // Skip hello.txt by calling Next again without reading. + _, _, err = cr.Next() + if err != nil { + t.Fatalf("Next hello.txt: %v", err) + } + + // Skip large.bin too. + size, _, err := cr.Next() + if err != nil { + t.Fatalf("Next large.bin: %v", err) + } + if size != 64*1024 { + t.Errorf("large.bin size = %d, want %d", size, 64*1024) + } + + // Read binary.bin after skipping two entries. + size, _, err = cr.Next() + if err != nil { + t.Fatalf("Next binary.bin: %v", err) + } + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + t.Fatalf("ReadFull binary.bin: %v", err) + } + if content[0] != 0x00 { + t.Errorf("binary.bin first byte = %x, want 0x00", content[0]) + } +} + +func TestCatfileReader_Missing(t *testing.T) { + repoDir, blobs := createTestRepo(t) + + fakeHash := plumbing.NewHash("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef") + ids := []plumbing.Hash{ + blobs["hello.txt"], + fakeHash, + blobs["empty.txt"], + } + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatalf("newCatfileReader: %v", err) + } + defer cr.Close() + + // hello.txt — read normally. + size, missing, err := cr.Next() + if err != nil || missing { + t.Fatalf("Next hello.txt: err=%v missing=%v", err, missing) + } + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + t.Fatalf("ReadFull hello.txt: %v", err) + } + if string(content) != "hello world\n" { + t.Errorf("hello.txt = %q", content) + } + + // fakeHash — missing. + _, missing, err = cr.Next() + if err != nil { + t.Fatalf("Next fakeHash: %v", err) + } + if !missing { + t.Error("expected fakeHash to be missing") + } + + // empty.txt — still works after missing entry. + size, missing, err = cr.Next() + if err != nil || missing { + t.Fatalf("Next empty.txt: err=%v missing=%v", err, missing) + } + if size != 0 { + t.Errorf("empty.txt size = %d, want 0", size) + } +} diff --git a/gitindex/index.go b/gitindex/index.go index 5fbeba0d0..0d8bc6697 100644 --- a/gitindex/index.go +++ b/gitindex/index.go @@ -585,17 +585,67 @@ func indexGitRepo(opts Options, config gitIndexConfig) (bool, error) { sort.Strings(names) names = uniq(names) - log.Printf("attempting to index %d total files", totalFiles) - for idx, name := range names { - keys := fileKeys[name] + // Separate main-repo keys from submodule keys, collecting blob SHAs + // for the main repo so we can stream them via git cat-file --batch. + mainRepoKeys := make([]fileKey, 0, totalFiles) + mainRepoIDs := make([]plumbing.Hash, 0, totalFiles) + var submoduleKeys []fileKey + + for _, name := range names { + for _, key := range fileKeys[name] { + if key.SubRepoPath == "" { + mainRepoKeys = append(mainRepoKeys, key) + mainRepoIDs = append(mainRepoIDs, key.ID) + } else { + submoduleKeys = append(submoduleKeys, key) + } + } + } + + log.Printf("attempting to index %d total files (%d via cat-file, %d via go-git)", totalFiles, len(mainRepoIDs), len(submoduleKeys)) + + // Stream main-repo blobs via pipelined cat-file --batch --buffer. + // Large blobs are skipped without reading content into memory. + if len(mainRepoIDs) > 0 { + cr, err := newCatfileReader(opts.RepoDir, mainRepoIDs) + if err != nil { + return false, fmt.Errorf("newCatfileReader: %w", err) + } - for _, key := range keys { - doc, err := createDocument(key, repos, opts.BuildOptions) + for idx, key := range mainRepoKeys { + size, missing, err := cr.Next() if err != nil { - return false, err + cr.Close() + return false, fmt.Errorf("cat-file next for %s: %w", key.FullPath(), err) + } + + branches := repos[key].Branches + var doc index.Document + + if missing { + doc = skippedLargeDoc(key, branches) + } else { + keyFullPath := key.FullPath() + if size > int64(opts.BuildOptions.SizeMax) && !opts.BuildOptions.IgnoreSizeMax(keyFullPath) { + // Skip without reading content into memory. + doc = skippedLargeDoc(key, branches) + } else { + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + cr.Close() + return false, fmt.Errorf("read blob %s: %w", keyFullPath, err) + } + doc = index.Document{ + SubRepositoryPath: key.SubRepoPath, + Name: keyFullPath, + Content: content, + Branches: branches, + } + } } if err := builder.Add(doc); err != nil { + cr.Close() return false, fmt.Errorf("error adding document with name %s: %w", key.FullPath(), err) } @@ -603,7 +653,28 @@ func indexGitRepo(opts Options, config gitIndexConfig) (bool, error) { builder.CheckMemoryUsage() } } + + if err := cr.Close(); err != nil { + return false, fmt.Errorf("close cat-file: %w", err) + } } + + // Index submodule blobs via go-git. + for idx, key := range submoduleKeys { + doc, err := createDocument(key, repos, opts.BuildOptions) + if err != nil { + return false, err + } + + if err := builder.Add(doc); err != nil { + return false, fmt.Errorf("error adding document with name %s: %w", key.FullPath(), err) + } + + if idx%10_000 == 0 { + builder.CheckMemoryUsage() + } + } + return true, builder.Finish() }