Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion shard.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: placeos-driver
version: 7.17.4
version: 7.17.5

dependencies:
action-controller:
Expand Down
7 changes: 1 addition & 6 deletions src/placeos-driver/protocol.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,9 @@ require "tokenizer"

require "./protocol/request"

STDIN.blocking = false
STDIN.sync = false

STDERR.flush_on_newline = false
STDERR.blocking = false
STDERR.sync = true # we mark this as false if in use for protocol comms

STDOUT.blocking = false
STDERR.sync = true
STDOUT.sync = true

# :nodoc:
Expand Down
80 changes: 46 additions & 34 deletions src/placeos-driver/subscriptions.cr
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,16 @@ class PlaceOS::Driver
end

private def monitor_changes
monitor_count = 0

SimpleRetry.try_to(
base_interval: 1.second,
max_interval: 5.seconds,
randomise: 500.milliseconds
) do
return if terminated?
monitor_count += 1
subscribe_count = monitor_count
wait = Channel(Nil).new
begin
# This will run on redis reconnect
Expand All @@ -134,39 +139,31 @@ class PlaceOS::Driver
# re-subscribe to existing subscriptions here
# NOTE:: sending an empty array errors
keys = mutex.synchronize { subscriptions.keys }
redis.subscribe(keys) if keys.size > 0
if keys.size > 0
redis.subscribe(keys)
end

spawn(same_thread: true) {
# re-check indirect subscriptions
# re-check indirect subscriptions, these are registered by the subscriptions above
redirections.each_key do |system_id|
remap_indirect(system_id)
end
}

@running = true

# TODO:: check for any value changes
# disconnect might have been a network partition and an update may
# have occurred during this time gap

while details = subscription_channel.receive?
sub, chan = details

begin
SimpleRetry.try_to(
max_attempts: 4,
base_interval: 20.milliseconds,
max_interval: 1.seconds,
randomise: 80.milliseconds
) do
if sub
redis.subscribe [chan]
else
redis.unsubscribe [chan]
end
if sub
redis.subscribe [chan]
else
redis.unsubscribe [chan]
end
rescue error
Log.fatal(exception: error) { "redis subscription failed... some components may not function correctly" }
redis.close
end
end
}
Expand All @@ -175,8 +172,19 @@ class PlaceOS::Driver
# requires a block and subsequent ones throw an error with a block.
# NOTE:: this version of subscribe only supports splat arguments
redis.subscribe(SYSTEM_ORDER_UPDATE) do |on|
raise "redis reconnect detected" if subscribe_count != monitor_count
subscribe_count += 1

on.message { |c, m| on_message(c, m) }
spawn(same_thread: true) { wait.close }
spawn(same_thread: true) do
instance = monitor_count
wait.close
loop do
sleep 1.second
break if instance != monitor_count
subscription_channel.send({true, SYSTEM_ORDER_UPDATE})
end
end
end

raise "no subscriptions, restarting loop" unless terminated?
Expand All @@ -193,8 +201,15 @@ class PlaceOS::Driver

@running = false

# We need to re-create the subscribe object for our sanity
handle_disconnect unless terminated?
case client = @redis
in ::Redis::Cluster::Client
client.close!
in Redis
client.close rescue nil
in Nil
end
@redis_cluster = nil
@redis = nil
end
end
end
Expand All @@ -211,14 +226,16 @@ class PlaceOS::Driver
new_channel = sub.subscribe_to

# Unsubscribe if channel changed
if current_channel && current_channel != new_channel
perform_unsubscribe(sub, current_channel)
else
subscribed = true
if current_channel
if current_channel != new_channel
perform_unsubscribe(sub, current_channel)
else
subscribed = true
end
end
end

perform_subscribe(sub) if !subscribed
perform_subscribe(sub) unless subscribed
end
end
end
Expand Down Expand Up @@ -271,15 +288,15 @@ class PlaceOS::Driver
@redis : Redis? = nil

protected def self.new_clustered_redis
Redis::Client.boot(ENV["REDIS_URL"]? || "redis://localhost:6379")
Redis::Client.boot(ENV["REDIS_URL"]? || "redis://localhost:6379", reconnect: false)
end

private def redis_cluster
@redis_cluster_client ||= Subscriptions.new_clustered_redis
@redis_cluster ||= Subscriptions.new_clustered_redis
end

protected def self.new_redis(cluster : Redis::Client = new_clustered_redis) : Redis
client = new_clustered_redis.connect!
client = cluster.connect!

if client.is_a?(::Redis::Cluster::Client)
client.cluster_info.each_nodes do |node_info|
Expand All @@ -293,14 +310,9 @@ class PlaceOS::Driver
# Could not connect to any nodes in cluster
raise Redis::ConnectionError.new
else
cluster.close!
cluster.connect!.as(Redis)
client.as(Redis)
end
end

private def handle_disconnect
@redis = Subscriptions.new_redis(redis_cluster)
end
end
end

Expand Down
6 changes: 3 additions & 3 deletions src/placeos-driver/transport/http.cr
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class PlaceOS::Driver

max_requests = @settings.get { setting?(Int32, :http_max_requests) } || 1000
@max_requests = max_requests
@client_idle = Time.monotonic
@client_idle = Time.instant
@client_requests = 0

@tls = __is_https? ? new_tls_context : nil
Expand All @@ -139,7 +139,7 @@ class PlaceOS::Driver

@params_base : URI::Params
@client : ConnectProxy::HTTPClient
@client_idle : Time::Span
@client_idle : Time::Instant
@keep_alive : Time::Span
@max_requests : Int32
@ip : String
Expand Down Expand Up @@ -179,7 +179,7 @@ class PlaceOS::Driver

protected def with_shared_client(&)
@http_client_mutex.synchronize do
now = Time.monotonic
now = Time.instant
idle_for = now - @client_idle
__new_http_client if @client.__place_socket_invalid? || idle_for >= @keep_alive || @client_requests >= @max_requests
@client_idle = now
Expand Down
Loading