From dfbedcf873a6e16ccbf167d8f4ea2e5d7ac591e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9mence=20Lesn=C3=A9?= Date: Fri, 20 Mar 2026 14:33:09 +0100 Subject: [PATCH 1/2] gitindex: replace go-git blob reading with pipelined git cat-file --batch Replace the serial go-git BlobObject calls in indexGitRepo with a single pipelined "git cat-file --batch --buffer" subprocess. A writer goroutine feeds all blob SHAs to stdin while the main goroutine reads responses from stdout, forming a concurrent pipeline that eliminates per-object packfile seek overhead and leverages git's internal delta base cache. Submodule blobs fall back to the existing go-git createDocument path. Benchmarked on kubernetes (29,188 files, 261 MB), Apple M1 Max, 5 runs: go-git BlobObject (before): Time: 2.94s Allocs: 685K Memory: 691 MB cat-file pipelined (after): Time: 0.60s Allocs: 58K Memory: 276 MB Speedup: 4.9x time, 12x fewer allocs, 2.5x less memory --- gitindex/catfile.go | 172 +++++++++++++++++++++++++++++++++ gitindex/catfile_bench_test.go | 148 ++++++++++++++++++++++++++++ gitindex/catfile_test.go | 138 ++++++++++++++++++++++++++ gitindex/index.go | 77 ++++++++++++--- 4 files changed, 524 insertions(+), 11 deletions(-) create mode 100644 gitindex/catfile.go create mode 100644 gitindex/catfile_bench_test.go create mode 100644 gitindex/catfile_test.go diff --git a/gitindex/catfile.go b/gitindex/catfile.go new file mode 100644 index 000000000..df9252ae5 --- /dev/null +++ b/gitindex/catfile.go @@ -0,0 +1,172 @@ +// Copyright 2016 Google Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gitindex + +import ( + "bufio" + "bytes" + "encoding/hex" + "fmt" + "io" + "os/exec" + "strconv" + + "github.com/go-git/go-git/v5/plumbing" +) + +// blobResult holds the result of reading a single blob from a pipelined +// cat-file --batch --buffer process. +type blobResult struct { + ID plumbing.Hash + Content []byte + Size int64 + Missing bool + Err error +} + +// readBlobsPipelined reads all blobs for the given IDs using a single +// "git cat-file --batch --buffer" process. A writer goroutine feeds SHAs +// to stdin while the main goroutine reads responses from stdout, forming a +// concurrent pipeline. The --buffer flag switches git's output from per-object +// flush (write_or_die) to libc stdio buffering (fwrite), reducing syscalls. +// After stdin EOF, git calls fflush(stdout) to deliver any remaining output. +// Results are returned in the same order as ids. +func readBlobsPipelined(repoDir string, ids []plumbing.Hash) ([]blobResult, error) { + cmd := exec.Command("git", "cat-file", "--batch", "--buffer") + cmd.Dir = repoDir + + stdin, err := cmd.StdinPipe() + if err != nil { + return nil, fmt.Errorf("stdin pipe: %w", err) + } + + stdout, err := cmd.StdoutPipe() + if err != nil { + stdin.Close() + return nil, fmt.Errorf("stdout pipe: %w", err) + } + + if err := cmd.Start(); err != nil { + stdin.Close() + stdout.Close() + return nil, fmt.Errorf("start git cat-file: %w", err) + } + + // Writer goroutine: feed all SHAs then close stdin to trigger flush. + // Uses bufio.Writer to coalesce small writes into fewer syscalls. + // Stack-allocated hex buffer avoids per-SHA heap allocation. + writeErr := make(chan error, 1) + go func() { + defer stdin.Close() + bw := bufio.NewWriterSize(stdin, 64*1024) // 64KB write buffer + var hexBuf [41]byte + hexBuf[40] = '\n' + for _, id := range ids { + hex.Encode(hexBuf[:40], id[:]) + if _, err := bw.Write(hexBuf[:]); err != nil { + writeErr <- err + return + } + } + writeErr <- bw.Flush() + }() + + // Reader: consume all responses in order. + // Manual header parsing avoids SplitN allocation. + reader := bufio.NewReaderSize(stdout, 512*1024) + results := make([]blobResult, len(ids)) + var readErr error + + for i, id := range ids { + results[i].ID = id + + headerBytes, err := reader.ReadBytes('\n') + if err != nil { + readErr = fmt.Errorf("read header for %s: %w", id, err) + results[i].Err = readErr + break + } + header := headerBytes[:len(headerBytes)-1] // trim \n + + if bytes.HasSuffix(header, []byte(" missing")) { + results[i].Missing = true + continue + } + + // Parse size from " ". + lastSpace := bytes.LastIndexByte(header, ' ') + if lastSpace == -1 { + readErr = fmt.Errorf("unexpected header: %q", header) + results[i].Err = readErr + break + } + size, err := strconv.ParseInt(string(header[lastSpace+1:]), 10, 64) + if err != nil { + readErr = fmt.Errorf("parse size from %q: %w", header, err) + results[i].Err = readErr + break + } + results[i].Size = size + + // Read exactly size bytes into a dedicated slice (must survive + // until consumed by builder.Add). Exact-size avoids allocator + // rounding waste (e.g. make(4097) → 8192 bytes). + content := make([]byte, size) + if _, err := io.ReadFull(reader, content); err != nil { + readErr = fmt.Errorf("read content (%d bytes): %w", size, err) + results[i].Err = readErr + break + } + results[i].Content = content + + // Consume trailing LF delimiter. + if _, err := reader.ReadByte(); err != nil { + readErr = fmt.Errorf("read trailing LF: %w", err) + results[i].Err = readErr + break + } + } + + // Mark all unprocessed results as failed if we broke out early. + if readErr != nil { + for j := range results { + if results[j].Err == nil && results[j].Content == nil && !results[j].Missing { + results[j].Err = readErr + } + } + } + + // Drain stdout so git can exit without blocking on a full pipe buffer. + _, _ = io.Copy(io.Discard, reader) + + // Wait for writer goroutine to finish. + wErr := <-writeErr + + // Wait for the git process to exit. + waitErr := cmd.Wait() + + // Return the first meaningful error. + if readErr != nil { + return results, readErr + } + if wErr != nil { + return results, fmt.Errorf("write to cat-file: %w", wErr) + } + if waitErr != nil { + return results, waitErr + } + + return results, nil +} diff --git a/gitindex/catfile_bench_test.go b/gitindex/catfile_bench_test.go new file mode 100644 index 000000000..fe7f96892 --- /dev/null +++ b/gitindex/catfile_bench_test.go @@ -0,0 +1,148 @@ +package gitindex + +import ( + "fmt" + "io" + "os" + "testing" + + "github.com/go-git/go-git/v5/plumbing" +) + +// Set ZOEKT_BENCH_REPO to a git checkout to enable these benchmarks. +// +// git clone --depth=1 https://github.com/kubernetes/kubernetes /tmp/k8s +// ZOEKT_BENCH_REPO=/tmp/k8s go test ./gitindex/ -bench=BenchmarkBlobRead -benchmem -count=5 -timeout=600s + +func requireBenchGitRepo(b *testing.B) string { + b.Helper() + dir := os.Getenv("ZOEKT_BENCH_REPO") + if dir == "" { + b.Skip("ZOEKT_BENCH_REPO not set") + } + return dir +} + +// collectBlobKeys opens the repo, walks HEAD, and returns all fileKeys with +// their BlobLocations plus the repo directory path. +func collectBlobKeys(b *testing.B, repoDir string) (map[fileKey]BlobLocation, string) { + b.Helper() + + repo, closer, err := openRepo(repoDir) + if err != nil { + b.Fatalf("openRepo: %v", err) + } + b.Cleanup(func() { closer.Close() }) + + head, err := repo.Head() + if err != nil { + b.Fatalf("Head: %v", err) + } + + commit, err := repo.CommitObject(head.Hash()) + if err != nil { + b.Fatalf("CommitObject: %v", err) + } + + tree, err := commit.Tree() + if err != nil { + b.Fatalf("Tree: %v", err) + } + + rw := NewRepoWalker(repo, "https://example.com/repo", nil) + if _, err := rw.CollectFiles(tree, "HEAD", nil); err != nil { + b.Fatalf("CollectFiles: %v", err) + } + + return rw.Files, repoDir +} + +// sortedBlobKeys returns fileKeys for deterministic iteration. +func sortedBlobKeys(files map[fileKey]BlobLocation) []fileKey { + keys := make([]fileKey, 0, len(files)) + for k := range files { + keys = append(keys, k) + } + return keys +} + +// BenchmarkBlobRead_GoGit measures the current go-git BlobObject approach: +// sequential calls to repo.GitRepo.BlobObject(hash) for each file. +func BenchmarkBlobRead_GoGit(b *testing.B) { + repoDir := requireBenchGitRepo(b) + files, _ := collectBlobKeys(b, repoDir) + keys := sortedBlobKeys(files) + b.Logf("collected %d blob keys", len(keys)) + + for _, n := range []int{1_000, 5_000, len(keys)} { + n = min(n, len(keys)) + subset := keys[:n] + + b.Run(fmt.Sprintf("files=%d", n), func(b *testing.B) { + b.ReportAllocs() + var totalBytes int64 + for b.Loop() { + totalBytes = 0 + for _, key := range subset { + loc := files[key] + blob, err := loc.GitRepo.BlobObject(key.ID) + if err != nil { + b.Fatalf("BlobObject(%s): %v", key.ID, err) + } + r, err := blob.Reader() + if err != nil { + b.Fatalf("Reader: %v", err) + } + n, err := io.Copy(io.Discard, r) + r.Close() + if err != nil { + b.Fatalf("Read: %v", err) + } + totalBytes += n + } + } + b.ReportMetric(float64(totalBytes), "content-bytes/op") + b.ReportMetric(float64(len(subset)), "files/op") + }) + } +} + +// BenchmarkBlobRead_CatfilePipelined measures the pipelined approach: +// all SHAs written to stdin at once via --buffer, then all responses read. +// This is the production path used by indexGitRepo. +func BenchmarkBlobRead_CatfilePipelined(b *testing.B) { + repoDir := requireBenchGitRepo(b) + files, gitDir := collectBlobKeys(b, repoDir) + keys := sortedBlobKeys(files) + b.Logf("collected %d blob keys", len(keys)) + + ids := make([]plumbing.Hash, len(keys)) + for i, k := range keys { + ids[i] = k.ID + } + + for _, n := range []int{1_000, 5_000, len(keys)} { + n = min(n, len(keys)) + subset := ids[:n] + + b.Run(fmt.Sprintf("files=%d", n), func(b *testing.B) { + b.ReportAllocs() + var totalBytes int64 + for b.Loop() { + totalBytes = 0 + results, err := readBlobsPipelined(gitDir, subset) + if err != nil { + b.Fatalf("readBlobsPipelined: %v", err) + } + for _, r := range results { + if r.Err != nil { + b.Fatalf("blob %s: %v", r.ID, r.Err) + } + totalBytes += int64(len(r.Content)) + } + } + b.ReportMetric(float64(totalBytes), "content-bytes/op") + b.ReportMetric(float64(len(subset)), "files/op") + }) + } +} diff --git a/gitindex/catfile_test.go b/gitindex/catfile_test.go new file mode 100644 index 000000000..b6eb99841 --- /dev/null +++ b/gitindex/catfile_test.go @@ -0,0 +1,138 @@ +package gitindex + +import ( + "os" + "os/exec" + "path/filepath" + "testing" + + "github.com/go-git/go-git/v5/plumbing" +) + +// createTestRepo creates a git repo with various test files and returns +// the repo path and a map of filename -> blob SHA. +func createTestRepo(t *testing.T) (string, map[string]plumbing.Hash) { + t.Helper() + dir := t.TempDir() + repoDir := filepath.Join(dir, "repo") + + script := ` +set -e +git init -b main repo +cd repo +git config user.email "test@test.com" +git config user.name "Test" + +# Normal text file +echo "hello world" > hello.txt + +# Empty file +touch empty.txt + +# Binary file with newlines embedded +printf '\x00\x01\x02\nhello\nworld\n\x03\x04' > binary.bin + +# Large-ish file (64KB of data) +dd if=/dev/urandom bs=1024 count=64 of=large.bin 2>/dev/null + +git add -A +git commit -m "initial" +` + cmd := exec.Command("/bin/sh", "-c", script) + cmd.Dir = dir + cmd.Stderr = os.Stderr + if err := cmd.Run(); err != nil { + t.Fatalf("create test repo: %v", err) + } + + // Get blob SHAs for each file. + blobs := map[string]plumbing.Hash{} + for _, name := range []string{"hello.txt", "empty.txt", "binary.bin", "large.bin"} { + out, err := exec.Command("git", "-C", repoDir, "rev-parse", "HEAD:"+name).Output() + if err != nil { + t.Fatalf("rev-parse %s: %v", name, err) + } + sha := string(out[:len(out)-1]) // trim newline + blobs[name] = plumbing.NewHash(sha) + } + + return repoDir, blobs +} + +func TestReadBlobsPipelined(t *testing.T) { + repoDir, blobs := createTestRepo(t) + + ids := []plumbing.Hash{ + blobs["hello.txt"], + blobs["empty.txt"], + blobs["binary.bin"], + blobs["large.bin"], + } + + results, err := readBlobsPipelined(repoDir, ids) + if err != nil { + t.Fatalf("readBlobsPipelined: %v", err) + } + + if len(results) != 4 { + t.Fatalf("got %d results, want 4", len(results)) + } + + // hello.txt + if results[0].Err != nil { + t.Fatalf("hello.txt err: %v", results[0].Err) + } + if string(results[0].Content) != "hello world\n" { + t.Errorf("hello.txt = %q", results[0].Content) + } + + // empty.txt + if results[1].Err != nil { + t.Fatalf("empty.txt err: %v", results[1].Err) + } + if len(results[1].Content) != 0 { + t.Errorf("empty.txt len = %d, want 0", len(results[1].Content)) + } + + // binary.bin — verify exact content survived the pipeline. + if results[2].Err != nil { + t.Fatalf("binary.bin err: %v", results[2].Err) + } + if results[2].Content[0] != 0x00 { + t.Errorf("binary.bin first byte = %x, want 0x00", results[2].Content[0]) + } + + // large.bin + if results[3].Err != nil { + t.Fatalf("large.bin err: %v", results[3].Err) + } + if results[3].Size != 64*1024 { + t.Errorf("large.bin size = %d, want %d", results[3].Size, 64*1024) + } +} + +func TestReadBlobsPipelined_WithMissing(t *testing.T) { + repoDir, blobs := createTestRepo(t) + + fakeHash := plumbing.NewHash("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef") + ids := []plumbing.Hash{ + blobs["hello.txt"], + fakeHash, + blobs["empty.txt"], + } + + results, err := readBlobsPipelined(repoDir, ids) + if err != nil { + t.Fatalf("readBlobsPipelined: %v", err) + } + + if !results[1].Missing { + t.Errorf("expected result[1] to be missing") + } + if string(results[0].Content) != "hello world\n" { + t.Errorf("hello.txt = %q", results[0].Content) + } + if len(results[2].Content) != 0 { + t.Errorf("empty.txt len = %d, want 0", len(results[2].Content)) + } +} diff --git a/gitindex/index.go b/gitindex/index.go index 5fbeba0d0..77de23d1b 100644 --- a/gitindex/index.go +++ b/gitindex/index.go @@ -585,23 +585,78 @@ func indexGitRepo(opts Options, config gitIndexConfig) (bool, error) { sort.Strings(names) names = uniq(names) - log.Printf("attempting to index %d total files", totalFiles) - for idx, name := range names { - keys := fileKeys[name] + // Build the ordered list of (name, key) pairs and collect blob SHAs for + // the main repo so we can read them all at once via git cat-file --batch. + type indexEntry struct { + key fileKey + blobIndex int // index into blobResults, or -1 for submodule blobs + } + entries := make([]indexEntry, 0, totalFiles) + mainRepoIDs := make([]plumbing.Hash, 0, totalFiles) + + for _, name := range names { + for _, key := range fileKeys[name] { + if key.SubRepoPath == "" { + entries = append(entries, indexEntry{key: key, blobIndex: len(mainRepoIDs)}) + mainRepoIDs = append(mainRepoIDs, key.ID) + } else { + entries = append(entries, indexEntry{key: key, blobIndex: -1}) + } + } + } + + // Bulk-read all main-repo blobs via pipelined cat-file --batch --buffer. + var blobResults []blobResult + if len(mainRepoIDs) > 0 { + var err error + blobResults, err = readBlobsPipelined(opts.RepoDir, mainRepoIDs) + if err != nil { + return false, fmt.Errorf("readBlobsPipelined: %w", err) + } + } + + log.Printf("attempting to index %d total files (%d via cat-file, %d via go-git)", totalFiles, len(mainRepoIDs), totalFiles-len(mainRepoIDs)) + for idx, entry := range entries { + var doc index.Document + + if entry.blobIndex >= 0 { + // Main repo blob: use pipelined result. + r := blobResults[entry.blobIndex] + if r.Err != nil { + return false, fmt.Errorf("read blob %s: %w", r.ID, r.Err) + } - for _, key := range keys { - doc, err := createDocument(key, repos, opts.BuildOptions) + branches := repos[entry.key].Branches + if r.Missing { + doc = skippedLargeDoc(entry.key, branches) + } else { + keyFullPath := entry.key.FullPath() + if r.Size > int64(opts.BuildOptions.SizeMax) && !opts.BuildOptions.IgnoreSizeMax(keyFullPath) { + doc = skippedLargeDoc(entry.key, branches) + } else { + doc = index.Document{ + SubRepositoryPath: entry.key.SubRepoPath, + Name: keyFullPath, + Content: r.Content, + Branches: branches, + } + } + } + } else { + // Submodule blob: fall back to go-git. + var err error + doc, err = createDocument(entry.key, repos, opts.BuildOptions) if err != nil { return false, err } + } - if err := builder.Add(doc); err != nil { - return false, fmt.Errorf("error adding document with name %s: %w", key.FullPath(), err) - } + if err := builder.Add(doc); err != nil { + return false, fmt.Errorf("error adding document with name %s: %w", entry.key.FullPath(), err) + } - if idx%10_000 == 0 { - builder.CheckMemoryUsage() - } + if idx%10_000 == 0 { + builder.CheckMemoryUsage() } } return true, builder.Finish() From dec1a91f40ddd9bbbc40d9a92f3823def8d2f564 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9mence=20Lesn=C3=A9?= Date: Fri, 20 Mar 2026 18:06:50 +0100 Subject: [PATCH 2/2] gitindex: streaming catfileReader API, skip large blobs without reading MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the bulk readBlobsPipelined (which read all blobs into a []blobResult slice) with a streaming catfileReader modeled after archive/tar.Reader: cr, _ := newCatfileReader(repoDir, ids) for { size, missing, err := cr.Next() if size > maxSize { continue } // auto-skipped, never read content := make([]byte, size) io.ReadFull(cr, content) } Next() reads the cat-file header and returns the blob's size. The caller decides whether to Read the content or skip it — calling Next() again automatically discards unread bytes via bufio.Reader.Discard. Large blobs over SizeMax are never allocated or read into Go memory. Also split the single interleaved loop into two: one for main-repo blobs streamed via cat-file, one for submodule blobs via go-git's createDocument. The builder sorts documents internally so ordering between the loops does not matter. Peak memory is now bounded by ShardMax (one shard's worth of content) rather than total repository size. --- gitindex/catfile.go | 206 +++++++++++++++++++-------------- gitindex/catfile_bench_test.go | 29 +++-- gitindex/catfile_test.go | 158 +++++++++++++++++++------ gitindex/index.go | 94 ++++++++------- 4 files changed, 317 insertions(+), 170 deletions(-) diff --git a/gitindex/catfile.go b/gitindex/catfile.go index df9252ae5..89418fdcc 100644 --- a/gitindex/catfile.go +++ b/gitindex/catfile.go @@ -26,24 +26,43 @@ import ( "github.com/go-git/go-git/v5/plumbing" ) -// blobResult holds the result of reading a single blob from a pipelined -// cat-file --batch --buffer process. -type blobResult struct { - ID plumbing.Hash - Content []byte - Size int64 - Missing bool - Err error +// catfileReader provides streaming access to git blob objects via a pipelined +// "git cat-file --batch --buffer" process. A writer goroutine feeds all blob +// SHAs to stdin while the caller reads responses one at a time, similar to +// archive/tar.Reader. +// +// The --buffer flag switches git's output from per-object flush (write_or_die) +// to libc stdio buffering (fwrite), reducing syscalls. After stdin EOF, git +// calls fflush(stdout) to deliver any remaining output. +// +// Usage: +// +// cr, err := newCatfileReader(repoDir, ids) +// if err != nil { ... } +// defer cr.Close() +// +// for { +// size, missing, err := cr.Next() +// if err == io.EOF { break } +// if missing { continue } +// if size > maxSize { continue } // unread bytes auto-skipped +// content := make([]byte, size) +// io.ReadFull(cr, content) +// } +type catfileReader struct { + cmd *exec.Cmd + reader *bufio.Reader + writeErr <-chan error + + // pending tracks unread content bytes + trailing LF for the current + // entry. Next() discards any pending bytes before reading the next header. + pending int64 } -// readBlobsPipelined reads all blobs for the given IDs using a single -// "git cat-file --batch --buffer" process. A writer goroutine feeds SHAs -// to stdin while the main goroutine reads responses from stdout, forming a -// concurrent pipeline. The --buffer flag switches git's output from per-object -// flush (write_or_die) to libc stdio buffering (fwrite), reducing syscalls. -// After stdin EOF, git calls fflush(stdout) to deliver any remaining output. -// Results are returned in the same order as ids. -func readBlobsPipelined(repoDir string, ids []plumbing.Hash) ([]blobResult, error) { +// newCatfileReader starts a "git cat-file --batch --buffer" process and feeds +// all ids to its stdin via a background goroutine. The caller must call Close +// when done. +func newCatfileReader(repoDir string, ids []plumbing.Hash) (*catfileReader, error) { cmd := exec.Command("git", "cat-file", "--batch", "--buffer") cmd.Dir = repoDir @@ -65,12 +84,10 @@ func readBlobsPipelined(repoDir string, ids []plumbing.Hash) ([]blobResult, erro } // Writer goroutine: feed all SHAs then close stdin to trigger flush. - // Uses bufio.Writer to coalesce small writes into fewer syscalls. - // Stack-allocated hex buffer avoids per-SHA heap allocation. writeErr := make(chan error, 1) go func() { defer stdin.Close() - bw := bufio.NewWriterSize(stdin, 64*1024) // 64KB write buffer + bw := bufio.NewWriterSize(stdin, 64*1024) var hexBuf [41]byte hexBuf[40] = '\n' for _, id := range ids { @@ -83,90 +100,101 @@ func readBlobsPipelined(repoDir string, ids []plumbing.Hash) ([]blobResult, erro writeErr <- bw.Flush() }() - // Reader: consume all responses in order. - // Manual header parsing avoids SplitN allocation. - reader := bufio.NewReaderSize(stdout, 512*1024) - results := make([]blobResult, len(ids)) - var readErr error - - for i, id := range ids { - results[i].ID = id - - headerBytes, err := reader.ReadBytes('\n') - if err != nil { - readErr = fmt.Errorf("read header for %s: %w", id, err) - results[i].Err = readErr - break - } - header := headerBytes[:len(headerBytes)-1] // trim \n + return &catfileReader{ + cmd: cmd, + reader: bufio.NewReaderSize(stdout, 512*1024), + writeErr: writeErr, + }, nil +} - if bytes.HasSuffix(header, []byte(" missing")) { - results[i].Missing = true - continue +// Next advances to the next blob entry. It returns the blob's size and whether +// it is missing. Any unread content from the previous entry is automatically +// discarded. Returns io.EOF when all entries have been consumed. +// +// After Next returns successfully with missing=false, call Read to consume the +// blob content, or call Next again to skip it. +func (cr *catfileReader) Next() (size int64, missing bool, err error) { + // Discard unread content from the previous entry. + if cr.pending > 0 { + if _, err := cr.reader.Discard(int(cr.pending)); err != nil { + return 0, false, fmt.Errorf("discard pending bytes: %w", err) } + cr.pending = 0 + } - // Parse size from " ". - lastSpace := bytes.LastIndexByte(header, ' ') - if lastSpace == -1 { - readErr = fmt.Errorf("unexpected header: %q", header) - results[i].Err = readErr - break - } - size, err := strconv.ParseInt(string(header[lastSpace+1:]), 10, 64) - if err != nil { - readErr = fmt.Errorf("parse size from %q: %w", header, err) - results[i].Err = readErr - break - } - results[i].Size = size - - // Read exactly size bytes into a dedicated slice (must survive - // until consumed by builder.Add). Exact-size avoids allocator - // rounding waste (e.g. make(4097) → 8192 bytes). - content := make([]byte, size) - if _, err := io.ReadFull(reader, content); err != nil { - readErr = fmt.Errorf("read content (%d bytes): %w", size, err) - results[i].Err = readErr - break + headerBytes, err := cr.reader.ReadBytes('\n') + if err != nil { + if err == io.EOF { + return 0, false, io.EOF } - results[i].Content = content + return 0, false, fmt.Errorf("read header: %w", err) + } + header := headerBytes[:len(headerBytes)-1] // trim \n - // Consume trailing LF delimiter. - if _, err := reader.ReadByte(); err != nil { - readErr = fmt.Errorf("read trailing LF: %w", err) - results[i].Err = readErr - break - } + if bytes.HasSuffix(header, []byte(" missing")) { + return 0, true, nil } - // Mark all unprocessed results as failed if we broke out early. - if readErr != nil { - for j := range results { - if results[j].Err == nil && results[j].Content == nil && !results[j].Missing { - results[j].Err = readErr - } - } + // Parse size from " ". + lastSpace := bytes.LastIndexByte(header, ' ') + if lastSpace == -1 { + return 0, false, fmt.Errorf("unexpected header: %q", header) + } + size, err = strconv.ParseInt(string(header[lastSpace+1:]), 10, 64) + if err != nil { + return 0, false, fmt.Errorf("parse size from %q: %w", header, err) } - // Drain stdout so git can exit without blocking on a full pipe buffer. - _, _ = io.Copy(io.Discard, reader) + // Track pending bytes: content + trailing LF. + cr.pending = size + 1 + return size, false, nil +} - // Wait for writer goroutine to finish. - wErr := <-writeErr +// Read reads from the current blob's content. Implements io.Reader. Returns +// io.EOF when the blob's content has been fully read (the trailing LF +// delimiter is consumed automatically). +func (cr *catfileReader) Read(p []byte) (int, error) { + if cr.pending <= 0 { + return 0, io.EOF + } - // Wait for the git process to exit. - waitErr := cmd.Wait() + // Don't read into the trailing LF byte — reserve it. + contentRemaining := cr.pending - 1 + if contentRemaining <= 0 { + // Only the trailing LF remains; consume it and signal EOF. + if _, err := cr.reader.ReadByte(); err != nil { + return 0, fmt.Errorf("read trailing LF: %w", err) + } + cr.pending = 0 + return 0, io.EOF + } - // Return the first meaningful error. - if readErr != nil { - return results, readErr + // Limit the read to the remaining content bytes. + if int64(len(p)) > contentRemaining { + p = p[:contentRemaining] } - if wErr != nil { - return results, fmt.Errorf("write to cat-file: %w", wErr) + n, err := cr.reader.Read(p) + cr.pending -= int64(n) + if err != nil { + return n, err } - if waitErr != nil { - return results, waitErr + + // If we've consumed all content bytes, also consume the trailing LF. + if cr.pending == 1 { + if _, err := cr.reader.ReadByte(); err != nil { + return n, fmt.Errorf("read trailing LF: %w", err) + } + cr.pending = 0 } - return results, nil + return n, nil +} + +// Close shuts down the cat-file process and waits for it to exit. +func (cr *catfileReader) Close() error { + // Drain stdout so git can flush and exit without blocking. + _, _ = io.Copy(io.Discard, cr.reader) + // Wait for writer goroutine. + <-cr.writeErr + return cr.cmd.Wait() } diff --git a/gitindex/catfile_bench_test.go b/gitindex/catfile_bench_test.go index fe7f96892..ec2626a7b 100644 --- a/gitindex/catfile_bench_test.go +++ b/gitindex/catfile_bench_test.go @@ -107,10 +107,10 @@ func BenchmarkBlobRead_GoGit(b *testing.B) { } } -// BenchmarkBlobRead_CatfilePipelined measures the pipelined approach: -// all SHAs written to stdin at once via --buffer, then all responses read. +// BenchmarkBlobRead_CatfileReader measures the streaming catfileReader approach: +// all SHAs written to stdin at once via --buffer, responses read one at a time. // This is the production path used by indexGitRepo. -func BenchmarkBlobRead_CatfilePipelined(b *testing.B) { +func BenchmarkBlobRead_CatfileReader(b *testing.B) { repoDir := requireBenchGitRepo(b) files, gitDir := collectBlobKeys(b, repoDir) keys := sortedBlobKeys(files) @@ -130,16 +130,27 @@ func BenchmarkBlobRead_CatfilePipelined(b *testing.B) { var totalBytes int64 for b.Loop() { totalBytes = 0 - results, err := readBlobsPipelined(gitDir, subset) + cr, err := newCatfileReader(gitDir, subset) if err != nil { - b.Fatalf("readBlobsPipelined: %v", err) + b.Fatalf("newCatfileReader: %v", err) } - for _, r := range results { - if r.Err != nil { - b.Fatalf("blob %s: %v", r.ID, r.Err) + for range subset { + size, missing, err := cr.Next() + if err != nil { + cr.Close() + b.Fatalf("Next: %v", err) + } + if missing { + continue + } + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + cr.Close() + b.Fatalf("ReadFull: %v", err) } - totalBytes += int64(len(r.Content)) + totalBytes += int64(len(content)) } + cr.Close() } b.ReportMetric(float64(totalBytes), "content-bytes/op") b.ReportMetric(float64(len(subset)), "files/op") diff --git a/gitindex/catfile_test.go b/gitindex/catfile_test.go index b6eb99841..c871bd7d2 100644 --- a/gitindex/catfile_test.go +++ b/gitindex/catfile_test.go @@ -1,6 +1,7 @@ package gitindex import ( + "io" "os" "os/exec" "path/filepath" @@ -59,7 +60,7 @@ git commit -m "initial" return repoDir, blobs } -func TestReadBlobsPipelined(t *testing.T) { +func TestCatfileReader(t *testing.T) { repoDir, blobs := createTestRepo(t) ids := []plumbing.Hash{ @@ -69,49 +70,118 @@ func TestReadBlobsPipelined(t *testing.T) { blobs["large.bin"], } - results, err := readBlobsPipelined(repoDir, ids) + cr, err := newCatfileReader(repoDir, ids) if err != nil { - t.Fatalf("readBlobsPipelined: %v", err) - } - - if len(results) != 4 { - t.Fatalf("got %d results, want 4", len(results)) + t.Fatalf("newCatfileReader: %v", err) } + defer cr.Close() // hello.txt - if results[0].Err != nil { - t.Fatalf("hello.txt err: %v", results[0].Err) + size, missing, err := cr.Next() + if err != nil { + t.Fatalf("Next hello.txt: %v", err) + } + if missing { + t.Fatal("hello.txt unexpectedly missing") + } + if size != 12 { + t.Errorf("hello.txt size = %d, want 12", size) + } + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + t.Fatalf("ReadFull hello.txt: %v", err) } - if string(results[0].Content) != "hello world\n" { - t.Errorf("hello.txt = %q", results[0].Content) + if string(content) != "hello world\n" { + t.Errorf("hello.txt content = %q", content) } // empty.txt - if results[1].Err != nil { - t.Fatalf("empty.txt err: %v", results[1].Err) + size, missing, err = cr.Next() + if err != nil { + t.Fatalf("Next empty.txt: %v", err) } - if len(results[1].Content) != 0 { - t.Errorf("empty.txt len = %d, want 0", len(results[1].Content)) + if size != 0 { + t.Errorf("empty.txt size = %d, want 0", size) } - // binary.bin — verify exact content survived the pipeline. - if results[2].Err != nil { - t.Fatalf("binary.bin err: %v", results[2].Err) + // binary.bin — read content and verify binary data survives. + size, missing, err = cr.Next() + if err != nil { + t.Fatalf("Next binary.bin: %v", err) + } + binContent := make([]byte, size) + if _, err := io.ReadFull(cr, binContent); err != nil { + t.Fatalf("ReadFull binary.bin: %v", err) } - if results[2].Content[0] != 0x00 { - t.Errorf("binary.bin first byte = %x, want 0x00", results[2].Content[0]) + if binContent[0] != 0x00 || binContent[3] != '\n' { + t.Errorf("binary.bin unexpected leading bytes: %x", binContent[:5]) } // large.bin - if results[3].Err != nil { - t.Fatalf("large.bin err: %v", results[3].Err) + size, missing, err = cr.Next() + if err != nil { + t.Fatalf("Next large.bin: %v", err) + } + if size != 64*1024 { + t.Errorf("large.bin size = %d, want %d", size, 64*1024) + } + largeContent := make([]byte, size) + if _, err := io.ReadFull(cr, largeContent); err != nil { + t.Fatalf("ReadFull large.bin: %v", err) + } + + // EOF after all entries. + _, _, err = cr.Next() + if err != io.EOF { + t.Errorf("expected io.EOF after last entry, got %v", err) + } +} + +func TestCatfileReader_Skip(t *testing.T) { + repoDir, blobs := createTestRepo(t) + + ids := []plumbing.Hash{ + blobs["hello.txt"], + blobs["large.bin"], + blobs["binary.bin"], + } + + cr, err := newCatfileReader(repoDir, ids) + if err != nil { + t.Fatalf("newCatfileReader: %v", err) + } + defer cr.Close() + + // Skip hello.txt by calling Next again without reading. + _, _, err = cr.Next() + if err != nil { + t.Fatalf("Next hello.txt: %v", err) + } + + // Skip large.bin too. + size, _, err := cr.Next() + if err != nil { + t.Fatalf("Next large.bin: %v", err) + } + if size != 64*1024 { + t.Errorf("large.bin size = %d, want %d", size, 64*1024) } - if results[3].Size != 64*1024 { - t.Errorf("large.bin size = %d, want %d", results[3].Size, 64*1024) + + // Read binary.bin after skipping two entries. + size, _, err = cr.Next() + if err != nil { + t.Fatalf("Next binary.bin: %v", err) + } + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + t.Fatalf("ReadFull binary.bin: %v", err) + } + if content[0] != 0x00 { + t.Errorf("binary.bin first byte = %x, want 0x00", content[0]) } } -func TestReadBlobsPipelined_WithMissing(t *testing.T) { +func TestCatfileReader_Missing(t *testing.T) { repoDir, blobs := createTestRepo(t) fakeHash := plumbing.NewHash("deadbeefdeadbeefdeadbeefdeadbeefdeadbeef") @@ -121,18 +191,40 @@ func TestReadBlobsPipelined_WithMissing(t *testing.T) { blobs["empty.txt"], } - results, err := readBlobsPipelined(repoDir, ids) + cr, err := newCatfileReader(repoDir, ids) if err != nil { - t.Fatalf("readBlobsPipelined: %v", err) + t.Fatalf("newCatfileReader: %v", err) } + defer cr.Close() - if !results[1].Missing { - t.Errorf("expected result[1] to be missing") + // hello.txt — read normally. + size, missing, err := cr.Next() + if err != nil || missing { + t.Fatalf("Next hello.txt: err=%v missing=%v", err, missing) + } + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + t.Fatalf("ReadFull hello.txt: %v", err) + } + if string(content) != "hello world\n" { + t.Errorf("hello.txt = %q", content) } - if string(results[0].Content) != "hello world\n" { - t.Errorf("hello.txt = %q", results[0].Content) + + // fakeHash — missing. + _, missing, err = cr.Next() + if err != nil { + t.Fatalf("Next fakeHash: %v", err) + } + if !missing { + t.Error("expected fakeHash to be missing") + } + + // empty.txt — still works after missing entry. + size, missing, err = cr.Next() + if err != nil || missing { + t.Fatalf("Next empty.txt: err=%v missing=%v", err, missing) } - if len(results[2].Content) != 0 { - t.Errorf("empty.txt len = %d, want 0", len(results[2].Content)) + if size != 0 { + t.Errorf("empty.txt size = %d, want 0", size) } } diff --git a/gitindex/index.go b/gitindex/index.go index 77de23d1b..0d8bc6697 100644 --- a/gitindex/index.go +++ b/gitindex/index.go @@ -585,80 +585,96 @@ func indexGitRepo(opts Options, config gitIndexConfig) (bool, error) { sort.Strings(names) names = uniq(names) - // Build the ordered list of (name, key) pairs and collect blob SHAs for - // the main repo so we can read them all at once via git cat-file --batch. - type indexEntry struct { - key fileKey - blobIndex int // index into blobResults, or -1 for submodule blobs - } - entries := make([]indexEntry, 0, totalFiles) + // Separate main-repo keys from submodule keys, collecting blob SHAs + // for the main repo so we can stream them via git cat-file --batch. + mainRepoKeys := make([]fileKey, 0, totalFiles) mainRepoIDs := make([]plumbing.Hash, 0, totalFiles) + var submoduleKeys []fileKey for _, name := range names { for _, key := range fileKeys[name] { if key.SubRepoPath == "" { - entries = append(entries, indexEntry{key: key, blobIndex: len(mainRepoIDs)}) + mainRepoKeys = append(mainRepoKeys, key) mainRepoIDs = append(mainRepoIDs, key.ID) } else { - entries = append(entries, indexEntry{key: key, blobIndex: -1}) + submoduleKeys = append(submoduleKeys, key) } } } - // Bulk-read all main-repo blobs via pipelined cat-file --batch --buffer. - var blobResults []blobResult + log.Printf("attempting to index %d total files (%d via cat-file, %d via go-git)", totalFiles, len(mainRepoIDs), len(submoduleKeys)) + + // Stream main-repo blobs via pipelined cat-file --batch --buffer. + // Large blobs are skipped without reading content into memory. if len(mainRepoIDs) > 0 { - var err error - blobResults, err = readBlobsPipelined(opts.RepoDir, mainRepoIDs) + cr, err := newCatfileReader(opts.RepoDir, mainRepoIDs) if err != nil { - return false, fmt.Errorf("readBlobsPipelined: %w", err) + return false, fmt.Errorf("newCatfileReader: %w", err) } - } - log.Printf("attempting to index %d total files (%d via cat-file, %d via go-git)", totalFiles, len(mainRepoIDs), totalFiles-len(mainRepoIDs)) - for idx, entry := range entries { - var doc index.Document - - if entry.blobIndex >= 0 { - // Main repo blob: use pipelined result. - r := blobResults[entry.blobIndex] - if r.Err != nil { - return false, fmt.Errorf("read blob %s: %w", r.ID, r.Err) + for idx, key := range mainRepoKeys { + size, missing, err := cr.Next() + if err != nil { + cr.Close() + return false, fmt.Errorf("cat-file next for %s: %w", key.FullPath(), err) } - branches := repos[entry.key].Branches - if r.Missing { - doc = skippedLargeDoc(entry.key, branches) + branches := repos[key].Branches + var doc index.Document + + if missing { + doc = skippedLargeDoc(key, branches) } else { - keyFullPath := entry.key.FullPath() - if r.Size > int64(opts.BuildOptions.SizeMax) && !opts.BuildOptions.IgnoreSizeMax(keyFullPath) { - doc = skippedLargeDoc(entry.key, branches) + keyFullPath := key.FullPath() + if size > int64(opts.BuildOptions.SizeMax) && !opts.BuildOptions.IgnoreSizeMax(keyFullPath) { + // Skip without reading content into memory. + doc = skippedLargeDoc(key, branches) } else { + content := make([]byte, size) + if _, err := io.ReadFull(cr, content); err != nil { + cr.Close() + return false, fmt.Errorf("read blob %s: %w", keyFullPath, err) + } doc = index.Document{ - SubRepositoryPath: entry.key.SubRepoPath, + SubRepositoryPath: key.SubRepoPath, Name: keyFullPath, - Content: r.Content, + Content: content, Branches: branches, } } } - } else { - // Submodule blob: fall back to go-git. - var err error - doc, err = createDocument(entry.key, repos, opts.BuildOptions) - if err != nil { - return false, err + + if err := builder.Add(doc); err != nil { + cr.Close() + return false, fmt.Errorf("error adding document with name %s: %w", key.FullPath(), err) + } + + if idx%10_000 == 0 { + builder.CheckMemoryUsage() } } + if err := cr.Close(); err != nil { + return false, fmt.Errorf("close cat-file: %w", err) + } + } + + // Index submodule blobs via go-git. + for idx, key := range submoduleKeys { + doc, err := createDocument(key, repos, opts.BuildOptions) + if err != nil { + return false, err + } + if err := builder.Add(doc); err != nil { - return false, fmt.Errorf("error adding document with name %s: %w", entry.key.FullPath(), err) + return false, fmt.Errorf("error adding document with name %s: %w", key.FullPath(), err) } if idx%10_000 == 0 { builder.CheckMemoryUsage() } } + return true, builder.Finish() }