From 4c19d85011ca1788f95edbb3ceb30953ca543a49 Mon Sep 17 00:00:00 2001 From: Konboi Date: Thu, 2 Apr 2026 14:41:32 +0900 Subject: [PATCH] Parallelize chunk uploads in record tests for better throughput Use ThreadPoolExecutor to POST event chunks concurrently instead of sequentially. The first chunk is sent synchronously to handle no-build mode (where the response sets the session ID), then subsequent chunks are uploaded in parallel with up to 3 workers. This significantly reduces wall-clock time when uploading millions of test records by overlapping network I/O with parsing. Co-Authored-By: Claude Opus 4.6 --- launchable/commands/record/tests.py | 39 +++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/launchable/commands/record/tests.py b/launchable/commands/record/tests.py index f86ba9d03..95d27e80e 100644 --- a/launchable/commands/record/tests.py +++ b/launchable/commands/record/tests.py @@ -3,6 +3,7 @@ import os import re import xml.etree.ElementTree as ET +from concurrent.futures import ThreadPoolExecutor, as_completed from http import HTTPStatus from typing import Callable, Dict, Generator, List, Optional, Sequence, Tuple, Union @@ -580,17 +581,33 @@ def recorded_result() -> Tuple[int, int, int, float]: start = time_ns() exceptions = [] - for chunk in ichunked(tc, post_chunk): - p, es = payload( - cases=chunk, - test_runner=test_runner, - group=group, - test_suite_name=test_suite if test_suite else "", - flavors=dict(flavor), - ) - - send(p) - exceptions.extend(es) + futures = [] + first = True + max_workers = 3 + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + for chunk in ichunked(tc, post_chunk): + p, es = payload( + cases=chunk, + test_runner=test_runner, + group=group, + test_suite_name=test_suite if test_suite else "", + flavors=dict(flavor), + ) + + exceptions.extend(es) + + if first: + # First chunk must be synchronous to handle no-build mode + # where the response sets the session ID for subsequent requests + send(p) + first = False + else: + futures.append(executor.submit(send, p)) + + # Wait for all parallel uploads and propagate any exceptions + for future in as_completed(futures): + future.result() end = time_ns() tracking_client.send_event( event_name=Tracking.Event.PERFORMANCE,