diff --git a/shard.yml b/shard.yml index 4fabd2c5..ee8349b4 100644 --- a/shard.yml +++ b/shard.yml @@ -1,5 +1,5 @@ name: placeos-driver -version: 7.17.4 +version: 7.17.5 dependencies: action-controller: diff --git a/src/placeos-driver/protocol.cr b/src/placeos-driver/protocol.cr index 77ea910c..2b1aff7a 100644 --- a/src/placeos-driver/protocol.cr +++ b/src/placeos-driver/protocol.cr @@ -6,14 +6,9 @@ require "tokenizer" require "./protocol/request" -STDIN.blocking = false STDIN.sync = false - STDERR.flush_on_newline = false -STDERR.blocking = false -STDERR.sync = true # we mark this as false if in use for protocol comms - -STDOUT.blocking = false +STDERR.sync = true STDOUT.sync = true # :nodoc: diff --git a/src/placeos-driver/subscriptions.cr b/src/placeos-driver/subscriptions.cr index adfaf731..6ab85765 100644 --- a/src/placeos-driver/subscriptions.cr +++ b/src/placeos-driver/subscriptions.cr @@ -119,11 +119,16 @@ class PlaceOS::Driver end private def monitor_changes + monitor_count = 0 + SimpleRetry.try_to( base_interval: 1.second, max_interval: 5.seconds, randomise: 500.milliseconds ) do + return if terminated? + monitor_count += 1 + subscribe_count = monitor_count wait = Channel(Nil).new begin # This will run on redis reconnect @@ -134,10 +139,12 @@ class PlaceOS::Driver # re-subscribe to existing subscriptions here # NOTE:: sending an empty array errors keys = mutex.synchronize { subscriptions.keys } - redis.subscribe(keys) if keys.size > 0 + if keys.size > 0 + redis.subscribe(keys) + end spawn(same_thread: true) { - # re-check indirect subscriptions + # re-check indirect subscriptions, these are registered by the subscriptions above redirections.each_key do |system_id| remap_indirect(system_id) end @@ -145,28 +152,18 @@ class PlaceOS::Driver @running = true - # TODO:: check for any value changes - # disconnect might have been a network partition and an update may - # have occurred during this time gap - while details = subscription_channel.receive? sub, chan = details begin - SimpleRetry.try_to( - max_attempts: 4, - base_interval: 20.milliseconds, - max_interval: 1.seconds, - randomise: 80.milliseconds - ) do - if sub - redis.subscribe [chan] - else - redis.unsubscribe [chan] - end + if sub + redis.subscribe [chan] + else + redis.unsubscribe [chan] end rescue error Log.fatal(exception: error) { "redis subscription failed... some components may not function correctly" } + redis.close end end } @@ -175,8 +172,19 @@ class PlaceOS::Driver # requires a block and subsequent ones throw an error with a block. # NOTE:: this version of subscribe only supports splat arguments redis.subscribe(SYSTEM_ORDER_UPDATE) do |on| + raise "redis reconnect detected" if subscribe_count != monitor_count + subscribe_count += 1 + on.message { |c, m| on_message(c, m) } - spawn(same_thread: true) { wait.close } + spawn(same_thread: true) do + instance = monitor_count + wait.close + loop do + sleep 1.second + break if instance != monitor_count + subscription_channel.send({true, SYSTEM_ORDER_UPDATE}) + end + end end raise "no subscriptions, restarting loop" unless terminated? @@ -193,8 +201,15 @@ class PlaceOS::Driver @running = false - # We need to re-create the subscribe object for our sanity - handle_disconnect unless terminated? + case client = @redis + in ::Redis::Cluster::Client + client.close! + in Redis + client.close rescue nil + in Nil + end + @redis_cluster = nil + @redis = nil end end end @@ -211,14 +226,16 @@ class PlaceOS::Driver new_channel = sub.subscribe_to # Unsubscribe if channel changed - if current_channel && current_channel != new_channel - perform_unsubscribe(sub, current_channel) - else - subscribed = true + if current_channel + if current_channel != new_channel + perform_unsubscribe(sub, current_channel) + else + subscribed = true + end end end - perform_subscribe(sub) if !subscribed + perform_subscribe(sub) unless subscribed end end end @@ -271,15 +288,15 @@ class PlaceOS::Driver @redis : Redis? = nil protected def self.new_clustered_redis - Redis::Client.boot(ENV["REDIS_URL"]? || "redis://localhost:6379") + Redis::Client.boot(ENV["REDIS_URL"]? || "redis://localhost:6379", reconnect: false) end private def redis_cluster - @redis_cluster_client ||= Subscriptions.new_clustered_redis + @redis_cluster ||= Subscriptions.new_clustered_redis end protected def self.new_redis(cluster : Redis::Client = new_clustered_redis) : Redis - client = new_clustered_redis.connect! + client = cluster.connect! if client.is_a?(::Redis::Cluster::Client) client.cluster_info.each_nodes do |node_info| @@ -293,14 +310,9 @@ class PlaceOS::Driver # Could not connect to any nodes in cluster raise Redis::ConnectionError.new else - cluster.close! - cluster.connect!.as(Redis) + client.as(Redis) end end - - private def handle_disconnect - @redis = Subscriptions.new_redis(redis_cluster) - end end end diff --git a/src/placeos-driver/transport/http.cr b/src/placeos-driver/transport/http.cr index f86dd9a2..9f62b48a 100644 --- a/src/placeos-driver/transport/http.cr +++ b/src/placeos-driver/transport/http.cr @@ -130,7 +130,7 @@ class PlaceOS::Driver max_requests = @settings.get { setting?(Int32, :http_max_requests) } || 1000 @max_requests = max_requests - @client_idle = Time.monotonic + @client_idle = Time.instant @client_requests = 0 @tls = __is_https? ? new_tls_context : nil @@ -139,7 +139,7 @@ class PlaceOS::Driver @params_base : URI::Params @client : ConnectProxy::HTTPClient - @client_idle : Time::Span + @client_idle : Time::Instant @keep_alive : Time::Span @max_requests : Int32 @ip : String @@ -179,7 +179,7 @@ class PlaceOS::Driver protected def with_shared_client(&) @http_client_mutex.synchronize do - now = Time.monotonic + now = Time.instant idle_for = now - @client_idle __new_http_client if @client.__place_socket_invalid? || idle_for >= @keep_alive || @client_requests >= @max_requests @client_idle = now