Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions pinecone/adapters/response_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from __future__ import annotations

from multiprocessing.pool import ApplyResult
from concurrent.futures import Future
from typing import TYPE_CHECKING, Any

from pinecone.adapters.protocols import (
Expand Down Expand Up @@ -123,21 +123,21 @@ def adapt_fetch_response(openapi_response: FetchResponseAdapter) -> FetchRespons


class UpsertResponseTransformer:
"""Transformer for converting ApplyResult[OpenAPIUpsertResponse] to UpsertResponse.
"""Transformer for converting a Future[OpenAPIUpsertResponse] to UpsertResponse.

This wrapper transforms the OpenAPI response to our dataclass when .get() is called,
while delegating other methods to the underlying ApplyResult.
while delegating other methods to the underlying Future.

Example:
>>> transformer = UpsertResponseTransformer(async_result)
>>> transformer = UpsertResponseTransformer(future)
>>> response = transformer.get() # Returns UpsertResponse
"""

_apply_result: ApplyResult
_future: Future
""" :meta private: """

def __init__(self, apply_result: ApplyResult) -> None:
self._apply_result = apply_result
def __init__(self, future: Future) -> None:
self._future = future

def get(self, timeout: float | None = None) -> UpsertResponse:
"""Get the transformed UpsertResponse.
Expand All @@ -148,9 +148,9 @@ def get(self, timeout: float | None = None) -> UpsertResponse:
Returns:
The SDK UpsertResponse dataclass.
"""
openapi_response = self._apply_result.get(timeout)
openapi_response = self._future.get(timeout)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future has no .get() method; use .result()

Medium Severity

UpsertResponseTransformer.get() calls self._future.get(timeout), but concurrent.futures.Future does not have a .get() method — its API is .result(timeout). This only works by accident because api_client.py monkey-patches future.get = future.result on the returned future. The internal UpsertResponseTransformer class should call self._future.result(timeout) directly instead of relying on the external monkey-patch, which is intended for backward compatibility of external callers.

Additional Locations (1)
Fix in Cursor Fix in Web

return adapt_upsert_response(openapi_response)

def __getattr__(self, name: str) -> Any:
# Delegate other methods to the underlying ApplyResult
return getattr(self._apply_result, name)
# Delegate other methods to the underlying Future
return getattr(self._future, name)
59 changes: 19 additions & 40 deletions pinecone/openapi_support/api_client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
from __future__ import annotations

import atexit
import io

from typing import Any, TYPE_CHECKING

if TYPE_CHECKING:
from multiprocessing.pool import ThreadPool
from concurrent.futures import ThreadPoolExecutor

from .rest_urllib3 import Urllib3RestClient
Expand All @@ -33,7 +31,6 @@ class ApiClient(object):
to the API. More threads means more concurrent API requests.
"""

_pool: "ThreadPool" | None = None
_threadpool_executor: "ThreadPoolExecutor" | None = None

def __init__(
Expand All @@ -59,24 +56,6 @@ def close(self):
if self._threadpool_executor:
self._threadpool_executor.shutdown()
self._threadpool_executor = None
if self._pool:
self._pool.close()
self._pool.join()
self._pool = None
if hasattr(atexit, "unregister"):
atexit.unregister(self.close)

@property
def pool(self) -> "ThreadPool":
"""Create thread pool on first request
avoids instantiating unused threadpool for blocking clients.
"""
if self._pool is None:
from multiprocessing.pool import ThreadPool

atexit.register(self.close)
self._pool = ThreadPool(self.pool_threads)
return self._pool

@property
def threadpool_executor(self) -> "ThreadPoolExecutor":
Expand Down Expand Up @@ -336,27 +315,27 @@ def call_api(
_check_type,
)

return self.pool.apply_async(
future = self.threadpool_executor.submit(
self.__call_api,
(
resource_path,
method,
path_params,
query_params,
header_params,
body,
post_params,
files,
response_type,
auth_settings,
_return_http_data_only,
collection_formats,
_preload_content,
_request_timeout,
_host,
_check_type,
),
resource_path,
method,
path_params,
query_params,
header_params,
body,
post_params,
files,
response_type,
auth_settings,
_return_http_data_only,
collection_formats,
_preload_content,
_request_timeout,
_host,
_check_type,
)
future.get = future.result
return future

def request(
self,
Expand Down