From 159cac62b564845ce86484fd505fe130b2303ee4 Mon Sep 17 00:00:00 2001 From: yonatan-genai Date: Fri, 3 Apr 2026 23:42:38 +0300 Subject: [PATCH] Replace multiprocessing.pool.ThreadPool with ThreadPoolExecutor for async_req ThreadPool depends on POSIX named semaphores (sem_open), which fails on platforms without /dev/shm such as AWS Lambda. The async_req code path only needs thread-based parallelism, not multiprocessing. This reuses the existing threadpool_executor property (already used by the async_threadpool_executor path) for async_req as well. A .get() alias is added to the returned Future for backward compatibility with code that calls .get() instead of .result(). Removes the multiprocessing dependency from the client entirely. --- pinecone/adapters/response_adapters.py | 20 ++++----- pinecone/openapi_support/api_client.py | 59 +++++++++----------------- 2 files changed, 29 insertions(+), 50 deletions(-) diff --git a/pinecone/adapters/response_adapters.py b/pinecone/adapters/response_adapters.py index b77d6f32..210f2a59 100644 --- a/pinecone/adapters/response_adapters.py +++ b/pinecone/adapters/response_adapters.py @@ -10,7 +10,7 @@ from __future__ import annotations -from multiprocessing.pool import ApplyResult +from concurrent.futures import Future from typing import TYPE_CHECKING, Any from pinecone.adapters.protocols import ( @@ -123,21 +123,21 @@ def adapt_fetch_response(openapi_response: FetchResponseAdapter) -> FetchRespons class UpsertResponseTransformer: - """Transformer for converting ApplyResult[OpenAPIUpsertResponse] to UpsertResponse. + """Transformer for converting a Future[OpenAPIUpsertResponse] to UpsertResponse. This wrapper transforms the OpenAPI response to our dataclass when .get() is called, - while delegating other methods to the underlying ApplyResult. + while delegating other methods to the underlying Future. Example: - >>> transformer = UpsertResponseTransformer(async_result) + >>> transformer = UpsertResponseTransformer(future) >>> response = transformer.get() # Returns UpsertResponse """ - _apply_result: ApplyResult + _future: Future """ :meta private: """ - def __init__(self, apply_result: ApplyResult) -> None: - self._apply_result = apply_result + def __init__(self, future: Future) -> None: + self._future = future def get(self, timeout: float | None = None) -> UpsertResponse: """Get the transformed UpsertResponse. @@ -148,9 +148,9 @@ def get(self, timeout: float | None = None) -> UpsertResponse: Returns: The SDK UpsertResponse dataclass. """ - openapi_response = self._apply_result.get(timeout) + openapi_response = self._future.get(timeout) return adapt_upsert_response(openapi_response) def __getattr__(self, name: str) -> Any: - # Delegate other methods to the underlying ApplyResult - return getattr(self._apply_result, name) + # Delegate other methods to the underlying Future + return getattr(self._future, name) diff --git a/pinecone/openapi_support/api_client.py b/pinecone/openapi_support/api_client.py index afd6b96c..d55edde2 100644 --- a/pinecone/openapi_support/api_client.py +++ b/pinecone/openapi_support/api_client.py @@ -1,12 +1,10 @@ from __future__ import annotations -import atexit import io from typing import Any, TYPE_CHECKING if TYPE_CHECKING: - from multiprocessing.pool import ThreadPool from concurrent.futures import ThreadPoolExecutor from .rest_urllib3 import Urllib3RestClient @@ -33,7 +31,6 @@ class ApiClient(object): to the API. More threads means more concurrent API requests. """ - _pool: "ThreadPool" | None = None _threadpool_executor: "ThreadPoolExecutor" | None = None def __init__( @@ -59,24 +56,6 @@ def close(self): if self._threadpool_executor: self._threadpool_executor.shutdown() self._threadpool_executor = None - if self._pool: - self._pool.close() - self._pool.join() - self._pool = None - if hasattr(atexit, "unregister"): - atexit.unregister(self.close) - - @property - def pool(self) -> "ThreadPool": - """Create thread pool on first request - avoids instantiating unused threadpool for blocking clients. - """ - if self._pool is None: - from multiprocessing.pool import ThreadPool - - atexit.register(self.close) - self._pool = ThreadPool(self.pool_threads) - return self._pool @property def threadpool_executor(self) -> "ThreadPoolExecutor": @@ -336,27 +315,27 @@ def call_api( _check_type, ) - return self.pool.apply_async( + future = self.threadpool_executor.submit( self.__call_api, - ( - resource_path, - method, - path_params, - query_params, - header_params, - body, - post_params, - files, - response_type, - auth_settings, - _return_http_data_only, - collection_formats, - _preload_content, - _request_timeout, - _host, - _check_type, - ), + resource_path, + method, + path_params, + query_params, + header_params, + body, + post_params, + files, + response_type, + auth_settings, + _return_http_data_only, + collection_formats, + _preload_content, + _request_timeout, + _host, + _check_type, ) + future.get = future.result + return future def request( self,