-
Notifications
You must be signed in to change notification settings - Fork 4
small features: add option to save cache in parquet, save judge input… #35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -91,6 +91,28 @@ def compute_pref_summary(prefs: pd.Series) -> dict[str, float | int]: | |
| } | ||
|
|
||
|
|
||
| def _is_retryable_error(e: Exception) -> bool: | ||
| """Return True if the exception is a transient server error that should be retried. | ||
|
|
||
| Handles two formats: | ||
| - String representation contains the HTTP code (most providers) | ||
| - ValueError raised by langchain-openai with a dict arg: {'message': ..., 'code': 429} | ||
| """ | ||
| # langchain-openai raises ValueError(response_dict.get("error")) where the | ||
| # error value is a dict like {'message': '...', 'code': 408} | ||
| _RETRYABLE_CODES = {408, 429, 502, 503, 504} | ||
|
ErlisLushtaku marked this conversation as resolved.
|
||
| if isinstance(e, ValueError) and e.args: | ||
| arg = e.args[0] | ||
| if isinstance(arg, dict) and arg.get("code") in _RETRYABLE_CODES: | ||
| return True | ||
|
|
||
| error_str = str(e) | ||
| return ( | ||
| any(str(code) in error_str for code in _RETRYABLE_CODES) | ||
| or "rate" in error_str.lower() | ||
| ) | ||
|
|
||
|
|
||
| def do_inference(chat_model, inputs, use_tqdm: bool = False): | ||
| # Retries on rate-limit/server errors with exponential backoff. | ||
| # Async path retries individual calls; batch path splits into 4^attempt chunks on failure. | ||
|
|
@@ -109,8 +131,7 @@ async def process_single(input_item, max_retries=5, base_delay=1.0): | |
| pbar.update(1) | ||
| return result | ||
| except Exception as e: | ||
| is_rate_limit = "429" in str(e) or "rate" in str(e).lower() | ||
| if attempt == max_retries - 1 or not is_rate_limit: | ||
| if attempt == max_retries - 1 or not _is_retryable_error(e): | ||
| raise | ||
| delay = base_delay * (2**attempt) | ||
| print( | ||
|
|
@@ -144,14 +165,7 @@ def batch_with_retry(batch_inputs, max_retries=5, base_delay=1.0): | |
| results.extend(chat_model.batch(inputs=chunk, **invoke_kwargs)) | ||
| return results | ||
| except Exception as e: | ||
| is_server_error = ( | ||
| "429" in str(e) | ||
| or "500" in str(e) | ||
| or "502" in str(e) | ||
| or "503" in str(e) | ||
| or "rate" in str(e).lower() | ||
| ) | ||
| if attempt == max_retries - 1 or not is_server_error: | ||
| if attempt == max_retries - 1 or not _is_retryable_error(e): | ||
| raise | ||
| delay = base_delay * (2**attempt) | ||
| next_chunks = 4 ** (attempt + 1) | ||
|
|
@@ -481,80 +495,60 @@ def cache_function_dataframe( | |
| cache_name: str, | ||
| ignore_cache: bool = False, | ||
| cache_path: Path | None = None, | ||
| parquet: bool = False, | ||
| ) -> pd.DataFrame: | ||
| """ | ||
| :param fun: a function whose dataframe result obtained `fun()` will be cached | ||
| :param cache_name: the cache of the function result is written into | ||
| `{cache_path}/{cache_name}.csv.zip` | ||
| :param cache_name: the cache of the function result is written into `{cache_path}/{cache_name}.csv.zip` | ||
| :param ignore_cache: whether to recompute even if the cache is present | ||
| :param cache_path: folder where to write cache files, default to ~/cache-zeroshot/ | ||
| :param parquet: whether to store the data in parquet, if not specified use csv.zip | ||
| :return: result of fun() | ||
| """ | ||
| if cache_path is None: | ||
| cache_path = data_root / "cache" | ||
| cache_file = cache_path / (cache_name + ".csv.zip") | ||
|
|
||
| if parquet: | ||
| cache_file = cache_path / (cache_name + ".parquet") | ||
| else: | ||
| cache_file = cache_path / (cache_name + ".csv.zip") | ||
| cache_file.parent.mkdir(parents=True, exist_ok=True) | ||
| if cache_file.exists() and not ignore_cache: | ||
| print(f"Loading cache {cache_file}") | ||
| return pd.read_csv(cache_file) | ||
| if parquet: | ||
| return pd.read_parquet(cache_file) | ||
| else: | ||
| return pd.read_csv(cache_file) | ||
| else: | ||
| print( | ||
| f"Cache {cache_file} not found or ignore_cache set to True, regenerating the file" | ||
| ) | ||
| with Timeblock("Evaluate function."): | ||
| df = fun() | ||
| assert isinstance(df, pd.DataFrame) | ||
| df.to_csv(cache_file, index=False) | ||
| return pd.read_csv(cache_file) | ||
|
|
||
|
|
||
| def compute_cohen_kappa(y1: list[str], y2: list[str]) -> float: | ||
| """ | ||
| Compute Cohen's kappa coefficient for inter-rater agreement. | ||
|
|
||
| Args: | ||
| y1: List of labels from first rater | ||
| y2: List of labels from second rater | ||
|
|
||
| Returns: | ||
| Cohen's kappa coefficient (float between -1 and 1) | ||
| """ | ||
| if len(y1) != len(y2): | ||
| raise ValueError("Both lists must have the same length") | ||
|
|
||
| if len(y1) == 0: | ||
| raise ValueError("Lists cannot be empty") | ||
|
|
||
| # Get all unique categories | ||
| categories = sorted(set(y1) | set(y2)) | ||
| n = len(y1) | ||
|
|
||
| # Build confusion matrix | ||
| matrix = {} | ||
| for cat1 in categories: | ||
| matrix[cat1] = {cat2: 0 for cat2 in categories} | ||
|
|
||
| for label1, label2 in zip(y1, y2, strict=True): | ||
| matrix[label1][label2] += 1 | ||
|
|
||
| # Compute observed agreement (p_o) | ||
| observed_agreement = sum(matrix[cat][cat] for cat in categories) / n | ||
|
|
||
| # Compute expected agreement (p_e) | ||
| expected_agreement = 0 | ||
| for cat in categories: | ||
| # Marginal probabilities | ||
| p1 = sum(matrix[cat][c] for c in categories) / n # rater 1 | ||
| p2 = sum(matrix[c][cat] for c in categories) / n # rater 2 | ||
| expected_agreement += p1 * p2 | ||
|
|
||
| # Compute Cohen's kappa | ||
| if expected_agreement == 1: | ||
| return 1.0 if observed_agreement == 1 else 0.0 | ||
|
|
||
| kappa = (observed_agreement - expected_agreement) / (1 - expected_agreement) | ||
|
|
||
| return kappa | ||
| if parquet: | ||
| # object cols cannot be saved easily in parquet; numpy arrays must be | ||
| # deep-converted to plain Python so str() produces ast.literal_eval-safe | ||
| # repr (no "array([...])" syntax, which breaks literal_eval) | ||
| import numpy as np | ||
|
|
||
| def _to_python(x): | ||
| """Recursively convert numpy arrays/scalars to Python lists/dicts.""" | ||
| if isinstance(x, np.ndarray): | ||
| return [_to_python(i) for i in x] | ||
| if isinstance(x, dict): | ||
| return {k: _to_python(v) for k, v in x.items()} | ||
| if isinstance(x, list): | ||
| return [_to_python(i) for i in x] | ||
| return x | ||
|
|
||
| for col in df.select_dtypes(include="object").columns: | ||
| df[col] = df[col].apply(_to_python).astype(str) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should be careful here if the dataframe can contain missing values. Calling
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes I agree but I dont see another way to serialize to parquet. I agree that this conversion is loosing the missingness information but I think all downstream code should probably exclude empty strings too when computing annotations.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I searched a bit and I think we have two safe solutions: Option 1: Parquet + JSON Sidecar and save a small Option 2: Compressed Pickle (.pkl.gz) Otherwise we could leave it like this and note it for later, if you think this is not a big issue at the moment.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Option 1 seems like an overkill given that the option is for util which is deactivated by default. I would be keen to merge as is, possibly adding a comment.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then we could merge it as is so we can use it too 👍 |
||
| df.to_parquet(cache_file, index=False) | ||
| return pd.read_parquet(cache_file) | ||
| else: | ||
| df.to_csv(cache_file, index=False) | ||
| return pd.read_csv(cache_file) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be added to the
estimate_elo_ratings.pyworkflow as well?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It uses
judge_and_parse_prefsfrom this file so it is updated as well if I am not mistaken.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but we are dropping it here because we are constructing the Dataframe manually
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, I will change the elo-rating to also store input.