Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/rspec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ jobs:
strategy:
fail-fast: false
matrix:
ruby_version: ['2.6.3', '2.7.5', '3.0.5', '3.1.4', '3.2.2']
ruby_version: ['2.7.5', '3.0.5', '3.1.4', '3.2.2']
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
Expand Down
1 change: 1 addition & 0 deletions .rspec
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
--format documentation
--color
--require spec_helper
--exclude-pattern "spec/rails_app/**/*"
3 changes: 3 additions & 0 deletions .rspec_rails_specs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
--format documentation
--color
--exclude-pattern "**/*"
1 change: 1 addition & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ gemspec
group :development, :test do
gem "rails", '>= 6.1.4'
gem "rspec-rails"
gem "parallel_tests"
gem "pry", platform: :mri, require: false
gem "pry-byebug", platform: :mri, require: false
gem 'rubocop'
Expand Down
51 changes: 46 additions & 5 deletions lib/event_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@
require 'event_source/event'
require 'event_source/subscriber'
require 'event_source/operations/codec64'
require 'event_source/operations/mime_encode'
require 'event_source/operations/mime_decode'
require 'event_source/operations/create_message'
require 'event_source/operations/fetch_session'
require 'event_source/operations/build_message_options'
require 'event_source/operations/build_message'
require 'event_source/boot_registry'

# Event source provides ability to compose, publish and subscribe to events
module EventSource
Expand All @@ -65,14 +68,23 @@ class << self
:async_api_schemas=

def configure
@configured = true
yield(config)
end

def initialize!
load_protocols
create_connections
load_async_api_resources
load_components
def initialize!(force = false)
# Don't boot if I was never configured.
return unless @configured
boot_registry.boot!(force) do
load_protocols
create_connections
load_async_api_resources
load_components
end
end

def boot_registry
@boot_registry ||= EventSource::BootRegistry.new
end

def config
Expand All @@ -90,6 +102,35 @@ def build_async_api_resource(resource)
.call(resource)
.success
end

def register_subscriber(subscriber_klass)
boot_registry.register_subscriber(subscriber_klass)
end

def register_publisher(subscriber_klass)
boot_registry.register_publisher(subscriber_klass)
end

def inflight_messages_count
@inflight_mutex ||= Mutex.new
@inflight_mutex.synchronize { @inflight_messages_count ||= 0 }
end

def increment_inflight_messages
@inflight_mutex ||= Mutex.new
@inflight_mutex.synchronize do
@inflight_messages_count ||= 0
@inflight_messages_count += 1
end
end

def decrement_inflight_messages
@inflight_mutex ||= Mutex.new
@inflight_mutex.synchronize do
@inflight_messages_count ||= 0
@inflight_messages_count = [@inflight_messages_count - 1, 0].max
end
end
end

class EventSourceLogger
Expand Down
96 changes: 96 additions & 0 deletions lib/event_source/boot_registry.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# frozen_string_literal: true

require 'set'
require 'monitor'

module EventSource
# This class manages correct/loading of subscribers and publishers
# based on the current stage of the EventSource lifecycle.
#
# Depending on both the time the initialization of EventSource is invoked
# and when subscriber/publisher code is loaded, this can become complicated.
# This is largely caused by two confounding factors:
# 1. We want to delay initialization of EventSource until Rails is fully
# 'ready'
# 2. Based on the Rails environment, such as production, development, or
# test (primarily how those different environments treat lazy vs. eager
# loading of classes in a Rails application), subscriber and publisher
# code can be loaded before, after, or sometimes even DURING the
# EventSource boot process - we need to support all models
class BootRegistry
def initialize
@unbooted_publishers = Set.new
@unbooted_subscribers = Set.new
@booted_publishers = Set.new
@booted_subscribers = Set.new
# This is our re-entrant mutex. We're going to use it to make sure that
# registration and boot methods aren't allowed to simultaneously alter
# our state. You'll notice most methods on this class are wrapped in
# synchronize calls against this.
@bootex = Monitor.new
@booted = false
end

def boot!(force = false)
@bootex.synchronize do
return if @booted && !force
yield
boot_publishers!
boot_subscribers!
@booted = true
end
end

# Register a publisher for EventSource.
#
# If the EventSource hasn't been booted, save publisher for later.
# Otherwise, boot it now.
def register_publisher(publisher_klass)
@bootex.synchronize do
if @booted
publisher_klass.validate
@booted_publishers << publisher_klass
else
@unbooted_publishers << publisher_klass
end
end
end

# Register a subscriber for EventSource.
#
# If the EventSource hasn't been booted, save the subscriber for later.
# Otherwise, boot it now.
def register_subscriber(subscriber_klass)
@bootex.synchronize do
if @booted
subscriber_klass.create_subscription
@booted_subscribers << subscriber_klass
else
@unbooted_subscribers << subscriber_klass
end
end
end

# Boot the publishers.
def boot_publishers!
@bootex.synchronize do
@unbooted_publishers.each do |pk|
pk.validate
@booted_publishers << pk
end
@unbooted_publishers = Set.new
end
end

# Boot the subscribers.
def boot_subscribers!
@bootex.synchronize do
@unbooted_subscribers.each do |sk|
sk.create_subscription
@booted_subscribers << sk
end
@unbooted_subscribers = Set.new
end
end
end
end
7 changes: 7 additions & 0 deletions lib/event_source/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,12 @@ def add_subscribe_operation(async_api_channel_item)
)
logger.info " Subscribe Operation Added: #{operation_id}"
end

def cancel_consumers
subscribe_operations.each_value do |sub_op|
subject = sub_op.subject
subject.cancel_consumers! if subject.respond_to?(:cancel_consumers!)
end
end
end
end
3 changes: 2 additions & 1 deletion lib/event_source/configure/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ class Config

def initialize
@log_level = :warn
@shutdown_timeouts = { amqp_drain: 5, http_drain: 5 }
end

# TODO: add default for pub_sub_root
attr_writer :pub_sub_root, :protocols, :server_configurations
attr_accessor :app_name, :log_level
attr_accessor :app_name, :log_level, :auto_shutdown, :shutdown_timeouts

def load_protocols
@protocols.each do |protocol|
Expand Down
46 changes: 46 additions & 0 deletions lib/event_source/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,54 @@ def drop_connection(connection_uri)
connections
end

def cancel_consumers_for(protocol, timeout: 5)
protocol_value = protocol.to_s.upcase
logger.info "Cancelling #{protocol_value} consumers prior to shutdown"
connections = connections_for(protocol)
any_consumers = connections.any? { |connection| connection_has_consumers?(connection) }
logger.info "#{protocol_value} inflight handlers (before_cancel): #{EventSource.inflight_messages_count}, any_consumers=#{any_consumers}"

connections.each do |connection|
connection.channels.each_value do |channel|
channel.cancel_consumers
end
end

wait_for_connections_to_drain(connections, timeout)

logger.info "#{protocol_value} consumer cancellation complete"
logger.info "#{protocol_value} inflight handlers (end): #{EventSource.inflight_messages_count}"
end

private

def wait_for_connections_to_drain(connections, timeout)
return if connections.empty?

protocol = connections.first.protocol.to_s.upcase
start = Time.now

loop do
inflight = EventSource.inflight_messages_count
any_consumers = connections.any? { |connection| connection_has_consumers?(connection) }

logger.info "#{protocol} inflight handlers (drain): #{inflight}, any_consumers=#{any_consumers}"
break if inflight.zero? && !any_consumers
break if (Time.now - start) >= timeout
sleep 0.1
end
end

def connection_has_consumers?(connection)
connection.channels.values.any? do |channel|
begin
channel.channel_proxy.subject.any_consumers?
rescue StandardError
false
end
end
end

# Find connection proxy class for given protocol
# @param [Symbol] protocol the protocol name, `:http` or `:amqp`
# @return [Class] Protocol Specific Connection Proxy Class
Expand Down
2 changes: 2 additions & 0 deletions lib/event_source/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,7 @@ class Error < StandardError
ServerConfigurationNotFound = Class.new(Error)
ServerConfigurationInvalid = Class.new(Error)
MessageBuildError = Class.new(Error)
PayloadEncodeError = Class.new(Error)
PayloadDecodeError = Class.new(Error)
end
end
98 changes: 98 additions & 0 deletions lib/event_source/operations/mime_decode.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# frozen_string_literal: true

require "dry/monads"
require "dry/monads/do"

module EventSource
module Operations
# Operation for decoding payloads, including decompression using Zlib.
class MimeDecode
include Dry::Monads[:result, :do]
include EventSource::Logging

# Supported MIME types for decoding.
MIME_TYPES = %w[application/zlib application/json].freeze

# Decodes the payload based on the specified MIME type.
# For example, decompresses the payload using Zlib for 'application/zlib'.
#
# @param mime_type [String] the MIME type of the payload (e.g., 'application/zlib', 'application/json')
# @param payload [String] the encoded payload to decode
#
# @return [Dry::Monads::Success<String>] if decoding is successful
# @return [Dry::Monads::Failure<String>] if an error occurs (e.g., invalid MIME type, decoding failure)
def call(mime_type, payload)
valid_payload, mime_type = yield validate_payload(payload, mime_type.to_s)
decoded_data = yield decode(valid_payload, mime_type)

Success(decoded_data)
end

private

# Validates the payload based on the MIME type.
# Ensures the payload is binary-encoded for 'application/zlib' MIME type.
#
# @param payload [String] the payload to validate
# @param mime_type [String] the MIME type of the payload
#
# @return [Dry::Monads::Success<String>] if the payload is valid
# @return [Dry::Monads::Failure<String>] if the payload is invalid
def validate_payload(payload, mime_type)
unless MIME_TYPES.include?(mime_type)
return Failure("Invalid MIME type '#{mime_type}'. Supported types are: #{MIME_TYPES.join(', ')}.")
end

# Allow JSON string payloads to pass validation to avoid processing failures
# for existing JSON messages in the queue. These messages may have been queued
# with the wrong MIME type ('application/zlib') but are still valid JSON.
if mime_type == 'application/zlib'
return Failure("Payload must be binary-encoded for MIME type 'application/zlib'.") unless binary_payload?(payload) || valid_json_string?(payload)
end

Success([payload, mime_type])
end

# Decodes the payload based on the specified MIME type.
# For 'application/zlib', it attempts to decompress the payload using Zlib.
# If decompression fails due to an error, the original payload is returned unmodified.
#
# @param payload [String] the payload to decode
# @param mime_type [String] the MIME type of the payload
#
# @return [Dry::Monads::Success<String>] if decoding is successful or if the MIME type is not 'application/zlib'.
# @return [Dry::Monads::Success<String>] if decompression fails, returning the original payload without modification.
#
# @note If the MIME type is 'application/zlib' and decompression fails, the original payload is returned as is, and no error is raised.
def decode(payload, mime_type)
return Success(payload) unless mime_type == 'application/zlib'

begin
decoded_data = Zlib.inflate(payload)
Success(decoded_data)
rescue Zlib::Error => e
logger.error "Zlib errored while inflating payload: #{payload} \n with #{e.class}: #{e.message}, \n returning original payload."
Success(payload)
end
end

# Checks whether the payload is binary-encoded.
#
# @param payload [String] the payload to check
#
# @return [Boolean] true if the payload is binary-encoded, false otherwise
def binary_payload?(payload)
return false unless payload.respond_to?(:encoding)

payload.encoding == Encoding::BINARY
end

def valid_json_string?(data)
data.is_a?(String) && JSON.parse(data)
true
rescue JSON::ParserError
false
end
end
end
end
Loading
Loading