-
Notifications
You must be signed in to change notification settings - Fork 576
Add support for GRACEFUL_DISCONNECT events #1285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -674,6 +674,9 @@ class Connection(object): | |
|
|
||
| CALLBACK_ERR_THREAD_THRESHOLD = 100 | ||
|
|
||
| supports_graceful_disconnect = False | ||
| is_draining = False | ||
|
|
||
| in_buffer_size = 4096 | ||
| out_buffer_size = 4096 | ||
|
|
||
|
|
@@ -1061,16 +1064,29 @@ def get_request_id(self): | |
| return self.highest_request_id | ||
|
|
||
| def handle_pushed(self, response): | ||
| if response.event_type == 'GRACEFUL_DISCONNECT': | ||
| self._handle_graceful_disconnect() | ||
| log.debug("Message pushed from server: %r", response) | ||
| for cb in self._push_watchers.get(response.event_type, []): | ||
| try: | ||
| cb(response.event_args) | ||
| except Exception: | ||
| log.exception("Pushed event handler errored, ignoring:") | ||
|
|
||
| def _handle_graceful_disconnect(self): | ||
| log.info("Received GRACEFUL_DISCONNECT from %s. Draining connection...", self.endpoint) | ||
| self.is_draining = True | ||
| self._socket_writable = False | ||
|
|
||
| with self.lock: | ||
| if self.in_flight == 0: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the several occurrences of |
||
| self.close() | ||
|
|
||
| def send_msg(self, msg, request_id, cb, encoder=ProtocolHandler.encode_message, decoder=ProtocolHandler.decode_message, result_metadata=None): | ||
| if self.is_defunct: | ||
| raise ConnectionShutdown("Connection to %s is defunct" % self.endpoint) | ||
| if self.is_draining: | ||
| raise ConnectionShutdown("Connection to %s is draining" % self.endpoint) | ||
| elif self.is_closed: | ||
| raise ConnectionShutdown("Connection to %s is closed" % self.endpoint) | ||
| elif not self._socket_writable: | ||
|
|
@@ -1397,6 +1413,7 @@ def _handle_options_response(self, options_response): | |
| locally_supported_compressions[compression_type] | ||
|
|
||
| self._send_startup_message(compression_type, no_compact=self.no_compact) | ||
| self.supports_graceful_disconnect = options_response.options.get('GRACEFUL_DISCONNECT') == ['true'] | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wondering where is the code that regsiters to listen on the GRACEFUL_DISCONNECT event? |
||
|
|
||
| @defunct_on_error | ||
| def _send_startup_message(self, compression=None, no_compact=False): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to remove those print statements before we merge the PR for sure