|
12 | 12 | from datetime import datetime, timedelta |
13 | 13 | from threading import Event, Thread |
14 | 14 | from types import GeneratorType |
15 | | -from typing import Any, Generator, Optional, Sequence, TypeVar, Union |
| 15 | +from typing import Any, Generator, Iterator, Optional, Sequence, TypeVar |
16 | 16 |
|
17 | 17 | import grpc |
18 | 18 | from google.protobuf import empty_pb2 |
|
30 | 30 | # If `opentelemetry-sdk` is available, enable the tracer |
31 | 31 | try: |
32 | 32 | from opentelemetry import trace |
33 | | - from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator |
| 33 | + from opentelemetry.trace.propagation.tracecontext import \ |
| 34 | + TraceContextTextMapPropagator |
34 | 35 |
|
35 | 36 | otel_propagator = TraceContextTextMapPropagator() |
36 | 37 | otel_tracer = trace.get_tracer(__name__) |
@@ -283,7 +284,7 @@ class TaskHubGrpcWorker: |
283 | 284 | activity function. |
284 | 285 | """ |
285 | 286 |
|
286 | | - _response_stream: Optional[grpc.Future] = None |
| 287 | + _response_stream: Optional[Iterator[Any]] = None |
287 | 288 | _interceptors: Optional[list[shared.ClientInterceptor]] = None |
288 | 289 |
|
289 | 290 | def __init__( |
@@ -418,10 +419,10 @@ def create_fresh_connection(): |
418 | 419 |
|
419 | 420 | def invalidate_connection(): |
420 | 421 | nonlocal current_channel, current_stub, current_reader_thread |
421 | | - # Cancel the response stream first to signal the reader thread to stop |
| 422 | + # Close the response stream first to signal the reader thread to stop |
422 | 423 | if self._response_stream is not None: |
423 | 424 | try: |
424 | | - self._response_stream.cancel() |
| 425 | + self._response_stream.close() |
425 | 426 | except Exception: |
426 | 427 | pass |
427 | 428 | self._response_stream = None |
@@ -740,7 +741,10 @@ def stop(self): |
740 | 741 |
|
741 | 742 | self._logger.info("Stopping gRPC worker...") |
742 | 743 | if self._response_stream is not None: |
743 | | - self._response_stream.cancel() |
| 744 | + try: |
| 745 | + self._response_stream.close() |
| 746 | + except Exception as e: |
| 747 | + self._logger.exception(f"Error stopping response stream: {e}") |
744 | 748 | self._shutdown.set() |
745 | 749 | # Explicitly close the gRPC channel to ensure OTel interceptors and other resources are cleaned up |
746 | 750 | if self._current_channel is not None: |
|
0 commit comments