From 10c0cd1de482b062b29bd0ba49a2b32b10eafca4 Mon Sep 17 00:00:00 2001 From: Trey Date: Mon, 23 Sep 2024 13:11:59 -0400 Subject: [PATCH 1/8] Don't start event source until Rails is ready. (#117) * Don't start event source until rails is ready. * Fix up when HTTP subscriber bindings are resolved. * Properly name parameter. * Fix http subscriber routing. * Add more documentation. --- .github/workflows/rspec.yml | 2 +- .rspec | 1 + .rspec_rails_specs | 3 + Gemfile | 1 + lib/event_source.rb | 28 +++++- lib/event_source/boot_registry.rb | 96 +++++++++++++++++++ lib/event_source/protocols/amqp_protocol.rb | 2 +- .../protocols/http/faraday_queue_proxy.rb | 10 +- lib/event_source/publisher.rb | 17 ++-- lib/event_source/queue.rb | 16 +++- lib/event_source/railtie.rb | 8 +- lib/event_source/subscriber.rb | 7 +- spec/event_source/command_spec.rb | 5 + spec/event_source/rails_application_spec.rb | 25 +++++ .../events/determinations/eval.rb | 11 +++ .../event_source/publishers/mitc_publisher.rb | 9 ++ .../publishers/parties/mitc_publisher.rb | 12 --- .../subscribers/mitc_response_subscriber.rb | 12 +++ .../subscribers/parties/mitc_subscriber.rb | 14 --- .../parties/organization_subscriber.rb | 21 ---- .../events/parties/organization/created.rb | 2 +- .../organizations/organization_model.rb | 15 --- spec/rails_app/config/application.rb | 2 - spec/rails_app/config/environments/test.rb | 2 +- .../config/initializers/event_source.rb | 13 ++- .../spec/http_service_integration_spec.rb | 20 ++++ spec/rails_app/spec/rails_helper.rb | 5 + spec/rails_app/spec/railtie_spec.rb | 11 +++ spec/support/async_api_files/http_mitc.yml | 58 +++++++++++ 29 files changed, 330 insertions(+), 98 deletions(-) create mode 100644 .rspec_rails_specs create mode 100644 lib/event_source/boot_registry.rb create mode 100644 spec/event_source/rails_application_spec.rb create mode 100644 spec/rails_app/app/event_source/events/determinations/eval.rb create mode 100644 spec/rails_app/app/event_source/publishers/mitc_publisher.rb delete mode 100644 spec/rails_app/app/event_source/publishers/parties/mitc_publisher.rb create mode 100644 spec/rails_app/app/event_source/subscribers/mitc_response_subscriber.rb delete mode 100644 spec/rails_app/app/event_source/subscribers/parties/mitc_subscriber.rb delete mode 100644 spec/rails_app/app/event_source/subscribers/parties/organization_subscriber.rb delete mode 100644 spec/rails_app/app/models/organizations/organization_model.rb create mode 100644 spec/rails_app/spec/http_service_integration_spec.rb create mode 100644 spec/rails_app/spec/rails_helper.rb create mode 100644 spec/rails_app/spec/railtie_spec.rb create mode 100644 spec/support/async_api_files/http_mitc.yml diff --git a/.github/workflows/rspec.yml b/.github/workflows/rspec.yml index 57363046..3a6a9315 100644 --- a/.github/workflows/rspec.yml +++ b/.github/workflows/rspec.yml @@ -6,7 +6,7 @@ jobs: strategy: fail-fast: false matrix: - ruby_version: ['2.6.3', '2.7.5', '3.0.5', '3.1.4', '3.2.2'] + ruby_version: ['2.7.5', '3.0.5', '3.1.4', '3.2.2'] runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 diff --git a/.rspec b/.rspec index 34c5164d..8433ecd9 100644 --- a/.rspec +++ b/.rspec @@ -1,3 +1,4 @@ --format documentation --color --require spec_helper +--exclude-pattern "spec/rails_app/**/*" diff --git a/.rspec_rails_specs b/.rspec_rails_specs new file mode 100644 index 00000000..791d0cba --- /dev/null +++ b/.rspec_rails_specs @@ -0,0 +1,3 @@ +--format documentation +--color +--exclude-pattern "**/*" diff --git a/Gemfile b/Gemfile index 55e26d2d..df419cb2 100644 --- a/Gemfile +++ b/Gemfile @@ -8,6 +8,7 @@ gemspec group :development, :test do gem "rails", '>= 6.1.4' gem "rspec-rails" + gem "parallel_tests" gem "pry", platform: :mri, require: false gem "pry-byebug", platform: :mri, require: false gem 'rubocop' diff --git a/lib/event_source.rb b/lib/event_source.rb index 09fb8514..5ee9d3a1 100644 --- a/lib/event_source.rb +++ b/lib/event_source.rb @@ -42,6 +42,7 @@ require 'event_source/operations/fetch_session' require 'event_source/operations/build_message_options' require 'event_source/operations/build_message' +require 'event_source/boot_registry' # Event source provides ability to compose, publish and subscribe to events module EventSource @@ -65,14 +66,23 @@ class << self :async_api_schemas= def configure + @configured = true yield(config) end - def initialize! - load_protocols - create_connections - load_async_api_resources - load_components + def initialize!(force = false) + # Don't boot if I was never configured. + return unless @configured + boot_registry.boot!(force) do + load_protocols + create_connections + load_async_api_resources + load_components + end + end + + def boot_registry + @boot_registry ||= EventSource::BootRegistry.new end def config @@ -90,6 +100,14 @@ def build_async_api_resource(resource) .call(resource) .success end + + def register_subscriber(subscriber_klass) + boot_registry.register_subscriber(subscriber_klass) + end + + def register_publisher(subscriber_klass) + boot_registry.register_publisher(subscriber_klass) + end end class EventSourceLogger diff --git a/lib/event_source/boot_registry.rb b/lib/event_source/boot_registry.rb new file mode 100644 index 00000000..6c0a7b02 --- /dev/null +++ b/lib/event_source/boot_registry.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +require 'set' +require 'monitor' + +module EventSource + # This class manages correct/loading of subscribers and publishers + # based on the current stage of the EventSource lifecycle. + # + # Depending on both the time the initialization of EventSource is invoked + # and when subscriber/publisher code is loaded, this can become complicated. + # This is largely caused by two confounding factors: + # 1. We want to delay initialization of EventSource until Rails is fully + # 'ready' + # 2. Based on the Rails environment, such as production, development, or + # test (primarily how those different environments treat lazy vs. eager + # loading of classes in a Rails application), subscriber and publisher + # code can be loaded before, after, or sometimes even DURING the + # EventSource boot process - we need to support all models + class BootRegistry + def initialize + @unbooted_publishers = Set.new + @unbooted_subscribers = Set.new + @booted_publishers = Set.new + @booted_subscribers = Set.new + # This is our re-entrant mutex. We're going to use it to make sure that + # registration and boot methods aren't allowed to simultaneously alter + # our state. You'll notice most methods on this class are wrapped in + # synchronize calls against this. + @bootex = Monitor.new + @booted = false + end + + def boot!(force = false) + @bootex.synchronize do + return if @booted && !force + yield + boot_publishers! + boot_subscribers! + @booted = true + end + end + + # Register a publisher for EventSource. + # + # If the EventSource hasn't been booted, save publisher for later. + # Otherwise, boot it now. + def register_publisher(publisher_klass) + @bootex.synchronize do + if @booted + publisher_klass.validate + @booted_publishers << publisher_klass + else + @unbooted_publishers << publisher_klass + end + end + end + + # Register a subscriber for EventSource. + # + # If the EventSource hasn't been booted, save the subscriber for later. + # Otherwise, boot it now. + def register_subscriber(subscriber_klass) + @bootex.synchronize do + if @booted + subscriber_klass.create_subscription + @booted_subscribers << subscriber_klass + else + @unbooted_subscribers << subscriber_klass + end + end + end + + # Boot the publishers. + def boot_publishers! + @bootex.synchronize do + @unbooted_publishers.each do |pk| + pk.validate + @booted_publishers << pk + end + @unbooted_publishers = Set.new + end + end + + # Boot the subscribers. + def boot_subscribers! + @bootex.synchronize do + @unbooted_subscribers.each do |sk| + sk.create_subscription + @booted_subscribers << sk + end + @unbooted_subscribers = Set.new + end + end + end +end \ No newline at end of file diff --git a/lib/event_source/protocols/amqp_protocol.rb b/lib/event_source/protocols/amqp_protocol.rb index c40594fe..5717a421 100644 --- a/lib/event_source/protocols/amqp_protocol.rb +++ b/lib/event_source/protocols/amqp_protocol.rb @@ -15,7 +15,7 @@ require_relative 'amqp/contracts/contract' Gem - .find_files('event_source/protocols/amqp/contracts/**/*.rb') + .find_files('event_source/protocols/amqp/contracts/**/*.rb', false) .sort .each { |f| require(f) } diff --git a/lib/event_source/protocols/http/faraday_queue_proxy.rb b/lib/event_source/protocols/http/faraday_queue_proxy.rb index 6489bd7a..9a53923b 100644 --- a/lib/event_source/protocols/http/faraday_queue_proxy.rb +++ b/lib/event_source/protocols/http/faraday_queue_proxy.rb @@ -49,12 +49,12 @@ def actions # @param [Object] subscriber_klass Subscriber class # @return [Queue] Queue instance def subscribe(subscriber_klass, _options) + subscription_key = [app_name, formatted_exchange_name].join(delimiter) subscriber_suffix = subscriber_klass.name.downcase.gsub('::', '_') - unique_key = [app_name, formatted_exchange_name].join(delimiter) + "_#{subscriber_suffix}" - logger.debug "FaradayQueueProxy#register_subscription Subscriber Class #{subscriber_klass}" - logger.debug "FaradayQueueProxy#register_subscription Unique_key #{unique_key}" - executable = subscriber_klass.executable_for(unique_key) - @subject.actions.push(executable) + unique_key = subscription_key + "_#{subscriber_suffix}" + logger.info "FaradayQueueProxy#register_subscription Subscriber Class #{subscriber_klass}" + logger.info "FaradayQueueProxy#register_subscription Unique_key #{unique_key}" + @subject.register_action(subscriber_klass, unique_key) end def consumer_proxy_for(operation_bindings) diff --git a/lib/event_source/publisher.rb b/lib/event_source/publisher.rb index d5aa9b3c..fc47e110 100644 --- a/lib/event_source/publisher.rb +++ b/lib/event_source/publisher.rb @@ -26,6 +26,16 @@ def self.publisher_container @publisher_container ||= Concurrent::Map.new end + def self.initialization_registry + @initialization_registry ||= Concurrent::Array.new + end + + def self.initialize_publishers + self.initialization_registry.each do |pub| + pub.validate + end + end + def self.[](exchange_ref) # TODO: validate publisher already exists # raise EventSource::Error::PublisherAlreadyRegisteredError.new(id) if registry.key?(id) @@ -46,12 +56,7 @@ def included(base) } base.extend(ClassMethods) - TracePoint.trace(:end) do |t| - if base == t.self - base.validate - t.disable - end - end + EventSource.register_publisher(base) end # methods to register events diff --git a/lib/event_source/queue.rb b/lib/event_source/queue.rb index 21dc7caa..bafeba03 100644 --- a/lib/event_source/queue.rb +++ b/lib/event_source/queue.rb @@ -6,15 +6,14 @@ class Queue # @attr_reader [Object] queue_proxy the protocol-specific class supporting this DSL # @attr_reader [String] name # @attr_reader [Hash] bindings - # @attr_reader [Hash] actions - attr_reader :queue_proxy, :name, :bindings, :actions + attr_reader :queue_proxy, :name, :bindings def initialize(queue_proxy, name, bindings = {}) @queue_proxy = queue_proxy @name = name @bindings = bindings @subject = ::Queue.new - @actions = [] + @registered_actions = [] end # def subscribe(subscriber_klass, &block) @@ -49,5 +48,16 @@ def close def closed? @subject.closed? end + + # Register an action to be performed, with a resolver class and key. + def register_action(resolver, key) + @registered_actions << [resolver, key] + end + + def actions + @registered_actions.map do |ra| + ra.first.executable_for(ra.last) + end + end end end diff --git a/lib/event_source/railtie.rb b/lib/event_source/railtie.rb index ea971274..77dcc66b 100644 --- a/lib/event_source/railtie.rb +++ b/lib/event_source/railtie.rb @@ -2,7 +2,9 @@ module EventSource # :nodoc: - class Railtie < Rails::Railtie - + module Railtie + Rails::Application::Finisher.initializer "event_source.boot", after: :finisher_hook do + EventSource.initialize! + end end -end \ No newline at end of file +end diff --git a/lib/event_source/subscriber.rb b/lib/event_source/subscriber.rb index d62257a6..937f2aba 100644 --- a/lib/event_source/subscriber.rb +++ b/lib/event_source/subscriber.rb @@ -49,12 +49,7 @@ def included(base) base.extend ClassMethods base.include InstanceMethods - TracePoint.trace(:end) do |t| - if base == t.self - base.create_subscription - t.disable - end - end + EventSource.register_subscriber(base) end module InstanceMethods diff --git a/spec/event_source/command_spec.rb b/spec/event_source/command_spec.rb index 7ad0f985..b44d0a6e 100644 --- a/spec/event_source/command_spec.rb +++ b/spec/event_source/command_spec.rb @@ -22,7 +22,12 @@ def call # binding.pry end + before :each do + EventSource.initialize!(true) + end + context '.event' do + let(:organization_params) do { hbx_id: '553234', diff --git a/spec/event_source/rails_application_spec.rb b/spec/event_source/rails_application_spec.rb new file mode 100644 index 00000000..7f77fa79 --- /dev/null +++ b/spec/event_source/rails_application_spec.rb @@ -0,0 +1,25 @@ +require "spec_helper" +require "parallel_tests" +require "parallel_tests/rspec/runner" + +RSpec.describe EventSource, "rails specs" do + it "runs the rails tests in the rails application context" do + ParallelTests.with_pid_file do + specs_run_result = ParallelTests::RSpec::Runner.run_tests( + [ + "spec/rails_app/spec/railtie_spec.rb", + "spec/rails_app/spec/http_service_integration_spec.rb" + ], + 1, + 1, + { + serialize_stdout: true, + test_options: ["-O", ".rspec_rails_specs", "--format", "documentation"] + } + ) + if specs_run_result[:exit_status] != 0 + fail(specs_run_result[:stdout] + "\n\n") + end + end + end +end \ No newline at end of file diff --git a/spec/rails_app/app/event_source/events/determinations/eval.rb b/spec/rails_app/app/event_source/events/determinations/eval.rb new file mode 100644 index 00000000..0cd6b90d --- /dev/null +++ b/spec/rails_app/app/event_source/events/determinations/eval.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +module Events + module Determinations + # Eval will register event publisher for MiTC + class Eval < EventSource::Event + publisher_path 'publishers.mitc_publisher' + + end + end +end \ No newline at end of file diff --git a/spec/rails_app/app/event_source/publishers/mitc_publisher.rb b/spec/rails_app/app/event_source/publishers/mitc_publisher.rb new file mode 100644 index 00000000..5fc3ed43 --- /dev/null +++ b/spec/rails_app/app/event_source/publishers/mitc_publisher.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +module Publishers + class MitcPublisher + # Publisher will send request payload to MiTC for determinations + include ::EventSource::Publisher[http: '/determinations/eval'] + register_event '/determinations/eval' + end +end diff --git a/spec/rails_app/app/event_source/publishers/parties/mitc_publisher.rb b/spec/rails_app/app/event_source/publishers/parties/mitc_publisher.rb deleted file mode 100644 index 3e007812..00000000 --- a/spec/rails_app/app/event_source/publishers/parties/mitc_publisher.rb +++ /dev/null @@ -1,12 +0,0 @@ -# frozen_string_literal: true - -module Parties - class MitcPublisher - # include ::EventSource::Publisher[http: '/determinations/eval'] - - # # register_event '/determinations/eval' - end -end - - - diff --git a/spec/rails_app/app/event_source/subscribers/mitc_response_subscriber.rb b/spec/rails_app/app/event_source/subscribers/mitc_response_subscriber.rb new file mode 100644 index 00000000..d9d253eb --- /dev/null +++ b/spec/rails_app/app/event_source/subscribers/mitc_response_subscriber.rb @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +module Subscribers + class MitcResponseSubscriber + include ::EventSource::Subscriber[http: '/determinations/eval'] + extend EventSource::Logging + + subscribe(:on_determinations_eval) do |body, status, headers| + $GLOBAL_TEST_FLAG = true + end + end +end diff --git a/spec/rails_app/app/event_source/subscribers/parties/mitc_subscriber.rb b/spec/rails_app/app/event_source/subscribers/parties/mitc_subscriber.rb deleted file mode 100644 index a6a404f9..00000000 --- a/spec/rails_app/app/event_source/subscribers/parties/mitc_subscriber.rb +++ /dev/null @@ -1,14 +0,0 @@ -# frozen_string_literal: true - -module MagiMedicaid - class EligbilityDeterminationsSubscriber - # include ::EventSource::Subscriber[http: '/determinations/eval'] - - # # # from: MagiMedicaidEngine of EA after Application's submission - # # # { event: magi_medicaid_application_submitted, payload: :magi_medicaid_application } - # subscribe(:on_determinations_eval) do |headers, payload| - # puts "block headers------#{headers}" - # puts "block payload-----#{payload}" - # end - end -end diff --git a/spec/rails_app/app/event_source/subscribers/parties/organization_subscriber.rb b/spec/rails_app/app/event_source/subscribers/parties/organization_subscriber.rb deleted file mode 100644 index b4d0786a..00000000 --- a/spec/rails_app/app/event_source/subscribers/parties/organization_subscriber.rb +++ /dev/null @@ -1,21 +0,0 @@ -# frozen_string_literal: true - -module Parties - class OrganizationSubscriber - # include ::EventSource::Subscriber[amqp: 'enroll.parties.organizations.fein_corrected'] - - # # subscribe(:on_enroll_parties_organizations_fein_corrected) do |delivery_info, metadata, payload| - # # # Sequence of steps that are executed as single operation - # # puts "triggered --> on_enroll_parties_organizations_fein_corrected block -- #{delivery_info} -- #{metadata} -- #{payload}" - # # end - - # subscribe("on_faa.enroll.parties.organizations") do |delivery_info, metadata, payload| - # # Sequence of steps that are executed as single operation - # puts "triggered --> on_enroll_parties_organizations_fein_corrected block -- #{delivery_info} -- #{metadata} -- #{payload}" - # end - # # def on_enroll_parties_organizations_fein_corrected(payload) - # # # Set of independent reactors for the given event that execute asynchronously - # # puts "triggered --> on_enroll_parties_organizations_fein_corrected method --#{payload}" - # # end - end -end \ No newline at end of file diff --git a/spec/rails_app/app/events/parties/organization/created.rb b/spec/rails_app/app/events/parties/organization/created.rb index 42866e23..49e6c4ff 100644 --- a/spec/rails_app/app/events/parties/organization/created.rb +++ b/spec/rails_app/app/events/parties/organization/created.rb @@ -6,7 +6,7 @@ class Created < EventSource::Event publisher_path 'parties.organization_publisher' # Schema used to validaate Event payload - contract_class 'Parties::Organization::CreateContract' + # contract_class 'Parties::Organization::CreateContract' attribute_keys :hbx_id, :legal_name, :fein, :entity_kind end end diff --git a/spec/rails_app/app/models/organizations/organization_model.rb b/spec/rails_app/app/models/organizations/organization_model.rb deleted file mode 100644 index 233218fc..00000000 --- a/spec/rails_app/app/models/organizations/organization_model.rb +++ /dev/null @@ -1,15 +0,0 @@ -# frozen_string_literal: true - -module Organizations - class OrganizationModel - include Mongoid::Document - include Mongoid::Timestamps - - field :legal_name, type: String - field :entity_kind, type: Symbol - field :fein, type: String - - # Track Events for this model - has_many :events, as: :event_stream, class_name: 'EventSource::EventStream' - end -end diff --git a/spec/rails_app/config/application.rb b/spec/rails_app/config/application.rb index bfe50a09..80441f98 100644 --- a/spec/rails_app/config/application.rb +++ b/spec/rails_app/config/application.rb @@ -7,8 +7,6 @@ Bundler.require(*Rails.groups) -require "event_source" - module RailsApp class Application < Rails::Application config.root = File.expand_path('../..', __FILE__) diff --git a/spec/rails_app/config/environments/test.rb b/spec/rails_app/config/environments/test.rb index 1ace3c3f..b2d790a8 100644 --- a/spec/rails_app/config/environments/test.rb +++ b/spec/rails_app/config/environments/test.rb @@ -12,7 +12,7 @@ # Do not eager load code on boot. This avoids loading your whole application # just for the purpose of running a single test. If you are using a tool that # preloads Rails for running tests, you may have to set it to true. - config.eager_load = false + config.eager_load = true # Configure static file server for tests with Cache-Control for performance. config.serve_static_files = true diff --git a/spec/rails_app/config/initializers/event_source.rb b/spec/rails_app/config/initializers/event_source.rb index 346d942c..d192c56d 100644 --- a/spec/rails_app/config/initializers/event_source.rb +++ b/spec/rails_app/config/initializers/event_source.rb @@ -57,6 +57,14 @@ rabbitmq.default_content_type = 'application/json' end + server.http do |http| + http.ref = 'http://mitc:3001' + http.host = ENV['MITC_HOST'] || 'http://localhost' + http.port = ENV['MITC_PORT'] || '3000' + http.url = ENV['MITC_URL'] || 'http://localhost:3000' + http.default_content_type = 'application/json' + end + server.http do |http| http.host = "https://api.github.com" http.default_content_type = 'application/json' @@ -119,7 +127,8 @@ # AcaEntities::Operations::AsyncApi::FindResource.new.call(self) end -dir = Pathname.pwd.join('spec', 'support', 'async_api_files') +config_dir = File.dirname(__FILE__) +dir = File.join(config_dir, '..', '..', '..', 'support', 'async_api_files') EventSource.async_api_schemas = ::Dir[::File.join(dir, '**', '*')].reject { |p| ::File.directory? p }.sort.reduce([]) do |memo, file| # read # serialize yaml to hash @@ -127,4 +136,4 @@ memo << EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath.new.call(path: file).success end -EventSource.initialize! +# EventSource.initialize! diff --git a/spec/rails_app/spec/http_service_integration_spec.rb b/spec/rails_app/spec/http_service_integration_spec.rb new file mode 100644 index 00000000..007eb983 --- /dev/null +++ b/spec/rails_app/spec/http_service_integration_spec.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +require_relative './rails_helper' + +RSpec.describe "with an http subscriber service" do + include EventSource::Command + + it "runs when invoked" do + $GLOBAL_TEST_FLAG = false + + WebMock.stub_request( + :post, + /http:\/\/localhost:3000\/determinations\/eval/ + ).to_return({body: "{}"}) + response = event("events.determinations.eval", attributes: {}).success.publish + expect(response.status).to eq 200 + sleep(0.5) + expect($GLOBAL_TEST_FLAG).to eq(true) + end +end diff --git a/spec/rails_app/spec/rails_helper.rb b/spec/rails_app/spec/rails_helper.rb new file mode 100644 index 00000000..42d5ccf5 --- /dev/null +++ b/spec/rails_app/spec/rails_helper.rb @@ -0,0 +1,5 @@ +ENV['RAILS_ENV'] ||= 'test' +require 'bundler/setup' +require 'webmock/rspec' +require 'rails' +require_relative '../config/environment' diff --git a/spec/rails_app/spec/railtie_spec.rb b/spec/rails_app/spec/railtie_spec.rb new file mode 100644 index 00000000..7c93c25f --- /dev/null +++ b/spec/rails_app/spec/railtie_spec.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +require_relative './rails_helper' + +RSpec.describe EventSource::Railtie do + it "runs when invoked" do + manager = EventSource::ConnectionManager.instance + connection = manager.connections_for(:amqp).first + expect(connection).not_to be_nil + end +end diff --git a/spec/support/async_api_files/http_mitc.yml b/spec/support/async_api_files/http_mitc.yml new file mode 100644 index 00000000..e7e2f4b6 --- /dev/null +++ b/spec/support/async_api_files/http_mitc.yml @@ -0,0 +1,58 @@ +asyncapi: "2.0.0" +info: + title: MAGI in the Cloud (MitC) + version: 0.1.0 + description: Configuration for accessing MitC Medicaid and CHIP eligibility determination services + contact: + name: IdeaCrew + url: https://ideacrew.com + email: info@ideacrew.com + license: + name: MIT + url: https://opensource.org/licenses/MIT + +servers: + production: + url: http://mitc:3001 + protocol: http + protocolVersion: 0.1.0 + description: MitC Development Server + development: + url: http://mitc:3001 + protocol: http + protocolVersion: 0.1.0 + description: MitC Development Server + test: + url: http://mitc:3001 + protocol: http + protocolVersion: 0.1.0 + description: MitC Test Server + +defaultContentType: application/json + +channels: + /determinations/eval: + publish: + operationId: /determinations/eval + description: HTTP endpoint for MitC eligibility determination requests + bindings: + http: + type: request + method: POST + headers: + Content-Type: application/json + Accept: application/json + subscribe: + operationId: /on/determinations/eval + description: EventSource Subscriber that publishes MitC eligibility determination responses + bindings: + http: + type: response + method: GET + headers: + Content-Type: application/json + Accept: application/json + +tags: + - name: linter_tag + description: placeholder that satisfies the linter \ No newline at end of file From 55c2452a65da6c59525777c949f368b30cc96ff1 Mon Sep 17 00:00:00 2001 From: Raghu Ram Date: Thu, 2 Jan 2025 16:23:14 -0500 Subject: [PATCH 2/8] add support for amqp payload compress/decompress (#119) * add support for amqp payload compress/decompress * publish compressed messages based on api configuration * publish binary data directly to rabbitmq * check if message bindings available * verify encoding before passing payload to consumer handler * refactor payload_codec, add specs * add mime encode/decode operations * add specs for mime encode/decode operations * update bunny queue proxy and remove payload_codec * remove zlib gem reference * log file size before and after encoding * update method names * update errors for encode/decode exceptions * add documentation for encode/decode operations --- lib/event_source.rb | 2 + lib/event_source/error.rb | 2 + lib/event_source/operations/mime_decode.rb | 80 ++++++++++++++++ lib/event_source/operations/mime_encode.rb | 87 +++++++++++++++++ .../protocols/amqp/bunny_exchange_proxy.rb | 11 ++- .../protocols/amqp/bunny_queue_proxy.rb | 29 +++++- lib/event_source/publish_operation.rb | 31 ++++++ .../operations/mime_decode_spec.rb | 56 +++++++++++ .../operations/mime_encode_spec.rb | 55 +++++++++++ .../amqp/bunny_exchange_proxy_spec.rb | 54 ++++++++++- .../protocols/amqp/bunny_queue_proxy_spec.rb | 95 +++++++++++++++++++ spec/event_source/publish_operation_spec.rb | 77 +++++++++++++++ 12 files changed, 575 insertions(+), 4 deletions(-) create mode 100644 lib/event_source/operations/mime_decode.rb create mode 100644 lib/event_source/operations/mime_encode.rb create mode 100644 spec/event_source/operations/mime_decode_spec.rb create mode 100644 spec/event_source/operations/mime_encode_spec.rb diff --git a/lib/event_source.rb b/lib/event_source.rb index 5ee9d3a1..b24ca3a1 100644 --- a/lib/event_source.rb +++ b/lib/event_source.rb @@ -38,6 +38,8 @@ require 'event_source/event' require 'event_source/subscriber' require 'event_source/operations/codec64' +require 'event_source/operations/mime_encode' +require 'event_source/operations/mime_decode' require 'event_source/operations/create_message' require 'event_source/operations/fetch_session' require 'event_source/operations/build_message_options' diff --git a/lib/event_source/error.rb b/lib/event_source/error.rb index ade7918a..e7dc002d 100644 --- a/lib/event_source/error.rb +++ b/lib/event_source/error.rb @@ -32,5 +32,7 @@ class Error < StandardError ServerConfigurationNotFound = Class.new(Error) ServerConfigurationInvalid = Class.new(Error) MessageBuildError = Class.new(Error) + PayloadEncodeError = Class.new(Error) + PayloadDecodeError = Class.new(Error) end end diff --git a/lib/event_source/operations/mime_decode.rb b/lib/event_source/operations/mime_decode.rb new file mode 100644 index 00000000..f147182d --- /dev/null +++ b/lib/event_source/operations/mime_decode.rb @@ -0,0 +1,80 @@ +# frozen_string_literal: true + +require "dry/monads" +require "dry/monads/do" + +module EventSource + module Operations + # Operation for decoding payloads, including decompression using Zlib. + class MimeDecode + include Dry::Monads[:result, :do] + + # Supported MIME types for decoding. + MIME_TYPES = %w[application/zlib application/json].freeze + + # Decodes the payload based on the specified MIME type. + # For example, decompresses the payload using Zlib for 'application/zlib'. + # + # @param mime_type [String] the MIME type of the payload (e.g., 'application/zlib', 'application/json') + # @param payload [String] the encoded payload to decode + # + # @return [Dry::Monads::Success] if decoding is successful + # @return [Dry::Monads::Failure] if an error occurs (e.g., invalid MIME type, decoding failure) + def call(mime_type, payload) + valid_payload = yield validate_payload(payload, mime_type) + decoded_data = yield decode(valid_payload, mime_type) + + Success(decoded_data) + end + + private + + # Validates the payload based on the MIME type. + # Ensures the payload is binary-encoded for 'application/zlib' MIME type. + # + # @param payload [String] the payload to validate + # @param mime_type [String] the MIME type of the payload + # + # @return [Dry::Monads::Success] if the payload is valid + # @return [Dry::Monads::Failure] if the payload is invalid + def validate_payload(payload, mime_type) + unless MIME_TYPES.include?(mime_type.to_s) + return Failure("Invalid MIME type '#{mime_type}'. Supported types are: #{MIME_TYPES.join(', ')}.") + end + + if mime_type.to_s == 'application/zlib' && !binary_payload?(payload) + return Failure("Payload must be binary-encoded for MIME type 'application/zlib'.") + end + + Success(payload) + end + + # Decodes the payload using the specified MIME type. + # For 'application/zlib', it decompresses the payload using Zlib. + # + # @param payload [String] the payload to decode + # @param mime_type [String] the MIME type of the payload + # + # @return [Dry::Monads::Success] if decoding is successful + # @return [Dry::Monads::Failure] if decoding fails + def decode(payload, mime_type) + decoded_data = Zlib.inflate(payload) if mime_type.to_s == 'application/zlib' + + Success(decoded_data || payload) + rescue Zlib::Error => e + Failure("Failed to decode payload using Zlib: #{e.message}") + end + + # Checks whether the payload is binary-encoded. + # + # @param payload [String] the payload to check + # + # @return [Boolean] true if the payload is binary-encoded, false otherwise + def binary_payload?(payload) + return false unless payload.respond_to?(:encoding) + + payload.encoding == Encoding::BINARY + end + end + end +end diff --git a/lib/event_source/operations/mime_encode.rb b/lib/event_source/operations/mime_encode.rb new file mode 100644 index 00000000..c5b6f8e7 --- /dev/null +++ b/lib/event_source/operations/mime_encode.rb @@ -0,0 +1,87 @@ +# frozen_string_literal: true + +require "dry/monads" +require "dry/monads/do" + +module EventSource + module Operations + # Operation for encoding payloads into specified MIME types. + # For example, it supports compression using Zlib for 'application/zlib'. + class MimeEncode + include Dry::Monads[:result, :do] + include EventSource::Logging + + # Supported MIME types for encoding. + MIME_TYPES = %w[application/zlib application/json].freeze + + # Encodes the given payload into the specified MIME type. + # For example, compresses the payload using Zlib for 'application/zlib'. + # + # @param mime_type [String] the MIME type for encoding (e.g., 'application/zlib', 'application/json') + # @param payload [String, Hash] the payload to encode; must be a Hash or String + # + # @return [Dry::Monads::Success] if encoding is successful + # @return [Dry::Monads::Failure] if an error occurs (e.g., invalid MIME type, payload type, or encoding failure) + def call(mime_type, payload) + json_payload = yield validate_payload(payload, mime_type) + encoded_data = yield encode(json_payload, mime_type) + + Success(encoded_data) + end + + private + + # Validates the payload and MIME type before encoding. + # Ensures the MIME type is supported and the payload is either a Hash or a String. + # + # @param payload [String, Hash] the payload to validate + # @param mime_type [String] the MIME type for encoding + # + # @return [Dry::Monads::Success] if the payload and MIME type are valid + # @return [Dry::Monads::Failure] if the MIME type is unsupported or the payload is invalid + def validate_payload(payload, mime_type) + unless MIME_TYPES.include?(mime_type.to_s) + return Failure("Invalid MIME type '#{mime_type}'. Supported types are: #{MIME_TYPES.join(', ')}.") + end + + unless payload.is_a?(Hash) || payload.is_a?(String) + return Failure("Invalid payload type. Expected a Hash or String, but received #{payload.class}.") + end + + Success(payload.is_a?(Hash) ? payload.to_json : payload) + end + + # Encodes the payload based on the MIME type. + # For 'application/zlib', compresses the payload using Zlib. + # Logs the original and encoded payload sizes for debugging. + # + # @param json_payload [String] the JSON stringified payload to encode + # @param mime_type [String] the MIME type for encoding + # + # @return [Dry::Monads::Success] if encoding is successful + # @return [Dry::Monads::Failure] if encoding fails + def encode(json_payload, mime_type) + encoded_data = Zlib.deflate(json_payload) if mime_type.to_s == 'application/zlib' + + logger.debug "*" * 80 + logger.debug "Starting payload encoding for MIME type: '#{mime_type}'" + logger.debug "Original payload size: #{data_size_in_kb(json_payload)} KB" + logger.debug "Encoded payload size: #{data_size_in_kb(encoded_data)} KB" if encoded_data + logger.debug "*" * 80 + + Success(encoded_data || json_payload) + rescue Zlib::Error => e + Failure("Failed to compress payload using Zlib: #{e.message}") + end + + # Calculates the size of the data in kilobytes (KB). + # + # @param data [String] the data whose size is to be calculated + # + # @return [Float] the size of the data in KB, rounded to two decimal places + def data_size_in_kb(data) + (data.bytesize / 1024.0).round(2) + end + end + end +end diff --git a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb index 28e735c1..4dd1b2c4 100644 --- a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb @@ -49,7 +49,10 @@ def publish(payload:, publish_bindings:, headers: {}) bunny_publish_bindings[:headers] = headers unless headers.empty? logger.debug "BunnyExchange#publish publishing message with bindings: #{bunny_publish_bindings.inspect}" - @subject.publish(payload.to_json, bunny_publish_bindings) + + payload = payload.to_json unless is_binary?(payload) + @subject.publish(payload, bunny_publish_bindings) + logger.debug "BunnyExchange#publish published message: #{payload}" logger.debug "BunnyExchange#publish published message to exchange: #{@subject.name}" end @@ -67,6 +70,12 @@ def message_id SecureRandom.uuid end + def is_binary?(payload) + return false unless payload.respond_to?(:encoding) + + payload.encoding == Encoding::BINARY + end + # Filtering and renaming AsyncAPI Operation bindings to Bunny/RabitMQ # bindings # diff --git a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb index 89c07b58..616d9c41 100644 --- a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb @@ -25,6 +25,7 @@ class BunnyQueueProxy # @return [Bunny::Queue] def initialize(channel_proxy, async_api_channel_item) @channel_proxy = channel_proxy + @async_api_channel_item = async_api_channel_item bindings = async_api_channel_item.bindings @consumers = [] @@ -134,7 +135,7 @@ def on_receive_message( subscriber = subscriber_klass.new subscriber.channel = @subject.channel - + payload = decode_payload(payload) subscription_handler = EventSource::Protocols::Amqp::BunnyConsumerHandler.new( subscriber, @@ -143,7 +144,6 @@ def on_receive_message( payload, &executable ) - subscription_handler.run rescue Bunny::Exception => e logger.error "Bunny Consumer Error \n message: #{e.message} \n backtrace: #{e.backtrace.join("\n")}" @@ -151,6 +151,31 @@ def on_receive_message( subscriber = nil end + # Decodes the given payload based on the `contentEncoding` specified in the AsyncAPI *_subscribe.yml message bindings. + # + # For example, if `contentEncoding` is set to `application/zlib`, the payload will be decompressed using zlib. + # If no `contentEncoding` is provided, the payload will be returned unchanged. + # + # @param payload [String] The payload to be decoded. + # @return [String] The decoded payload, or the original payload if no encoding is specified. + # @raise [EventSource::Error::PayloadDecodeError] if the decoding process fails. + def decode_payload(payload) + async_api_subscribe_operation = @async_api_channel_item.subscribe + return payload unless async_api_subscribe_operation.message + + message_bindings = async_api_subscribe_operation.message['bindings'] + encoding = message_bindings.first[1]['contentEncoding'] if message_bindings + return payload unless encoding + + output = EventSource::Operations::MimeDecode.new.call(encoding, payload) + if output.success? + output.value! + else + logger.error "Failed to decompress message \n due to: #{output.failure}" + raise EventSource::Error::PayloadDecodeError, output.failure + end + end + def find_executable(subscriber_klass, delivery_info) subscriber_suffix = subscriber_klass_name_to_suffix(subscriber_klass) diff --git a/lib/event_source/publish_operation.rb b/lib/event_source/publish_operation.rb index e6cd6c05..ddb7fb9f 100644 --- a/lib/event_source/publish_operation.rb +++ b/lib/event_source/publish_operation.rb @@ -3,6 +3,8 @@ module EventSource # Publish {EventSource::Event} messages class PublishOperation + include EventSource::Logging + # @attr_reader [EventSource::Channel] channel the channel instance used by # this PublishOperation # @attr_reader [Object] subject instance of the protocol's publish class @@ -26,11 +28,40 @@ def initialize(channel, publish_proxy, async_api_publish_operation) # @example # #publish("Message", :headers => { }) def call(payload, options = {}) + payload = encode_payload(payload) @subject.publish( payload: payload, publish_bindings: @async_api_publish_operation[:bindings], headers: options[:headers] || {} ) end + + # Encodes the given payload based on the `contentEncoding` specified in the AsyncAPI *_publish.yml message bindings. + # + # For example, if `contentEncoding` is set to `application/zlib`, the payload will be compressed using zlib. + # If no `contentEncoding` is provided, the payload will be returned as-is without modification. + # + # Note: + # - Encoding is not needed for the HTTP protocol, as encoding is handled at the server level. + # - For other protocols like AMQP, encoding is supported to ensure proper message transmission. + # + # @param payload [String, Hash] The payload to be encoded. + # @return [String] The encoded payload, or the original payload if no encoding is specified. + # @raise [EventSource::Error::PayloadEncodeError] if the encoding process fails. + def encode_payload(payload) + return payload unless @async_api_publish_operation.message + + message_bindings = @async_api_publish_operation.message['bindings'] + encoding = message_bindings.first[1]['contentEncoding'] if message_bindings + return payload unless encoding + + output = EventSource::Operations::MimeEncode.new.call(encoding, payload) + if output.success? + output.value! + else + logger.error "Failed to decompress message \n due to: #{output.failure}" + raise EventSource::Error::PayloadEncodeError, output.failure + end + end end end diff --git a/spec/event_source/operations/mime_decode_spec.rb b/spec/event_source/operations/mime_decode_spec.rb new file mode 100644 index 00000000..7045177a --- /dev/null +++ b/spec/event_source/operations/mime_decode_spec.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +RSpec.describe EventSource::Operations::MimeDecode do + subject { described_class.new } + + describe "#call" do + context "when the payload and mime type are valid" do + let(:payload) { { message: "Hello, World!" } } + let(:compressed_payload) { Zlib.deflate(payload.to_json) } + let(:mime_type) { "application/zlib" } + + it "successfully decodes the payload" do + result = subject.call(mime_type, compressed_payload) + + expect(result).to be_success + expect(result.value!).to eq(payload.to_json) + end + end + + context "when the payload is not binary for application/zlib" do + let(:invalid_payload) { "Not binary data" } + let(:mime_type) { "application/zlib" } + + it "returns a failure" do + result = subject.call(mime_type, invalid_payload) + + expect(result).to be_failure + expect(result.failure).to eq("Payload must be binary-encoded for MIME type 'application/zlib'.") + end + end + + context "when the mime type is invalid" do + let(:payload) { { message: "Hello, World!" }.to_json } + let(:mime_type) { "text/plain" } + + it "returns a failure" do + result = subject.call(mime_type, payload) + + expect(result).to be_failure + expect(result.failure).to eq("Invalid MIME type 'text/plain'. Supported types are: application/zlib, application/json.") + end + end + + context "when decoding fails" do + let(:invalid_compressed_payload) { "Invalid compressed data" } + let(:mime_type) { "application/zlib" } + + it "returns a failure with an error message" do + result = subject.call(mime_type, invalid_compressed_payload) + + expect(result).to be_failure + expect(result.failure).to eq("Payload must be binary-encoded for MIME type 'application/zlib'.") + end + end + end +end diff --git a/spec/event_source/operations/mime_encode_spec.rb b/spec/event_source/operations/mime_encode_spec.rb new file mode 100644 index 00000000..ca91814c --- /dev/null +++ b/spec/event_source/operations/mime_encode_spec.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true + +RSpec.describe EventSource::Operations::MimeEncode do + subject { described_class.new } + + describe "#call" do + context "when the payload and mime type are valid" do + let(:payload) { { message: "Hello, World!" } } + let(:mime_type) { "application/zlib" } + + it "successfully encodes the payload" do + result = subject.call(mime_type, payload) + + expect(result).to be_success + expect(Zlib.inflate(result.value!)).to eq(payload.to_json) + end + end + + context "when the payload is a string and mime type is valid" do + let(:payload) { "Hello, World!" } + let(:mime_type) { "application/json" } + + it "returns the payload as JSON" do + result = subject.call(mime_type, payload) + + expect(result).to be_success + expect(result.value!).to eq(payload) + end + end + + context "when the mime type is invalid" do + let(:payload) { { message: "Hello, World!" } } + let(:mime_type) { "text/plain" } + + it "returns a failure" do + result = subject.call(mime_type, payload) + + expect(result).to be_failure + expect(result.failure).to eq("Invalid MIME type 'text/plain'. Supported types are: application/zlib, application/json.") + end + end + + context "when the payload is invalid" do + let(:payload) { 1000 } + let(:mime_type) { "application/json" } + + it "returns a failure" do + result = subject.call(mime_type, payload) + + expect(result).to be_failure + expect(result.failure).to eq("Invalid payload type. Expected a Hash or String, but received Integer.") + end + end + end +end diff --git a/spec/event_source/protocols/amqp/bunny_exchange_proxy_spec.rb b/spec/event_source/protocols/amqp/bunny_exchange_proxy_spec.rb index 5b289934..d9a2015c 100644 --- a/spec/event_source/protocols/amqp/bunny_exchange_proxy_spec.rb +++ b/spec/event_source/protocols/amqp/bunny_exchange_proxy_spec.rb @@ -4,5 +4,57 @@ require 'config_helper' RSpec.describe EventSource::Protocols::Amqp::BunnyExchangeProxy do - it 'the publish method should include all bindings, including message_id, persistance & message level bindings' + let(:channel_proxy) { instance_double('BunnyChannelProxy', subject: double) } + let(:exchange_bindings) do + { + type: :direct, + name: 'test_exchange', + durable: true, + auto_delete: false, + vhost: 'test_vhost' + } + end + let(:bunny_exchange) { instance_double('Bunny::Exchange', name: 'test_exchange') } + let(:payload) { { message: 'test message' } } + let(:publish_bindings) { { routing_key: 'test.key', persistent: true } } + let(:headers) { { correlation_id: '12345', custom_header: 'test_value' } } + + subject { described_class.new(channel_proxy, exchange_bindings) } + + before do + allow_any_instance_of(described_class).to receive(:bunny_exchange_for).and_return(bunny_exchange) + allow(bunny_exchange).to receive(:publish) + end + + describe '#publish' do + it 'publishes the payload with the correct bindings and headers' do + subject.publish(payload: payload, publish_bindings: publish_bindings, headers: headers) + + expect(bunny_exchange).to have_received(:publish).with(payload.to_json, { + correlation_id: '12345', + headers: { custom_header: 'test_value' } + }) + end + + it 'logs the publishing process' do + expect(subject.logger).to receive(:debug).with(/publishing message with bindings:/) + expect(subject.logger).to receive(:debug).with(/published message:/) + expect(subject.logger).to receive(:debug).with(/published message to exchange:/) + + subject.publish(payload: payload, publish_bindings: publish_bindings, headers: headers) + end + + context 'when the payload is binary' do + let(:binary_payload) { Zlib.deflate("binary data") } + + it 'does not convert the payload to JSON' do + subject.publish(payload: binary_payload, publish_bindings: publish_bindings, headers: headers) + + expect(bunny_exchange).to have_received(:publish).with(binary_payload, { + correlation_id: '12345', + headers: { custom_header: 'test_value' } + }) + end + end + end end diff --git a/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb b/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb index a1758b38..5bbe2041 100644 --- a/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb +++ b/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb @@ -321,4 +321,99 @@ end end end + + describe '#decode_payload' do + let(:payload) { 'test_payload' } + let(:mime_decode_operation) { instance_double(EventSource::Operations::MimeDecode) } + let(:channel_proxy) { instance_double('ChannelProxy', subject: double) } + let(:async_api_channel_item) { instance_double('AsyncApiChannelItem', bindings: channel_bindings) } + + let(:subscribe_operation) do + EventSource::AsyncApi::SubscribeOperation.new( + operationId: 'subscribe_message', + bindings: { amqp: { key: 'value' } }, + message: { + 'bindings' => { + 'amqp' => { + 'contentEncoding' => 'application/zlib' + } + } + } + ) + end + + let(:instance) { described_class.new(channel_proxy, async_api_channel_item) } + let(:channel_bindings) do + { + amqp: { + is: :queue, + queue: { + name: 'on_event_source.test_queue', + durable: true, + exclusive: false, + auto_delete: false, + vhost: 'event_source' + } + } + } + end + + let(:bunny_queue) { instance_double('BunnyQueue') } + + before do + allow_any_instance_of(described_class).to receive(:bunny_queue_for).and_return(bunny_queue) + allow_any_instance_of(described_class).to receive(:bind_exchange).and_return(true) + allow(async_api_channel_item).to receive(:subscribe).and_return(subscribe_operation) + allow(EventSource::Operations::MimeDecode).to receive(:new).and_return(mime_decode_operation) + end + + context 'when there is no message in the subscribe operation' do + let(:subscribe_operation) { EventSource::AsyncApi::SubscribeOperation.new(operationId: 'subscribe_message' ) } + + it 'returns the original payload' do + expect(instance.decode_payload(payload)).to eq(payload) + end + end + + context 'when there is no contentEncoding in the message bindings' do + let(:subscribe_operation) { EventSource::AsyncApi::SubscribeOperation.new(operationId: 'subscribe_message', message: { }) } + + it 'returns the original payload' do + expect(instance.decode_payload(payload)).to eq(payload) + end + end + + context 'when contentEncoding is provided' do + context 'when decoding is successful' do + let(:decoded_payload) { 'decoded_payload' } + + before do + allow(mime_decode_operation).to receive(:call) + .with('application/zlib', payload) + .and_return(Dry::Monads::Success(decoded_payload)) + end + + it 'returns the decoded payload' do + expect(instance.decode_payload(payload)).to eq(decoded_payload) + end + end + + context 'when decoding fails' do + let(:failure_message) { 'Decoding error' } + + before do + allow(mime_decode_operation).to receive(:call) + .with('application/zlib', payload) + .and_return(Dry::Monads::Failure(failure_message)) + end + + it 'logs the error and raises a PayloadDecodeError' do + expect(instance.logger).to receive(:error).with("Failed to decompress message \n due to: #{failure_message}") + expect do + instance.decode_payload(payload) + end.to raise_error(EventSource::Error::PayloadDecodeError, failure_message) + end + end + end + end end diff --git a/spec/event_source/publish_operation_spec.rb b/spec/event_source/publish_operation_spec.rb index 19f2d535..105e2c75 100644 --- a/spec/event_source/publish_operation_spec.rb +++ b/spec/event_source/publish_operation_spec.rb @@ -120,6 +120,83 @@ it 'should forward the message to a queue bound to the exchange' do end end + + describe '#encode_payload' do + + let(:channel) { instance_double('EventSource::Channel') } + let(:publish_proxy) { instance_double('PublishProxy', publish: true) } + let(:async_api_publish_operation) do + EventSource::AsyncApi::PublishOperation.new({ + operationId: 'publish_message', + bindings: { amqp: { key: 'value' } }, + message: { + 'bindings' => { 'amqp' => { 'contentEncoding' => 'application/zlib' } } + } + }) + end + + let(:instance) { described_class.new(channel, publish_proxy, async_api_publish_operation) } + + describe '#encode_payload' do + let(:payload) { 'test_payload' } + let(:mime_encode_operation) { instance_double(EventSource::Operations::MimeEncode) } + + before do + allow(EventSource::Operations::MimeEncode).to receive(:new).and_return(mime_encode_operation) + end + + context 'when there is no message in the async API publish operation' do + let(:async_api_publish_operation) { EventSource::AsyncApi::PublishOperation.new({ operationId: 'publish_message', bindings: {} }) } + + it 'returns the original payload' do + expect(instance.encode_payload(payload)).to eq(payload) + end + end + + context 'when there is no contentEncoding in the message bindings' do + let(:async_api_publish_operation) do + EventSource::AsyncApi::PublishOperation.new({ operationId: 'publish_message', message: { } }) + end + + it 'returns the original payload' do + expect(instance.encode_payload(payload)).to eq(payload) + end + end + + context 'when contentEncoding is provided' do + context 'when encoding is successful' do + let(:encoded_payload) { 'encoded_payload' } + + before do + allow(mime_encode_operation).to receive(:call) + .with('application/zlib', payload) + .and_return(Dry::Monads::Success(encoded_payload)) + end + + it 'returns the encoded payload' do + expect(instance.encode_payload(payload)).to eq(encoded_payload) + end + end + + context 'when encoding fails' do + let(:failure_message) { 'Encoding error' } + + before do + allow(mime_encode_operation).to receive(:call) + .with('application/zlib', payload) + .and_return(Dry::Monads::Failure(failure_message)) + end + + it 'logs the error and raises a PayloadEncodeError' do + expect(instance.logger).to receive(:error).with("Failed to decompress message \n due to: #{failure_message}") + expect do + instance.encode_payload(payload) + end.to raise_error(EventSource::Error::PayloadEncodeError, failure_message) + end + end + end + end + end end # RSpec.describe EventSource::PublishOperation do From 9a84bc9fce020f002ad9402c38c8caa82b035160 Mon Sep 17 00:00:00 2001 From: Raghu Ram Date: Mon, 3 Feb 2025 10:20:38 -0500 Subject: [PATCH 3/8] add default mime type encode as json for amqp protocol (#120) * add default mime type encode json * update documentation * spec fixes * refactor json string check and specs --- lib/event_source/operations/mime_decode.rb | 22 +++++--- lib/event_source/operations/mime_encode.rb | 51 +++++++++++-------- .../protocols/amqp/bunny_exchange_proxy.rb | 9 +--- lib/event_source/publish_operation.rb | 19 +++++-- .../operations/mime_decode_spec.rb | 42 +++++++++++++++ .../operations/mime_encode_spec.rb | 51 +++++++++++++++---- .../amqp/bunny_exchange_proxy_spec.rb | 4 +- 7 files changed, 148 insertions(+), 50 deletions(-) diff --git a/lib/event_source/operations/mime_decode.rb b/lib/event_source/operations/mime_decode.rb index f147182d..1f5bc8e4 100644 --- a/lib/event_source/operations/mime_decode.rb +++ b/lib/event_source/operations/mime_decode.rb @@ -21,7 +21,7 @@ class MimeDecode # @return [Dry::Monads::Success] if decoding is successful # @return [Dry::Monads::Failure] if an error occurs (e.g., invalid MIME type, decoding failure) def call(mime_type, payload) - valid_payload = yield validate_payload(payload, mime_type) + valid_payload, mime_type = yield validate_payload(payload, mime_type.to_s) decoded_data = yield decode(valid_payload, mime_type) Success(decoded_data) @@ -38,15 +38,18 @@ def call(mime_type, payload) # @return [Dry::Monads::Success] if the payload is valid # @return [Dry::Monads::Failure] if the payload is invalid def validate_payload(payload, mime_type) - unless MIME_TYPES.include?(mime_type.to_s) + unless MIME_TYPES.include?(mime_type) return Failure("Invalid MIME type '#{mime_type}'. Supported types are: #{MIME_TYPES.join(', ')}.") end - if mime_type.to_s == 'application/zlib' && !binary_payload?(payload) - return Failure("Payload must be binary-encoded for MIME type 'application/zlib'.") + # Allow JSON string payloads to pass validation to avoid processing failures + # for existing JSON messages in the queue. These messages may have been queued + # with the wrong MIME type ('application/zlib') but are still valid JSON. + if mime_type == 'application/zlib' + return Failure("Payload must be binary-encoded for MIME type 'application/zlib'.") unless binary_payload?(payload) || valid_json_string?(payload) end - Success(payload) + Success([payload, mime_type]) end # Decodes the payload using the specified MIME type. @@ -58,7 +61,7 @@ def validate_payload(payload, mime_type) # @return [Dry::Monads::Success] if decoding is successful # @return [Dry::Monads::Failure] if decoding fails def decode(payload, mime_type) - decoded_data = Zlib.inflate(payload) if mime_type.to_s == 'application/zlib' + decoded_data = Zlib.inflate(payload) if mime_type == 'application/zlib' && binary_payload?(payload) Success(decoded_data || payload) rescue Zlib::Error => e @@ -75,6 +78,13 @@ def binary_payload?(payload) payload.encoding == Encoding::BINARY end + + def valid_json_string?(data) + data.is_a?(String) && JSON.parse(data) + true + rescue JSON::ParserError + false + end end end end diff --git a/lib/event_source/operations/mime_encode.rb b/lib/event_source/operations/mime_encode.rb index c5b6f8e7..4bd551cb 100644 --- a/lib/event_source/operations/mime_encode.rb +++ b/lib/event_source/operations/mime_encode.rb @@ -18,60 +18,69 @@ class MimeEncode # For example, compresses the payload using Zlib for 'application/zlib'. # # @param mime_type [String] the MIME type for encoding (e.g., 'application/zlib', 'application/json') - # @param payload [String, Hash] the payload to encode; must be a Hash or String + # @param payload [Any] the payload to encode; # # @return [Dry::Monads::Success] if encoding is successful # @return [Dry::Monads::Failure] if an error occurs (e.g., invalid MIME type, payload type, or encoding failure) def call(mime_type, payload) - json_payload = yield validate_payload(payload, mime_type) - encoded_data = yield encode(json_payload, mime_type) + mime_type = yield validate(mime_type) + encoded_data = yield encode(mime_type, payload) Success(encoded_data) end private - # Validates the payload and MIME type before encoding. - # Ensures the MIME type is supported and the payload is either a Hash or a String. + # Validates theMIME type before encoding. + # Ensures the MIME type is supported # - # @param payload [String, Hash] the payload to validate # @param mime_type [String] the MIME type for encoding # # @return [Dry::Monads::Success] if the payload and MIME type are valid # @return [Dry::Monads::Failure] if the MIME type is unsupported or the payload is invalid - def validate_payload(payload, mime_type) + def validate(mime_type) unless MIME_TYPES.include?(mime_type.to_s) return Failure("Invalid MIME type '#{mime_type}'. Supported types are: #{MIME_TYPES.join(', ')}.") end - unless payload.is_a?(Hash) || payload.is_a?(String) - return Failure("Invalid payload type. Expected a Hash or String, but received #{payload.class}.") - end - - Success(payload.is_a?(Hash) ? payload.to_json : payload) + Success(mime_type.to_s) end # Encodes the payload based on the MIME type. # For 'application/zlib', compresses the payload using Zlib. # Logs the original and encoded payload sizes for debugging. # - # @param json_payload [String] the JSON stringified payload to encode + # @param data [String] the JSON stringified payload to encode # @param mime_type [String] the MIME type for encoding # # @return [Dry::Monads::Success] if encoding is successful # @return [Dry::Monads::Failure] if encoding fails - def encode(json_payload, mime_type) - encoded_data = Zlib.deflate(json_payload) if mime_type.to_s == 'application/zlib' + def encode(mime_type, payload) + case mime_type + when 'application/zlib' + json_payload = payload.to_json + encoded_data = Zlib.deflate(json_payload) + log_encoding_details(mime_type, json_payload, encoded_data) + when 'application/json' + encoded_data = payload.to_json + end + + Success(encoded_data || payload) + rescue JSON::GeneratorError => e + Failure("Failed to encode payload to JSON: #{e.message}") + rescue Zlib::Error => e + Failure("Failed to compress payload using Zlib: #{e.message}") + rescue StandardError => e + Failure("Unexpected error during encoding: #{e.message}") + end + # Logs details of the encoding process. + def log_encoding_details(mime_type, payload, encoded_data) logger.debug "*" * 80 logger.debug "Starting payload encoding for MIME type: '#{mime_type}'" - logger.debug "Original payload size: #{data_size_in_kb(json_payload)} KB" - logger.debug "Encoded payload size: #{data_size_in_kb(encoded_data)} KB" if encoded_data + logger.debug "Original payload size: #{data_size_in_kb(payload)} KB" + logger.debug "Encoded payload size: #{data_size_in_kb(encoded_data)} KB" logger.debug "*" * 80 - - Success(encoded_data || json_payload) - rescue Zlib::Error => e - Failure("Failed to compress payload using Zlib: #{e.message}") end # Calculates the size of the data in kilobytes (KB). diff --git a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb index 4dd1b2c4..a73e6d3a 100644 --- a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb @@ -15,6 +15,8 @@ class BunnyExchangeProxy # @attr_reader [EventSource::Protcols::Amqp::BunnyChannelProxy] channel_proxy the channel_proxy used to create this exchange attr_reader :subject, :channel_proxy + DefaultMimeType = 'application/json'.freeze + # @param [EventSource::AsyncApi::Channel] channel_proxy instance on which to open this Exchange # @param [Hash] exchange_bindings instance with configuration for this Exchange def initialize(channel_proxy, exchange_bindings) @@ -50,7 +52,6 @@ def publish(payload:, publish_bindings:, headers: {}) logger.debug "BunnyExchange#publish publishing message with bindings: #{bunny_publish_bindings.inspect}" - payload = payload.to_json unless is_binary?(payload) @subject.publish(payload, bunny_publish_bindings) logger.debug "BunnyExchange#publish published message: #{payload}" @@ -70,12 +71,6 @@ def message_id SecureRandom.uuid end - def is_binary?(payload) - return false unless payload.respond_to?(:encoding) - - payload.encoding == Encoding::BINARY - end - # Filtering and renaming AsyncAPI Operation bindings to Bunny/RabitMQ # bindings # diff --git a/lib/event_source/publish_operation.rb b/lib/event_source/publish_operation.rb index ddb7fb9f..13f12113 100644 --- a/lib/event_source/publish_operation.rb +++ b/lib/event_source/publish_operation.rb @@ -49,10 +49,7 @@ def call(payload, options = {}) # @return [String] The encoded payload, or the original payload if no encoding is specified. # @raise [EventSource::Error::PayloadEncodeError] if the encoding process fails. def encode_payload(payload) - return payload unless @async_api_publish_operation.message - - message_bindings = @async_api_publish_operation.message['bindings'] - encoding = message_bindings.first[1]['contentEncoding'] if message_bindings + encoding = determine_encoding return payload unless encoding output = EventSource::Operations::MimeEncode.new.call(encoding, payload) @@ -63,5 +60,19 @@ def encode_payload(payload) raise EventSource::Error::PayloadEncodeError, output.failure end end + + # Determines the encoding for the payload based on message bindings or protocol defaults. + # - If message bindings are present, uses the 'contentEncoding' value from the bindings. + # - If no message bindings are present and the protocol is AMQP, uses the default encoding for the AMQP protocol. Other protocols return nil. + def determine_encoding + message_bindings = @async_api_publish_operation.message&.dig('bindings') + return message_bindings.first[1]['contentEncoding'] if message_bindings.present? + + amqp_protocol? ? "#{subject.class}::DefaultMimeType".constantize : nil + end + + def amqp_protocol? + subject.is_a?(EventSource::Protocols::Amqp::BunnyExchangeProxy) + end end end diff --git a/spec/event_source/operations/mime_decode_spec.rb b/spec/event_source/operations/mime_decode_spec.rb index 7045177a..17ddf627 100644 --- a/spec/event_source/operations/mime_decode_spec.rb +++ b/spec/event_source/operations/mime_decode_spec.rb @@ -52,5 +52,47 @@ expect(result.failure).to eq("Payload must be binary-encoded for MIME type 'application/zlib'.") end end + + context "when the mime_type is 'application/zlib'" do + context "and the payload is a JSON string but not binary" do + let(:json_string) { "Invalid compressed data".to_json } + let(:mime_type) { "application/zlib" } + + it "passes validation" do + result = subject.call(mime_type, json_string) + + expect(result).to be_success + expect(result.value!).to eq(json_string) + end + end + + context "and the payload is neither binary nor valid JSON" do + let(:non_json_payload) { "Invalid compressed data" } + let(:mime_type) { "application/zlib" } + + it "returns a failure with a validation error message" do + result = subject.call(mime_type, non_json_payload) + + expect(result).to be_failure + expect(result.failure).to eq("Payload must be binary-encoded for MIME type 'application/zlib'.") + end + end + + context "and the payload is not binary and raises an error when parsed as JSON" do + let(:corrupted_json_payload) { "Invalid compressed data" } + let(:mime_type) { "application/zlib" } + + before do + allow(JSON).to receive(:parse).with(corrupted_json_payload).and_raise(JSON::ParserError) + end + + it "returns a failure with a validation error message" do + result = subject.call(mime_type, corrupted_json_payload) + + expect(result).to be_failure + expect(result.failure).to eq("Payload must be binary-encoded for MIME type 'application/zlib'.") + end + end + end end end diff --git a/spec/event_source/operations/mime_encode_spec.rb b/spec/event_source/operations/mime_encode_spec.rb index ca91814c..b2475d00 100644 --- a/spec/event_source/operations/mime_encode_spec.rb +++ b/spec/event_source/operations/mime_encode_spec.rb @@ -4,11 +4,15 @@ subject { described_class.new } describe "#call" do - context "when the payload and mime type are valid" do + + let(:valid_payload) { { key: 'value' } } + let(:invalid_payload) { -> {} } + + context "when MIME type is application/zlib" do let(:payload) { { message: "Hello, World!" } } let(:mime_type) { "application/zlib" } - it "successfully encodes the payload" do + it "compresses the payload using Zlib" do result = subject.call(mime_type, payload) expect(result).to be_success @@ -16,7 +20,7 @@ end end - context "when the payload is a string and mime type is valid" do + context "when MIME type is application/json" do let(:payload) { "Hello, World!" } let(:mime_type) { "application/json" } @@ -24,7 +28,7 @@ result = subject.call(mime_type, payload) expect(result).to be_success - expect(result.value!).to eq(payload) + expect(result.value!).to eq(payload.to_json) end end @@ -40,15 +44,42 @@ end end - context "when the payload is invalid" do - let(:payload) { 1000 } - let(:mime_type) { "application/json" } + context 'when payload cannot be converted to JSON' do + before do + allow(invalid_payload).to receive(:to_json).and_raise(JSON::GeneratorError) + end - it "returns a failure" do - result = subject.call(mime_type, payload) + it 'returns a failure with JSON::GeneratorError' do + result = subject.call('application/json', invalid_payload) + + expect(result).to be_failure + expect(result.failure).to match(/Failed to encode payload to JSON:/) + end + end + + context 'when Zlib compression fails' do + before do + allow(Zlib).to receive(:deflate).and_raise(Zlib::Error, 'Compression failed') + end + + it 'returns a failure with Zlib::Error' do + result = subject.call('application/zlib', valid_payload) + + expect(result).to be_failure + expect(result.failure).to eq('Failed to compress payload using Zlib: Compression failed') + end + end + + context 'when an unexpected error occurs' do + before do + allow(valid_payload).to receive(:to_json).and_raise(StandardError, 'something went wrong') + end + + it 'returns a failure with StandardError' do + result = subject.call('application/json', valid_payload) expect(result).to be_failure - expect(result.failure).to eq("Invalid payload type. Expected a Hash or String, but received Integer.") + expect(result.failure).to eq('Unexpected error during encoding: something went wrong') end end end diff --git a/spec/event_source/protocols/amqp/bunny_exchange_proxy_spec.rb b/spec/event_source/protocols/amqp/bunny_exchange_proxy_spec.rb index d9a2015c..5cee63b4 100644 --- a/spec/event_source/protocols/amqp/bunny_exchange_proxy_spec.rb +++ b/spec/event_source/protocols/amqp/bunny_exchange_proxy_spec.rb @@ -28,7 +28,7 @@ describe '#publish' do it 'publishes the payload with the correct bindings and headers' do - subject.publish(payload: payload, publish_bindings: publish_bindings, headers: headers) + subject.publish(payload: payload.to_json, publish_bindings: publish_bindings, headers: headers) expect(bunny_exchange).to have_received(:publish).with(payload.to_json, { correlation_id: '12345', @@ -41,7 +41,7 @@ expect(subject.logger).to receive(:debug).with(/published message:/) expect(subject.logger).to receive(:debug).with(/published message to exchange:/) - subject.publish(payload: payload, publish_bindings: publish_bindings, headers: headers) + subject.publish(payload: payload.to_json, publish_bindings: publish_bindings, headers: headers) end context 'when the payload is binary' do From 4dd07bc72f0f57d5a75528f5095c4d6aa2bd353f Mon Sep 17 00:00:00 2001 From: Raghu Ram Date: Mon, 3 Feb 2025 14:21:34 -0500 Subject: [PATCH 4/8] make sure decode errors don't block the queue (#121) * add default mime type encode json * update documentation * spec fixes * refactor json string check and specs * don't block messages from processing when encountered decode error - added subscriber.ack to acknowledge the message * add spec fixes * added more logger messages --------- Signed-off-by: Raghu Ram --- lib/event_source/operations/mime_decode.rb | 9 +++++++++ .../protocols/amqp/bunny_queue_proxy.rb | 16 ++++++++++++++-- lib/event_source/publish_operation.rb | 2 +- .../protocols/amqp/bunny_queue_proxy_spec.rb | 5 ++++- spec/event_source/publish_operation_spec.rb | 2 +- 5 files changed, 29 insertions(+), 5 deletions(-) diff --git a/lib/event_source/operations/mime_decode.rb b/lib/event_source/operations/mime_decode.rb index 1f5bc8e4..12291695 100644 --- a/lib/event_source/operations/mime_decode.rb +++ b/lib/event_source/operations/mime_decode.rb @@ -79,6 +79,15 @@ def binary_payload?(payload) payload.encoding == Encoding::BINARY end + # For future reference, here is the implementation of the `zlib_compressed?` method: + # Binary encoding check alone is unreliable since the payload may not be zlib-compressed. + # Instead, verify if the payload starts with "\x78" to determine zlib compression. + def zlib_compressed?(payload) + return false unless payload.is_a?(String) + + payload.start_with?("\x78") + end + def valid_json_string?(data) data.is_a?(String) && JSON.parse(data) true diff --git a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb index 616d9c41..421c3303 100644 --- a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb @@ -135,7 +135,16 @@ def on_receive_message( subscriber = subscriber_klass.new subscriber.channel = @subject.channel - payload = decode_payload(payload) + + begin + payload = decode_payload(payload) + rescue EventSource::Error::PayloadDecodeError, StandardError => e + logger.error "Payload decoding failed: #{e.message} \n backtrace: #{e.backtrace.join("\n")} \n payload: #{payload}" + # Acknowledge the message so it doesn't block the queue + subscriber.ack(delivery_info.delivery_tag) + return + end + subscription_handler = EventSource::Protocols::Amqp::BunnyConsumerHandler.new( subscriber, @@ -171,7 +180,10 @@ def decode_payload(payload) if output.success? output.value! else - logger.error "Failed to decompress message \n due to: #{output.failure}" + logger.error '*' * 40 + logger.error payload.encoding if payload.respond_to?(:encoding) + logger.error payload.inspect + logger.error "Failed to decode message \n due to: #{output.failure} \n payload: #{payload}" raise EventSource::Error::PayloadDecodeError, output.failure end end diff --git a/lib/event_source/publish_operation.rb b/lib/event_source/publish_operation.rb index 13f12113..f8724dea 100644 --- a/lib/event_source/publish_operation.rb +++ b/lib/event_source/publish_operation.rb @@ -56,7 +56,7 @@ def encode_payload(payload) if output.success? output.value! else - logger.error "Failed to decompress message \n due to: #{output.failure}" + logger.error "Failed to encode message \n due to: #{output.failure}" raise EventSource::Error::PayloadEncodeError, output.failure end end diff --git a/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb b/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb index 5bbe2041..213d4387 100644 --- a/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb +++ b/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb @@ -408,7 +408,10 @@ end it 'logs the error and raises a PayloadDecodeError' do - expect(instance.logger).to receive(:error).with("Failed to decompress message \n due to: #{failure_message}") + expect(instance.logger).to receive(:error).with('*' * 40) + expect(instance.logger).to receive(:error).with(payload.encoding) if payload.respond_to?(:encoding) + expect(instance.logger).to receive(:error).with(payload.inspect) + expect(instance.logger).to receive(:error).with("Failed to decode message \n due to: #{failure_message} \n payload: #{payload}") expect do instance.decode_payload(payload) end.to raise_error(EventSource::Error::PayloadDecodeError, failure_message) diff --git a/spec/event_source/publish_operation_spec.rb b/spec/event_source/publish_operation_spec.rb index 105e2c75..25560506 100644 --- a/spec/event_source/publish_operation_spec.rb +++ b/spec/event_source/publish_operation_spec.rb @@ -188,7 +188,7 @@ end it 'logs the error and raises a PayloadEncodeError' do - expect(instance.logger).to receive(:error).with("Failed to decompress message \n due to: #{failure_message}") + expect(instance.logger).to receive(:error).with("Failed to encode message \n due to: #{failure_message}") expect do instance.encode_payload(payload) end.to raise_error(EventSource::Error::PayloadEncodeError, failure_message) From 1d1f6ebe33b4527c4f4b902c0f217846736caa0f Mon Sep 17 00:00:00 2001 From: Raghu Ram Date: Tue, 4 Feb 2025 09:11:15 -0500 Subject: [PATCH 5/8] add exception handling for zlib compressed data errors (#122) --- lib/event_source/operations/mime_decode.rb | 35 +++++++++---------- .../protocols/amqp/bunny_queue_proxy.rb | 4 --- .../operations/mime_decode_spec.rb | 16 ++++++++- .../protocols/amqp/bunny_queue_proxy_spec.rb | 4 --- 4 files changed, 32 insertions(+), 27 deletions(-) diff --git a/lib/event_source/operations/mime_decode.rb b/lib/event_source/operations/mime_decode.rb index 12291695..490aa0c8 100644 --- a/lib/event_source/operations/mime_decode.rb +++ b/lib/event_source/operations/mime_decode.rb @@ -8,6 +8,7 @@ module Operations # Operation for decoding payloads, including decompression using Zlib. class MimeDecode include Dry::Monads[:result, :do] + include EventSource::Logging # Supported MIME types for decoding. MIME_TYPES = %w[application/zlib application/json].freeze @@ -52,20 +53,27 @@ def validate_payload(payload, mime_type) Success([payload, mime_type]) end - # Decodes the payload using the specified MIME type. - # For 'application/zlib', it decompresses the payload using Zlib. + # Decodes the payload based on the specified MIME type. + # For 'application/zlib', it attempts to decompress the payload using Zlib. + # If decompression fails due to an error, the original payload is returned unmodified. # # @param payload [String] the payload to decode # @param mime_type [String] the MIME type of the payload # - # @return [Dry::Monads::Success] if decoding is successful - # @return [Dry::Monads::Failure] if decoding fails + # @return [Dry::Monads::Success] if decoding is successful or if the MIME type is not 'application/zlib'. + # @return [Dry::Monads::Success] if decompression fails, returning the original payload without modification. + # + # @note If the MIME type is 'application/zlib' and decompression fails, the original payload is returned as is, and no error is raised. def decode(payload, mime_type) - decoded_data = Zlib.inflate(payload) if mime_type == 'application/zlib' && binary_payload?(payload) - - Success(decoded_data || payload) - rescue Zlib::Error => e - Failure("Failed to decode payload using Zlib: #{e.message}") + return Success(payload) unless mime_type == 'application/zlib' + + begin + decoded_data = Zlib.inflate(payload) + Success(decoded_data) + rescue Zlib::Error => e + logger.error "Zlib errored while inflating payload: #{payload} \n with #{e.class}: #{e.message}, \n returning original payload." + Success(payload) + end end # Checks whether the payload is binary-encoded. @@ -79,15 +87,6 @@ def binary_payload?(payload) payload.encoding == Encoding::BINARY end - # For future reference, here is the implementation of the `zlib_compressed?` method: - # Binary encoding check alone is unreliable since the payload may not be zlib-compressed. - # Instead, verify if the payload starts with "\x78" to determine zlib compression. - def zlib_compressed?(payload) - return false unless payload.is_a?(String) - - payload.start_with?("\x78") - end - def valid_json_string?(data) data.is_a?(String) && JSON.parse(data) true diff --git a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb index 421c3303..b4e9b523 100644 --- a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb @@ -180,10 +180,6 @@ def decode_payload(payload) if output.success? output.value! else - logger.error '*' * 40 - logger.error payload.encoding if payload.respond_to?(:encoding) - logger.error payload.inspect - logger.error "Failed to decode message \n due to: #{output.failure} \n payload: #{payload}" raise EventSource::Error::PayloadDecodeError, output.failure end end diff --git a/spec/event_source/operations/mime_decode_spec.rb b/spec/event_source/operations/mime_decode_spec.rb index 17ddf627..65039570 100644 --- a/spec/event_source/operations/mime_decode_spec.rb +++ b/spec/event_source/operations/mime_decode_spec.rb @@ -93,6 +93,20 @@ expect(result.failure).to eq("Payload must be binary-encoded for MIME type 'application/zlib'.") end end - end + + context 'when Zlib.inflate raises an exception' do + let(:payload) { { message: "Hello, World!" } } + let(:invalid_compressed_payload) { Zlib.deflate(payload.to_json) } + + it 'returns the original payload wrapped in Success' do + allow(Zlib).to receive(:inflate).and_raise(Zlib::DataError, "invalid compressed data") + + result = subject.call('application/zlib', invalid_compressed_payload) + + expect(result).to be_a(Dry::Monads::Success) + expect(result.value!).to eq(invalid_compressed_payload) + end + end + end end end diff --git a/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb b/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb index 213d4387..0880f92e 100644 --- a/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb +++ b/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb @@ -408,10 +408,6 @@ end it 'logs the error and raises a PayloadDecodeError' do - expect(instance.logger).to receive(:error).with('*' * 40) - expect(instance.logger).to receive(:error).with(payload.encoding) if payload.respond_to?(:encoding) - expect(instance.logger).to receive(:error).with(payload.inspect) - expect(instance.logger).to receive(:error).with("Failed to decode message \n due to: #{failure_message} \n payload: #{payload}") expect do instance.decode_payload(payload) end.to raise_error(EventSource::Error::PayloadDecodeError, failure_message) From 841beb926bd8cd8a49a3ddf89235a051bf0a2ec6 Mon Sep 17 00:00:00 2001 From: vishal kalletla Date: Wed, 25 Feb 2026 14:31:36 -0500 Subject: [PATCH 6/8] handle graceful shutdown of consumers (#137) * handle graceful shutdown of Bunny (AMQP client) * rearrange methods * add more changes and specs * remove unused * add more specs * add railtie spec * fix typo * spec fix * fix path * revert spec config * revert spec * test spec * test one more change * remove unused code --- lib/event_source.rb | 21 ++++ lib/event_source/channel.rb | 7 ++ lib/event_source/configure/config.rb | 3 +- lib/event_source/connection_manager.rb | 46 +++++++++ .../protocols/amqp/bunny_queue_proxy.rb | 17 +++- lib/event_source/railtie.rb | 32 +++++++ spec/event_source/channel_spec.rb | 18 ++++ spec/event_source/connection_manager_spec.rb | 90 +++++++++++++++++ spec/rails_app/spec/railtie_spec.rb | 96 ++++++++++++++++++- 9 files changed, 323 insertions(+), 7 deletions(-) diff --git a/lib/event_source.rb b/lib/event_source.rb index b24ca3a1..2fdd8f5c 100644 --- a/lib/event_source.rb +++ b/lib/event_source.rb @@ -110,6 +110,27 @@ def register_subscriber(subscriber_klass) def register_publisher(subscriber_klass) boot_registry.register_publisher(subscriber_klass) end + + def inflight_messages_count + @inflight_mutex ||= Mutex.new + @inflight_mutex.synchronize { @inflight_messages_count ||= 0 } + end + + def increment_inflight_messages + @inflight_mutex ||= Mutex.new + @inflight_mutex.synchronize do + @inflight_messages_count ||= 0 + @inflight_messages_count += 1 + end + end + + def decrement_inflight_messages + @inflight_mutex ||= Mutex.new + @inflight_mutex.synchronize do + @inflight_messages_count ||= 0 + @inflight_messages_count = [@inflight_messages_count - 1, 0].max + end + end end class EventSourceLogger diff --git a/lib/event_source/channel.rb b/lib/event_source/channel.rb index fc6d951a..ac73fd2c 100644 --- a/lib/event_source/channel.rb +++ b/lib/event_source/channel.rb @@ -110,5 +110,12 @@ def add_subscribe_operation(async_api_channel_item) ) logger.info " Subscribe Operation Added: #{operation_id}" end + + def cancel_consumers + subscribe_operations.each_value do |sub_op| + subject = sub_op.subject + subject.cancel_consumers! if subject.respond_to?(:cancel_consumers!) + end + end end end diff --git a/lib/event_source/configure/config.rb b/lib/event_source/configure/config.rb index 6f74a670..796e5768 100644 --- a/lib/event_source/configure/config.rb +++ b/lib/event_source/configure/config.rb @@ -10,11 +10,12 @@ class Config def initialize @log_level = :warn + @shutdown_timeouts = { amqp_drain: 5, http_drain: 5 } end # TODO: add default for pub_sub_root attr_writer :pub_sub_root, :protocols, :server_configurations - attr_accessor :app_name, :log_level + attr_accessor :app_name, :log_level, :auto_shutdown, :shutdown_timeouts def load_protocols @protocols.each do |protocol| diff --git a/lib/event_source/connection_manager.rb b/lib/event_source/connection_manager.rb index 58df84ab..e032af6b 100644 --- a/lib/event_source/connection_manager.rb +++ b/lib/event_source/connection_manager.rb @@ -160,8 +160,54 @@ def drop_connection(connection_uri) connections end + def cancel_consumers_for(protocol, timeout: 5) + protocol_value = protocol.to_s.upcase + logger.info "Cancelling #{protocol_value} consumers prior to shutdown" + connections = connections_for(protocol) + any_consumers = connections.any? { |connection| connection_has_consumers?(connection) } + logger.info "#{protocol_value} inflight handlers (before_cancel): #{EventSource.inflight_messages_count}, any_consumers=#{any_consumers}" + + connections.each do |connection| + connection.channels.each_value do |channel| + channel.cancel_consumers + end + end + + wait_for_connections_to_drain(connections, timeout) + + logger.info "#{protocol_value} consumer cancellation complete" + logger.info "#{protocol_value} inflight handlers (end): #{EventSource.inflight_messages_count}" + end + private + def wait_for_connections_to_drain(connections, timeout) + return if connections.empty? + + protocol = connections.first.protocol.to_s.upcase + start = Time.now + + loop do + inflight = EventSource.inflight_messages_count + any_consumers = connections.any? { |connection| connection_has_consumers?(connection) } + + logger.info "#{protocol} inflight handlers (drain): #{inflight}, any_consumers=#{any_consumers}" + break if inflight.zero? && !any_consumers + break if (Time.now - start) >= timeout + sleep 0.1 + end + end + + def connection_has_consumers?(connection) + connection.channels.values.any? do |channel| + begin + channel.channel_proxy.subject.any_consumers? + rescue StandardError + false + end + end + end + # Find connection proxy class for given protocol # @param [Symbol] protocol the protocol name, `:http` or `:amqp` # @return [Class] Protocol Specific Connection Proxy Class diff --git a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb index b4e9b523..a8cefa10 100644 --- a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb @@ -91,7 +91,7 @@ def spawn_thread(options) end def add_consumer(subscriber_klass, options) - @subject.subscribe(options) do |delivery_info, metadata, payload| + consumer = @subject.subscribe(options) do |delivery_info, metadata, payload| on_receive_message( subscriber_klass, delivery_info, @@ -99,6 +99,7 @@ def add_consumer(subscriber_klass, options) payload ) end + @consumers << consumer if consumer end def convert_to_subscribe_options(options) @@ -124,6 +125,7 @@ def on_receive_message( metadata, payload ) + EventSource.increment_inflight_messages logger.debug '**************************' logger.debug subscriber_klass.inspect logger.debug delivery_info.inspect @@ -158,6 +160,7 @@ def on_receive_message( logger.error "Bunny Consumer Error \n message: #{e.message} \n backtrace: #{e.backtrace.join("\n")}" ensure subscriber = nil + EventSource.decrement_inflight_messages end # Decodes the given payload based on the `contentEncoding` specified in the AsyncAPI *_subscribe.yml message bindings. @@ -198,6 +201,18 @@ def method_missing(name, *args) @subject.send(name, *args) end + # Cancel all registered consumers for this queue + def cancel_consumers! + @consumers.each do |consumer| + begin + consumer.cancel + rescue StandardError => e + logger.info "Consumer cancellation error: #{e.message}" + end + end + @consumers.clear + end + private def subscriber_klass_name_to_suffix(subscriber_klass) diff --git a/lib/event_source/railtie.rb b/lib/event_source/railtie.rb index 77dcc66b..3b0e7254 100644 --- a/lib/event_source/railtie.rb +++ b/lib/event_source/railtie.rb @@ -1,9 +1,41 @@ # frozen_string_literal: true +require 'logger' + module EventSource # :nodoc: module Railtie Rails::Application::Finisher.initializer "event_source.boot", after: :finisher_hook do + logger = Logger.new($stdout) + logger.progname = 'EventSource graceful shutdown' + timeouts = EventSource.config.shutdown_timeouts || {} + amqp_timeout = timeouts[:amqp_drain] || 5 + + # Perform shutdown work outside of trap/at_exit context to avoid + # ThreadError from mutex operations within Bunny (AMQP client). + shutdown = lambda do |reason| + Thread.new do + begin + logger.info "#{reason}, starting graceful shutdown" + logger.info "AMQP inflight handlers at shutdown start: #{EventSource.inflight_messages_count}" + cm = EventSource::ConnectionManager.instance + + # Stop consuming and allow in-flight handlers to drain briefly + cm.cancel_consumers_for(:amqp, timeout: amqp_timeout) + cm.drop_connections_for(:amqp) + rescue => e + logger.error "graceful shutdown error: #{e.class}: #{e.message}" + end + end.join + end + + if EventSource.config.auto_shutdown + at_exit { shutdown.call('at_exit received') } + + %w[TERM INT].each do |sig| + Signal.trap(sig) { shutdown.call("signal=#{sig} received") } + end + end EventSource.initialize! end end diff --git a/spec/event_source/channel_spec.rb b/spec/event_source/channel_spec.rb index 3f1020cc..ada8f37e 100644 --- a/spec/event_source/channel_spec.rb +++ b/spec/event_source/channel_spec.rb @@ -92,5 +92,23 @@ expect(audit_log_queue.pop.last).to eq 'test message from enterprise events!!' expect(audit_log_queue.pop.last).to eq 'test message from enrollment events!!' end + + context '.cancel_consumers' do + before do + subject.subscribe_operations.values.each do |sub_op| + subscriber_klass = double('SubscriberKlass') + sub_op.subscribe(subscriber_klass) + end + end + + it 'cancels consumers via channel' do + channel = subject + expect(channel.channel_proxy.any_consumers?).to be_truthy + expect(channel.subscribe_operations.values.first.subject.consumers.count).to be > 0 + channel.cancel_consumers + expect(channel.channel_proxy.any_consumers?).to be_falsey + expect(channel.subscribe_operations.values.first.subject.consumers.count).to eq 0 + end + end end end diff --git a/spec/event_source/connection_manager_spec.rb b/spec/event_source/connection_manager_spec.rb index da9d30cf..fb24a997 100644 --- a/spec/event_source/connection_manager_spec.rb +++ b/spec/event_source/connection_manager_spec.rb @@ -89,6 +89,70 @@ ).to eq Hash.new end end + + context '.cancel_consumers_for' do + let(:connection) { connection_manager.connections[connection_url] } + + let(:async_api_file) do + Pathname.pwd.join('spec', 'support', 'asyncapi', 'polypress_amqp.yml') + end + + let(:async_api_channels) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call(path: async_api_file) + .success + .channels + end + + let(:channel) do + connection.channels[:'on_polypress.magi_medicaid.mitc.eligibilities'] + end + + before do + connection.start unless connection.active? + connection.add_channels(async_api_channels) + channel.subscribe_operations.values.each do |sub_op| + subscriber_klass = double('SubscriberKlass') + sub_op.subscribe(subscriber_klass) + end + end + + after { connection.disconnect if connection.active? } + + context 'when inflight messages are present' do + before do + allow(EventSource).to receive(:inflight_messages_count).and_return(5) + end + + it 'waits for timeout' do + expect(channel).to receive(:cancel_consumers).and_call_original + connection_manager.cancel_consumers_for(protocol, timeout: 1) + end + end + + context 'when inflight messages are draining' do + before do + allow(EventSource).to receive(:inflight_messages_count).and_return(1, 0) + end + + it 'waits for drain' do + expect(channel).to receive(:cancel_consumers).and_call_original + connection_manager.cancel_consumers_for(protocol, timeout: 5) + end + end + + context 'when no inflight messages are present' do + before do + allow(EventSource).to receive(:inflight_messages_count).and_return(0) + end + + it 'cancels AMQP consumers on each channel without waiting' do + expect(channel).to receive(:cancel_consumers).and_call_original + connection_manager.cancel_consumers_for(protocol, timeout: 1) + end + end + end end end end @@ -160,5 +224,31 @@ end end end + + context 'when no connections are present + - .cancel_consumers_for' do + let(:protocol) { :amqp } + let(:connection) { instance_double('EventSource::Connection', protocol: protocol, channels: { default: channel }) } + let(:channel) { instance_double('EventSource::Channel') } + + before do + allow(EventSource).to receive(:inflight_messages_count).and_return(0) + end + + context 'does not raise error' do + before do + allow(connection_manager).to receive(:connections_for).with(protocol).and_return([]) + allow(connection_manager).to receive(:wait_for_connections_to_drain) + end + + it 'and still calls drain helper' do + expect do + connection_manager.cancel_consumers_for(protocol, timeout: 1) + end.not_to raise_error + + expect(connection_manager).to have_received(:wait_for_connections_to_drain).with([], 1) + end + end + end end end diff --git a/spec/rails_app/spec/railtie_spec.rb b/spec/rails_app/spec/railtie_spec.rb index 7c93c25f..ce6c8a1f 100644 --- a/spec/rails_app/spec/railtie_spec.rb +++ b/spec/rails_app/spec/railtie_spec.rb @@ -1,11 +1,97 @@ # frozen_string_literal: true -require_relative './rails_helper' +require_relative 'rails_helper' RSpec.describe EventSource::Railtie do - it "runs when invoked" do - manager = EventSource::ConnectionManager.instance - connection = manager.connections_for(:amqp).first - expect(connection).not_to be_nil + before do + @original_auto_shutdown = EventSource.config.auto_shutdown + EventSource.config.auto_shutdown = auto_shutdown_enabled + @at_exit_handler = nil + allow_any_instance_of(Object).to receive(:at_exit) do |_, &blk| + @at_exit_handler = blk + end + initializer = Rails.application.initializers.find { |i| i.name == 'event_source.boot' } + initializer.run(Rails.application) + end + + context '.auto_shutdown' do + let(:protocol) { :amqp } + let(:url) { 'amqp://localhost:5672/' } + let(:protocol_version) { '0.9.1' } + let(:description) { 'Development RabbitMQ Server' } + let(:server_config) do + { + ref: url, + url: url, + protocol: protocol, + protocol_version: protocol_version, + description: description + } + end + + let(:connection_manager) { EventSource::ConnectionManager.instance } + let(:connection) do + connection_manager.add_connection(server_config) + connection_manager.connections_for(:amqp).first + end + + let(:async_api_file) do + Pathname.new(__dir__).join('..', '..', 'support', 'asyncapi', 'polypress_amqp.yml').expand_path + end + + let(:async_api_channels) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call(path: async_api_file) + .success + .channels + end + + let(:channel) do + connection.channels[:'on_polypress.magi_medicaid.mitc.eligibilities'] + end + + before do + connection_manager.drop_connections_for(:amqp) + connection_manager.drop_connections_for(:http) + connection.start unless connection.active? + connection.add_channels(async_api_channels) + + channel.subscribe_operations.each_value do |subscribe_operation| + subscriber_klass = double('SubscriberKlass') + subscribe_operation.subscribe(subscriber_klass) + end + + @original_timeouts = EventSource.config.shutdown_timeouts + EventSource.config.shutdown_timeouts = { amqp_drain: 2 } + allow(EventSource).to receive(:inflight_messages_count).and_return(0) + end + + after do + EventSource.config.shutdown_timeouts = @original_timeouts + EventSource.config.auto_shutdown = @original_auto_shutdown + end + + context 'when auto_shutdown is enabled' do + let(:auto_shutdown_enabled) { true } + + it 'cancels AMQP consumers and drops AMQP connections on shutdown' do + expect(connection_manager.connections_for(:amqp)).not_to be_empty + expect(connection.channels).not_to be_empty + expect(channel.subscribe_operations.values.first.subject.consumers).not_to be_empty + expect(@at_exit_handler).not_to be_nil + @at_exit_handler.call + + expect(connection_manager.connections_for(:amqp)).to be_empty + end + end + + context 'when auto_shutdown is disabled' do + let(:auto_shutdown_enabled) { false } + + it 'does not register at_exit handler' do + expect(connection_manager.connections_for(:amqp)).not_to be_empty + end + end end end From 61e82fd8ea22fbc64bea179cfff265dad8acfe28 Mon Sep 17 00:00:00 2001 From: vkghub Date: Fri, 27 Feb 2026 12:52:31 -0500 Subject: [PATCH 7/8] spec fix --- spec/rails_app/spec/http_service_integration_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/rails_app/spec/http_service_integration_spec.rb b/spec/rails_app/spec/http_service_integration_spec.rb index 007eb983..13061b85 100644 --- a/spec/rails_app/spec/http_service_integration_spec.rb +++ b/spec/rails_app/spec/http_service_integration_spec.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -require_relative './rails_helper' +require_relative 'rails_helper' RSpec.describe "with an http subscriber service" do include EventSource::Command From c2da07f86b09e9f646e41b6a1d390ae8bb3660e7 Mon Sep 17 00:00:00 2001 From: vkghub Date: Fri, 27 Feb 2026 12:57:03 -0500 Subject: [PATCH 8/8] fix one more spec --- spec/event_source/rails_application_spec.rb | 25 --------------------- 1 file changed, 25 deletions(-) delete mode 100644 spec/event_source/rails_application_spec.rb diff --git a/spec/event_source/rails_application_spec.rb b/spec/event_source/rails_application_spec.rb deleted file mode 100644 index 7f77fa79..00000000 --- a/spec/event_source/rails_application_spec.rb +++ /dev/null @@ -1,25 +0,0 @@ -require "spec_helper" -require "parallel_tests" -require "parallel_tests/rspec/runner" - -RSpec.describe EventSource, "rails specs" do - it "runs the rails tests in the rails application context" do - ParallelTests.with_pid_file do - specs_run_result = ParallelTests::RSpec::Runner.run_tests( - [ - "spec/rails_app/spec/railtie_spec.rb", - "spec/rails_app/spec/http_service_integration_spec.rb" - ], - 1, - 1, - { - serialize_stdout: true, - test_options: ["-O", ".rspec_rails_specs", "--format", "documentation"] - } - ) - if specs_run_result[:exit_status] != 0 - fail(specs_run_result[:stdout] + "\n\n") - end - end - end -end \ No newline at end of file