Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ finished. Application completion is signalled by a `CHANNEL_EXECUTE_COMPLETE`
event:

```
Listener → FS: sendmsg
Listener → FS: sendmsg <uuid>
call-command: execute
execute-app-name: playback
execute-app-arg: welcome.wav
Expand All @@ -270,18 +270,19 @@ being processed until the current application completes.

### Two fibers per connection

Librevox runs two fibers for each outbound connection:
Librevox runs two fibers for each connection:

- **Session fiber** (`run_session`) — runs the setup sequence and then
`session_initiated`. Each `send_message` or `application` call blocks the fiber
until the reply arrives.
- **Read fiber** (`read_loop`) — reads messages from the socket and dispatches
them to `Async::Queue` instances, waking the session fiber.

An `Async::Semaphore(1)` mutex on `send_message` ensures only one command is
in-flight at a time, so replies are always delivered to the correct caller.
This also serializes commands issued by event hooks (which run in their own
fibers) with the main session flow.
`session_initiated`. Each `send_message` or `application` call creates an
`Async::Promise`, pushes it onto an array, and blocks the fiber until the
promise is resolved.
- **Read fiber** (`read_loop`) — reads messages from the socket and resolves
promises in FIFO order, waking the session fiber.

No mutex is needed — Ruby's cooperative fiber scheduling guarantees that the
promise push happens before the I/O yield point (the socket write), so
interleaving from concurrent event-hook fibers is safe. When a connection
drops, pending promises are rejected with `ConnectionError`.

## API Documentation

Expand Down
1 change: 1 addition & 0 deletions lib/librevox.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class ConnectionError < StandardError; end

autoload :Client, 'librevox/client'
autoload :CommandSocket, 'librevox/command_socket'
autoload :CommandDelegate, 'librevox/command_delegate'
autoload :Commands, 'librevox/commands'
autoload :Applications, 'librevox/applications'
autoload :Runner, 'librevox/runner'
Expand Down
18 changes: 13 additions & 5 deletions lib/librevox/client.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
# frozen_string_literal: true

require 'io/stream'
require 'io/endpoint/host_endpoint'

module Librevox
class Client
def self.start(handler, host: "localhost", port: 8021, **options)
endpoint = IO::Endpoint.tcp(host, port)
new(handler, endpoint, **options).run
end

def initialize(handler, endpoint, **options)
@handler = handler
@endpoint = endpoint
Expand All @@ -21,7 +27,7 @@ def connect(socket)
def run
loop do
@endpoint.connect(&method(:connect))
rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET, ConnectionError => e
rescue IOError, Errno::ECONNREFUSED, Errno::ECONNRESET, ConnectionError, ResponseError => e
Librevox.logger.error "Connection lost: #{e.message}. Reconnecting in 1s."
sleep 1
end
Expand All @@ -35,6 +41,8 @@ def handle_session(connection, listener)
listener.run_session

read_task.wait
rescue ConnectionError
# Expected when the connection drops
ensure
read_task&.stop
connection.close
Expand All @@ -44,17 +52,17 @@ def start_read_loop(connection, listener)
Async do
read_messages(connection, listener)
ensure
# Close queues here (not in handle_session's ensure) so that
# a connection drop unblocks listener.run_session via nil dequeue.
# Reject pending promises here (not in handle_session's ensure) so
# that a connection drop unblocks listener.run_session via rejection.
# handle_session's ensure can't run until run_session returns,
# creating a deadlock if queues aren't closed from this fiber.
# creating a deadlock if promises aren't rejected from this fiber.
listener.connection_closed
end
end

def read_messages(connection, listener)
connection.read_loop do |msg|
listener.receive_message(msg)
listener.receive_data(msg)
end
end
end
Expand Down
19 changes: 19 additions & 0 deletions lib/librevox/command_delegate.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# frozen_string_literal: true

module Librevox
# In some cases there are both applications and commands with the same
# name, e.g. fifo. But we can't have two `fifo`-methods, so we include
# commands in CommandDelegate, and expose all commands through the `api`
# method, which wraps a CommandDelegate instance.
class CommandDelegate
include Librevox::Commands

def initialize(listener)
@listener = listener
end

def command(*args)
@listener.send_message(super(*args))
end
end
end
12 changes: 6 additions & 6 deletions lib/librevox/command_socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def connect
end

def send_message(msg)
@connection.send_message(msg)
@connection.send_data(msg)
read_response
end

Expand All @@ -32,19 +32,19 @@ def command(*args)
end

def read_response
while msg = @connection.read_message
while msg = @connection.receive_data
return msg if msg.command_reply? || msg.api_response?
end
end

def application(uuid, app, args = nil, **params)
headers = params
.merge(
def application(app, uuid, args = nil, **params)
headers = {
event_lock: true,
call_command: "execute",
execute_app_name: app,
execute_app_arg: args,
)
}
.merge(params)
.map { |key, value| "#{key.to_s.tr('_', '-')}: #{value}" }

send_message "sendmsg #{uuid}\n#{headers.join("\n")}"
Expand Down
104 changes: 58 additions & 46 deletions lib/librevox/listener/base.rb
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
# frozen_string_literal: true

require 'async/queue'
require 'async/semaphore'
require 'async/barrier'
require 'securerandom'

module Librevox
module Listener
class Base
def initialize(connection)
@connection = connection
@reply_queue = Async::Queue.new
@command_mutex = Async::Semaphore.new(1)
@event_barrier = Async::Barrier.new
end

class << self
def hooks
@hooks ||= Hash.new {|hash, key| hash[key] = []}
Expand All @@ -24,20 +16,11 @@ def event(event, &block)
end
end

# In some cases there are both applications and commands with the same
# name, e.g. fifo. But we can't have two `fifo`-methods, so we include
# commands in CommandDelegate, and expose all commands through the `api`
# method, which wraps a CommandDelegate instance.
class CommandDelegate
include Librevox::Commands

def initialize(listener)
@listener = listener
end

def command(*args)
@listener.send_message(super(*args))
end
def initialize(connection)
@connection = connection
@reply_promises = []
@app_promises = {}
@event_barrier = Async::Barrier.new
end

# Exposes an instance of {CommandDelegate}, which includes {Librevox::Commands}.
Expand All @@ -51,47 +34,70 @@ def api
end

def send_message(msg)
@command_mutex.acquire do
@connection.send_message(msg)
reply = @reply_queue.dequeue
raise ConnectionError, "Connection closed" if reply.nil?
raise ResponseError, reply.headers[:reply_text] if reply.error?
reply
end
promise = Async::Promise.new

@reply_promises << promise

@connection.send_data(msg)

reply = promise.wait
raise ResponseError, reply.headers[:reply_text] if reply.error?

reply
end

attr_accessor :response
def execute_app(app, uuid, args = nil, **params)
event_uuid = SecureRandom.uuid

headers = {
event_lock: true,
call_command: "execute",
execute_app_name: app,
execute_app_arg: args,
event_uuid: event_uuid,
}
.merge(params)
.map { |key, value| "#{key.to_s.tr('_', '-')}: #{value}" }

def receive_message(response)
@response = response
handle_response
promise = Async::Promise.new
@app_promises[event_uuid] = promise

send_message "sendmsg #{uuid}\n#{headers.join("\n")}"

promise.wait
end

def handle_response
def receive_data(response)
if response.reply?
@reply_queue.push(response)
@reply_promises.shift&.resolve(response)
return
end

if response.event?
resp = response
if response.event == "CHANNEL_EXECUTE_COMPLETE"
app_uuid = response.content[:application_uuid]
@app_promises.delete(app_uuid)&.resolve(response)
end

@event_barrier.async do
on_event(resp)
invoke_event_hooks(resp)
on_event(response)
invoke_event_hooks(response)
end
end
end

# override
def on_event(event)
end
def connection_closed
error = ConnectionError.new("Connection closed")

def run_session
end
@reply_promises.each { |p| p.reject(error) }
@app_promises.each_value { |p| p.reject(error) }

@reply_promises.clear
@app_promises.clear

def connection_closed
@reply_queue.close
@event_barrier.wait
rescue ConnectionError
# Expected — event hooks may have been mid-command when disconnected
end

def disconnect
Expand All @@ -100,6 +106,12 @@ def disconnect

private

def on_event(event)
end

def run_session
end

def invoke_event_hooks(resp)
event_name = resp.event.downcase.to_sym
hooks = self.class.hooks[event_name]
Expand Down
8 changes: 2 additions & 6 deletions lib/librevox/listener/inbound.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# frozen_string_literal: true

require 'io/endpoint/host_endpoint'

module Librevox
module Listener
class Inbound < Base
Expand All @@ -18,10 +16,8 @@ def filters(filters)
end
end

def self.run(barrier, host: "localhost", port: 8021, **options)
endpoint = IO::Endpoint.tcp(host, port)
client = Client.new(self, endpoint, **options)
barrier.async { client.run }
def self.start(...)
Client.start(self, ...)
Comment on lines +19 to +20
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Den syntaks kendte jeg ikke, men nice

end

def initialize(connection, args = {})
Expand Down
Loading
Loading