Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/optimization/src/ldai_optimization/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,23 @@
from ldai_optimization.dataclasses import (
AIJudgeCallConfig,
OptimizationContext,
OptimizationFromConfigOptions,
OptimizationJudge,
OptimizationJudgeContext,
OptimizationOptions,
ToolDefinition,
)
from ldai_optimization.ld_api_client import LDApiError

__version__ = "0.0.0"

__all__ = [
'__version__',
'AIJudgeCallConfig',
'LDApiError',
'OptimizationClient',
'OptimizationContext',
'OptimizationFromConfigOptions',
'OptimizationJudge',
'OptimizationJudgeContext',
'OptimizationOptions',
Expand Down
179 changes: 172 additions & 7 deletions packages/optimization/src/ldai_optimization/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
import random
import uuid
from typing import Any, Dict, List, Literal, Optional

from ldai import AIAgentConfig, AIJudgeConfig, AIJudgeConfigDefault, LDAIClient
Expand All @@ -16,11 +17,17 @@
AutoCommitConfig,
JudgeResult,
OptimizationContext,
OptimizationFromConfigOptions,
OptimizationJudge,
OptimizationJudgeContext,
OptimizationOptions,
ToolDefinition,
)
from ldai_optimization.ld_api_client import (
AgentOptimizationConfig,
LDApiClient,
OptimizationResultPayload,
)
from ldai_optimization.prompts import (
build_message_history_text,
build_new_variation_prompt,
Expand All @@ -39,6 +46,34 @@
logger = logging.getLogger(__name__)


def _strip_provider_prefix(model: str) -> str:
"""Strip the provider prefix from a model identifier returned by the LD API.

API model keys are formatted as "Provider.model-name" (e.g. "OpenAI.gpt-5",
"Anthropic.claude-opus-4.6"). Only the part after the first period is needed
by the underlying LLM clients. If no period is present the string is returned
unchanged.

:param model: Raw model string from the API.
:return: Model name with provider prefix removed.
"""
return model.split(".", 1)[-1]


# Maps SDK status strings to the API status/activity values expected by
# agent_optimization_result records. Defined at module level to avoid
# allocating the dict on every on_status_update invocation.
_OPTIMIZATION_STATUS_MAP: Dict[str, Dict[str, str]] = {
"init": {"status": "RUNNING", "activity": "PENDING"},
"generating": {"status": "RUNNING", "activity": "GENERATING"},
"evaluating": {"status": "RUNNING", "activity": "EVALUATING"},
"generating variation": {"status": "RUNNING", "activity": "GENERATING_VARIATION"},
"turn completed": {"status": "RUNNING", "activity": "COMPLETED"},
"success": {"status": "PASSED", "activity": "COMPLETED"},
"failure": {"status": "FAILED", "activity": "COMPLETED"},
}


class OptimizationClient:
_options: OptimizationOptions
_ldClient: LDAIClient
Expand Down Expand Up @@ -883,21 +918,151 @@ async def _generate_new_variation(
)

async def optimize_from_config(
self, agent_key: str, optimization_config_key: str
self, optimization_config_key: str, options: OptimizationFromConfigOptions
) -> Any:
"""Optimize an agent from a configuration.
"""Optimize an agent using a configuration fetched from the LaunchDarkly API.

:param agent_key: Identifier of the agent to optimize.
:param optimization_config_key: Identifier of the optimization configuration to use.
:return: Optimization result.
The agent key, judge configuration, model choices, and other optimization
parameters are all sourced from the remote agent optimization config. The
caller only needs to provide the execution callbacks and evaluation contexts.

Iteration results are automatically persisted to the LaunchDarkly API so
the UI can display live run progress.

:param optimization_config_key: Key of the agent optimization config to fetch.
:param options: User-provided callbacks and evaluation contexts.
:return: Optimization result (OptimizationContext from the final iteration).
"""
if not self._has_api_key:
raise ValueError(
"LAUNCHDARKLY_API_KEY is not set, so optimize_from_config is not available"
)

self._agent_key = agent_key
raise NotImplementedError
assert self._api_key is not None
api_client = LDApiClient(
self._api_key,
**({"base_url": options.base_url} if options.base_url else {}),
)
config = api_client.get_agent_optimization(options.project_key, optimization_config_key)

self._agent_key = config["aiConfigKey"]
optimization_id: str = config["id"]
run_id = str(uuid.uuid4())

context = random.choice(options.context_choices)
# _get_agent_config calls _initialize_class_members_from_config internally;
# _run_optimization calls it again to reset history before the loop starts.
agent_config = await self._get_agent_config(self._agent_key, context)

optimization_options = self._build_options_from_config(
config, options, api_client, optimization_id, run_id
)
return await self._run_optimization(agent_config, optimization_options)

def _build_options_from_config(
self,
config: AgentOptimizationConfig,
options: OptimizationFromConfigOptions,
api_client: LDApiClient,
optimization_id: str,
run_id: str,
) -> OptimizationOptions:
"""Map a fetched AgentOptimization config + user options into OptimizationOptions.

Acceptance statements and judge configs from the API are merged into a single
judges dict. An on_status_update closure is injected to persist each iteration
result to the LaunchDarkly API; any user-supplied on_status_update is chained
after the persistence call.

:param config: Validated AgentOptimizationConfig from the API.
:param options: User-provided options from optimize_from_config.
:param api_client: Initialised LDApiClient for result persistence.
:param optimization_id: UUID id of the parent agent_optimization record.
:param run_id: UUID that groups all result records for this run.
:return: A fully populated OptimizationOptions ready for _run_optimization.
"""
judges: Dict[str, OptimizationJudge] = {}

for i, stmt in enumerate(config["acceptanceStatements"]):
key = f"acceptance-statement-{i}"
judges[key] = OptimizationJudge(
threshold=float(stmt.get("threshold", 0.95)),
acceptance_statement=stmt["statement"],
)

for judge in config["judges"]:
judges[judge["key"]] = OptimizationJudge(
threshold=float(judge.get("threshold", 0.95)),
judge_key=judge["key"],
)

has_ground_truth = bool(config.get("groundTruthResponses"))
if not judges and not has_ground_truth and options.on_turn is None:
raise ValueError(
"The optimization config has no acceptance statements, judges, or ground truth "
"responses, and no on_turn callback was provided. At least one is required to "
"evaluate optimization results."
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ground truth validation bypasses check but fails downstream

Medium Severity

The early validation in _build_options_from_config treats groundTruthResponses as a valid evaluation criterion (has_ground_truth bypasses the "no criteria" check), but ground truth is never passed to OptimizationOptions and that dataclass's __post_init__ only checks for judges or on_turn. When a config has ground truth but no judges and no on_turn, the early check passes, then OptimizationOptions(judges=judges or None, ...) converts the empty dict to None, and __post_init__ raises with the less helpful message "Either judges or on_turn must be provided" instead of the descriptive one mentioning ground truth.

Additional Locations (1)
Fix in Cursor Fix in Web

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not yet implemented


variable_choices: List[Dict[str, Any]] = config["variableChoices"] or [{}]
user_input_options: Optional[List[str]] = config["userInputOptions"] or None

project_key = options.project_key
config_version: int = config["version"]

def _persist_and_forward(
status: Literal[
"init",
"generating",
"evaluating",
"generating variation",
"turn completed",
"success",
"failure",
],
ctx: OptimizationContext,
) -> None:
# _safe_status_update (the caller) already wraps this entire function in
# a try/except, so errors here are caught and logged without aborting the run.
mapped = _OPTIMIZATION_STATUS_MAP.get(
status, {"status": "RUNNING", "activity": "PENDING"}
)
snapshot = ctx.copy_without_history()
payload: OptimizationResultPayload = {
"run_id": run_id,
"config_optimization_version": config_version,
"status": mapped["status"],
"activity": mapped["activity"],
"iteration": snapshot.iteration,
"instructions": snapshot.current_instructions,
"parameters": snapshot.current_parameters,
"completion_response": snapshot.completion_response,
"scores": {k: v.to_json() for k, v in snapshot.scores.items()},
"user_input": snapshot.user_input,
}
api_client.post_agent_optimization_result(project_key, optimization_id, payload)

if options.on_status_update:
try:
options.on_status_update(status, ctx)
except Exception:
logger.exception("User on_status_update callback failed for status=%s", status)

return OptimizationOptions(
context_choices=options.context_choices,
max_attempts=config["maxAttempts"],
model_choices=[_strip_provider_prefix(m) for m in config["modelChoices"]],
judge_model=_strip_provider_prefix(config["judgeModel"]),
variable_choices=variable_choices,
handle_agent_call=options.handle_agent_call,
handle_judge_call=options.handle_judge_call,
judges=judges or None,
user_input_options=user_input_options,
on_turn=options.on_turn,
on_passing_result=options.on_passing_result,
on_failing_result=options.on_failing_result,
on_status_update=_persist_and_forward,
)

async def _execute_agent_turn(
self,
Expand Down
77 changes: 69 additions & 8 deletions packages/optimization/src/ldai_optimization/dataclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,20 @@ class OptimizationJudgeContext:
variables: Dict[str, Any] = field(default_factory=dict) # variable set used during agent generation


# Shared callback type aliases used by both OptimizationOptions and
# OptimizationFromConfigOptions to avoid duplicating the full signatures.
# Placed here so all referenced types (OptimizationContext, AIJudgeCallConfig,
# OptimizationJudgeContext) are already defined above.
HandleAgentCall = Union[
Callable[[str, AIAgentConfig, OptimizationContext, Dict[str, Callable[..., Any]]], str],
Callable[[str, AIAgentConfig, OptimizationContext, Dict[str, Callable[..., Any]]], Awaitable[str]],
]
HandleJudgeCall = Union[
Callable[[str, AIJudgeCallConfig, OptimizationJudgeContext, Dict[str, Callable[..., Any]]], str],
Callable[[str, AIJudgeCallConfig, OptimizationJudgeContext, Dict[str, Callable[..., Any]]], Awaitable[str]],
]


@dataclass
class OptimizationOptions:
"""Options for agent optimization."""
Expand All @@ -218,14 +232,8 @@ class OptimizationOptions:
Dict[str, Any]
] # choices of interpolated variables to be chosen at random per turn, 1 min required
# Actual agent/completion (judge) calls - Required
handle_agent_call: Union[
Callable[[str, AIAgentConfig, OptimizationContext, Dict[str, Callable[..., Any]]], str],
Callable[[str, AIAgentConfig, OptimizationContext, Dict[str, Callable[..., Any]]], Awaitable[str]],
]
handle_judge_call: Union[
Callable[[str, AIJudgeCallConfig, OptimizationJudgeContext, Dict[str, Callable[..., Any]]], str],
Callable[[str, AIJudgeCallConfig, OptimizationJudgeContext, Dict[str, Callable[..., Any]]], Awaitable[str]],
]
handle_agent_call: HandleAgentCall
handle_judge_call: HandleJudgeCall
# Criteria for pass/fail - Optional
user_input_options: Optional[List[str]] = (
None # optional list of user input messages to randomly select from
Expand Down Expand Up @@ -270,3 +278,56 @@ def __post_init__(self):
raise ValueError("Either judges or on_turn must be provided")
if self.judge_model is None:
raise ValueError("judge_model must be provided")


@dataclass
class OptimizationFromConfigOptions:
"""User-provided options for optimize_from_config.

Fields that come from the LaunchDarkly API (max_attempts, model_choices,
judge_model, variable_choices, user_input_options, judges) are omitted here
and sourced from the fetched agent optimization config instead.

:param project_key: LaunchDarkly project key used to build API paths.
:param context_choices: One or more LD evaluation contexts to use.
:param handle_agent_call: Callback that invokes the agent and returns its response.
:param handle_judge_call: Callback that invokes a judge and returns its response.
:param on_turn: Optional manual pass/fail callback; when provided, judge scoring is skipped.
:param on_passing_result: Called with the winning OptimizationContext on success.
:param on_failing_result: Called with the final OptimizationContext on failure.
:param on_status_update: Called on each status transition; chained after the
automatic result-persistence POST so it always runs after the record is saved.
:param base_url: Base URL of the LaunchDarkly instance. Defaults to
https://app.launchdarkly.com. Override to target a staging instance.
"""

project_key: str
context_choices: List[Context]
handle_agent_call: HandleAgentCall
handle_judge_call: HandleJudgeCall
on_turn: Optional[Callable[["OptimizationContext"], bool]] = None
on_passing_result: Optional[Callable[["OptimizationContext"], None]] = None
on_failing_result: Optional[Callable[["OptimizationContext"], None]] = None
on_status_update: Optional[
Callable[
[
Literal[
"init",
"generating",
"evaluating",
"generating variation",
"turn completed",
"success",
"failure",
],
"OptimizationContext",
],
None,
]
] = None
base_url: Optional[str] = None

def __post_init__(self):
"""Validate required options."""
if len(self.context_choices) < 1:
raise ValueError("context_choices must have at least 1 context")
Loading
Loading