From 42d66c1f2725e9c462a2bd7bf95a0faed2e27590 Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Thu, 5 Feb 2026 18:27:17 +1100 Subject: [PATCH 1/9] feat(subscriptions): refactor for clarity and debugging --- src/placeos-driver/protocol.cr | 7 +---- src/placeos-driver/subscriptions.cr | 47 +++++++++++++++------------- src/placeos-driver/transport/http.cr | 6 ++-- 3 files changed, 30 insertions(+), 30 deletions(-) diff --git a/src/placeos-driver/protocol.cr b/src/placeos-driver/protocol.cr index 77ea910c..2b1aff7a 100644 --- a/src/placeos-driver/protocol.cr +++ b/src/placeos-driver/protocol.cr @@ -6,14 +6,9 @@ require "tokenizer" require "./protocol/request" -STDIN.blocking = false STDIN.sync = false - STDERR.flush_on_newline = false -STDERR.blocking = false -STDERR.sync = true # we mark this as false if in use for protocol comms - -STDOUT.blocking = false +STDERR.sync = true STDOUT.sync = true # :nodoc: diff --git a/src/placeos-driver/subscriptions.cr b/src/placeos-driver/subscriptions.cr index adfaf731..6318e95d 100644 --- a/src/placeos-driver/subscriptions.cr +++ b/src/placeos-driver/subscriptions.cr @@ -124,6 +124,8 @@ class PlaceOS::Driver max_interval: 5.seconds, randomise: 500.milliseconds ) do + puts "\n\nSTART SUBSCRIPTION MONITORING\n\n" + return if terminated? wait = Channel(Nil).new begin # This will run on redis reconnect @@ -134,10 +136,13 @@ class PlaceOS::Driver # re-subscribe to existing subscriptions here # NOTE:: sending an empty array errors keys = mutex.synchronize { subscriptions.keys } - redis.subscribe(keys) if keys.size > 0 + if keys.size > 0 + puts "\n\nFOUND #{keys} EXISTING SUBSCRIPTIONS\n\n" + redis.subscribe(keys) + end spawn(same_thread: true) { - # re-check indirect subscriptions + # re-check indirect subscriptions, these are registered by the subscriptions above redirections.each_key do |system_id| remap_indirect(system_id) end @@ -145,10 +150,6 @@ class PlaceOS::Driver @running = true - # TODO:: check for any value changes - # disconnect might have been a network partition and an update may - # have occurred during this time gap - while details = subscription_channel.receive? sub, chan = details @@ -182,6 +183,7 @@ class PlaceOS::Driver raise "no subscriptions, restarting loop" unless terminated? rescue e Log.warn(exception: e) { "redis subscription loop exited" } + puts "\n\nERROR SUBSCRIPTION MONITORING: #{e.message}\n\n" raise e ensure wait.close @@ -193,8 +195,14 @@ class PlaceOS::Driver @running = false - # We need to re-create the subscribe object for our sanity - handle_disconnect unless terminated? + case client = @redis + in ::Redis::Cluster::Client + client.close! + in Redis + client.close rescue nil + in Nil + end + @redis = nil end end end @@ -211,14 +219,16 @@ class PlaceOS::Driver new_channel = sub.subscribe_to # Unsubscribe if channel changed - if current_channel && current_channel != new_channel - perform_unsubscribe(sub, current_channel) - else - subscribed = true + if current_channel + if current_channel != new_channel + perform_unsubscribe(sub, current_channel) + else + subscribed = true + end end end - perform_subscribe(sub) if !subscribed + perform_subscribe(sub) unless subscribed end end end @@ -275,11 +285,11 @@ class PlaceOS::Driver end private def redis_cluster - @redis_cluster_client ||= Subscriptions.new_clustered_redis + @redis_cluster ||= Subscriptions.new_clustered_redis end protected def self.new_redis(cluster : Redis::Client = new_clustered_redis) : Redis - client = new_clustered_redis.connect! + client = cluster.connect! if client.is_a?(::Redis::Cluster::Client) client.cluster_info.each_nodes do |node_info| @@ -293,14 +303,9 @@ class PlaceOS::Driver # Could not connect to any nodes in cluster raise Redis::ConnectionError.new else - cluster.close! - cluster.connect!.as(Redis) + client.as(Redis) end end - - private def handle_disconnect - @redis = Subscriptions.new_redis(redis_cluster) - end end end diff --git a/src/placeos-driver/transport/http.cr b/src/placeos-driver/transport/http.cr index f86dd9a2..9f62b48a 100644 --- a/src/placeos-driver/transport/http.cr +++ b/src/placeos-driver/transport/http.cr @@ -130,7 +130,7 @@ class PlaceOS::Driver max_requests = @settings.get { setting?(Int32, :http_max_requests) } || 1000 @max_requests = max_requests - @client_idle = Time.monotonic + @client_idle = Time.instant @client_requests = 0 @tls = __is_https? ? new_tls_context : nil @@ -139,7 +139,7 @@ class PlaceOS::Driver @params_base : URI::Params @client : ConnectProxy::HTTPClient - @client_idle : Time::Span + @client_idle : Time::Instant @keep_alive : Time::Span @max_requests : Int32 @ip : String @@ -179,7 +179,7 @@ class PlaceOS::Driver protected def with_shared_client(&) @http_client_mutex.synchronize do - now = Time.monotonic + now = Time.instant idle_for = now - @client_idle __new_http_client if @client.__place_socket_invalid? || idle_for >= @keep_alive || @client_requests >= @max_requests @client_idle = now From 86aa54d3ca4cbfb7614cae4b092218753ec5ba8d Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Thu, 5 Feb 2026 22:57:21 +1100 Subject: [PATCH 2/9] feat: add ping to subscribe loop --- src/placeos-driver/subscriptions.cr | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/placeos-driver/subscriptions.cr b/src/placeos-driver/subscriptions.cr index 6318e95d..bfb327a3 100644 --- a/src/placeos-driver/subscriptions.cr +++ b/src/placeos-driver/subscriptions.cr @@ -119,6 +119,8 @@ class PlaceOS::Driver end private def monitor_changes + monitor_count = 0 + SimpleRetry.try_to( base_interval: 1.second, max_interval: 5.seconds, @@ -126,6 +128,7 @@ class PlaceOS::Driver ) do puts "\n\nSTART SUBSCRIPTION MONITORING\n\n" return if terminated? + monitor_count += 1 wait = Channel(Nil).new begin # This will run on redis reconnect @@ -153,6 +156,11 @@ class PlaceOS::Driver while details = subscription_channel.receive? sub, chan = details + if chan == "ping" + redis.ping + next + end + begin SimpleRetry.try_to( max_attempts: 4, @@ -177,7 +185,15 @@ class PlaceOS::Driver # NOTE:: this version of subscribe only supports splat arguments redis.subscribe(SYSTEM_ORDER_UPDATE) do |on| on.message { |c, m| on_message(c, m) } - spawn(same_thread: true) { wait.close } + spawn(same_thread: true) do + instance = monitor_count + wait.close + loop do + sleep 1.second + break if instance != monitor_count + subscription_channel.send({true, "ping"}) rescue nil + end + end end raise "no subscriptions, restarting loop" unless terminated? From 35a93c2667faffe638087dea01fc5d3397e1e464 Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Thu, 5 Feb 2026 23:43:39 +1100 Subject: [PATCH 3/9] feat: delay ping on subscribe loop --- src/placeos-driver/subscriptions.cr | 1 + 1 file changed, 1 insertion(+) diff --git a/src/placeos-driver/subscriptions.cr b/src/placeos-driver/subscriptions.cr index bfb327a3..e90d8cd2 100644 --- a/src/placeos-driver/subscriptions.cr +++ b/src/placeos-driver/subscriptions.cr @@ -188,6 +188,7 @@ class PlaceOS::Driver spawn(same_thread: true) do instance = monitor_count wait.close + sleep 10.seconds loop do sleep 1.second break if instance != monitor_count From b9ec6a1c8b787a102697a474ef449f4031925578 Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Fri, 6 Feb 2026 00:05:20 +1100 Subject: [PATCH 4/9] feat: use duplicate subscribe as no-op:wq --- src/placeos-driver/subscriptions.cr | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/placeos-driver/subscriptions.cr b/src/placeos-driver/subscriptions.cr index e90d8cd2..3222c202 100644 --- a/src/placeos-driver/subscriptions.cr +++ b/src/placeos-driver/subscriptions.cr @@ -156,11 +156,6 @@ class PlaceOS::Driver while details = subscription_channel.receive? sub, chan = details - if chan == "ping" - redis.ping - next - end - begin SimpleRetry.try_to( max_attempts: 4, @@ -188,11 +183,10 @@ class PlaceOS::Driver spawn(same_thread: true) do instance = monitor_count wait.close - sleep 10.seconds loop do sleep 1.second break if instance != monitor_count - subscription_channel.send({true, "ping"}) rescue nil + subscription_channel.send({true, SYSTEM_ORDER_UPDATE}) rescue nil end end end From e5bc8c76bd31b1aa2d72ee49a4de6da264b49583 Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Fri, 6 Feb 2026 00:21:05 +1100 Subject: [PATCH 5/9] feat: add more logging --- src/placeos-driver/subscriptions.cr | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/src/placeos-driver/subscriptions.cr b/src/placeos-driver/subscriptions.cr index 3222c202..8ff08f3b 100644 --- a/src/placeos-driver/subscriptions.cr +++ b/src/placeos-driver/subscriptions.cr @@ -157,17 +157,10 @@ class PlaceOS::Driver sub, chan = details begin - SimpleRetry.try_to( - max_attempts: 4, - base_interval: 20.milliseconds, - max_interval: 1.seconds, - randomise: 80.milliseconds - ) do - if sub - redis.subscribe [chan] - else - redis.unsubscribe [chan] - end + if sub + redis.subscribe [chan] + else + redis.unsubscribe [chan] end rescue error Log.fatal(exception: error) { "redis subscription failed... some components may not function correctly" } @@ -186,7 +179,10 @@ class PlaceOS::Driver loop do sleep 1.second break if instance != monitor_count - subscription_channel.send({true, SYSTEM_ORDER_UPDATE}) rescue nil + puts "PING SUBSCRIPTION REDIS" + subscription_channel.send({true, SYSTEM_ORDER_UPDATE}) + rescue error + puts "\n\nERROR PINGING REDIS: #{error.inspect_with_backtrace}\n\n" end end end From dde6ea2d49f30a3a761bcb0f2733ad291f893311 Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Fri, 6 Feb 2026 00:34:24 +1100 Subject: [PATCH 6/9] fix: close redis on write failure --- src/placeos-driver/subscriptions.cr | 1 + 1 file changed, 1 insertion(+) diff --git a/src/placeos-driver/subscriptions.cr b/src/placeos-driver/subscriptions.cr index 8ff08f3b..ea746140 100644 --- a/src/placeos-driver/subscriptions.cr +++ b/src/placeos-driver/subscriptions.cr @@ -164,6 +164,7 @@ class PlaceOS::Driver end rescue error Log.fatal(exception: error) { "redis subscription failed... some components may not function correctly" } + redis.close end end } From b8f6b2c6296bd203d99d82a8bfc0423525b63929 Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Fri, 6 Feb 2026 00:37:44 +1100 Subject: [PATCH 7/9] feat: add more logging --- src/placeos-driver/subscriptions.cr | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/placeos-driver/subscriptions.cr b/src/placeos-driver/subscriptions.cr index ea746140..1439c72c 100644 --- a/src/placeos-driver/subscriptions.cr +++ b/src/placeos-driver/subscriptions.cr @@ -173,14 +173,16 @@ class PlaceOS::Driver # requires a block and subsequent ones throw an error with a block. # NOTE:: this version of subscribe only supports splat arguments redis.subscribe(SYSTEM_ORDER_UPDATE) do |on| + monitor_count += 1 on.message { |c, m| on_message(c, m) } spawn(same_thread: true) do instance = monitor_count wait.close + puts "\n\n\nNEW SUBSCRIPTION LOOP!!!! #{monitor_count}\n\n\n" loop do sleep 1.second + puts "PING SUBSCRIPTION REDIS: #{monitor_count} == #{instance}" break if instance != monitor_count - puts "PING SUBSCRIPTION REDIS" subscription_channel.send({true, SYSTEM_ORDER_UPDATE}) rescue error puts "\n\nERROR PINGING REDIS: #{error.inspect_with_backtrace}\n\n" From 6ec02c89a350fab9f237ced7d5f81ec90e859218 Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Fri, 6 Feb 2026 09:47:33 +1100 Subject: [PATCH 8/9] feat: prevent reconnect on subscription loop --- src/placeos-driver/subscriptions.cr | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/placeos-driver/subscriptions.cr b/src/placeos-driver/subscriptions.cr index 1439c72c..20bb2454 100644 --- a/src/placeos-driver/subscriptions.cr +++ b/src/placeos-driver/subscriptions.cr @@ -129,6 +129,7 @@ class PlaceOS::Driver puts "\n\nSTART SUBSCRIPTION MONITORING\n\n" return if terminated? monitor_count += 1 + subscribe_count = monitor_count wait = Channel(Nil).new begin # This will run on redis reconnect @@ -140,7 +141,7 @@ class PlaceOS::Driver # NOTE:: sending an empty array errors keys = mutex.synchronize { subscriptions.keys } if keys.size > 0 - puts "\n\nFOUND #{keys} EXISTING SUBSCRIPTIONS\n\n" + puts "\n\nFOUND #{keys.size} EXISTING SUBSCRIPTIONS\n\n" redis.subscribe(keys) end @@ -173,7 +174,9 @@ class PlaceOS::Driver # requires a block and subsequent ones throw an error with a block. # NOTE:: this version of subscribe only supports splat arguments redis.subscribe(SYSTEM_ORDER_UPDATE) do |on| - monitor_count += 1 + raise "redis reconnect detected" if subscribe_count != monitor_count + subscribe_count += 1 + on.message { |c, m| on_message(c, m) } spawn(same_thread: true) do instance = monitor_count @@ -212,6 +215,7 @@ class PlaceOS::Driver client.close rescue nil in Nil end + @redis_cluster = nil @redis = nil end end @@ -291,7 +295,7 @@ class PlaceOS::Driver @redis : Redis? = nil protected def self.new_clustered_redis - Redis::Client.boot(ENV["REDIS_URL"]? || "redis://localhost:6379") + Redis::Client.boot(ENV["REDIS_URL"]? || "redis://localhost:6379", reconnect: false) end private def redis_cluster From f57eadb466e6a1be673d322817ae900030d662a6 Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Fri, 6 Feb 2026 10:34:13 +1100 Subject: [PATCH 9/9] chore: remove debugging lines --- shard.yml | 2 +- src/placeos-driver/subscriptions.cr | 7 ------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/shard.yml b/shard.yml index 4fabd2c5..ee8349b4 100644 --- a/shard.yml +++ b/shard.yml @@ -1,5 +1,5 @@ name: placeos-driver -version: 7.17.4 +version: 7.17.5 dependencies: action-controller: diff --git a/src/placeos-driver/subscriptions.cr b/src/placeos-driver/subscriptions.cr index 20bb2454..6ab85765 100644 --- a/src/placeos-driver/subscriptions.cr +++ b/src/placeos-driver/subscriptions.cr @@ -126,7 +126,6 @@ class PlaceOS::Driver max_interval: 5.seconds, randomise: 500.milliseconds ) do - puts "\n\nSTART SUBSCRIPTION MONITORING\n\n" return if terminated? monitor_count += 1 subscribe_count = monitor_count @@ -141,7 +140,6 @@ class PlaceOS::Driver # NOTE:: sending an empty array errors keys = mutex.synchronize { subscriptions.keys } if keys.size > 0 - puts "\n\nFOUND #{keys.size} EXISTING SUBSCRIPTIONS\n\n" redis.subscribe(keys) end @@ -181,14 +179,10 @@ class PlaceOS::Driver spawn(same_thread: true) do instance = monitor_count wait.close - puts "\n\n\nNEW SUBSCRIPTION LOOP!!!! #{monitor_count}\n\n\n" loop do sleep 1.second - puts "PING SUBSCRIPTION REDIS: #{monitor_count} == #{instance}" break if instance != monitor_count subscription_channel.send({true, SYSTEM_ORDER_UPDATE}) - rescue error - puts "\n\nERROR PINGING REDIS: #{error.inspect_with_backtrace}\n\n" end end end @@ -196,7 +190,6 @@ class PlaceOS::Driver raise "no subscriptions, restarting loop" unless terminated? rescue e Log.warn(exception: e) { "redis subscription loop exited" } - puts "\n\nERROR SUBSCRIPTION MONITORING: #{e.message}\n\n" raise e ensure wait.close