feat: add partial broadcast for cached values#3218
Conversation
| end | ||
|
|
||
| defp maybe_broadcast_cached(cache, cache_key, value) do | ||
| Logflare.Utils.Tasks.start_child(fn -> |
There was a problem hiding this comment.
thinking that instead of creating a task for each broadcast, we can have a dedicated genserver for this to avoid the overhead of ephemeral task procs
There was a problem hiding this comment.
I can add a benchmark to compare the two but I'd rather avoid a single process handling cross-node broadcasts. AFAIK :erpc.multicast still awaits on spawn_request reply, so if one of the nodes connection is slow, it could potentially block all of the cache broadcast from sending and grow the process's message queue.
I looked into it a bit more and it seems like Tasks are unnecessary since multicast uses spawn_request with reply=no which means it should be returning immediately.
# since node=nil doesn't exist, it probably means we are not waiting for `send/2` to finish
iex> :erlang.spawn_request(_bad_node_name = nil, :erpc, :execute_cast, [Function, :identity, [1]], reply: :no)
#==> #Reference<0.2294785123.1844445189.249014>
# and it's pretty much instant
iex> :timer.tc fn -> :erlang.spawn_request(_bad_node_name = nil, :erpc, :execute_cast, [Function, :identity, [1]], reply: :no) end
#==> {12, #Reference<0.2294785123.1844445189.249084>}Side-note: maybe we can add global cache broadcast rate limiting similar to https://github.com/getsentry/sentry-elixir/blob/master/lib/sentry/logger_handler/rate_limiter.ex to avoid sending out too many messages, just as a pre-caution.
29cd132 to
40d61bc
Compare
40d61bc to
b8c1ed6
Compare
|
@Ziinc I think it's ready for the initial review! |
| "LOGFLARE_CACHE_BROADCAST_RATIO" |> System.get_env("0.1") |> String.to_float() | ||
|
|
||
| cache_broadcast_max_nodes = | ||
| "LOGFLARE_CACHE_BROADCAST_MAX_NODES" |> System.get_env("5") |> String.to_integer() |
There was a problem hiding this comment.
I'm not sure how it should be configured. For now I went with "global" env vars that configure all context caches with the same values.
There was a problem hiding this comment.
lets default to 3 nodes.
LOGFLARE_CACHE_GOSSIP_MAX_NODES
There was a problem hiding this comment.
I wonder if we really want to name it gossip. In the current implementation it's not really a gossip, it's just a one-hop partial broadcast. And from the discussion on Slack we don't actually want real gossiping since it would result in more network load due to redundant messaging (at least in my current understanding).
There was a problem hiding this comment.
for initial implementation we're looking at 1 hop, but if network messaging does not go through the roof then multi-hop is ideal due to benefits of the cluster size being large.
having the hop count, hop probability, and the max message throughput as env var configs would be good so that we can experiment with this.
| Enum.map(list_caches_with_metrics(), fn {cache, _} -> cache end) | ||
| end | ||
|
|
||
| defp wal_tombstone_specs do |
There was a problem hiding this comment.
I didn't want to create another module for it since it's not like a normal ContextCache. So I wrote the spec here. This "cache" is used to remember recently deleted records so that they are dropped from "cache broadcasts" to avoid race conditions (record fetch miss, lookup -> cache broadcast -> record deleted -> wal broadcast -> wal broadcast arrives faster -> cache broadcast arrives later).
There was a problem hiding this comment.
a separate ContextCache.WalTombstoneCache would probably be better, we have a few ephemeral caches like these as you have already identified so it is fine.
They're differentiated by the .Cache suffix.
| end | ||
| end | ||
|
|
||
| describe "broadcasts" do |
There was a problem hiding this comment.
I can also add a more realistic test similar to Phoenix PubSub distributed tests where extra nodes are started and connected and "cache broadcast" side-effects are tested on them. Or I can use mocks.
There was a problem hiding this comment.
a distributed test harness would be good for this and other broadcasting tests.
i had tried to use LocalCluster before for this but couldn't get a nice testing flow.
There was a problem hiding this comment.
would prefer avoiding mocks where possible
There was a problem hiding this comment.
I've added Logflare.ContextCache.GossipClusterTest, it's a bit clunky but seems to work for this use case.
| System.get_env("LOGFLARE_CACHE_BROADCAST_ENABLED", default_cache_broadcast_enabled) == "true" | ||
|
|
||
| cache_broadcast_ratio = | ||
| "LOGFLARE_CACHE_BROADCAST_RATIO" |> System.get_env("0.1") |> String.to_float() |
There was a problem hiding this comment.
lets hardcode this value in the logic and let the user set the max broadcast nodes
There was a problem hiding this comment.
default of 0.05 would be better (5% of cluster)
so in a 100 node cluster, at most 5 would receive an update (assuming user sets a high max)
There was a problem hiding this comment.
I've updated the defaults and renamed the env vars.
| "LOGFLARE_CACHE_BROADCAST_RATIO" |> System.get_env("0.1") |> String.to_float() | ||
|
|
||
| cache_broadcast_max_nodes = | ||
| "LOGFLARE_CACHE_BROADCAST_MAX_NODES" |> System.get_env("5") |> String.to_integer() |
There was a problem hiding this comment.
lets default to 3 nodes.
LOGFLARE_CACHE_GOSSIP_MAX_NODES
| # allows excluding heavy caches via: LOGFLARE_CACHE_BROADCAST_EXCLUDE="auth,saved_searches" | ||
| excluded_caches = | ||
| System.get_env("LOGFLARE_CACHE_BROADCAST_EXCLUDE", "") | ||
| |> String.split(",", trim: true) | ||
| |> Enum.map(&String.trim/1) | ||
| |> MapSet.new() |
There was a problem hiding this comment.
i don't think having an explicit exclusion is a good approach, would be too much configuration for this which should be a transparent optimization.
| cache_broadcasts = | ||
| Map.new(known_caches, fn {short_name, name} -> | ||
| config = %{ | ||
| ratio: cache_broadcast_ratio, | ||
| max_nodes: cache_broadcast_max_nodes, | ||
| enabled: cache_broadcast_enabled? and not MapSet.member?(excluded_caches, short_name) | ||
| } | ||
|
|
||
| {name, config} | ||
| end) | ||
|
|
||
| config :logflare, :cache_broadcasts, cache_broadcasts |
There was a problem hiding this comment.
storing the ratio and the max_nodes on app env would be enough, this seems unnecessary. all caches should be broadcasted.
There was a problem hiding this comment.
Now it's
config :logflare, :context_cache_gossip, %{
enabled: cache_gossip_enabled?,
ratio: cache_gossip_ratio,
max_nodes: cache_gossip_max_nodes
}| Enum.map(list_caches_with_metrics(), fn {cache, _} -> cache end) | ||
| end | ||
|
|
||
| defp wal_tombstone_specs do |
There was a problem hiding this comment.
a separate ContextCache.WalTombstoneCache would probably be better, we have a few ephemeral caches like these as you have already identified so it is fine.
They're differentiated by the .Cache suffix.
| def start_link(arg), do: Supervisor.start_link(__MODULE__, arg, name: __MODULE__) | ||
|
|
||
| context_caches_with_metrics = Logflare.ContextCache.Supervisor.list_caches_with_metrics() | ||
| wal_tombstones = Logflare.ContextCache.wal_tombstones_cache_name() |
There was a problem hiding this comment.
separate module name would make this easier.
There was a problem hiding this comment.
I've added Logflare.ContextCache.Tombstones.Cache
| counter("logflare.context_cache.broadcast.count", | ||
| event_name: "logflare.context_cache.broadcast.stop", | ||
| tags: [:cache, :enabled], | ||
| description: "Total cache broadcast attempts" | ||
| ), | ||
| distribution("logflare.context_cache.broadcast.stop.duration", | ||
| tags: [:cache, :enabled], | ||
| unit: {:native, :millisecond}, | ||
| description: "Latency of dispatching the cache broadcast" | ||
| ), | ||
| counter("logflare.context_cache.receive_broadcast.count", | ||
| event_name: "logflare.context_cache.receive_broadcast.stop", | ||
| tags: [:cache, :action], | ||
| description: "Total cache broadcasts received and their outcome (cached or dropped)" | ||
| ), | ||
| distribution("logflare.context_cache.receive_broadcast.stop.duration", | ||
| tags: [:cache, :action], | ||
| unit: {:native, :millisecond}, | ||
| description: "Latency of processing an incoming cache broadcast" |
There was a problem hiding this comment.
should make clear in event_name that it is for the local gossip mechanism of the ContextCache on fetching nodes, and not necessarily from the node with TransactionBroadcaster that does broadcasting of wal updates.
clarification is needed due to upcoming changes in here where the TransactionBroadcaster will be broadcasting updated values with the wal.
There was a problem hiding this comment.
The metrics now have context_cache_gossip in their name
"logflare.context_cache_gossip.multicast.count"
"logflare.context_cache_gossip.multicast.stop.duration"
"logflare.context_cache_gossip.receive.count"
"logflare.context_cache_gossip.receive.stop.duration"| end | ||
| end | ||
|
|
||
| describe "broadcasts" do |
There was a problem hiding this comment.
a distributed test harness would be good for this and other broadcasting tests.
i had tried to use LocalCluster before for this but couldn't get a nice testing flow.
| end | ||
| end | ||
|
|
||
| describe "broadcasts" do |
There was a problem hiding this comment.
would prefer avoiding mocks where possible
|
@ruslandoga just to clarify, we do want a full gossip protocol. But with safe initial defaults reduced to minimize risks since this is experimental. We want to make all inputs to the propagation configurable so that we can experiment with it. |
|
I am thinking about renaming Tombstones to RecentlyBusted or something like that since it's not just for deleted records but also for updated ones. After #3184 however it might be that Tombstones/RecentlyBusted would be just for deleted records, but I am not sure yet. |
|
|
||
| # Explicitly ignore high-volume/ephemeral caches | ||
| def maybe_gossip(Logflare.Logs.LogEvents.Cache, _key, _value), do: :ok | ||
| def maybe_gossip(Logflare.Auth.Cache, _key, _value), do: :ok |
There was a problem hiding this comment.
I've added auth cache here as well, but not sure about it. For one thing it has unusual return types like {:ok, value, value}, and it's scary to mis-cache those.
| defp extract_pkeys(%{id: id}), do: [id] | ||
|
|
||
| defp extract_pkeys(v) do | ||
| Logger.warning("Unable to extract primary key from gossip value: #{inspect(v)}") |
There was a problem hiding this comment.
Some values I've noticed that we can't extract primary keys for (and therefore can't tombstone/prevent race conditions):
[{"event_message", [{{:string_contains, "testing"}, {:route, {1312, 0}}}]}]from Rules cache at key{:rules_tree_by_source_id, [12104]}-- what's the rule id?{:error, :token_not_found}for Auth cache at key{:verify_access_token, ["vIby58vR7h", []]}-- looks like a memoized call, should those be gossiped at all?{:ok, %Logflare.OauthAccessTokens.OauthAccessToken{}, %Logflare.User{}}for Auth cache at key{:verify_access_token, ["940e5d63954fa0e99f2dc2ae0fa1d032f5d07cda655459b4b2f2fc4df6d255d4", ["private"]]}-- is tombstone for user or token?
I'll prevent Auth and Rules caches from gossiping for now since it's unclear how best to extract pkeys for tombstoning.
|
|
||
| @telemetry_handler_id "context-cache-gossip-logger" | ||
|
|
||
| def attach_logger do |
There was a problem hiding this comment.
This can aid troubleshooting or just checking in with the node. The logs I'm adding there are probably too noisy to be on full-time. These functions can be removed later.
| end | ||
| end | ||
|
|
||
| def multicast(Cachex.Spec.cache(name: cache), key, value) do |
| @doc false | ||
| def record_tombstones(context_pkeys) when is_list(context_pkeys) do | ||
| # Writes a short-lived marker for a primary key indicating it was recently updated or deleted. | ||
| # Incoming cache multicasts check this tombstone cache to determine if their payload could be stale. |
| import Logflare.Factory | ||
|
|
||
| alias Ecto.Adapters.SQL.Sandbox, as: EctoSandbox | ||
| alias Logflare.{ContextCache, Users} |
There was a problem hiding this comment.
| alias Logflare.{ContextCache, Users} | |
| alias Logflare.ContextCache | |
| alias Logflare.Users |
| if not Node.alive?() do | ||
| case :net_kernel.start(:"test@127.0.0.1", %{}) do | ||
| {:ok, _} -> | ||
| :ok | ||
|
|
||
| {:error, reason} -> | ||
| raise """ | ||
| ============================================= | ||
| Failed to start distributed Erlang for tests. | ||
|
|
||
| Please make sure `epmd` is running: | ||
|
|
||
| epmd -daemon | ||
|
|
||
| ============================================= | ||
| Underlying error: #{inspect(reason)} | ||
| """ | ||
| end |
There was a problem hiding this comment.
i think we should teardown :net_kernel to avoid funky behaviour in other tests. it is fine for now, but would not be the case if we were to introduce more similar distributed testing
There was a problem hiding this comment.
also, tearing it down will slightly improve ci test speed
* add partial cache broadcast on misses * credo * credo again * naming * comment * comments * naming * continue * credo * wording * continue * fewer changes * add refresh * begin tests * seems to work * add key to telemetry info * add key to telemetry info * doc * fix credo warning * extract gossip functions into own module * fix mime error * continue * move epmd setup to ci * use ; instead of && * seems to work * cleanup * cleanup again * add big error if epmd is not running * cleanup * format error * improve float parsing (allow 0 and 1) * add warnings * pipe * comment on epmd * move comment * move unboxed runs to setup from setup_all * cache user * continue * cleanup * continue * more logs * wording * comment * naming * eh * notice --------- Co-authored-by: Ziinc <Ziinc@users.noreply.github.com>
* feat: initial implementation of bigquery iam provisioning * chore: remove stripe-mock and add in price id to seeds for paid plan testing * chore: add stripe helper * docs: add docs on additional bigquery projects * chore: version bump * chore: instance template scripts (#3369) * feat: LQL timestamp support for unix timestamps and ISO8601 (#3327) * feat: LQL timestamp support for unix timestamps and ISO8601 * fix: don't apply local timezone correction to absolute timestamps * feat: add partial broadcast for cached values (#3218) * add partial cache broadcast on misses * credo * credo again * naming * comment * comments * naming * continue * credo * wording * continue * fewer changes * add refresh * begin tests * seems to work * add key to telemetry info * add key to telemetry info * doc * fix credo warning * extract gossip functions into own module * fix mime error * continue * move epmd setup to ci * use ; instead of && * seems to work * cleanup * cleanup again * add big error if epmd is not running * cleanup * format error * improve float parsing (allow 0 and 1) * add warnings * pipe * comment on epmd * move comment * move unboxed runs to setup from setup_all * cache user * continue * cleanup * continue * more logs * wording * comment * naming * eh * notice --------- Co-authored-by: Ziinc <Ziinc@users.noreply.github.com> * chore: add telegraf --------- Co-authored-by: Adam Mokan <amokan@gmail.com> Co-authored-by: Matt Stubbs <22266+msmithstubbs@users.noreply.github.com> Co-authored-by: ruslandoga <ruslandoga+gh@icloud.com>
ANL-1352