diff --git a/lib/mongo/session.rb b/lib/mongo/session.rb index ab5fa18be4..34d21fd966 100644 --- a/lib/mongo/session.rb +++ b/lib/mongo/session.rb @@ -16,6 +16,7 @@ require 'mongo/session/session_pool' require 'mongo/session/server_session' +require 'mongo/session/with_transaction_runner' module Mongo # A logical session representing a set of sequential operations executed @@ -445,175 +446,8 @@ def with_transaction(options = nil) @inside_with_transaction = true @with_transaction_timeout_ms = options&.dig(:timeout_ms) || @options[:default_timeout_ms] || @client.timeout_ms @with_transaction_deadline = calculate_with_transaction_deadline(options) - deadline = if @with_transaction_deadline - # CSOT enabled, so we have a customer defined deadline. - @with_transaction_deadline - else - # CSOT not enabled, so we use the default deadline, 120 seconds. - Utils.monotonic_time + 120 - end - transaction_in_progress = false - transaction_attempt = 0 - last_error = nil - overload_error_count = 0 - overload_encountered = false - - loop do - if transaction_attempt > 0 - if overload_encountered - delay = @client.retry_policy.backoff_delay(overload_error_count) - if backoff_would_exceed_deadline?(deadline, delay) - make_timeout_error_from(last_error, 'CSOT timeout expired waiting to retry withTransaction') - end - raise(last_error) unless @client.retry_policy.should_retry_overload?(overload_error_count, delay) - - sleep(delay) - else - backoff = backoff_seconds_for_retry(transaction_attempt) - if backoff_would_exceed_deadline?(deadline, backoff) - make_timeout_error_from(last_error, 'CSOT timeout expired waiting to retry withTransaction') - end - - sleep(backoff) - end - end - - commit_options = {} - commit_options[:write_concern] = options[:write_concern] if options - start_transaction(options) - transaction_in_progress = true - transaction_attempt += 1 - - begin - rv = yield self - rescue Exception => e - if within_states?(STARTING_TRANSACTION_STATE, TRANSACTION_IN_PROGRESS_STATE) - log_warn("Aborting transaction due to #{e.class}: #{e}") - # CSOT: if the deadline is already expired, clear it so that - # abort_transaction uses a fresh timeout (not the expired deadline). - # If the deadline is not yet expired, keep it so abort uses remaining time. - @with_transaction_deadline = nil if @with_transaction_deadline && deadline_expired?(deadline) - abort_transaction - transaction_in_progress = false - end - - if deadline_expired?(deadline) - transaction_in_progress = false - make_timeout_error_from(e, 'CSOT timeout expired during withTransaction callback') - end - - if e.is_a?(Mongo::Error) && e.label?('TransientTransactionError') - last_error = e - if e.label?('SystemOverloadedError') - overload_encountered = true - overload_error_count += 1 - elsif overload_encountered - overload_error_count += 1 - @client.retry_policy.record_non_overload_retry_failure - end - next - end - - raise - else - if within_states?(TRANSACTION_ABORTED_STATE, NO_TRANSACTION_STATE, TRANSACTION_COMMITTED_STATE) - transaction_in_progress = false - return rv - end - - # CSOT: if the timeout has expired before we can commit, abort the - # transaction instead and raise a client-side timeout error. - if @with_transaction_deadline && deadline_expired?(deadline) - transaction_in_progress = false - @with_transaction_deadline = nil - abort_transaction - raise Mongo::Error::TimeoutError, 'CSOT timeout expired before transaction could be committed' - end - - begin - commit_transaction(commit_options) - transaction_in_progress = false - return rv - rescue Mongo::Error => e - if e.label?('UnknownTransactionCommitResult') - if deadline_expired?(deadline) || - (e.is_a?(Error::OperationFailure::Family) && e.max_time_ms_expired?) - transaction_in_progress = false - if @with_transaction_timeout_ms && deadline_expired?(deadline) - make_timeout_error_from(e, 'CSOT timeout expired during withTransaction commit') - else - raise - end - end - - if e.label?('SystemOverloadedError') - overload_encountered = true - overload_error_count += 1 - elsif overload_encountered - overload_error_count += 1 - @client.retry_policy.record_non_overload_retry_failure - end - - if overload_encountered - delay = @client.retry_policy.backoff_delay(overload_error_count) - if backoff_would_exceed_deadline?(deadline, delay) - transaction_in_progress = false - make_timeout_error_from(e, 'CSOT timeout expired during withTransaction commit') - end - unless @client.retry_policy.should_retry_overload?(overload_error_count, delay) - transaction_in_progress = false - raise - end - sleep(delay) - end - - wc_options = case v = commit_options[:write_concern] - when WriteConcern::Base - v.options - when nil - {} - else - v - end - commit_options[:write_concern] = wc_options.merge(w: :majority) - retry - elsif e.label?('TransientTransactionError') - if Utils.monotonic_time >= deadline - transaction_in_progress = false - make_timeout_error_from(e, 'CSOT timeout expired during withTransaction commit') - end - last_error = e - if e.label?('SystemOverloadedError') - overload_encountered = true - overload_error_count += 1 - elsif overload_encountered - overload_error_count += 1 - @client.retry_policy.record_non_overload_retry_failure - end - @state = NO_TRANSACTION_STATE - next - else - transaction_in_progress = false - raise - end - rescue Error::AuthError - transaction_in_progress = false - raise - end - end - end - - # No official return value, but return true so that in interactive - # use the method hints that it succeeded. - true + WithTransactionRunner.new(self, options).run { yield self } ensure - if transaction_in_progress - log_warn('with_transaction callback broke out of with_transaction loop, aborting transaction') - begin - abort_transaction - rescue Error::OperationFailure::Family, Error::InvalidTransactionOperation - end - end @with_transaction_deadline = nil @with_transaction_timeout_ms = nil @inside_with_transaction = false @@ -1269,6 +1103,31 @@ def inside_with_transaction? @inside_with_transaction end + # Nils the with_transaction deadline, allowing subsequent operations + # (e.g. abort_transaction) to use a fresh timeout rather than an + # already-expired one. Called by WithTransactionRunner. + def clear_with_transaction_deadline! + @with_transaction_deadline = nil + end + + # Readable by WithTransactionRunner to detect CSOT mode. + # @api private + attr_reader :with_transaction_timeout_ms + + # Resets transaction state to NO_TRANSACTION_STATE without calling + # abort_transaction. Used by WithTransactionRunner after a + # TransientTransactionError during commit — the server has already + # rolled back, so no server-side cleanup is needed. + # @api private + def reset_transaction_state! + @state = NO_TRANSACTION_STATE + end + + # @api private + def within_states?(*states) + states.include?(@state) + end + private # Get the read concern the session will use when starting a transaction. @@ -1286,10 +1145,6 @@ def txn_read_concern txn_options[:read_concern] || @client.read_concern end - def within_states?(*states) - states.include?(@state) - end - def check_if_no_transaction! return unless within_states?(NO_TRANSACTION_STATE) @@ -1372,41 +1227,5 @@ def calculate_with_transaction_deadline(opts) calc.call(@client.timeout_ms) end end - - def deadline_expired?(deadline) - if deadline.zero? - false - else - Utils.monotonic_time >= deadline - end - end - - # Exponential backoff settings for with_transaction retries. - BACKOFF_INITIAL = 0.005 - BACKOFF_MAX = 0.5 - private_constant :BACKOFF_INITIAL, :BACKOFF_MAX - - def backoff_seconds_for_retry(transaction_attempt) - exponential = BACKOFF_INITIAL * (1.5**(transaction_attempt - 1)) - Random.rand * [ exponential, BACKOFF_MAX ].min - end - - def backoff_would_exceed_deadline?(deadline, backoff_seconds) - return false if deadline.zero? - - Utils.monotonic_time + backoff_seconds >= deadline - end - - # Implements makeTimeoutError(lastError) from the transactions-convenient-api spec. - # In CSOT mode raises TimeoutError with last_error's message included as a substring. - # In non-CSOT mode re-raises last_error directly. - def make_timeout_error_from(last_error, timeout_message) - if @with_transaction_timeout_ms - raise Mongo::Error::TimeoutError, "#{timeout_message}: #{last_error}" - else - raise last_error - end - end - end end diff --git a/lib/mongo/session/with_transaction_runner.rb b/lib/mongo/session/with_transaction_runner.rb new file mode 100644 index 0000000000..b7c627c4bf --- /dev/null +++ b/lib/mongo/session/with_transaction_runner.rb @@ -0,0 +1,313 @@ +# frozen_string_literal: true + +module Mongo + class Session + # Owns the retry loop state and logic for Session#with_transaction. + # + # Control-flow contract: + # return — step succeeded, continue + # raise — unrecoverable error, propagate to caller + # throw :retry — restart the transaction loop from the top + # + # @api private + class WithTransactionRunner + BACKOFF_INITIAL = 0.005 + BACKOFF_MAX = 0.5 + private_constant :BACKOFF_INITIAL, :BACKOFF_MAX + + # Runs one transaction attempt: pre-backoff, start, callback, commit. + def run_attempt(&block) + pre_retry_backoff if @transaction_attempt.positive? + @session.start_transaction(@options) + @transaction_in_progress = true + @transaction_attempt += 1 + result = execute_callback(&block) + @transaction_in_progress = false + commit(result) + end + + # Outer retry loop. Returns the callback's return value on success. + # The ensure fires only on raise/break — NOT on throw :retry (which is + # caught by catch(:retry) within this method). + def run(&block) + loop do + catch(:retry) { return run_attempt(&block) } + end + ensure + abort_if_in_progress + end + + def initialize(session, options) + @session = session + @options = options + @csot = !session.with_transaction_timeout_ms.nil? + csot_deadline = session.with_transaction_deadline + # Non-CSOT: apply 120-second safety limit. + # CSOT: use computed deadline (0 = infinite when timeout_ms: 0). + @deadline = csot_deadline.nil? ? (Utils.monotonic_time + 120) : csot_deadline + @last_error = nil + @transaction_attempt = 0 + @overload_encountered = false + @overload_error_count = 0 + @transaction_in_progress = false + end + + private + + def deadline_expired? + @deadline.zero? ? false : Utils.monotonic_time >= @deadline + end + + def backoff_would_exceed_deadline?(secs) + @deadline.zero? ? false : Utils.monotonic_time + secs >= @deadline + end + + def backoff_seconds_for_retry + exponential = BACKOFF_INITIAL * (1.5**(@transaction_attempt - 1)) + Random.rand * [ exponential, BACKOFF_MAX ].min + end + + # -- Timeout error ----------------------------------------------------- + + # Raises TimeoutError (CSOT) or re-raises last_error (non-CSOT). + # Note: `0` is truthy in Ruby so timeout_ms:0 (infinite CSOT) → @csot = true. + def make_timeout_error_from(last_error, message) + raise Mongo::Error::TimeoutError, "#{message}: #{last_error}" if @csot + + raise last_error + end + + # Updates @overload_encountered and @overload_error_count. + def track_overload(err) + if err.label?('SystemOverloadedError') + @overload_encountered = true + @overload_error_count += 1 + elsif @overload_encountered + @overload_error_count += 1 + @session.client.retry_policy.record_non_overload_retry_failure + end + end + + # -- execute_callback helpers ------------------------------------------ + + # Aborts the transaction if it is currently active. + # Clears the deadline first if it is already expired so that abort gets + # a fresh timeout rather than the expired one. + def abort_in_progress_transaction(err) + return unless @session.within_states?( + Session::STARTING_TRANSACTION_STATE, + Session::TRANSACTION_IN_PROGRESS_STATE + ) + + @session.log_warn("Aborting transaction due to #{err.class}: #{err}") + @session.clear_with_transaction_deadline! if @csot && deadline_expired? + @session.abort_transaction + @transaction_in_progress = false + end + + # Raises if the deadline has passed. + # In CSOT mode raises TimeoutError; in non-CSOT mode re-raises last_error. + def raise_or_retry_on_deadline!(err) + return unless deadline_expired? + + make_timeout_error_from(err, 'CSOT timeout expired during withTransaction callback') + end + + # Handles the error from the callback. + # Throws :retry for transient errors; re-raises everything else. + def handle_transient_callback_error(err) + raise err unless err.is_a?(Mongo::Error) && err.label?('TransientTransactionError') + + @last_error = err + track_overload(err) + throw :retry + end + + # Runs the user's block; handles errors using the three helpers above. + # rubocop:disable Lint/RescueException + def execute_callback + yield + rescue Exception => e + abort_in_progress_transaction(e) + raise_or_retry_on_deadline!(e) + handle_transient_callback_error(e) + end + # rubocop:enable Lint/RescueException + + # -- Pre-retry backoff ------------------------------------------------- + + # Sleeps before the next attempt; overload path uses adaptive delay, normal path uses exponential. + def pre_retry_backoff + if @overload_encountered + delay = @session.client.retry_policy.backoff_delay(@overload_error_count) + if backoff_would_exceed_deadline?(delay) + make_timeout_error_from(@last_error, 'CSOT timeout expired waiting to retry withTransaction') + end + raise @last_error unless @session.client.retry_policy.should_retry_overload?(@overload_error_count, delay) + + sleep(delay) + else + backoff = backoff_seconds_for_retry + if backoff_would_exceed_deadline?(backoff) + make_timeout_error_from(@last_error, 'CSOT timeout expired waiting to retry withTransaction') + end + sleep(backoff) + end + end + + # -- Commit helpers ---------------------------------------------------- + + # Returns true if the session is no longer in an active transaction state + # (the callback may have aborted or committed the transaction itself). + def transaction_no_longer_active? + return false unless @session.within_states?( + Session::TRANSACTION_ABORTED_STATE, + Session::NO_TRANSACTION_STATE, + Session::TRANSACTION_COMMITTED_STATE + ) + + @transaction_in_progress = false + true + end + + # CSOT-only: aborts and raises TimeoutError if the deadline has expired + # before we even try to commit. + def check_deadline_before_commit! + return unless @csot && deadline_expired? + + @session.clear_with_transaction_deadline! + @session.abort_transaction + @transaction_in_progress = false + raise Mongo::Error::TimeoutError, 'CSOT timeout expired before transaction could be committed' + end + + # Handles a TransientTransactionError raised during commit. + # Raises on deadline; otherwise records state and throws :retry. + # Note: uses deadline_expired? which correctly returns false for deadline 0 + # (timeout_ms: 0 = infinite CSOT), fixing a bug in the original code that + # used `Utils.monotonic_time >= deadline` — true when deadline == 0. + def handle_transient_commit_error(err) + if deadline_expired? + @transaction_in_progress = false + make_timeout_error_from(err, 'CSOT timeout expired during withTransaction commit') + end + @last_error = err + track_overload(err) + @session.reset_transaction_state! + throw :retry + end + + # Raises if the commit deadline or max_time_ms has expired. + def check_unknown_commit_deadline(err) + return unless deadline_expired? || (err.is_a?(Error::OperationFailure::Family) && err.max_time_ms_expired?) + + @transaction_in_progress = false + raise err unless @csot && deadline_expired? + + make_timeout_error_from(err, 'CSOT timeout expired during withTransaction commit') + end + + # Handles the overload path for unknown-commit retries. + # Sleeps and returns normally to let the caller retry with escalated wc. + # Raises or calls make_timeout_error_from if retry is not possible. + def handle_unknown_commit_overload(err) + return unless @overload_encountered + + delay = @session.client.retry_policy.backoff_delay(@overload_error_count) + if backoff_would_exceed_deadline?(delay) + @transaction_in_progress = false + make_timeout_error_from(err, 'CSOT timeout expired during withTransaction commit') + end + unless @session.client.retry_policy.should_retry_overload?(@overload_error_count, delay) + @transaction_in_progress = false + raise err + end + sleep(delay) + end + + # Handles an UnknownTransactionCommitResult error. + # Raises on deadline or max_time_ms expiry. Sleeps and falls through + # for overload-driven retries (caller escalates write concern). + def handle_unknown_commit_result(err) + check_unknown_commit_deadline(err) + track_overload(err) + handle_unknown_commit_overload(err) + end + + # Routes commit errors to the appropriate handler. + def handle_commit_error(err) + if err.label?('UnknownTransactionCommitResult') + handle_unknown_commit_result(err) + elsif err.label?('TransientTransactionError') + handle_transient_commit_error(err) + else + @transaction_in_progress = false + raise err + end + end + + # Inner commit+retry loop. Escalates write concern for retriable paths. + # Error::AuthError < RuntimeError (not < Mongo::Error), so it requires + # its own rescue clause. + def commit_with_escalation(result) + commit_options = @options ? { write_concern: @options[:write_concern] } : {} + loop do + @session.commit_transaction(commit_options) + @transaction_in_progress = false + return result + rescue Mongo::Error => e + handle_commit_error(e) + escalate_write_concern!(commit_options) + rescue Error::AuthError + @transaction_in_progress = false + raise + end + end + + # Top-level commit. Skips if the block already managed the transaction; + # checks the CSOT deadline; then enters the escalation loop. + def commit(result) + return result if transaction_no_longer_active? + + check_deadline_before_commit! + commit_with_escalation(result) + end + + # Escalates write concern to w: :majority for commit retries. + # Note: wtimeout is NOT added here — commit_transaction handles that + # internally when it detects a retry context. + def escalate_write_concern!(opts) + wc = case (v = opts[:write_concern]) + when WriteConcern::Base then v.options + when nil then {} + else v + end + opts[:write_concern] = wc.merge(w: :majority) + end + + # -- Ensure guard ------------------------------------------------------ + + # Called from run's ensure. Aborts if a break/external Timeout escaped + # the loop while a transaction was in progress. + def abort_if_in_progress + return unless @transaction_in_progress + + @session.log_warn( + 'with_transaction callback broke out of with_transaction loop, ' \ + 'aborting transaction' + ) + begin + @session.abort_transaction + rescue Error::OperationFailure::Family, Error::InvalidTransactionOperation + # Ignore — transaction may already be in an inconsistent state. + end + end + + # -- Sleep wrapper (stubbable in tests) -------------------------------- + + def sleep(secs) + Kernel.sleep(secs) + end + end + end +end diff --git a/spec/mongo/session/with_transaction_overload_spec.rb b/spec/mongo/session/with_transaction_overload_spec.rb index 3b1019db08..847ddc4bb5 100644 --- a/spec/mongo/session/with_transaction_overload_spec.rb +++ b/spec/mongo/session/with_transaction_overload_spec.rb @@ -75,13 +75,13 @@ def make_commit_transient_overload_error allow(retry_policy).to receive(:record_non_overload_retry_failure).and_call_original - allow(session).to receive(:sleep) + allow(Kernel).to receive(:sleep) end context 'when callback raises TransientTransactionError with SystemOverloadedError' do it 'uses the new overload backoff' do call_count = 0 - expect(session).to receive(:sleep).with(0.1).once + expect(Kernel).to receive(:sleep).with(0.1).once session.with_transaction do call_count += 1 @@ -93,8 +93,8 @@ def make_commit_transient_overload_error context 'when callback raises TransientTransactionError without SystemOverloadedError' do it 'uses the existing backoff' do call_count = 0 - expect(session).to receive(:sleep).once - expect(session).not_to receive(:sleep).with(0.1) + expect(Kernel).to receive(:sleep).once + expect(Kernel).not_to receive(:sleep).with(0.1) session.with_transaction do call_count += 1 @@ -150,7 +150,7 @@ def make_commit_transient_overload_error end it 'applies overload backoff during commit retry' do - expect(session).to receive(:sleep).once + expect(Kernel).to receive(:sleep).once session.with_transaction do session.instance_variable_set(:@state, Mongo::Session::TRANSACTION_IN_PROGRESS_STATE) diff --git a/spec/mongo/session/with_transaction_timeout_spec.rb b/spec/mongo/session/with_transaction_timeout_spec.rb index 694a4c6028..0e77b9fd44 100644 --- a/spec/mongo/session/with_transaction_timeout_spec.rb +++ b/spec/mongo/session/with_transaction_timeout_spec.rb @@ -43,7 +43,7 @@ allow(session).to receive(:commit_transaction) do session.instance_variable_set(:@state, Mongo::Session::TRANSACTION_COMMITTED_STATE) end - allow(session).to receive(:sleep) + allow(Kernel).to receive(:sleep) end # Stubs Mongo::Utils.monotonic_time: first `initial_calls` invocations @@ -224,6 +224,28 @@ def make_commit_overload_error end end + # Regression test: the original with_transaction used `Utils.monotonic_time >= deadline` + # for the TransientTransactionError commit check, which is always true when + # deadline == 0 (timeout_ms: 0 = infinite CSOT). WithTransactionRunner fixes + # this with deadline_expired? that guards on @deadline.zero?. + context 'when timeout_ms: 0 (infinite CSOT) and commit raises TransientTransactionError' do + let(:commit_error) { make_commit_transient_error } + let(:success_return_value) { 42 } + + it 'retries and succeeds (does not raise TimeoutError)' do + allow(Mongo::Utils).to receive(:monotonic_time).and_return(0.0) + calls = 0 + allow(session).to receive(:commit_transaction) do + calls += 1 + raise commit_error if calls == 1 + + session.instance_variable_set(:@state, Mongo::Session::TRANSACTION_COMMITTED_STATE) + end + result = session.with_transaction(timeout_ms: 0) { success_return_value } + expect(result).to eq(success_return_value) + end + end + context 'when commit overload backoff would exceed CSOT deadline' do let(:commit_error) { make_commit_overload_error }