Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions judgearena/evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,11 @@ def get_output(df_outputs: pd.DataFrame, dataset: str, method: str):

@dataclass
class JudgeAnnotation:
judge_completion: str
instruction: str
completion_A: str
completion_B: str
instruction: str # instruction from the user
completion_A: str # completion of the first model
completion_B: str # completion of the second model
judge_completion: str # output of the judge
judge_input: str | None = None # input that was passed to the judge
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be added to the estimate_elo_ratings.py workflow as well?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It uses judge_and_parse_prefs from this file so it is updated as well if I am not mistaken.

Copy link
Copy Markdown
Collaborator

@ErlisLushtaku ErlisLushtaku Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but we are dropping it here because we are constructing the Dataframe manually

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, I will change the elo-rating to also store input.



def annotate_battles(
Expand Down Expand Up @@ -338,11 +339,17 @@ def truncate(s: str, max_len: int | None = None):
)

annotations = []
for judge_completion, instruction, completion_A, completion_B in zip(
judge_completions, instructions, completions_A, completions_B, strict=True
for judge_input, judge_completion, instruction, completion_A, completion_B in zip(
inputs,
judge_completions,
instructions,
completions_A,
completions_B,
strict=True,
):
annotations.append(
JudgeAnnotation(
judge_input=judge_input,
judge_completion=judge_completion,
instruction=instruction,
completion_A=completion_A,
Expand Down
124 changes: 59 additions & 65 deletions judgearena/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,28 @@ def compute_pref_summary(prefs: pd.Series) -> dict[str, float | int]:
}


def _is_retryable_error(e: Exception) -> bool:
"""Return True if the exception is a transient server error that should be retried.

Handles two formats:
- String representation contains the HTTP code (most providers)
- ValueError raised by langchain-openai with a dict arg: {'message': ..., 'code': 429}
"""
# langchain-openai raises ValueError(response_dict.get("error")) where the
# error value is a dict like {'message': '...', 'code': 408}
_RETRYABLE_CODES = {408, 429, 502, 503, 504}
Comment thread
ErlisLushtaku marked this conversation as resolved.
if isinstance(e, ValueError) and e.args:
arg = e.args[0]
if isinstance(arg, dict) and arg.get("code") in _RETRYABLE_CODES:
return True

error_str = str(e)
return (
any(str(code) in error_str for code in _RETRYABLE_CODES)
or "rate" in error_str.lower()
)


def do_inference(chat_model, inputs, use_tqdm: bool = False):
# Retries on rate-limit/server errors with exponential backoff.
# Async path retries individual calls; batch path splits into 4^attempt chunks on failure.
Expand All @@ -109,8 +131,7 @@ async def process_single(input_item, max_retries=5, base_delay=1.0):
pbar.update(1)
return result
except Exception as e:
is_rate_limit = "429" in str(e) or "rate" in str(e).lower()
if attempt == max_retries - 1 or not is_rate_limit:
if attempt == max_retries - 1 or not _is_retryable_error(e):
raise
delay = base_delay * (2**attempt)
print(
Expand Down Expand Up @@ -144,14 +165,7 @@ def batch_with_retry(batch_inputs, max_retries=5, base_delay=1.0):
results.extend(chat_model.batch(inputs=chunk, **invoke_kwargs))
return results
except Exception as e:
is_server_error = (
"429" in str(e)
or "500" in str(e)
or "502" in str(e)
or "503" in str(e)
or "rate" in str(e).lower()
)
if attempt == max_retries - 1 or not is_server_error:
if attempt == max_retries - 1 or not _is_retryable_error(e):
raise
delay = base_delay * (2**attempt)
next_chunks = 4 ** (attempt + 1)
Expand Down Expand Up @@ -481,80 +495,60 @@ def cache_function_dataframe(
cache_name: str,
ignore_cache: bool = False,
cache_path: Path | None = None,
parquet: bool = False,
) -> pd.DataFrame:
"""
:param fun: a function whose dataframe result obtained `fun()` will be cached
:param cache_name: the cache of the function result is written into
`{cache_path}/{cache_name}.csv.zip`
:param cache_name: the cache of the function result is written into `{cache_path}/{cache_name}.csv.zip`
:param ignore_cache: whether to recompute even if the cache is present
:param cache_path: folder where to write cache files, default to ~/cache-zeroshot/
:param parquet: whether to store the data in parquet, if not specified use csv.zip
:return: result of fun()
"""
if cache_path is None:
cache_path = data_root / "cache"
cache_file = cache_path / (cache_name + ".csv.zip")

if parquet:
cache_file = cache_path / (cache_name + ".parquet")
else:
cache_file = cache_path / (cache_name + ".csv.zip")
cache_file.parent.mkdir(parents=True, exist_ok=True)
if cache_file.exists() and not ignore_cache:
print(f"Loading cache {cache_file}")
return pd.read_csv(cache_file)
if parquet:
return pd.read_parquet(cache_file)
else:
return pd.read_csv(cache_file)
else:
print(
f"Cache {cache_file} not found or ignore_cache set to True, regenerating the file"
)
with Timeblock("Evaluate function."):
df = fun()
assert isinstance(df, pd.DataFrame)
df.to_csv(cache_file, index=False)
return pd.read_csv(cache_file)


def compute_cohen_kappa(y1: list[str], y2: list[str]) -> float:
"""
Compute Cohen's kappa coefficient for inter-rater agreement.

Args:
y1: List of labels from first rater
y2: List of labels from second rater

Returns:
Cohen's kappa coefficient (float between -1 and 1)
"""
if len(y1) != len(y2):
raise ValueError("Both lists must have the same length")

if len(y1) == 0:
raise ValueError("Lists cannot be empty")

# Get all unique categories
categories = sorted(set(y1) | set(y2))
n = len(y1)

# Build confusion matrix
matrix = {}
for cat1 in categories:
matrix[cat1] = {cat2: 0 for cat2 in categories}

for label1, label2 in zip(y1, y2, strict=True):
matrix[label1][label2] += 1

# Compute observed agreement (p_o)
observed_agreement = sum(matrix[cat][cat] for cat in categories) / n

# Compute expected agreement (p_e)
expected_agreement = 0
for cat in categories:
# Marginal probabilities
p1 = sum(matrix[cat][c] for c in categories) / n # rater 1
p2 = sum(matrix[c][cat] for c in categories) / n # rater 2
expected_agreement += p1 * p2

# Compute Cohen's kappa
if expected_agreement == 1:
return 1.0 if observed_agreement == 1 else 0.0

kappa = (observed_agreement - expected_agreement) / (1 - expected_agreement)

return kappa
if parquet:
# object cols cannot be saved easily in parquet; numpy arrays must be
# deep-converted to plain Python so str() produces ast.literal_eval-safe
# repr (no "array([...])" syntax, which breaks literal_eval)
import numpy as np

def _to_python(x):
"""Recursively convert numpy arrays/scalars to Python lists/dicts."""
if isinstance(x, np.ndarray):
return [_to_python(i) for i in x]
if isinstance(x, dict):
return {k: _to_python(v) for k, v in x.items()}
if isinstance(x, list):
return [_to_python(i) for i in x]
return x

for col in df.select_dtypes(include="object").columns:
df[col] = df[col].apply(_to_python).astype(str)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be careful here if the dataframe can contain missing values. Calling .astype(str) on missing values (None or np.nan) converts them into strings "None" and "nan". When the parquet file is read back, they would be processed as strings instead of missing values.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I agree but I dont see another way to serialize to parquet. I agree that this conversion is loosing the missingness information but I think all downstream code should probably exclude empty strings too when computing annotations.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I searched a bit and I think we have two safe solutions:

Option 1: Parquet + JSON Sidecar
We selectively stringify only the complex objects (like dicts/lists), leave None/NaN untouched by filtering something like this:

df_cache[col] = df_cache[col].apply(
    lambda x: x if x is None or (isinstance(x, float) and np.isnan(x)) else repr(_to_python(x))
)

and save a small .meta.json tracking which columns were altered. Then when reading we again filter to call ast.literal_eval only on cells that don't contain these missing values and were serialized.

Option 2: Compressed Pickle (.pkl.gz)
We could switch to df.to_pickle(cache_file, compression="gzip"). It would be simpler, but probably less storage-efficient.

Otherwise we could leave it like this and note it for later, if you think this is not a big issue at the moment.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option 1 seems like an overkill given that the option is for util which is deactivated by default.
Option 2 is a strong no, as we would be replacing the issue with a security issue (also we wont be getting the benefit of parquet of column storage compression).

I would be keen to merge as is, possibly adding a comment.
Alternatively, I am also fine to drop the feature and keep it in my branch.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we could merge it as is so we can use it too 👍

df.to_parquet(cache_file, index=False)
return pd.read_parquet(cache_file)
else:
df.to_csv(cache_file, index=False)
return pd.read_csv(cache_file)


if __name__ == "__main__":
Expand Down
Loading