Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import gzip
import logging
import os
import random
import threading
import zlib
Expand Down Expand Up @@ -181,6 +182,28 @@ def __init__(
preferred_temporality, preferred_aggregation
)
self._shutdown = False
if hasattr(os, "register_at_fork"):
os.register_at_fork(after_in_child=self._reset_session_after_fork)

def _reset_session_after_fork(self) -> None:
"""
Reset exporter session in the child process after fork.

We close the existing session to avoid finalizer warnings if file
descriptors were already closed, then create a new session with the same
headers to prevent reusing the parent's connection state.
"""
try:
headers = self._session.headers.copy()
self._session.close()

self._session = requests.Session()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Session() can be customized / modified beyond just the headers.. This sort of breaks people if they are passing in a custom session to the constructor or using the _load_session_from_envvar thing I created.. Both of those features are pretty new and I doubt anyone is really using them yet so not a huge deal..

You could at leastcheck if _load_session_from_envvar returns None or not, and if doesn't use what it returns as the Session..

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could at leastcheck if _load_session_from_envvar returns None or not, and if doesn't use what it returns as the Session..

Are you suggesting to use the Session returned by _load_session_from_envvar as is in the forked process ? I believe this is not correct as both parent and forked process will use the same Session, which is what we are trying to avoid here

self._session.headers.update(headers)
except Exception:
_logger.debug(
"Exception occurred while resetting exporter session",
exc_info=True,
)

def _export(
self, serialized_data: bytes, timeout_sec: Optional[float] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import threading
import time
import unittest
from logging import WARNING
from os import environ
from unittest import TestCase
Expand Down Expand Up @@ -635,3 +637,34 @@ def test_shutdown_interrupts_retry_backoff(self, mock_post):
)

assert after - before < 0.2

@unittest.skipUnless(
hasattr(os, "register_at_fork"), "fork session reset not available"
)
def test_metric_exporter_register_at_fork_resets_session(self):
initial_session = MagicMock(spec=requests.Session)
initial_session.headers = {"preexisting": "yes"}

new_session = MagicMock(spec=requests.Session)
new_session.headers = {}

with patch("os.register_at_fork") as mock_register_at_fork, patch(
"opentelemetry.exporter.otlp.proto.http.metric_exporter.requests.Session",
return_value=new_session,
):
exporter = OTLPMetricExporter(
session=initial_session, headers={"x-test": "1"}
)
after_in_child = mock_register_at_fork.call_args.kwargs[
"after_in_child"
]
after_in_child()

initial_session.close.assert_called_once()
self.assertEqual(exporter._session, new_session)
self.assertEqual(exporter._session.headers.get("x-test"), "1")
self.assertEqual(
exporter._session.headers.get("Content-Type"),
"application/x-protobuf",
)
self.assertEqual(exporter._session.headers.get("preexisting"), "yes")
Loading