diff --git a/.github/workflows/rspec.yml b/.github/workflows/rspec.yml index 57363046..3a6a9315 100644 --- a/.github/workflows/rspec.yml +++ b/.github/workflows/rspec.yml @@ -6,7 +6,7 @@ jobs: strategy: fail-fast: false matrix: - ruby_version: ['2.6.3', '2.7.5', '3.0.5', '3.1.4', '3.2.2'] + ruby_version: ['2.7.5', '3.0.5', '3.1.4', '3.2.2'] runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 diff --git a/.rspec b/.rspec index 34c5164d..8433ecd9 100644 --- a/.rspec +++ b/.rspec @@ -1,3 +1,4 @@ --format documentation --color --require spec_helper +--exclude-pattern "spec/rails_app/**/*" diff --git a/.rspec_rails_specs b/.rspec_rails_specs new file mode 100644 index 00000000..791d0cba --- /dev/null +++ b/.rspec_rails_specs @@ -0,0 +1,3 @@ +--format documentation +--color +--exclude-pattern "**/*" diff --git a/Gemfile b/Gemfile index 55e26d2d..df419cb2 100644 --- a/Gemfile +++ b/Gemfile @@ -8,6 +8,7 @@ gemspec group :development, :test do gem "rails", '>= 6.1.4' gem "rspec-rails" + gem "parallel_tests" gem "pry", platform: :mri, require: false gem "pry-byebug", platform: :mri, require: false gem 'rubocop' diff --git a/lib/event_source.rb b/lib/event_source.rb index 09fb8514..2fdd8f5c 100644 --- a/lib/event_source.rb +++ b/lib/event_source.rb @@ -38,10 +38,13 @@ require 'event_source/event' require 'event_source/subscriber' require 'event_source/operations/codec64' +require 'event_source/operations/mime_encode' +require 'event_source/operations/mime_decode' require 'event_source/operations/create_message' require 'event_source/operations/fetch_session' require 'event_source/operations/build_message_options' require 'event_source/operations/build_message' +require 'event_source/boot_registry' # Event source provides ability to compose, publish and subscribe to events module EventSource @@ -65,14 +68,23 @@ class << self :async_api_schemas= def configure + @configured = true yield(config) end - def initialize! - load_protocols - create_connections - load_async_api_resources - load_components + def initialize!(force = false) + # Don't boot if I was never configured. + return unless @configured + boot_registry.boot!(force) do + load_protocols + create_connections + load_async_api_resources + load_components + end + end + + def boot_registry + @boot_registry ||= EventSource::BootRegistry.new end def config @@ -90,6 +102,35 @@ def build_async_api_resource(resource) .call(resource) .success end + + def register_subscriber(subscriber_klass) + boot_registry.register_subscriber(subscriber_klass) + end + + def register_publisher(subscriber_klass) + boot_registry.register_publisher(subscriber_klass) + end + + def inflight_messages_count + @inflight_mutex ||= Mutex.new + @inflight_mutex.synchronize { @inflight_messages_count ||= 0 } + end + + def increment_inflight_messages + @inflight_mutex ||= Mutex.new + @inflight_mutex.synchronize do + @inflight_messages_count ||= 0 + @inflight_messages_count += 1 + end + end + + def decrement_inflight_messages + @inflight_mutex ||= Mutex.new + @inflight_mutex.synchronize do + @inflight_messages_count ||= 0 + @inflight_messages_count = [@inflight_messages_count - 1, 0].max + end + end end class EventSourceLogger diff --git a/lib/event_source/boot_registry.rb b/lib/event_source/boot_registry.rb new file mode 100644 index 00000000..6c0a7b02 --- /dev/null +++ b/lib/event_source/boot_registry.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +require 'set' +require 'monitor' + +module EventSource + # This class manages correct/loading of subscribers and publishers + # based on the current stage of the EventSource lifecycle. + # + # Depending on both the time the initialization of EventSource is invoked + # and when subscriber/publisher code is loaded, this can become complicated. + # This is largely caused by two confounding factors: + # 1. We want to delay initialization of EventSource until Rails is fully + # 'ready' + # 2. Based on the Rails environment, such as production, development, or + # test (primarily how those different environments treat lazy vs. eager + # loading of classes in a Rails application), subscriber and publisher + # code can be loaded before, after, or sometimes even DURING the + # EventSource boot process - we need to support all models + class BootRegistry + def initialize + @unbooted_publishers = Set.new + @unbooted_subscribers = Set.new + @booted_publishers = Set.new + @booted_subscribers = Set.new + # This is our re-entrant mutex. We're going to use it to make sure that + # registration and boot methods aren't allowed to simultaneously alter + # our state. You'll notice most methods on this class are wrapped in + # synchronize calls against this. + @bootex = Monitor.new + @booted = false + end + + def boot!(force = false) + @bootex.synchronize do + return if @booted && !force + yield + boot_publishers! + boot_subscribers! + @booted = true + end + end + + # Register a publisher for EventSource. + # + # If the EventSource hasn't been booted, save publisher for later. + # Otherwise, boot it now. + def register_publisher(publisher_klass) + @bootex.synchronize do + if @booted + publisher_klass.validate + @booted_publishers << publisher_klass + else + @unbooted_publishers << publisher_klass + end + end + end + + # Register a subscriber for EventSource. + # + # If the EventSource hasn't been booted, save the subscriber for later. + # Otherwise, boot it now. + def register_subscriber(subscriber_klass) + @bootex.synchronize do + if @booted + subscriber_klass.create_subscription + @booted_subscribers << subscriber_klass + else + @unbooted_subscribers << subscriber_klass + end + end + end + + # Boot the publishers. + def boot_publishers! + @bootex.synchronize do + @unbooted_publishers.each do |pk| + pk.validate + @booted_publishers << pk + end + @unbooted_publishers = Set.new + end + end + + # Boot the subscribers. + def boot_subscribers! + @bootex.synchronize do + @unbooted_subscribers.each do |sk| + sk.create_subscription + @booted_subscribers << sk + end + @unbooted_subscribers = Set.new + end + end + end +end \ No newline at end of file diff --git a/lib/event_source/channel.rb b/lib/event_source/channel.rb index fc6d951a..ac73fd2c 100644 --- a/lib/event_source/channel.rb +++ b/lib/event_source/channel.rb @@ -110,5 +110,12 @@ def add_subscribe_operation(async_api_channel_item) ) logger.info " Subscribe Operation Added: #{operation_id}" end + + def cancel_consumers + subscribe_operations.each_value do |sub_op| + subject = sub_op.subject + subject.cancel_consumers! if subject.respond_to?(:cancel_consumers!) + end + end end end diff --git a/lib/event_source/configure/config.rb b/lib/event_source/configure/config.rb index 6f74a670..796e5768 100644 --- a/lib/event_source/configure/config.rb +++ b/lib/event_source/configure/config.rb @@ -10,11 +10,12 @@ class Config def initialize @log_level = :warn + @shutdown_timeouts = { amqp_drain: 5, http_drain: 5 } end # TODO: add default for pub_sub_root attr_writer :pub_sub_root, :protocols, :server_configurations - attr_accessor :app_name, :log_level + attr_accessor :app_name, :log_level, :auto_shutdown, :shutdown_timeouts def load_protocols @protocols.each do |protocol| diff --git a/lib/event_source/connection_manager.rb b/lib/event_source/connection_manager.rb index 58df84ab..e032af6b 100644 --- a/lib/event_source/connection_manager.rb +++ b/lib/event_source/connection_manager.rb @@ -160,8 +160,54 @@ def drop_connection(connection_uri) connections end + def cancel_consumers_for(protocol, timeout: 5) + protocol_value = protocol.to_s.upcase + logger.info "Cancelling #{protocol_value} consumers prior to shutdown" + connections = connections_for(protocol) + any_consumers = connections.any? { |connection| connection_has_consumers?(connection) } + logger.info "#{protocol_value} inflight handlers (before_cancel): #{EventSource.inflight_messages_count}, any_consumers=#{any_consumers}" + + connections.each do |connection| + connection.channels.each_value do |channel| + channel.cancel_consumers + end + end + + wait_for_connections_to_drain(connections, timeout) + + logger.info "#{protocol_value} consumer cancellation complete" + logger.info "#{protocol_value} inflight handlers (end): #{EventSource.inflight_messages_count}" + end + private + def wait_for_connections_to_drain(connections, timeout) + return if connections.empty? + + protocol = connections.first.protocol.to_s.upcase + start = Time.now + + loop do + inflight = EventSource.inflight_messages_count + any_consumers = connections.any? { |connection| connection_has_consumers?(connection) } + + logger.info "#{protocol} inflight handlers (drain): #{inflight}, any_consumers=#{any_consumers}" + break if inflight.zero? && !any_consumers + break if (Time.now - start) >= timeout + sleep 0.1 + end + end + + def connection_has_consumers?(connection) + connection.channels.values.any? do |channel| + begin + channel.channel_proxy.subject.any_consumers? + rescue StandardError + false + end + end + end + # Find connection proxy class for given protocol # @param [Symbol] protocol the protocol name, `:http` or `:amqp` # @return [Class] Protocol Specific Connection Proxy Class diff --git a/lib/event_source/error.rb b/lib/event_source/error.rb index ade7918a..e7dc002d 100644 --- a/lib/event_source/error.rb +++ b/lib/event_source/error.rb @@ -32,5 +32,7 @@ class Error < StandardError ServerConfigurationNotFound = Class.new(Error) ServerConfigurationInvalid = Class.new(Error) MessageBuildError = Class.new(Error) + PayloadEncodeError = Class.new(Error) + PayloadDecodeError = Class.new(Error) end end diff --git a/lib/event_source/operations/mime_decode.rb b/lib/event_source/operations/mime_decode.rb new file mode 100644 index 00000000..490aa0c8 --- /dev/null +++ b/lib/event_source/operations/mime_decode.rb @@ -0,0 +1,98 @@ +# frozen_string_literal: true + +require "dry/monads" +require "dry/monads/do" + +module EventSource + module Operations + # Operation for decoding payloads, including decompression using Zlib. + class MimeDecode + include Dry::Monads[:result, :do] + include EventSource::Logging + + # Supported MIME types for decoding. + MIME_TYPES = %w[application/zlib application/json].freeze + + # Decodes the payload based on the specified MIME type. + # For example, decompresses the payload using Zlib for 'application/zlib'. + # + # @param mime_type [String] the MIME type of the payload (e.g., 'application/zlib', 'application/json') + # @param payload [String] the encoded payload to decode + # + # @return [Dry::Monads::Success] if decoding is successful + # @return [Dry::Monads::Failure] if an error occurs (e.g., invalid MIME type, decoding failure) + def call(mime_type, payload) + valid_payload, mime_type = yield validate_payload(payload, mime_type.to_s) + decoded_data = yield decode(valid_payload, mime_type) + + Success(decoded_data) + end + + private + + # Validates the payload based on the MIME type. + # Ensures the payload is binary-encoded for 'application/zlib' MIME type. + # + # @param payload [String] the payload to validate + # @param mime_type [String] the MIME type of the payload + # + # @return [Dry::Monads::Success] if the payload is valid + # @return [Dry::Monads::Failure] if the payload is invalid + def validate_payload(payload, mime_type) + unless MIME_TYPES.include?(mime_type) + return Failure("Invalid MIME type '#{mime_type}'. Supported types are: #{MIME_TYPES.join(', ')}.") + end + + # Allow JSON string payloads to pass validation to avoid processing failures + # for existing JSON messages in the queue. These messages may have been queued + # with the wrong MIME type ('application/zlib') but are still valid JSON. + if mime_type == 'application/zlib' + return Failure("Payload must be binary-encoded for MIME type 'application/zlib'.") unless binary_payload?(payload) || valid_json_string?(payload) + end + + Success([payload, mime_type]) + end + + # Decodes the payload based on the specified MIME type. + # For 'application/zlib', it attempts to decompress the payload using Zlib. + # If decompression fails due to an error, the original payload is returned unmodified. + # + # @param payload [String] the payload to decode + # @param mime_type [String] the MIME type of the payload + # + # @return [Dry::Monads::Success] if decoding is successful or if the MIME type is not 'application/zlib'. + # @return [Dry::Monads::Success] if decompression fails, returning the original payload without modification. + # + # @note If the MIME type is 'application/zlib' and decompression fails, the original payload is returned as is, and no error is raised. + def decode(payload, mime_type) + return Success(payload) unless mime_type == 'application/zlib' + + begin + decoded_data = Zlib.inflate(payload) + Success(decoded_data) + rescue Zlib::Error => e + logger.error "Zlib errored while inflating payload: #{payload} \n with #{e.class}: #{e.message}, \n returning original payload." + Success(payload) + end + end + + # Checks whether the payload is binary-encoded. + # + # @param payload [String] the payload to check + # + # @return [Boolean] true if the payload is binary-encoded, false otherwise + def binary_payload?(payload) + return false unless payload.respond_to?(:encoding) + + payload.encoding == Encoding::BINARY + end + + def valid_json_string?(data) + data.is_a?(String) && JSON.parse(data) + true + rescue JSON::ParserError + false + end + end + end +end diff --git a/lib/event_source/operations/mime_encode.rb b/lib/event_source/operations/mime_encode.rb new file mode 100644 index 00000000..4bd551cb --- /dev/null +++ b/lib/event_source/operations/mime_encode.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +require "dry/monads" +require "dry/monads/do" + +module EventSource + module Operations + # Operation for encoding payloads into specified MIME types. + # For example, it supports compression using Zlib for 'application/zlib'. + class MimeEncode + include Dry::Monads[:result, :do] + include EventSource::Logging + + # Supported MIME types for encoding. + MIME_TYPES = %w[application/zlib application/json].freeze + + # Encodes the given payload into the specified MIME type. + # For example, compresses the payload using Zlib for 'application/zlib'. + # + # @param mime_type [String] the MIME type for encoding (e.g., 'application/zlib', 'application/json') + # @param payload [Any] the payload to encode; + # + # @return [Dry::Monads::Success] if encoding is successful + # @return [Dry::Monads::Failure] if an error occurs (e.g., invalid MIME type, payload type, or encoding failure) + def call(mime_type, payload) + mime_type = yield validate(mime_type) + encoded_data = yield encode(mime_type, payload) + + Success(encoded_data) + end + + private + + # Validates theMIME type before encoding. + # Ensures the MIME type is supported + # + # @param mime_type [String] the MIME type for encoding + # + # @return [Dry::Monads::Success] if the payload and MIME type are valid + # @return [Dry::Monads::Failure] if the MIME type is unsupported or the payload is invalid + def validate(mime_type) + unless MIME_TYPES.include?(mime_type.to_s) + return Failure("Invalid MIME type '#{mime_type}'. Supported types are: #{MIME_TYPES.join(', ')}.") + end + + Success(mime_type.to_s) + end + + # Encodes the payload based on the MIME type. + # For 'application/zlib', compresses the payload using Zlib. + # Logs the original and encoded payload sizes for debugging. + # + # @param data [String] the JSON stringified payload to encode + # @param mime_type [String] the MIME type for encoding + # + # @return [Dry::Monads::Success] if encoding is successful + # @return [Dry::Monads::Failure] if encoding fails + def encode(mime_type, payload) + case mime_type + when 'application/zlib' + json_payload = payload.to_json + encoded_data = Zlib.deflate(json_payload) + log_encoding_details(mime_type, json_payload, encoded_data) + when 'application/json' + encoded_data = payload.to_json + end + + Success(encoded_data || payload) + rescue JSON::GeneratorError => e + Failure("Failed to encode payload to JSON: #{e.message}") + rescue Zlib::Error => e + Failure("Failed to compress payload using Zlib: #{e.message}") + rescue StandardError => e + Failure("Unexpected error during encoding: #{e.message}") + end + + # Logs details of the encoding process. + def log_encoding_details(mime_type, payload, encoded_data) + logger.debug "*" * 80 + logger.debug "Starting payload encoding for MIME type: '#{mime_type}'" + logger.debug "Original payload size: #{data_size_in_kb(payload)} KB" + logger.debug "Encoded payload size: #{data_size_in_kb(encoded_data)} KB" + logger.debug "*" * 80 + end + + # Calculates the size of the data in kilobytes (KB). + # + # @param data [String] the data whose size is to be calculated + # + # @return [Float] the size of the data in KB, rounded to two decimal places + def data_size_in_kb(data) + (data.bytesize / 1024.0).round(2) + end + end + end +end diff --git a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb index 28e735c1..a73e6d3a 100644 --- a/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_exchange_proxy.rb @@ -15,6 +15,8 @@ class BunnyExchangeProxy # @attr_reader [EventSource::Protcols::Amqp::BunnyChannelProxy] channel_proxy the channel_proxy used to create this exchange attr_reader :subject, :channel_proxy + DefaultMimeType = 'application/json'.freeze + # @param [EventSource::AsyncApi::Channel] channel_proxy instance on which to open this Exchange # @param [Hash] exchange_bindings instance with configuration for this Exchange def initialize(channel_proxy, exchange_bindings) @@ -49,7 +51,9 @@ def publish(payload:, publish_bindings:, headers: {}) bunny_publish_bindings[:headers] = headers unless headers.empty? logger.debug "BunnyExchange#publish publishing message with bindings: #{bunny_publish_bindings.inspect}" - @subject.publish(payload.to_json, bunny_publish_bindings) + + @subject.publish(payload, bunny_publish_bindings) + logger.debug "BunnyExchange#publish published message: #{payload}" logger.debug "BunnyExchange#publish published message to exchange: #{@subject.name}" end diff --git a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb index 89c07b58..a8cefa10 100644 --- a/lib/event_source/protocols/amqp/bunny_queue_proxy.rb +++ b/lib/event_source/protocols/amqp/bunny_queue_proxy.rb @@ -25,6 +25,7 @@ class BunnyQueueProxy # @return [Bunny::Queue] def initialize(channel_proxy, async_api_channel_item) @channel_proxy = channel_proxy + @async_api_channel_item = async_api_channel_item bindings = async_api_channel_item.bindings @consumers = [] @@ -90,7 +91,7 @@ def spawn_thread(options) end def add_consumer(subscriber_klass, options) - @subject.subscribe(options) do |delivery_info, metadata, payload| + consumer = @subject.subscribe(options) do |delivery_info, metadata, payload| on_receive_message( subscriber_klass, delivery_info, @@ -98,6 +99,7 @@ def add_consumer(subscriber_klass, options) payload ) end + @consumers << consumer if consumer end def convert_to_subscribe_options(options) @@ -123,6 +125,7 @@ def on_receive_message( metadata, payload ) + EventSource.increment_inflight_messages logger.debug '**************************' logger.debug subscriber_klass.inspect logger.debug delivery_info.inspect @@ -135,6 +138,15 @@ def on_receive_message( subscriber = subscriber_klass.new subscriber.channel = @subject.channel + begin + payload = decode_payload(payload) + rescue EventSource::Error::PayloadDecodeError, StandardError => e + logger.error "Payload decoding failed: #{e.message} \n backtrace: #{e.backtrace.join("\n")} \n payload: #{payload}" + # Acknowledge the message so it doesn't block the queue + subscriber.ack(delivery_info.delivery_tag) + return + end + subscription_handler = EventSource::Protocols::Amqp::BunnyConsumerHandler.new( subscriber, @@ -143,12 +155,36 @@ def on_receive_message( payload, &executable ) - subscription_handler.run rescue Bunny::Exception => e logger.error "Bunny Consumer Error \n message: #{e.message} \n backtrace: #{e.backtrace.join("\n")}" ensure subscriber = nil + EventSource.decrement_inflight_messages + end + + # Decodes the given payload based on the `contentEncoding` specified in the AsyncAPI *_subscribe.yml message bindings. + # + # For example, if `contentEncoding` is set to `application/zlib`, the payload will be decompressed using zlib. + # If no `contentEncoding` is provided, the payload will be returned unchanged. + # + # @param payload [String] The payload to be decoded. + # @return [String] The decoded payload, or the original payload if no encoding is specified. + # @raise [EventSource::Error::PayloadDecodeError] if the decoding process fails. + def decode_payload(payload) + async_api_subscribe_operation = @async_api_channel_item.subscribe + return payload unless async_api_subscribe_operation.message + + message_bindings = async_api_subscribe_operation.message['bindings'] + encoding = message_bindings.first[1]['contentEncoding'] if message_bindings + return payload unless encoding + + output = EventSource::Operations::MimeDecode.new.call(encoding, payload) + if output.success? + output.value! + else + raise EventSource::Error::PayloadDecodeError, output.failure + end end def find_executable(subscriber_klass, delivery_info) @@ -165,6 +201,18 @@ def method_missing(name, *args) @subject.send(name, *args) end + # Cancel all registered consumers for this queue + def cancel_consumers! + @consumers.each do |consumer| + begin + consumer.cancel + rescue StandardError => e + logger.info "Consumer cancellation error: #{e.message}" + end + end + @consumers.clear + end + private def subscriber_klass_name_to_suffix(subscriber_klass) diff --git a/lib/event_source/protocols/amqp_protocol.rb b/lib/event_source/protocols/amqp_protocol.rb index c40594fe..5717a421 100644 --- a/lib/event_source/protocols/amqp_protocol.rb +++ b/lib/event_source/protocols/amqp_protocol.rb @@ -15,7 +15,7 @@ require_relative 'amqp/contracts/contract' Gem - .find_files('event_source/protocols/amqp/contracts/**/*.rb') + .find_files('event_source/protocols/amqp/contracts/**/*.rb', false) .sort .each { |f| require(f) } diff --git a/lib/event_source/protocols/http/faraday_queue_proxy.rb b/lib/event_source/protocols/http/faraday_queue_proxy.rb index 6489bd7a..9a53923b 100644 --- a/lib/event_source/protocols/http/faraday_queue_proxy.rb +++ b/lib/event_source/protocols/http/faraday_queue_proxy.rb @@ -49,12 +49,12 @@ def actions # @param [Object] subscriber_klass Subscriber class # @return [Queue] Queue instance def subscribe(subscriber_klass, _options) + subscription_key = [app_name, formatted_exchange_name].join(delimiter) subscriber_suffix = subscriber_klass.name.downcase.gsub('::', '_') - unique_key = [app_name, formatted_exchange_name].join(delimiter) + "_#{subscriber_suffix}" - logger.debug "FaradayQueueProxy#register_subscription Subscriber Class #{subscriber_klass}" - logger.debug "FaradayQueueProxy#register_subscription Unique_key #{unique_key}" - executable = subscriber_klass.executable_for(unique_key) - @subject.actions.push(executable) + unique_key = subscription_key + "_#{subscriber_suffix}" + logger.info "FaradayQueueProxy#register_subscription Subscriber Class #{subscriber_klass}" + logger.info "FaradayQueueProxy#register_subscription Unique_key #{unique_key}" + @subject.register_action(subscriber_klass, unique_key) end def consumer_proxy_for(operation_bindings) diff --git a/lib/event_source/publish_operation.rb b/lib/event_source/publish_operation.rb index e6cd6c05..f8724dea 100644 --- a/lib/event_source/publish_operation.rb +++ b/lib/event_source/publish_operation.rb @@ -3,6 +3,8 @@ module EventSource # Publish {EventSource::Event} messages class PublishOperation + include EventSource::Logging + # @attr_reader [EventSource::Channel] channel the channel instance used by # this PublishOperation # @attr_reader [Object] subject instance of the protocol's publish class @@ -26,11 +28,51 @@ def initialize(channel, publish_proxy, async_api_publish_operation) # @example # #publish("Message", :headers => { }) def call(payload, options = {}) + payload = encode_payload(payload) @subject.publish( payload: payload, publish_bindings: @async_api_publish_operation[:bindings], headers: options[:headers] || {} ) end + + # Encodes the given payload based on the `contentEncoding` specified in the AsyncAPI *_publish.yml message bindings. + # + # For example, if `contentEncoding` is set to `application/zlib`, the payload will be compressed using zlib. + # If no `contentEncoding` is provided, the payload will be returned as-is without modification. + # + # Note: + # - Encoding is not needed for the HTTP protocol, as encoding is handled at the server level. + # - For other protocols like AMQP, encoding is supported to ensure proper message transmission. + # + # @param payload [String, Hash] The payload to be encoded. + # @return [String] The encoded payload, or the original payload if no encoding is specified. + # @raise [EventSource::Error::PayloadEncodeError] if the encoding process fails. + def encode_payload(payload) + encoding = determine_encoding + return payload unless encoding + + output = EventSource::Operations::MimeEncode.new.call(encoding, payload) + if output.success? + output.value! + else + logger.error "Failed to encode message \n due to: #{output.failure}" + raise EventSource::Error::PayloadEncodeError, output.failure + end + end + + # Determines the encoding for the payload based on message bindings or protocol defaults. + # - If message bindings are present, uses the 'contentEncoding' value from the bindings. + # - If no message bindings are present and the protocol is AMQP, uses the default encoding for the AMQP protocol. Other protocols return nil. + def determine_encoding + message_bindings = @async_api_publish_operation.message&.dig('bindings') + return message_bindings.first[1]['contentEncoding'] if message_bindings.present? + + amqp_protocol? ? "#{subject.class}::DefaultMimeType".constantize : nil + end + + def amqp_protocol? + subject.is_a?(EventSource::Protocols::Amqp::BunnyExchangeProxy) + end end end diff --git a/lib/event_source/publisher.rb b/lib/event_source/publisher.rb index d5aa9b3c..fc47e110 100644 --- a/lib/event_source/publisher.rb +++ b/lib/event_source/publisher.rb @@ -26,6 +26,16 @@ def self.publisher_container @publisher_container ||= Concurrent::Map.new end + def self.initialization_registry + @initialization_registry ||= Concurrent::Array.new + end + + def self.initialize_publishers + self.initialization_registry.each do |pub| + pub.validate + end + end + def self.[](exchange_ref) # TODO: validate publisher already exists # raise EventSource::Error::PublisherAlreadyRegisteredError.new(id) if registry.key?(id) @@ -46,12 +56,7 @@ def included(base) } base.extend(ClassMethods) - TracePoint.trace(:end) do |t| - if base == t.self - base.validate - t.disable - end - end + EventSource.register_publisher(base) end # methods to register events diff --git a/lib/event_source/queue.rb b/lib/event_source/queue.rb index 21dc7caa..bafeba03 100644 --- a/lib/event_source/queue.rb +++ b/lib/event_source/queue.rb @@ -6,15 +6,14 @@ class Queue # @attr_reader [Object] queue_proxy the protocol-specific class supporting this DSL # @attr_reader [String] name # @attr_reader [Hash] bindings - # @attr_reader [Hash] actions - attr_reader :queue_proxy, :name, :bindings, :actions + attr_reader :queue_proxy, :name, :bindings def initialize(queue_proxy, name, bindings = {}) @queue_proxy = queue_proxy @name = name @bindings = bindings @subject = ::Queue.new - @actions = [] + @registered_actions = [] end # def subscribe(subscriber_klass, &block) @@ -49,5 +48,16 @@ def close def closed? @subject.closed? end + + # Register an action to be performed, with a resolver class and key. + def register_action(resolver, key) + @registered_actions << [resolver, key] + end + + def actions + @registered_actions.map do |ra| + ra.first.executable_for(ra.last) + end + end end end diff --git a/lib/event_source/railtie.rb b/lib/event_source/railtie.rb index ea971274..3b0e7254 100644 --- a/lib/event_source/railtie.rb +++ b/lib/event_source/railtie.rb @@ -1,8 +1,42 @@ # frozen_string_literal: true +require 'logger' + module EventSource # :nodoc: - class Railtie < Rails::Railtie - + module Railtie + Rails::Application::Finisher.initializer "event_source.boot", after: :finisher_hook do + logger = Logger.new($stdout) + logger.progname = 'EventSource graceful shutdown' + timeouts = EventSource.config.shutdown_timeouts || {} + amqp_timeout = timeouts[:amqp_drain] || 5 + + # Perform shutdown work outside of trap/at_exit context to avoid + # ThreadError from mutex operations within Bunny (AMQP client). + shutdown = lambda do |reason| + Thread.new do + begin + logger.info "#{reason}, starting graceful shutdown" + logger.info "AMQP inflight handlers at shutdown start: #{EventSource.inflight_messages_count}" + cm = EventSource::ConnectionManager.instance + + # Stop consuming and allow in-flight handlers to drain briefly + cm.cancel_consumers_for(:amqp, timeout: amqp_timeout) + cm.drop_connections_for(:amqp) + rescue => e + logger.error "graceful shutdown error: #{e.class}: #{e.message}" + end + end.join + end + + if EventSource.config.auto_shutdown + at_exit { shutdown.call('at_exit received') } + + %w[TERM INT].each do |sig| + Signal.trap(sig) { shutdown.call("signal=#{sig} received") } + end + end + EventSource.initialize! + end end -end \ No newline at end of file +end diff --git a/lib/event_source/subscriber.rb b/lib/event_source/subscriber.rb index d62257a6..937f2aba 100644 --- a/lib/event_source/subscriber.rb +++ b/lib/event_source/subscriber.rb @@ -49,12 +49,7 @@ def included(base) base.extend ClassMethods base.include InstanceMethods - TracePoint.trace(:end) do |t| - if base == t.self - base.create_subscription - t.disable - end - end + EventSource.register_subscriber(base) end module InstanceMethods diff --git a/spec/event_source/channel_spec.rb b/spec/event_source/channel_spec.rb index 3f1020cc..ada8f37e 100644 --- a/spec/event_source/channel_spec.rb +++ b/spec/event_source/channel_spec.rb @@ -92,5 +92,23 @@ expect(audit_log_queue.pop.last).to eq 'test message from enterprise events!!' expect(audit_log_queue.pop.last).to eq 'test message from enrollment events!!' end + + context '.cancel_consumers' do + before do + subject.subscribe_operations.values.each do |sub_op| + subscriber_klass = double('SubscriberKlass') + sub_op.subscribe(subscriber_klass) + end + end + + it 'cancels consumers via channel' do + channel = subject + expect(channel.channel_proxy.any_consumers?).to be_truthy + expect(channel.subscribe_operations.values.first.subject.consumers.count).to be > 0 + channel.cancel_consumers + expect(channel.channel_proxy.any_consumers?).to be_falsey + expect(channel.subscribe_operations.values.first.subject.consumers.count).to eq 0 + end + end end end diff --git a/spec/event_source/command_spec.rb b/spec/event_source/command_spec.rb index 7ad0f985..b44d0a6e 100644 --- a/spec/event_source/command_spec.rb +++ b/spec/event_source/command_spec.rb @@ -22,7 +22,12 @@ def call # binding.pry end + before :each do + EventSource.initialize!(true) + end + context '.event' do + let(:organization_params) do { hbx_id: '553234', diff --git a/spec/event_source/connection_manager_spec.rb b/spec/event_source/connection_manager_spec.rb index da9d30cf..fb24a997 100644 --- a/spec/event_source/connection_manager_spec.rb +++ b/spec/event_source/connection_manager_spec.rb @@ -89,6 +89,70 @@ ).to eq Hash.new end end + + context '.cancel_consumers_for' do + let(:connection) { connection_manager.connections[connection_url] } + + let(:async_api_file) do + Pathname.pwd.join('spec', 'support', 'asyncapi', 'polypress_amqp.yml') + end + + let(:async_api_channels) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call(path: async_api_file) + .success + .channels + end + + let(:channel) do + connection.channels[:'on_polypress.magi_medicaid.mitc.eligibilities'] + end + + before do + connection.start unless connection.active? + connection.add_channels(async_api_channels) + channel.subscribe_operations.values.each do |sub_op| + subscriber_klass = double('SubscriberKlass') + sub_op.subscribe(subscriber_klass) + end + end + + after { connection.disconnect if connection.active? } + + context 'when inflight messages are present' do + before do + allow(EventSource).to receive(:inflight_messages_count).and_return(5) + end + + it 'waits for timeout' do + expect(channel).to receive(:cancel_consumers).and_call_original + connection_manager.cancel_consumers_for(protocol, timeout: 1) + end + end + + context 'when inflight messages are draining' do + before do + allow(EventSource).to receive(:inflight_messages_count).and_return(1, 0) + end + + it 'waits for drain' do + expect(channel).to receive(:cancel_consumers).and_call_original + connection_manager.cancel_consumers_for(protocol, timeout: 5) + end + end + + context 'when no inflight messages are present' do + before do + allow(EventSource).to receive(:inflight_messages_count).and_return(0) + end + + it 'cancels AMQP consumers on each channel without waiting' do + expect(channel).to receive(:cancel_consumers).and_call_original + connection_manager.cancel_consumers_for(protocol, timeout: 1) + end + end + end end end end @@ -160,5 +224,31 @@ end end end + + context 'when no connections are present + - .cancel_consumers_for' do + let(:protocol) { :amqp } + let(:connection) { instance_double('EventSource::Connection', protocol: protocol, channels: { default: channel }) } + let(:channel) { instance_double('EventSource::Channel') } + + before do + allow(EventSource).to receive(:inflight_messages_count).and_return(0) + end + + context 'does not raise error' do + before do + allow(connection_manager).to receive(:connections_for).with(protocol).and_return([]) + allow(connection_manager).to receive(:wait_for_connections_to_drain) + end + + it 'and still calls drain helper' do + expect do + connection_manager.cancel_consumers_for(protocol, timeout: 1) + end.not_to raise_error + + expect(connection_manager).to have_received(:wait_for_connections_to_drain).with([], 1) + end + end + end end end diff --git a/spec/event_source/operations/mime_decode_spec.rb b/spec/event_source/operations/mime_decode_spec.rb new file mode 100644 index 00000000..65039570 --- /dev/null +++ b/spec/event_source/operations/mime_decode_spec.rb @@ -0,0 +1,112 @@ +# frozen_string_literal: true + +RSpec.describe EventSource::Operations::MimeDecode do + subject { described_class.new } + + describe "#call" do + context "when the payload and mime type are valid" do + let(:payload) { { message: "Hello, World!" } } + let(:compressed_payload) { Zlib.deflate(payload.to_json) } + let(:mime_type) { "application/zlib" } + + it "successfully decodes the payload" do + result = subject.call(mime_type, compressed_payload) + + expect(result).to be_success + expect(result.value!).to eq(payload.to_json) + end + end + + context "when the payload is not binary for application/zlib" do + let(:invalid_payload) { "Not binary data" } + let(:mime_type) { "application/zlib" } + + it "returns a failure" do + result = subject.call(mime_type, invalid_payload) + + expect(result).to be_failure + expect(result.failure).to eq("Payload must be binary-encoded for MIME type 'application/zlib'.") + end + end + + context "when the mime type is invalid" do + let(:payload) { { message: "Hello, World!" }.to_json } + let(:mime_type) { "text/plain" } + + it "returns a failure" do + result = subject.call(mime_type, payload) + + expect(result).to be_failure + expect(result.failure).to eq("Invalid MIME type 'text/plain'. Supported types are: application/zlib, application/json.") + end + end + + context "when decoding fails" do + let(:invalid_compressed_payload) { "Invalid compressed data" } + let(:mime_type) { "application/zlib" } + + it "returns a failure with an error message" do + result = subject.call(mime_type, invalid_compressed_payload) + + expect(result).to be_failure + expect(result.failure).to eq("Payload must be binary-encoded for MIME type 'application/zlib'.") + end + end + + context "when the mime_type is 'application/zlib'" do + context "and the payload is a JSON string but not binary" do + let(:json_string) { "Invalid compressed data".to_json } + let(:mime_type) { "application/zlib" } + + it "passes validation" do + result = subject.call(mime_type, json_string) + + expect(result).to be_success + expect(result.value!).to eq(json_string) + end + end + + context "and the payload is neither binary nor valid JSON" do + let(:non_json_payload) { "Invalid compressed data" } + let(:mime_type) { "application/zlib" } + + it "returns a failure with a validation error message" do + result = subject.call(mime_type, non_json_payload) + + expect(result).to be_failure + expect(result.failure).to eq("Payload must be binary-encoded for MIME type 'application/zlib'.") + end + end + + context "and the payload is not binary and raises an error when parsed as JSON" do + let(:corrupted_json_payload) { "Invalid compressed data" } + let(:mime_type) { "application/zlib" } + + before do + allow(JSON).to receive(:parse).with(corrupted_json_payload).and_raise(JSON::ParserError) + end + + it "returns a failure with a validation error message" do + result = subject.call(mime_type, corrupted_json_payload) + + expect(result).to be_failure + expect(result.failure).to eq("Payload must be binary-encoded for MIME type 'application/zlib'.") + end + end + + context 'when Zlib.inflate raises an exception' do + let(:payload) { { message: "Hello, World!" } } + let(:invalid_compressed_payload) { Zlib.deflate(payload.to_json) } + + it 'returns the original payload wrapped in Success' do + allow(Zlib).to receive(:inflate).and_raise(Zlib::DataError, "invalid compressed data") + + result = subject.call('application/zlib', invalid_compressed_payload) + + expect(result).to be_a(Dry::Monads::Success) + expect(result.value!).to eq(invalid_compressed_payload) + end + end + end + end +end diff --git a/spec/event_source/operations/mime_encode_spec.rb b/spec/event_source/operations/mime_encode_spec.rb new file mode 100644 index 00000000..b2475d00 --- /dev/null +++ b/spec/event_source/operations/mime_encode_spec.rb @@ -0,0 +1,86 @@ +# frozen_string_literal: true + +RSpec.describe EventSource::Operations::MimeEncode do + subject { described_class.new } + + describe "#call" do + + let(:valid_payload) { { key: 'value' } } + let(:invalid_payload) { -> {} } + + context "when MIME type is application/zlib" do + let(:payload) { { message: "Hello, World!" } } + let(:mime_type) { "application/zlib" } + + it "compresses the payload using Zlib" do + result = subject.call(mime_type, payload) + + expect(result).to be_success + expect(Zlib.inflate(result.value!)).to eq(payload.to_json) + end + end + + context "when MIME type is application/json" do + let(:payload) { "Hello, World!" } + let(:mime_type) { "application/json" } + + it "returns the payload as JSON" do + result = subject.call(mime_type, payload) + + expect(result).to be_success + expect(result.value!).to eq(payload.to_json) + end + end + + context "when the mime type is invalid" do + let(:payload) { { message: "Hello, World!" } } + let(:mime_type) { "text/plain" } + + it "returns a failure" do + result = subject.call(mime_type, payload) + + expect(result).to be_failure + expect(result.failure).to eq("Invalid MIME type 'text/plain'. Supported types are: application/zlib, application/json.") + end + end + + context 'when payload cannot be converted to JSON' do + before do + allow(invalid_payload).to receive(:to_json).and_raise(JSON::GeneratorError) + end + + it 'returns a failure with JSON::GeneratorError' do + result = subject.call('application/json', invalid_payload) + + expect(result).to be_failure + expect(result.failure).to match(/Failed to encode payload to JSON:/) + end + end + + context 'when Zlib compression fails' do + before do + allow(Zlib).to receive(:deflate).and_raise(Zlib::Error, 'Compression failed') + end + + it 'returns a failure with Zlib::Error' do + result = subject.call('application/zlib', valid_payload) + + expect(result).to be_failure + expect(result.failure).to eq('Failed to compress payload using Zlib: Compression failed') + end + end + + context 'when an unexpected error occurs' do + before do + allow(valid_payload).to receive(:to_json).and_raise(StandardError, 'something went wrong') + end + + it 'returns a failure with StandardError' do + result = subject.call('application/json', valid_payload) + + expect(result).to be_failure + expect(result.failure).to eq('Unexpected error during encoding: something went wrong') + end + end + end +end diff --git a/spec/event_source/protocols/amqp/bunny_exchange_proxy_spec.rb b/spec/event_source/protocols/amqp/bunny_exchange_proxy_spec.rb index 5b289934..5cee63b4 100644 --- a/spec/event_source/protocols/amqp/bunny_exchange_proxy_spec.rb +++ b/spec/event_source/protocols/amqp/bunny_exchange_proxy_spec.rb @@ -4,5 +4,57 @@ require 'config_helper' RSpec.describe EventSource::Protocols::Amqp::BunnyExchangeProxy do - it 'the publish method should include all bindings, including message_id, persistance & message level bindings' + let(:channel_proxy) { instance_double('BunnyChannelProxy', subject: double) } + let(:exchange_bindings) do + { + type: :direct, + name: 'test_exchange', + durable: true, + auto_delete: false, + vhost: 'test_vhost' + } + end + let(:bunny_exchange) { instance_double('Bunny::Exchange', name: 'test_exchange') } + let(:payload) { { message: 'test message' } } + let(:publish_bindings) { { routing_key: 'test.key', persistent: true } } + let(:headers) { { correlation_id: '12345', custom_header: 'test_value' } } + + subject { described_class.new(channel_proxy, exchange_bindings) } + + before do + allow_any_instance_of(described_class).to receive(:bunny_exchange_for).and_return(bunny_exchange) + allow(bunny_exchange).to receive(:publish) + end + + describe '#publish' do + it 'publishes the payload with the correct bindings and headers' do + subject.publish(payload: payload.to_json, publish_bindings: publish_bindings, headers: headers) + + expect(bunny_exchange).to have_received(:publish).with(payload.to_json, { + correlation_id: '12345', + headers: { custom_header: 'test_value' } + }) + end + + it 'logs the publishing process' do + expect(subject.logger).to receive(:debug).with(/publishing message with bindings:/) + expect(subject.logger).to receive(:debug).with(/published message:/) + expect(subject.logger).to receive(:debug).with(/published message to exchange:/) + + subject.publish(payload: payload.to_json, publish_bindings: publish_bindings, headers: headers) + end + + context 'when the payload is binary' do + let(:binary_payload) { Zlib.deflate("binary data") } + + it 'does not convert the payload to JSON' do + subject.publish(payload: binary_payload, publish_bindings: publish_bindings, headers: headers) + + expect(bunny_exchange).to have_received(:publish).with(binary_payload, { + correlation_id: '12345', + headers: { custom_header: 'test_value' } + }) + end + end + end end diff --git a/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb b/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb index a1758b38..0880f92e 100644 --- a/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb +++ b/spec/event_source/protocols/amqp/bunny_queue_proxy_spec.rb @@ -321,4 +321,98 @@ end end end + + describe '#decode_payload' do + let(:payload) { 'test_payload' } + let(:mime_decode_operation) { instance_double(EventSource::Operations::MimeDecode) } + let(:channel_proxy) { instance_double('ChannelProxy', subject: double) } + let(:async_api_channel_item) { instance_double('AsyncApiChannelItem', bindings: channel_bindings) } + + let(:subscribe_operation) do + EventSource::AsyncApi::SubscribeOperation.new( + operationId: 'subscribe_message', + bindings: { amqp: { key: 'value' } }, + message: { + 'bindings' => { + 'amqp' => { + 'contentEncoding' => 'application/zlib' + } + } + } + ) + end + + let(:instance) { described_class.new(channel_proxy, async_api_channel_item) } + let(:channel_bindings) do + { + amqp: { + is: :queue, + queue: { + name: 'on_event_source.test_queue', + durable: true, + exclusive: false, + auto_delete: false, + vhost: 'event_source' + } + } + } + end + + let(:bunny_queue) { instance_double('BunnyQueue') } + + before do + allow_any_instance_of(described_class).to receive(:bunny_queue_for).and_return(bunny_queue) + allow_any_instance_of(described_class).to receive(:bind_exchange).and_return(true) + allow(async_api_channel_item).to receive(:subscribe).and_return(subscribe_operation) + allow(EventSource::Operations::MimeDecode).to receive(:new).and_return(mime_decode_operation) + end + + context 'when there is no message in the subscribe operation' do + let(:subscribe_operation) { EventSource::AsyncApi::SubscribeOperation.new(operationId: 'subscribe_message' ) } + + it 'returns the original payload' do + expect(instance.decode_payload(payload)).to eq(payload) + end + end + + context 'when there is no contentEncoding in the message bindings' do + let(:subscribe_operation) { EventSource::AsyncApi::SubscribeOperation.new(operationId: 'subscribe_message', message: { }) } + + it 'returns the original payload' do + expect(instance.decode_payload(payload)).to eq(payload) + end + end + + context 'when contentEncoding is provided' do + context 'when decoding is successful' do + let(:decoded_payload) { 'decoded_payload' } + + before do + allow(mime_decode_operation).to receive(:call) + .with('application/zlib', payload) + .and_return(Dry::Monads::Success(decoded_payload)) + end + + it 'returns the decoded payload' do + expect(instance.decode_payload(payload)).to eq(decoded_payload) + end + end + + context 'when decoding fails' do + let(:failure_message) { 'Decoding error' } + + before do + allow(mime_decode_operation).to receive(:call) + .with('application/zlib', payload) + .and_return(Dry::Monads::Failure(failure_message)) + end + + it 'logs the error and raises a PayloadDecodeError' do + expect do + instance.decode_payload(payload) + end.to raise_error(EventSource::Error::PayloadDecodeError, failure_message) + end + end + end + end end diff --git a/spec/event_source/publish_operation_spec.rb b/spec/event_source/publish_operation_spec.rb index 19f2d535..25560506 100644 --- a/spec/event_source/publish_operation_spec.rb +++ b/spec/event_source/publish_operation_spec.rb @@ -120,6 +120,83 @@ it 'should forward the message to a queue bound to the exchange' do end end + + describe '#encode_payload' do + + let(:channel) { instance_double('EventSource::Channel') } + let(:publish_proxy) { instance_double('PublishProxy', publish: true) } + let(:async_api_publish_operation) do + EventSource::AsyncApi::PublishOperation.new({ + operationId: 'publish_message', + bindings: { amqp: { key: 'value' } }, + message: { + 'bindings' => { 'amqp' => { 'contentEncoding' => 'application/zlib' } } + } + }) + end + + let(:instance) { described_class.new(channel, publish_proxy, async_api_publish_operation) } + + describe '#encode_payload' do + let(:payload) { 'test_payload' } + let(:mime_encode_operation) { instance_double(EventSource::Operations::MimeEncode) } + + before do + allow(EventSource::Operations::MimeEncode).to receive(:new).and_return(mime_encode_operation) + end + + context 'when there is no message in the async API publish operation' do + let(:async_api_publish_operation) { EventSource::AsyncApi::PublishOperation.new({ operationId: 'publish_message', bindings: {} }) } + + it 'returns the original payload' do + expect(instance.encode_payload(payload)).to eq(payload) + end + end + + context 'when there is no contentEncoding in the message bindings' do + let(:async_api_publish_operation) do + EventSource::AsyncApi::PublishOperation.new({ operationId: 'publish_message', message: { } }) + end + + it 'returns the original payload' do + expect(instance.encode_payload(payload)).to eq(payload) + end + end + + context 'when contentEncoding is provided' do + context 'when encoding is successful' do + let(:encoded_payload) { 'encoded_payload' } + + before do + allow(mime_encode_operation).to receive(:call) + .with('application/zlib', payload) + .and_return(Dry::Monads::Success(encoded_payload)) + end + + it 'returns the encoded payload' do + expect(instance.encode_payload(payload)).to eq(encoded_payload) + end + end + + context 'when encoding fails' do + let(:failure_message) { 'Encoding error' } + + before do + allow(mime_encode_operation).to receive(:call) + .with('application/zlib', payload) + .and_return(Dry::Monads::Failure(failure_message)) + end + + it 'logs the error and raises a PayloadEncodeError' do + expect(instance.logger).to receive(:error).with("Failed to encode message \n due to: #{failure_message}") + expect do + instance.encode_payload(payload) + end.to raise_error(EventSource::Error::PayloadEncodeError, failure_message) + end + end + end + end + end end # RSpec.describe EventSource::PublishOperation do diff --git a/spec/rails_app/app/event_source/events/determinations/eval.rb b/spec/rails_app/app/event_source/events/determinations/eval.rb new file mode 100644 index 00000000..0cd6b90d --- /dev/null +++ b/spec/rails_app/app/event_source/events/determinations/eval.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +module Events + module Determinations + # Eval will register event publisher for MiTC + class Eval < EventSource::Event + publisher_path 'publishers.mitc_publisher' + + end + end +end \ No newline at end of file diff --git a/spec/rails_app/app/event_source/publishers/mitc_publisher.rb b/spec/rails_app/app/event_source/publishers/mitc_publisher.rb new file mode 100644 index 00000000..5fc3ed43 --- /dev/null +++ b/spec/rails_app/app/event_source/publishers/mitc_publisher.rb @@ -0,0 +1,9 @@ +# frozen_string_literal: true + +module Publishers + class MitcPublisher + # Publisher will send request payload to MiTC for determinations + include ::EventSource::Publisher[http: '/determinations/eval'] + register_event '/determinations/eval' + end +end diff --git a/spec/rails_app/app/event_source/publishers/parties/mitc_publisher.rb b/spec/rails_app/app/event_source/publishers/parties/mitc_publisher.rb deleted file mode 100644 index 3e007812..00000000 --- a/spec/rails_app/app/event_source/publishers/parties/mitc_publisher.rb +++ /dev/null @@ -1,12 +0,0 @@ -# frozen_string_literal: true - -module Parties - class MitcPublisher - # include ::EventSource::Publisher[http: '/determinations/eval'] - - # # register_event '/determinations/eval' - end -end - - - diff --git a/spec/rails_app/app/event_source/subscribers/mitc_response_subscriber.rb b/spec/rails_app/app/event_source/subscribers/mitc_response_subscriber.rb new file mode 100644 index 00000000..d9d253eb --- /dev/null +++ b/spec/rails_app/app/event_source/subscribers/mitc_response_subscriber.rb @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +module Subscribers + class MitcResponseSubscriber + include ::EventSource::Subscriber[http: '/determinations/eval'] + extend EventSource::Logging + + subscribe(:on_determinations_eval) do |body, status, headers| + $GLOBAL_TEST_FLAG = true + end + end +end diff --git a/spec/rails_app/app/event_source/subscribers/parties/mitc_subscriber.rb b/spec/rails_app/app/event_source/subscribers/parties/mitc_subscriber.rb deleted file mode 100644 index a6a404f9..00000000 --- a/spec/rails_app/app/event_source/subscribers/parties/mitc_subscriber.rb +++ /dev/null @@ -1,14 +0,0 @@ -# frozen_string_literal: true - -module MagiMedicaid - class EligbilityDeterminationsSubscriber - # include ::EventSource::Subscriber[http: '/determinations/eval'] - - # # # from: MagiMedicaidEngine of EA after Application's submission - # # # { event: magi_medicaid_application_submitted, payload: :magi_medicaid_application } - # subscribe(:on_determinations_eval) do |headers, payload| - # puts "block headers------#{headers}" - # puts "block payload-----#{payload}" - # end - end -end diff --git a/spec/rails_app/app/event_source/subscribers/parties/organization_subscriber.rb b/spec/rails_app/app/event_source/subscribers/parties/organization_subscriber.rb deleted file mode 100644 index b4d0786a..00000000 --- a/spec/rails_app/app/event_source/subscribers/parties/organization_subscriber.rb +++ /dev/null @@ -1,21 +0,0 @@ -# frozen_string_literal: true - -module Parties - class OrganizationSubscriber - # include ::EventSource::Subscriber[amqp: 'enroll.parties.organizations.fein_corrected'] - - # # subscribe(:on_enroll_parties_organizations_fein_corrected) do |delivery_info, metadata, payload| - # # # Sequence of steps that are executed as single operation - # # puts "triggered --> on_enroll_parties_organizations_fein_corrected block -- #{delivery_info} -- #{metadata} -- #{payload}" - # # end - - # subscribe("on_faa.enroll.parties.organizations") do |delivery_info, metadata, payload| - # # Sequence of steps that are executed as single operation - # puts "triggered --> on_enroll_parties_organizations_fein_corrected block -- #{delivery_info} -- #{metadata} -- #{payload}" - # end - # # def on_enroll_parties_organizations_fein_corrected(payload) - # # # Set of independent reactors for the given event that execute asynchronously - # # puts "triggered --> on_enroll_parties_organizations_fein_corrected method --#{payload}" - # # end - end -end \ No newline at end of file diff --git a/spec/rails_app/app/events/parties/organization/created.rb b/spec/rails_app/app/events/parties/organization/created.rb index 42866e23..49e6c4ff 100644 --- a/spec/rails_app/app/events/parties/organization/created.rb +++ b/spec/rails_app/app/events/parties/organization/created.rb @@ -6,7 +6,7 @@ class Created < EventSource::Event publisher_path 'parties.organization_publisher' # Schema used to validaate Event payload - contract_class 'Parties::Organization::CreateContract' + # contract_class 'Parties::Organization::CreateContract' attribute_keys :hbx_id, :legal_name, :fein, :entity_kind end end diff --git a/spec/rails_app/app/models/organizations/organization_model.rb b/spec/rails_app/app/models/organizations/organization_model.rb deleted file mode 100644 index 233218fc..00000000 --- a/spec/rails_app/app/models/organizations/organization_model.rb +++ /dev/null @@ -1,15 +0,0 @@ -# frozen_string_literal: true - -module Organizations - class OrganizationModel - include Mongoid::Document - include Mongoid::Timestamps - - field :legal_name, type: String - field :entity_kind, type: Symbol - field :fein, type: String - - # Track Events for this model - has_many :events, as: :event_stream, class_name: 'EventSource::EventStream' - end -end diff --git a/spec/rails_app/config/application.rb b/spec/rails_app/config/application.rb index bfe50a09..80441f98 100644 --- a/spec/rails_app/config/application.rb +++ b/spec/rails_app/config/application.rb @@ -7,8 +7,6 @@ Bundler.require(*Rails.groups) -require "event_source" - module RailsApp class Application < Rails::Application config.root = File.expand_path('../..', __FILE__) diff --git a/spec/rails_app/config/environments/test.rb b/spec/rails_app/config/environments/test.rb index 1ace3c3f..b2d790a8 100644 --- a/spec/rails_app/config/environments/test.rb +++ b/spec/rails_app/config/environments/test.rb @@ -12,7 +12,7 @@ # Do not eager load code on boot. This avoids loading your whole application # just for the purpose of running a single test. If you are using a tool that # preloads Rails for running tests, you may have to set it to true. - config.eager_load = false + config.eager_load = true # Configure static file server for tests with Cache-Control for performance. config.serve_static_files = true diff --git a/spec/rails_app/config/initializers/event_source.rb b/spec/rails_app/config/initializers/event_source.rb index 346d942c..d192c56d 100644 --- a/spec/rails_app/config/initializers/event_source.rb +++ b/spec/rails_app/config/initializers/event_source.rb @@ -57,6 +57,14 @@ rabbitmq.default_content_type = 'application/json' end + server.http do |http| + http.ref = 'http://mitc:3001' + http.host = ENV['MITC_HOST'] || 'http://localhost' + http.port = ENV['MITC_PORT'] || '3000' + http.url = ENV['MITC_URL'] || 'http://localhost:3000' + http.default_content_type = 'application/json' + end + server.http do |http| http.host = "https://api.github.com" http.default_content_type = 'application/json' @@ -119,7 +127,8 @@ # AcaEntities::Operations::AsyncApi::FindResource.new.call(self) end -dir = Pathname.pwd.join('spec', 'support', 'async_api_files') +config_dir = File.dirname(__FILE__) +dir = File.join(config_dir, '..', '..', '..', 'support', 'async_api_files') EventSource.async_api_schemas = ::Dir[::File.join(dir, '**', '*')].reject { |p| ::File.directory? p }.sort.reduce([]) do |memo, file| # read # serialize yaml to hash @@ -127,4 +136,4 @@ memo << EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath.new.call(path: file).success end -EventSource.initialize! +# EventSource.initialize! diff --git a/spec/rails_app/spec/http_service_integration_spec.rb b/spec/rails_app/spec/http_service_integration_spec.rb new file mode 100644 index 00000000..13061b85 --- /dev/null +++ b/spec/rails_app/spec/http_service_integration_spec.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +require_relative 'rails_helper' + +RSpec.describe "with an http subscriber service" do + include EventSource::Command + + it "runs when invoked" do + $GLOBAL_TEST_FLAG = false + + WebMock.stub_request( + :post, + /http:\/\/localhost:3000\/determinations\/eval/ + ).to_return({body: "{}"}) + response = event("events.determinations.eval", attributes: {}).success.publish + expect(response.status).to eq 200 + sleep(0.5) + expect($GLOBAL_TEST_FLAG).to eq(true) + end +end diff --git a/spec/rails_app/spec/rails_helper.rb b/spec/rails_app/spec/rails_helper.rb new file mode 100644 index 00000000..42d5ccf5 --- /dev/null +++ b/spec/rails_app/spec/rails_helper.rb @@ -0,0 +1,5 @@ +ENV['RAILS_ENV'] ||= 'test' +require 'bundler/setup' +require 'webmock/rspec' +require 'rails' +require_relative '../config/environment' diff --git a/spec/rails_app/spec/railtie_spec.rb b/spec/rails_app/spec/railtie_spec.rb new file mode 100644 index 00000000..ce6c8a1f --- /dev/null +++ b/spec/rails_app/spec/railtie_spec.rb @@ -0,0 +1,97 @@ +# frozen_string_literal: true + +require_relative 'rails_helper' + +RSpec.describe EventSource::Railtie do + before do + @original_auto_shutdown = EventSource.config.auto_shutdown + EventSource.config.auto_shutdown = auto_shutdown_enabled + @at_exit_handler = nil + allow_any_instance_of(Object).to receive(:at_exit) do |_, &blk| + @at_exit_handler = blk + end + initializer = Rails.application.initializers.find { |i| i.name == 'event_source.boot' } + initializer.run(Rails.application) + end + + context '.auto_shutdown' do + let(:protocol) { :amqp } + let(:url) { 'amqp://localhost:5672/' } + let(:protocol_version) { '0.9.1' } + let(:description) { 'Development RabbitMQ Server' } + let(:server_config) do + { + ref: url, + url: url, + protocol: protocol, + protocol_version: protocol_version, + description: description + } + end + + let(:connection_manager) { EventSource::ConnectionManager.instance } + let(:connection) do + connection_manager.add_connection(server_config) + connection_manager.connections_for(:amqp).first + end + + let(:async_api_file) do + Pathname.new(__dir__).join('..', '..', 'support', 'asyncapi', 'polypress_amqp.yml').expand_path + end + + let(:async_api_channels) do + EventSource::AsyncApi::Operations::AsyncApiConf::LoadPath + .new + .call(path: async_api_file) + .success + .channels + end + + let(:channel) do + connection.channels[:'on_polypress.magi_medicaid.mitc.eligibilities'] + end + + before do + connection_manager.drop_connections_for(:amqp) + connection_manager.drop_connections_for(:http) + connection.start unless connection.active? + connection.add_channels(async_api_channels) + + channel.subscribe_operations.each_value do |subscribe_operation| + subscriber_klass = double('SubscriberKlass') + subscribe_operation.subscribe(subscriber_klass) + end + + @original_timeouts = EventSource.config.shutdown_timeouts + EventSource.config.shutdown_timeouts = { amqp_drain: 2 } + allow(EventSource).to receive(:inflight_messages_count).and_return(0) + end + + after do + EventSource.config.shutdown_timeouts = @original_timeouts + EventSource.config.auto_shutdown = @original_auto_shutdown + end + + context 'when auto_shutdown is enabled' do + let(:auto_shutdown_enabled) { true } + + it 'cancels AMQP consumers and drops AMQP connections on shutdown' do + expect(connection_manager.connections_for(:amqp)).not_to be_empty + expect(connection.channels).not_to be_empty + expect(channel.subscribe_operations.values.first.subject.consumers).not_to be_empty + expect(@at_exit_handler).not_to be_nil + @at_exit_handler.call + + expect(connection_manager.connections_for(:amqp)).to be_empty + end + end + + context 'when auto_shutdown is disabled' do + let(:auto_shutdown_enabled) { false } + + it 'does not register at_exit handler' do + expect(connection_manager.connections_for(:amqp)).not_to be_empty + end + end + end +end diff --git a/spec/support/async_api_files/http_mitc.yml b/spec/support/async_api_files/http_mitc.yml new file mode 100644 index 00000000..e7e2f4b6 --- /dev/null +++ b/spec/support/async_api_files/http_mitc.yml @@ -0,0 +1,58 @@ +asyncapi: "2.0.0" +info: + title: MAGI in the Cloud (MitC) + version: 0.1.0 + description: Configuration for accessing MitC Medicaid and CHIP eligibility determination services + contact: + name: IdeaCrew + url: https://ideacrew.com + email: info@ideacrew.com + license: + name: MIT + url: https://opensource.org/licenses/MIT + +servers: + production: + url: http://mitc:3001 + protocol: http + protocolVersion: 0.1.0 + description: MitC Development Server + development: + url: http://mitc:3001 + protocol: http + protocolVersion: 0.1.0 + description: MitC Development Server + test: + url: http://mitc:3001 + protocol: http + protocolVersion: 0.1.0 + description: MitC Test Server + +defaultContentType: application/json + +channels: + /determinations/eval: + publish: + operationId: /determinations/eval + description: HTTP endpoint for MitC eligibility determination requests + bindings: + http: + type: request + method: POST + headers: + Content-Type: application/json + Accept: application/json + subscribe: + operationId: /on/determinations/eval + description: EventSource Subscriber that publishes MitC eligibility determination responses + bindings: + http: + type: response + method: GET + headers: + Content-Type: application/json + Accept: application/json + +tags: + - name: linter_tag + description: placeholder that satisfies the linter \ No newline at end of file