diff --git a/pkg/evaluation/build.go b/pkg/evaluation/build.go index a5b1d11e8..a5ed77fc2 100644 --- a/pkg/evaluation/build.go +++ b/pkg/evaluation/build.go @@ -26,6 +26,8 @@ var ( // getOrBuildImage returns a cached image ID or builds a new one. // Images are cached by working directory to avoid redundant builds. +// Concurrent calls for the same working directory are deduplicated +// using singleflight so that only one build runs at a time per key. func (r *Runner) getOrBuildImage(ctx context.Context, workingDir string) (string, error) { r.imageCacheMu.Lock() if imageID, ok := r.imageCache[workingDir]; ok { @@ -34,16 +36,27 @@ func (r *Runner) getOrBuildImage(ctx context.Context, workingDir string) (string } r.imageCacheMu.Unlock() - imageID, err := r.buildEvalImage(ctx, workingDir) + // singleflight ensures only one build per working directory runs at a time. + // The cache write inside the callback guarantees the result is available + // before singleflight releases the key, so subsequent callers always + // hit the cache above. + v, err, _ := r.imageBuildGroup.Do(workingDir, func() (any, error) { + imageID, err := r.buildEvalImage(ctx, workingDir) + if err != nil { + return "", err + } + + r.imageCacheMu.Lock() + r.imageCache[workingDir] = imageID + r.imageCacheMu.Unlock() + + return imageID, nil + }) if err != nil { return "", err } - r.imageCacheMu.Lock() - r.imageCache[workingDir] = imageID - r.imageCacheMu.Unlock() - - return imageID, nil + return v.(string), nil } func (r *Runner) buildEvalImage(ctx context.Context, workingDir string) (string, error) { diff --git a/pkg/evaluation/eval.go b/pkg/evaluation/eval.go index cfcf2a498..53e7f3f18 100644 --- a/pkg/evaluation/eval.go +++ b/pkg/evaluation/eval.go @@ -19,6 +19,7 @@ import ( "time" "github.com/google/uuid" + "golang.org/x/sync/singleflight" "github.com/docker/docker-agent/pkg/config" "github.com/docker/docker-agent/pkg/config/latest" @@ -39,6 +40,9 @@ type Runner struct { // Key is the working directory (empty string for no working dir). imageCache map[string]string imageCacheMu sync.Mutex + + // imageBuildGroup deduplicates concurrent image builds for the same working directory. + imageBuildGroup singleflight.Group } // newRunner creates a new evaluation runner. @@ -290,10 +294,12 @@ func (r *Runner) runSingleEval(ctx context.Context, evalSess *InputSession) (Res evals = &session.EvalCriteria{} } + userMessages := getUserMessages(evalSess.Session) + result := Result{ InputPath: evalSess.SourcePath, Title: evalSess.Title, - Question: strings.Join(getUserMessages(evalSess.Session), "\n"), + Question: strings.Join(userMessages, "\n"), SizeExpected: evals.Size, RelevanceExpected: float64(len(evals.Relevance)), } @@ -310,7 +316,7 @@ func (r *Runner) runSingleEval(ctx context.Context, evalSess *InputSession) (Res return result, fmt.Errorf("building eval image: %w", err) } - events, err := r.runDockerAgentInContainer(ctx, imageID, getUserMessages(evalSess.Session), evals.Setup) + events, err := r.runDockerAgentInContainer(ctx, imageID, userMessages, evals.Setup) if err != nil { return result, fmt.Errorf("running docker agent in container: %w", err) } @@ -323,7 +329,7 @@ func (r *Runner) runSingleEval(ctx context.Context, evalSess *InputSession) (Res result.Size = getResponseSize(result.Response) // Build session from events for database storage - result.Session = SessionFromEvents(events, evalSess.Title, getUserMessages(evalSess.Session)) + result.Session = SessionFromEvents(events, evalSess.Title, userMessages) result.Session.Evals = evals if len(expectedToolCalls) > 0 || len(actualToolCalls) > 0 { diff --git a/pkg/evaluation/judge.go b/pkg/evaluation/judge.go index 33b441841..5f2fe50a6 100644 --- a/pkg/evaluation/judge.go +++ b/pkg/evaluation/judge.go @@ -60,6 +60,13 @@ type Judge struct { model provider.Provider runConfig *config.RuntimeConfig concurrency int + + // judgeWithSchema is a provider pre-configured with structured output. + // Created lazily on first use and reused across all relevance checks. + // Protected by judgeWithSchemaMu; only cached on success so that + // transient errors (e.g. context cancellation) can be retried. + judgeWithSchema provider.Provider + judgeWithSchemaMu sync.Mutex } // NewJudge creates a new Judge that runs relevance checks with the given concurrency. @@ -141,16 +148,37 @@ func (j *Judge) CheckRelevance(ctx context.Context, response string, criteria [] return passed, failed, errs } -// checkSingle checks a single relevance criterion against the response. -// It returns whether the check passed, the reason provided by the judge, and any error. -func (j *Judge) checkSingle(ctx context.Context, response, criterion string) (passed bool, reason string, err error) { +// getOrCreateJudgeWithSchema returns a provider pre-configured with structured output. +// The provider is created once and reused across all relevance checks. +// Unlike sync.Once, transient failures (e.g. context cancellation) are not +// cached, allowing subsequent calls to retry. +func (j *Judge) getOrCreateJudgeWithSchema(ctx context.Context) (provider.Provider, error) { + j.judgeWithSchemaMu.Lock() + defer j.judgeWithSchemaMu.Unlock() + + if j.judgeWithSchema != nil { + return j.judgeWithSchema, nil + } + modelCfg := j.model.BaseConfig().ModelConfig - judgeWithSchema, err := provider.New( + p, err := provider.New( ctx, &modelCfg, j.runConfig.EnvProvider(), options.WithStructuredOutput(judgeResponseSchema), ) + if err != nil { + return nil, err + } + + j.judgeWithSchema = p + return j.judgeWithSchema, nil +} + +// checkSingle checks a single relevance criterion against the response. +// It returns whether the check passed, the reason provided by the judge, and any error. +func (j *Judge) checkSingle(ctx context.Context, response, criterion string) (passed bool, reason string, err error) { + judgeWithSchema, err := j.getOrCreateJudgeWithSchema(ctx) if err != nil { return false, "", fmt.Errorf("creating judge provider with structured output: %w", err) }