Skip to content

feat: add partial broadcast for cached values#3218

Merged
Ziinc merged 51 commits intoLogflare:mainfrom
ruslandoga:rd/cache-partial-broadcast
Apr 15, 2026
Merged

feat: add partial broadcast for cached values#3218
Ziinc merged 51 commits intoLogflare:mainfrom
ruslandoga:rd/cache-partial-broadcast

Conversation

@ruslandoga
Copy link
Copy Markdown
Contributor

ANL-1352

Comment thread lib/logflare/context_cache.ex Outdated
Comment thread lib/logflare/context_cache.ex Outdated
end

defp maybe_broadcast_cached(cache, cache_key, value) do
Logflare.Utils.Tasks.start_child(fn ->
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking that instead of creating a task for each broadcast, we can have a dedicated genserver for this to avoid the overhead of ephemeral task procs

Copy link
Copy Markdown
Contributor Author

@ruslandoga ruslandoga Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add a benchmark to compare the two but I'd rather avoid a single process handling cross-node broadcasts. AFAIK :erpc.multicast still awaits on spawn_request reply, so if one of the nodes connection is slow, it could potentially block all of the cache broadcast from sending and grow the process's message queue.

I looked into it a bit more and it seems like Tasks are unnecessary since multicast uses spawn_request with reply=no which means it should be returning immediately.

# since node=nil doesn't exist, it probably means we are not waiting for `send/2` to finish
iex> :erlang.spawn_request(_bad_node_name = nil, :erpc, :execute_cast, [Function, :identity, [1]], reply: :no)
#==> #Reference<0.2294785123.1844445189.249014>

# and it's pretty much instant
iex> :timer.tc fn -> :erlang.spawn_request(_bad_node_name = nil, :erpc, :execute_cast, [Function, :identity, [1]], reply: :no) end
#==> {12, #Reference<0.2294785123.1844445189.249084>}

Side-note: maybe we can add global cache broadcast rate limiting similar to https://github.com/getsentry/sentry-elixir/blob/master/lib/sentry/logger_handler/rate_limiter.ex to avoid sending out too many messages, just as a pre-caution.

@ruslandoga ruslandoga force-pushed the rd/cache-partial-broadcast branch 2 times, most recently from 29cd132 to 40d61bc Compare March 18, 2026 19:20
@ruslandoga ruslandoga force-pushed the rd/cache-partial-broadcast branch from 40d61bc to b8c1ed6 Compare March 18, 2026 19:21
@ruslandoga ruslandoga marked this pull request as ready for review March 18, 2026 20:00
@ruslandoga
Copy link
Copy Markdown
Contributor Author

@Ziinc I think it's ready for the initial review!

@ruslandoga ruslandoga requested a review from Ziinc March 18, 2026 20:00
Comment thread config/runtime.exs Outdated
"LOGFLARE_CACHE_BROADCAST_RATIO" |> System.get_env("0.1") |> String.to_float()

cache_broadcast_max_nodes =
"LOGFLARE_CACHE_BROADCAST_MAX_NODES" |> System.get_env("5") |> String.to_integer()
Copy link
Copy Markdown
Contributor Author

@ruslandoga ruslandoga Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how it should be configured. For now I went with "global" env vars that configure all context caches with the same values.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets default to 3 nodes.
LOGFLARE_CACHE_GOSSIP_MAX_NODES

Copy link
Copy Markdown
Contributor Author

@ruslandoga ruslandoga Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we really want to name it gossip. In the current implementation it's not really a gossip, it's just a one-hop partial broadcast. And from the discussion on Slack we don't actually want real gossiping since it would result in more network load due to redundant messaging (at least in my current understanding).

Copy link
Copy Markdown
Contributor

@Ziinc Ziinc Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for initial implementation we're looking at 1 hop, but if network messaging does not go through the roof then multi-hop is ideal due to benefits of the cluster size being large.
having the hop count, hop probability, and the max message throughput as env var configs would be good so that we can experiment with this.

Comment thread lib/logflare/cluster/utils.ex
Enum.map(list_caches_with_metrics(), fn {cache, _} -> cache end)
end

defp wal_tombstone_specs do
Copy link
Copy Markdown
Contributor Author

@ruslandoga ruslandoga Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to create another module for it since it's not like a normal ContextCache. So I wrote the spec here. This "cache" is used to remember recently deleted records so that they are dropped from "cache broadcasts" to avoid race conditions (record fetch miss, lookup -> cache broadcast -> record deleted -> wal broadcast -> wal broadcast arrives faster -> cache broadcast arrives later).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a separate ContextCache.WalTombstoneCache would probably be better, we have a few ephemeral caches like these as you have already identified so it is fine.
They're differentiated by the .Cache suffix.

Comment thread test/logflare/context_cache_test.exs Outdated
end
end

describe "broadcasts" do
Copy link
Copy Markdown
Contributor Author

@ruslandoga ruslandoga Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can also add a more realistic test similar to Phoenix PubSub distributed tests where extra nodes are started and connected and "cache broadcast" side-effects are tested on them. Or I can use mocks.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a distributed test harness would be good for this and other broadcasting tests.
i had tried to use LocalCluster before for this but couldn't get a nice testing flow.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would prefer avoiding mocks where possible

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added Logflare.ContextCache.GossipClusterTest, it's a bit clunky but seems to work for this use case.

Comment thread config/runtime.exs Outdated
System.get_env("LOGFLARE_CACHE_BROADCAST_ENABLED", default_cache_broadcast_enabled) == "true"

cache_broadcast_ratio =
"LOGFLARE_CACHE_BROADCAST_RATIO" |> System.get_env("0.1") |> String.to_float()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets hardcode this value in the logic and let the user set the max broadcast nodes

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default of 0.05 would be better (5% of cluster)
so in a 100 node cluster, at most 5 would receive an update (assuming user sets a high max)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LOGFLARE_CACHE_GOSSIP_RATIO

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the defaults and renamed the env vars.

Comment thread config/runtime.exs Outdated
"LOGFLARE_CACHE_BROADCAST_RATIO" |> System.get_env("0.1") |> String.to_float()

cache_broadcast_max_nodes =
"LOGFLARE_CACHE_BROADCAST_MAX_NODES" |> System.get_env("5") |> String.to_integer()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets default to 3 nodes.
LOGFLARE_CACHE_GOSSIP_MAX_NODES

Comment thread config/runtime.exs Outdated
Comment on lines +494 to +499
# allows excluding heavy caches via: LOGFLARE_CACHE_BROADCAST_EXCLUDE="auth,saved_searches"
excluded_caches =
System.get_env("LOGFLARE_CACHE_BROADCAST_EXCLUDE", "")
|> String.split(",", trim: true)
|> Enum.map(&String.trim/1)
|> MapSet.new()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think having an explicit exclusion is a good approach, would be too much configuration for this which should be a transparent optimization.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

Comment thread config/runtime.exs Outdated
Comment on lines +511 to +522
cache_broadcasts =
Map.new(known_caches, fn {short_name, name} ->
config = %{
ratio: cache_broadcast_ratio,
max_nodes: cache_broadcast_max_nodes,
enabled: cache_broadcast_enabled? and not MapSet.member?(excluded_caches, short_name)
}

{name, config}
end)

config :logflare, :cache_broadcasts, cache_broadcasts
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

storing the ratio and the max_nodes on app env would be enough, this seems unnecessary. all caches should be broadcasted.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it's

config :logflare, :context_cache_gossip, %{
  enabled: cache_gossip_enabled?,
  ratio: cache_gossip_ratio,
  max_nodes: cache_gossip_max_nodes
}

Comment thread lib/logflare/cluster/utils.ex
Enum.map(list_caches_with_metrics(), fn {cache, _} -> cache end)
end

defp wal_tombstone_specs do
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a separate ContextCache.WalTombstoneCache would probably be better, we have a few ephemeral caches like these as you have already identified so it is fine.
They're differentiated by the .Cache suffix.

Comment thread lib/telemetry.ex Outdated
def start_link(arg), do: Supervisor.start_link(__MODULE__, arg, name: __MODULE__)

context_caches_with_metrics = Logflare.ContextCache.Supervisor.list_caches_with_metrics()
wal_tombstones = Logflare.ContextCache.wal_tombstones_cache_name()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate module name would make this easier.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added Logflare.ContextCache.Tombstones.Cache

Comment thread lib/telemetry.ex Outdated
Comment on lines +231 to +249
counter("logflare.context_cache.broadcast.count",
event_name: "logflare.context_cache.broadcast.stop",
tags: [:cache, :enabled],
description: "Total cache broadcast attempts"
),
distribution("logflare.context_cache.broadcast.stop.duration",
tags: [:cache, :enabled],
unit: {:native, :millisecond},
description: "Latency of dispatching the cache broadcast"
),
counter("logflare.context_cache.receive_broadcast.count",
event_name: "logflare.context_cache.receive_broadcast.stop",
tags: [:cache, :action],
description: "Total cache broadcasts received and their outcome (cached or dropped)"
),
distribution("logflare.context_cache.receive_broadcast.stop.duration",
tags: [:cache, :action],
unit: {:native, :millisecond},
description: "Latency of processing an incoming cache broadcast"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should make clear in event_name that it is for the local gossip mechanism of the ContextCache on fetching nodes, and not necessarily from the node with TransactionBroadcaster that does broadcasting of wal updates.
clarification is needed due to upcoming changes in here where the TransactionBroadcaster will be broadcasting updated values with the wal.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metrics now have context_cache_gossip in their name

"logflare.context_cache_gossip.multicast.count"
"logflare.context_cache_gossip.multicast.stop.duration"
"logflare.context_cache_gossip.receive.count"
"logflare.context_cache_gossip.receive.stop.duration"

Comment thread test/logflare/context_cache_test.exs Outdated
end
end

describe "broadcasts" do
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a distributed test harness would be good for this and other broadcasting tests.
i had tried to use LocalCluster before for this but couldn't get a nice testing flow.

Comment thread test/logflare/context_cache_test.exs Outdated
end
end

describe "broadcasts" do
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would prefer avoiding mocks where possible

@Ziinc
Copy link
Copy Markdown
Contributor

Ziinc commented Mar 23, 2026

@ruslandoga just to clarify, we do want a full gossip protocol. But with safe initial defaults reduced to minimize risks since this is experimental. We want to make all inputs to the propagation configurable so that we can experiment with it.

@ruslandoga ruslandoga marked this pull request as draft April 2, 2026 14:02
@ruslandoga
Copy link
Copy Markdown
Contributor Author

I am thinking about renaming Tombstones to RecentlyBusted or something like that since it's not just for deleted records but also for updated ones. After #3184 however it might be that Tombstones/RecentlyBusted would be just for deleted records, but I am not sure yet.

Comment thread lib/logflare/context_cache/gossip.ex Outdated

# Explicitly ignore high-volume/ephemeral caches
def maybe_gossip(Logflare.Logs.LogEvents.Cache, _key, _value), do: :ok
def maybe_gossip(Logflare.Auth.Cache, _key, _value), do: :ok
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added auth cache here as well, but not sure about it. For one thing it has unusual return types like {:ok, value, value}, and it's scary to mis-cache those.

Comment thread lib/logflare/context_cache/gossip.ex Outdated
defp extract_pkeys(%{id: id}), do: [id]

defp extract_pkeys(v) do
Logger.warning("Unable to extract primary key from gossip value: #{inspect(v)}")
Copy link
Copy Markdown
Contributor Author

@ruslandoga ruslandoga Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some values I've noticed that we can't extract primary keys for (and therefore can't tombstone/prevent race conditions):

  • [{"event_message", [{{:string_contains, "testing"}, {:route, {1312, 0}}}]}] from Rules cache at key {:rules_tree_by_source_id, [12104]} -- what's the rule id?
  • {:error, :token_not_found} for Auth cache at key {:verify_access_token, ["vIby58vR7h", []]} -- looks like a memoized call, should those be gossiped at all?
  • {:ok, %Logflare.OauthAccessTokens.OauthAccessToken{}, %Logflare.User{}} for Auth cache at key {:verify_access_token, ["940e5d63954fa0e99f2dc2ae0fa1d032f5d07cda655459b4b2f2fc4df6d255d4", ["private"]]} -- is tombstone for user or token?

I'll prevent Auth and Rules caches from gossiping for now since it's unclear how best to extract pkeys for tombstoning.


@telemetry_handler_id "context-cache-gossip-logger"

def attach_logger do
Copy link
Copy Markdown
Contributor Author

@ruslandoga ruslandoga Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can aid troubleshooting or just checking in with the node. The logs I'm adding there are probably too noisy to be on full-time. These functions can be removed later.

end
end

def multicast(Cachex.Spec.cache(name: cache), key, value) do
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs @doc

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in #3375

Comment on lines +163 to +166
@doc false
def record_tombstones(context_pkeys) when is_list(context_pkeys) do
# Writes a short-lived marker for a primary key indicating it was recently updated or deleted.
# Incoming cache multicasts check this tombstone cache to determine if their payload could be stale.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline docs should be @doc

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved in #3375

import Logflare.Factory

alias Ecto.Adapters.SQL.Sandbox, as: EctoSandbox
alias Logflare.{ContextCache, Users}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
alias Logflare.{ContextCache, Users}
alias Logflare.ContextCache
alias Logflare.Users

Comment on lines +11 to +28
if not Node.alive?() do
case :net_kernel.start(:"test@127.0.0.1", %{}) do
{:ok, _} ->
:ok

{:error, reason} ->
raise """
=============================================
Failed to start distributed Erlang for tests.

Please make sure `epmd` is running:

epmd -daemon

=============================================
Underlying error: #{inspect(reason)}
"""
end
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should teardown :net_kernel to avoid funky behaviour in other tests. it is fine for now, but would not be the case if we were to introduce more similar distributed testing

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, tearing it down will slightly improve ci test speed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding teardown in #3376

@Ziinc Ziinc merged commit 921ad2f into Logflare:main Apr 15, 2026
14 checks passed
Ziinc added a commit that referenced this pull request Apr 15, 2026
* add partial cache broadcast on misses

* credo

* credo again

* naming

* comment

* comments

* naming

* continue

* credo

* wording

* continue

* fewer changes

* add refresh

* begin tests

* seems to work

* add key to telemetry info

* add key to telemetry info

* doc

* fix credo warning

* extract gossip functions into own module

* fix mime error

* continue

* move epmd setup to ci

* use ; instead of &&

* seems to work

* cleanup

* cleanup again

* add big error if epmd is not running

* cleanup

* format error

* improve float parsing (allow 0 and 1)

* add warnings

* pipe

* comment on epmd

* move comment

* move unboxed runs to setup from setup_all

* cache user

* continue

* cleanup

* continue

* more logs

* wording

* comment

* naming

* eh

* notice

---------

Co-authored-by: Ziinc <Ziinc@users.noreply.github.com>
Ziinc added a commit that referenced this pull request Apr 15, 2026
* feat: initial implementation of bigquery iam provisioning

* chore: remove stripe-mock and add in price id to seeds for paid plan testing

* chore: add stripe helper

* docs: add docs on additional bigquery projects

* chore: version bump

* chore: instance template scripts (#3369)

* feat: LQL timestamp support for unix timestamps and ISO8601 (#3327)

* feat: LQL timestamp support for unix timestamps and ISO8601

* fix: don't apply local timezone correction to absolute timestamps

* feat: add partial broadcast for cached values (#3218)

* add partial cache broadcast on misses

* credo

* credo again

* naming

* comment

* comments

* naming

* continue

* credo

* wording

* continue

* fewer changes

* add refresh

* begin tests

* seems to work

* add key to telemetry info

* add key to telemetry info

* doc

* fix credo warning

* extract gossip functions into own module

* fix mime error

* continue

* move epmd setup to ci

* use ; instead of &&

* seems to work

* cleanup

* cleanup again

* add big error if epmd is not running

* cleanup

* format error

* improve float parsing (allow 0 and 1)

* add warnings

* pipe

* comment on epmd

* move comment

* move unboxed runs to setup from setup_all

* cache user

* continue

* cleanup

* continue

* more logs

* wording

* comment

* naming

* eh

* notice

---------

Co-authored-by: Ziinc <Ziinc@users.noreply.github.com>

* chore: add telegraf

---------

Co-authored-by: Adam Mokan <amokan@gmail.com>
Co-authored-by: Matt Stubbs <22266+msmithstubbs@users.noreply.github.com>
Co-authored-by: ruslandoga <ruslandoga+gh@icloud.com>
@ruslandoga ruslandoga deleted the rd/cache-partial-broadcast branch April 15, 2026 09:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants