From 00162152173380785ebcc1923fe9498818aa44eb Mon Sep 17 00:00:00 2001 From: Trey Evans Date: Wed, 13 Sep 2023 16:11:00 -0400 Subject: [PATCH] Use proper threading to encourage work completion of AMQP subscribers. --- lib/event_source.rb | 10 ++++++++++ .../protocols/amqp/bunny_queue_proxy.rb | 3 ++- lib/event_source/threaded.rb | 14 ++++++++++++++ 3 files changed, 26 insertions(+), 1 deletion(-) create mode 100644 lib/event_source/threaded.rb diff --git a/lib/event_source.rb b/lib/event_source.rb index 67615316..baa9d76a 100644 --- a/lib/event_source.rb +++ b/lib/event_source.rb @@ -15,6 +15,7 @@ require 'event_source/version' require 'event_source/ruby_versions' require 'event_source/error' +require 'event_source/threaded' require 'event_source/inflector' require 'event_source/logging' require 'event_source/uris/uri' @@ -63,6 +64,7 @@ def configure end def initialize! + boot_threading! load_protocols create_connections load_async_api_resources @@ -84,6 +86,14 @@ def build_async_api_resource(resource) .call(resource) .success end + + def boot_threading! + @threaded = EventSource::Threaded.new + end + + def threaded + @threaded + end end class EventSourceLogger diff --git a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb index 2b431e55..4818dda8 100644 --- a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb @@ -188,11 +188,12 @@ def on_receive_message( payload, &executable ) - + EventSource.threaded.amqp_consumer_lock.mon_enter subscription_handler.run rescue Bunny::Exception => e logger.error "Bunny Consumer Error \n message: #{e.message} \n backtrace: #{e.backtrace.join("\n")}" ensure + EventSource.threaded.amqp_consumer_lock.mon_exit subscriber = nil end diff --git a/lib/event_source/threaded.rb b/lib/event_source/threaded.rb new file mode 100644 index 00000000..43de85a6 --- /dev/null +++ b/lib/event_source/threaded.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +module EventSource + # Manages concurrent resource access in a threaded environment. + class Threaded + + attr_reader :amqp_consumer_lock, :worker_lock + + def initialize + @amqp_consumer_lock = ::Monitor.new + @worker_lock = ::Monitor.new + end + end +end