From b2793c5799a73c36b19422628ca81e373a65a831 Mon Sep 17 00:00:00 2001 From: Gabor Pali Date: Thu, 19 Mar 2026 08:19:48 +0100 Subject: [PATCH 1/4] dreyfus: change how indexer processes are linked to Clouseau Dreyfus and Clouseau maintain a shared LRU cache of actively managed indexes, which may fall apart if the rate of eviction is getting too high. Such situations may happen when there are too many indexes to manage in a relatively short period of time so that Clouseau is forced to switch between them very rapidly and therefore it becomes a subject of thrashing. Thrashing may lead to process termination messages of indexers being closed (by the unexpected eviction) not necessarily processed in time and the subsequent search requests on the Dreyfus side will start failing via observing `noproc` errors due to the backing indexer is gone. This immediately translates to an HTTP 500 errors, which is visible to the user. Maintaining a shared state joined by Erlang process identifiers as keys is an inherently fragile idea due to their volatile nature when one of the sides is emulated in Java. That is why we shall find a more stable key, the path of the index file which is less prone to change. Indexes would be still managed by dedicated indexers processes as before, but the communication to them would happen solely through a third-party process on the Clouseau side. This process would then keep track of the states and availability of each indexer and handle the message forwarding for them properly. The change is implemented in a backward compatible way by putting the new-style communication method behind a toggle. This would make it possible for Dreyfus to work with Clouseau instances that have not been upgraded to support this protocol. --- src/dreyfus/src/clouseau_rpc.erl | 69 ++++++---- src/dreyfus/src/dreyfus_index.erl | 154 +++++++++++----------- src/dreyfus/src/dreyfus_index_manager.erl | 2 +- src/dreyfus/src/dreyfus_index_updater.erl | 40 +++--- src/dreyfus/src/dreyfus_rpc.erl | 8 +- src/dreyfus/src/dreyfus_util.erl | 4 +- 6 files changed, 146 insertions(+), 131 deletions(-) diff --git a/src/dreyfus/src/clouseau_rpc.erl b/src/dreyfus/src/clouseau_rpc.erl index c036a5a9af0..8ed91f29a54 100644 --- a/src/dreyfus/src/clouseau_rpc.erl +++ b/src/dreyfus/src/clouseau_rpc.erl @@ -16,6 +16,7 @@ -include("dreyfus.hrl"). +-export([index_key_from_pid/1, index_key_from_path/2, get_index_pid_from_key/1]). -export([open_index/3]). -export([await/2, commit/2, get_update_seq/1, info/1, search/2]). -export([group1/7, group2/2]). @@ -47,7 +48,21 @@ -type analyzer_fields() :: [{field_name(), analyzer_name() | [analyzer_name()]}]. --type indexer_pid() :: pid(). +-type indexer_key() :: {path, string_as_binary(_), pid()} | {pid, pid()}. + +-spec index_key_from_pid(pid()) -> indexer_key(). +index_key_from_pid(Pid) -> + {pid, Pid}. + +-spec index_key_from_path(string_as_binary(_), pid()) -> indexer_key(). +index_key_from_path(Path, Pid) -> + {path, Path, Pid}. + +-spec get_index_pid_from_key(indexer_key()) -> pid(). +get_index_pid_from_key({path, _, Pid}) -> + Pid; +get_index_pid_from_key({pid, Pid}) -> + Pid. %% Example of the message %% {[ @@ -69,7 +84,7 @@ -define(SEARCH_SERVICE_TIMEOUT, 2000). -spec open_index(Peer :: pid(), Path :: shard(), Analyzer :: analyzer()) -> - {ok, indexer_pid()} | error(). + {ok, indexer_key()} | error(). open_index(Peer, Path, Analyzer) -> rpc({main, clouseau()}, {open, Peer, Path, Analyzer}). @@ -86,14 +101,14 @@ get_root_dir() -> rpc({main, clouseau()}, {get_root_dir}). %% not used ??? --spec await(Ref :: indexer_pid(), MinSeq :: commit_seq()) -> +-spec await(Ref :: indexer_key(), MinSeq :: commit_seq()) -> ok | error(). await(Ref, MinSeq) -> rpc(Ref, {await, MinSeq}). %% deprecated --spec commit(Ref :: indexer_pid(), NewCommitSeq :: commit_seq()) -> +-spec commit(Ref :: indexer_key(), NewCommitSeq :: commit_seq()) -> ok | error(). commit(Ref, NewCommitSeq) -> @@ -107,25 +122,25 @@ commit(Ref, NewCommitSeq) -> | {committed_seq, committed_seq()} | {purge_seq, purge_seq()}. --spec info(Ref :: indexer_pid()) -> +-spec info(Ref :: indexer_key()) -> {ok, [info_result_item()]} | error(). info(Ref) -> rpc(Ref, info). --spec get_update_seq(Ref :: indexer_pid()) -> +-spec get_update_seq(Ref :: indexer_key()) -> {ok, update_seq()} | error(). get_update_seq(Ref) -> rpc(Ref, get_update_seq). --spec set_purge_seq(Ref :: indexer_pid(), Seq :: purge_seq()) -> +-spec set_purge_seq(Ref :: indexer_key(), Seq :: purge_seq()) -> ok | error(). set_purge_seq(Ref, Seq) -> rpc(Ref, {set_purge_seq, Seq}). --spec get_purge_seq(Ref :: indexer_pid()) -> +-spec get_purge_seq(Ref :: indexer_key()) -> {ok, purge_seq()} | error(). get_purge_seq(Ref) -> @@ -157,11 +172,11 @@ get_purge_seq(Ref) -> | {highlight_size, pos_integer()} | {legacy, boolean()}. --spec search(Ref :: indexer_pid(), Args :: [search_arg()]) -> +-spec search(Key :: indexer_key(), Args :: [search_arg()]) -> {ok, #top_docs{}} | error(). -search(Ref, Args) -> - case rpc(Ref, {search, Args}) of +search(Key, Args) -> + case rpc(Key, {search, Args}) of {ok, Response} when is_list(Response) -> {ok, #top_docs{ update_seq = couch_util:get_value(update_seq, Response), @@ -181,7 +196,7 @@ search(Ref, Args) -> | [field_name()]. -spec group1( - Ref :: indexer_pid(), + Key :: indexer_key(), Query :: query(), GroupBy :: field_name(), Refresh :: boolean(), @@ -190,8 +205,8 @@ search(Ref, Args) -> Limit :: limit() ) -> {ok, [{field_name(), sort_values()}]} | error(). -group1(Ref, Query, GroupBy, Refresh, Sort, Offset, Limit) -> - rpc(Ref, {group1, Query, GroupBy, Refresh, Sort, Offset, Limit}). +group1(Key, Query, GroupBy, Refresh, Sort, Offset, Limit) -> + rpc(Key, {group1, Query, GroupBy, Refresh, Sort, Offset, Limit}). -type group_name() :: string_as_binary(_) | null. -type sort_values() :: [string_as_binary(_) | null]. @@ -213,17 +228,17 @@ group1(Ref, Query, GroupBy, Refresh, Sort, Offset, Limit) -> | {highlight_size, pos_integer()}. -type grouped_results() :: [{field_name(), TotalHits :: non_neg_integer(), [#hit{}]}]. --spec group2(Ref :: indexer_pid(), Args :: [query_arg()]) -> +-spec group2(Key :: indexer_key(), Args :: [query_arg()]) -> {ok, {TotalHits :: non_neg_integer(), TotalGroupedHits :: non_neg_integer(), grouped_results()}}. -group2(Ref, Args) -> - rpc(Ref, {group2, Args}). +group2(Key, Args) -> + rpc(Key, {group2, Args}). --spec delete(Ref :: indexer_pid(), Id :: docid()) -> +-spec delete(Key :: indexer_key(), Id :: docid()) -> ok. -delete(Ref, Id) -> - rpc(Ref, {delete, couch_util:to_binary(Id)}). +delete(Key, Id) -> + rpc(Key, {delete, couch_util:to_binary(Id)}). -type docid() :: string_as_binary(_). @@ -242,11 +257,11 @@ delete(Ref, Id) -> -type yes_or_no() :: string_as_binary(yes) | string_as_binary(no). -spec update( - Ref :: indexer_pid(), Id :: docid(), Fields :: [{field_name(), field_value(), [field_option()]}] + Key :: indexer_key(), Id :: docid(), Fields :: [{field_name(), field_value(), [field_option()]}] ) -> ok. -update(Ref, Id, Fields) -> - rpc(Ref, {update, Id, Fields}). +update(Key, Id, Fields) -> + rpc(Key, {update, Id, Fields}). -spec cleanup(DbName :: string_as_binary(_)) -> ok. cleanup(DbName) -> @@ -337,9 +352,15 @@ connected() -> end end. -rpc(Ref, Msg) -> +rpc(Key, Message) -> + { Ref, Msg } = case Key of + {pid, Pid} -> { Pid, Message }; + {path, Path} -> { main, {forward, Path, Message} }; + Other -> Other + end, ioq:call_search(Ref, Msg, erlang:get(io_priority)). + clouseau() -> list_to_atom(config:get("dreyfus", "name", "clouseau@127.0.0.1")). diff --git a/src/dreyfus/src/dreyfus_index.erl b/src/dreyfus/src/dreyfus_index.erl index d7540500151..6fbcb3146f7 100644 --- a/src/dreyfus/src/dreyfus_index.erl +++ b/src/dreyfus/src/dreyfus_index.erl @@ -44,7 +44,7 @@ dbname, index, updater_pid = nil, - index_pid = nil, + index_key = nil, waiting_list = [] }). @@ -55,36 +55,39 @@ start_link(DbName, Index) -> proc_lib:start_link(?MODULE, init, [{DbName, Index}]). -await(Pid, MinSeq) -> - MFA = {gen_server, call, [Pid, {await, MinSeq}, infinity]}, +await(Key, MinSeq) -> + MFA = {gen_server, call, [Key, {await, MinSeq}, infinity]}, dreyfus_util:time([index, await], MFA). -search(Pid0, QueryArgs) -> - Pid = to_index_pid(Pid0), - MFA = {?MODULE, search_int, [Pid, QueryArgs]}, +search(Key0, QueryArgs) -> + Key = to_index_key(Key0), + MFA = {?MODULE, search_int, [Key, QueryArgs]}, dreyfus_util:time([index, search], MFA). -group1(Pid0, QueryArgs) -> - Pid = to_index_pid(Pid0), - MFA = {?MODULE, group1_int, [Pid, QueryArgs]}, +group1(Key0, QueryArgs) -> + Key = to_index_key(Key0), + MFA = {?MODULE, group1_int, [Key, QueryArgs]}, dreyfus_util:time([index, group1], MFA). -group2(Pid0, QueryArgs) -> - Pid = to_index_pid(Pid0), - MFA = {?MODULE, group2_int, [Pid, QueryArgs]}, +group2(Key0, QueryArgs) -> + Key = to_index_key(Key0), + MFA = {?MODULE, group2_int, [Key, QueryArgs]}, dreyfus_util:time([index, group2], MFA). -info(Pid0) -> - Pid = to_index_pid(Pid0), - MFA = {?MODULE, info_int, [Pid]}, +info(Key0) -> + Key = to_index_key(Key0), + MFA = {?MODULE, info_int, [Key]}, dreyfus_util:time([index, info], MFA). %% We either have a dreyfus_index gen_server pid or the remote %% clouseau pid. -to_index_pid(Pid) -> +to_index_key(Key) -> + Pid = clouseau_rpc:get_index_pid_from_key(Key), case node(Pid) == node() of - true -> gen_server:call(Pid, get_index_pid, infinity); - false -> Pid + true -> + IndexPid = gen_server:call(Pid, get_index_pid, infinity), + clouseau_rpc:index_key_from_pid(IndexPid); + false -> Key end. design_doc_to_indexes(DbName, #doc{body = {Fields}} = Doc) -> @@ -110,17 +113,17 @@ design_doc_to_indexes(DbName, #doc{body = {Fields}} = Doc) -> init({DbName, Index}) -> process_flag(trap_exit, true), case open_index(DbName, Index) of - {ok, Pid, Seq} -> + {ok, Key, Seq} -> State = #state{ dbname = DbName, index = Index#index{current_seq = Seq, dbname = DbName}, - index_pid = Pid + index_key = Key }, case couch_db:open_int(DbName, []) of {ok, Db} -> try couch_db:monitor(Db), - dreyfus_util:maybe_create_local_purge_doc(Db, Pid, Index) + dreyfus_util:maybe_create_local_purge_doc(Db, Key, Index) after couch_db:close(Db) end, @@ -139,7 +142,7 @@ handle_call( #state{ index = #index{dbname = DbName, name = IdxName, ddoc_id = DDocId, current_seq = Seq} = Index, - index_pid = IndexPid, + index_key = IndexKey, updater_pid = nil, waiting_list = WaitList } = State @@ -150,7 +153,7 @@ handle_call( case dreyfus_util:in_black_list(DbName2, GroupId, IdxName) of false -> UpPid = spawn_link(fun() -> - dreyfus_index_updater:update(IndexPid, Index) + dreyfus_index_updater:update(IndexKey, Index) end), State#state{ updater_pid = UpPid, @@ -170,29 +173,32 @@ handle_call( _From, #state{index = #index{current_seq = Seq}} = State ) when RequestSeq =< Seq -> - {reply, {ok, State#state.index_pid, Seq}, State}; + {reply, {ok, State#state.index_key, Seq}, State}; handle_call({await, RequestSeq}, From, #state{waiting_list = WaitList} = State) -> {noreply, State#state{ waiting_list = [{From, RequestSeq} | WaitList] }}; % upgrade +handle_call(get_index_key, _From, State) -> + {reply, State#state.index_key, State}; +% upgrade handle_call(get_index_pid, _From, State) -> - {reply, State#state.index_pid, State}; + {reply, clouseau_rpc:get_index_pid_from_key(State#state.index_key), State}; % obsolete handle_call({search, QueryArgs0}, _From, State) -> - Reply = search_int(State#state.index_pid, QueryArgs0), + Reply = search_int(State#state.index_key, QueryArgs0), {reply, Reply, State}; % obsolete handle_call({group1, QueryArgs0}, _From, State) -> - Reply = group1_int(State#state.index_pid, QueryArgs0), + Reply = group1_int(State#state.index_key, QueryArgs0), {reply, Reply, State}; % obsolete handle_call({group2, QueryArgs0}, _From, State) -> - Reply = group2_int(State#state.index_pid, QueryArgs0), + Reply = group2_int(State#state.index_key, QueryArgs0), {reply, Reply, State}; % obsolete handle_call(info, _From, State) -> - Reply = info_int(State#state.index_pid), + Reply = info_int(State#state.index_key), {reply, Reply, State}. handle_cast(_Msg, State) -> @@ -202,13 +208,13 @@ handle_info( {'EXIT', FromPid, {updated, NewSeq}}, #state{ index = #index{dbname = DbName, name = IdxName, ddoc_id = DDocId} = Index0, - index_pid = IndexPid, + index_key = IndexKey, updater_pid = UpPid, waiting_list = WaitList } = State ) when UpPid == FromPid -> Index = Index0#index{current_seq = NewSeq}, - case reply_with_index(IndexPid, Index, WaitList) of + case reply_with_index(IndexKey, Index, WaitList) of [] -> {noreply, State#state{ index = Index, @@ -229,7 +235,7 @@ handle_info( nil; false -> spawn_link(fun() -> - dreyfus_index_updater:update(IndexPid, Index) + dreyfus_index_updater:update(IndexKey, Index) end) end, {noreply, State#state{ @@ -240,36 +246,23 @@ handle_info( end; handle_info({'EXIT', _, {updated, _}}, State) -> {noreply, State}; -handle_info( - {'EXIT', FromPid, Reason}, - #state{ - index = Index, - index_pid = IndexPid, - waiting_list = WaitList - } = State -) when FromPid == IndexPid -> - couch_log:notice( - "index for ~p closed with reason ~p", [index_name(Index), Reason] - ), - [gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList], - {stop, normal, State}; -handle_info( - {'EXIT', FromPid, Reason}, - #state{ - index = Index, - updater_pid = UpPid, - waiting_list = WaitList - } = State -) when FromPid == UpPid -> - couch_log:info( - "Shutting down index server ~p, updater ~p closing w/ reason ~w", - [index_name(Index), UpPid, Reason] - ), - [gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList], - {stop, normal, State}; -handle_info({'EXIT', Pid, Reason}, State) -> - % probably dreyfus_index_manager. - couch_log:notice("Unknown pid ~p closed with reason ~p", [Pid, Reason]), +handle_info({'EXIT', FromPid, Reason}, #state{index = Index, index_key = IndexKey, updater_pid = UpdaterPid, waiting_list = WaitList} = State) -> + case clouseau_rpc:get_index_pid_from_key(IndexKey) of + IndexerPid when FromPid == IndexerPid -> + couch_log:notice( + "index for ~p closed with reason ~p", [index_name(Index), Reason] + ), + [gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList]; + _ when FromPid == UpdaterPid -> + couch_log:info( + "Shutting down index server ~p, updater ~p closing w/ reason ~w", + [index_name(Index), UpdaterPid, Reason] + ), + [gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList]; + _ -> + % probably dreyfus_index_manager. + couch_log:notice("Unknown pid ~p closed with reason ~p", [FromPid, Reason]) + end, {stop, normal, State}; handle_info( {'DOWN', _, _, Pid, Reason}, @@ -291,9 +284,10 @@ open_index(DbName, #index{analyzer = Analyzer, sig = Sig}) -> Path = <>, case clouseau_rpc:open_index(self(), Path, Analyzer) of {ok, Pid} -> - case clouseau_rpc:get_update_seq(Pid) of + Key = clouseau_rpc:index_key_from_pid(Pid), + case clouseau_rpc:get_update_seq(Key) of {ok, Seq} -> - {ok, Pid, Seq}; + {ok, Key, Seq}; Error -> Error end; @@ -336,18 +330,18 @@ design_doc_to_index(DbName, #doc{id = Id, body = {Fields}}, IndexName) -> {error, InvalidDDocError} end. -reply_with_index(IndexPid, Index, WaitList) -> - reply_with_index(IndexPid, Index, WaitList, []). +reply_with_index(IndexKey, Index, WaitList) -> + reply_with_index(IndexKey, Index, WaitList, []). -reply_with_index(_IndexPid, _Index, [], Acc) -> +reply_with_index(_IndexKey, _Index, [], Acc) -> Acc; -reply_with_index(IndexPid, #index{current_seq = IndexSeq} = Index, [{Pid, Seq} | Rest], Acc) when +reply_with_index(IndexKey, #index{current_seq = IndexSeq} = Index, [{Pid, Seq} | Rest], Acc) when Seq =< IndexSeq -> - gen_server:reply(Pid, {ok, IndexPid, IndexSeq}), - reply_with_index(IndexPid, Index, Rest, Acc); -reply_with_index(IndexPid, Index, [{Pid, Seq} | Rest], Acc) -> - reply_with_index(IndexPid, Index, Rest, [{Pid, Seq} | Acc]). + gen_server:reply(Pid, {ok, IndexKey, IndexSeq}), + reply_with_index(IndexKey, Index, Rest, Acc); +reply_with_index(IndexKey, Index, [{Pid, Seq} | Rest], Acc) -> + reply_with_index(IndexKey, Index, Rest, [{Pid, Seq} | Acc]). index_name(#index{dbname = DbName, ddoc_id = DDocId, name = IndexName}) -> <>. @@ -388,12 +382,12 @@ args_to_proplist2(#index_query_args{} = Args) -> {highlight_size, Args#index_query_args.highlight_size} ]. -search_int(Pid, QueryArgs0) -> +search_int(Key, QueryArgs0) -> QueryArgs = dreyfus_util:upgrade(QueryArgs0), Props = args_to_proplist(QueryArgs), - clouseau_rpc:search(Pid, Props). + clouseau_rpc:search(Key, Props). -group1_int(Pid, QueryArgs0) -> +group1_int(Key, QueryArgs0) -> QueryArgs = dreyfus_util:upgrade(QueryArgs0), #index_query_args{ q = Query, @@ -406,7 +400,7 @@ group1_int(Pid, QueryArgs0) -> } } = QueryArgs, clouseau_rpc:group1( - Pid, + Key, Query, GroupBy, Stale =:= false, @@ -415,10 +409,10 @@ group1_int(Pid, QueryArgs0) -> Limit ). -group2_int(Pid, QueryArgs0) -> +group2_int(Key, QueryArgs0) -> QueryArgs = dreyfus_util:upgrade(QueryArgs0), Props = args_to_proplist2(QueryArgs), - clouseau_rpc:group2(Pid, Props). + clouseau_rpc:group2(Key, Props). -info_int(Pid) -> - clouseau_rpc:info(Pid). +info_int(Key) -> + clouseau_rpc:info(Key). diff --git a/src/dreyfus/src/dreyfus_index_manager.erl b/src/dreyfus/src/dreyfus_index_manager.erl index a54db5e15bc..7921e839e72 100644 --- a/src/dreyfus/src/dreyfus_index_manager.erl +++ b/src/dreyfus/src/dreyfus_index_manager.erl @@ -64,7 +64,7 @@ handle_call({get_index, DbName, #index{sig = Sig} = Index}, From, State) -> ets:insert(?BY_SIG, {{DbName, Sig}, [From | WaitList]}), {noreply, State}; [{_, ExistingPid}] -> - {reply, {ok, ExistingPid}, State} + {reply, {ok, {pid, ExistingPid}}, State} end; handle_call({open_ok, DbName, Sig, NewPid}, {OpenerPid, _}, State) -> link(NewPid), diff --git a/src/dreyfus/src/dreyfus_index_updater.erl b/src/dreyfus/src/dreyfus_index_updater.erl index 387ab09e247..21b18367146 100644 --- a/src/dreyfus/src/dreyfus_index_updater.erl +++ b/src/dreyfus/src/dreyfus_index_updater.erl @@ -20,7 +20,7 @@ -import(couch_query_servers, [get_os_process/1, ret_os_process/1, proc_prompt/2]). -update(IndexPid, Index) -> +update(IndexKey, Index) -> #index{ current_seq = CurSeq, dbname = DbName, @@ -31,7 +31,7 @@ update(IndexPid, Index) -> {ok, Db} = couch_db:open_int(DbName, []), try TotalUpdateChanges = couch_db:count_changes_since(Db, CurSeq), - TotalPurgeChanges = count_pending_purged_docs_since(Db, IndexPid), + TotalPurgeChanges = count_pending_purged_docs_since(Db, IndexKey), TotalChanges = TotalUpdateChanges + TotalPurgeChanges, couch_task_status:add_task([ @@ -49,7 +49,7 @@ update(IndexPid, Index) -> %ExcludeIdRevs is [{Id1, Rev1}, {Id2, Rev2}, ...] %The Rev is the final Rev, not purged Rev. - {ok, ExcludeIdRevs} = purge_index(Db, IndexPid, Index), + {ok, ExcludeIdRevs} = purge_index(Db, IndexKey, Index), %% compute on all docs modified since we last computed. NewCurSeq = couch_db:get_update_seq(Db), @@ -58,9 +58,9 @@ update(IndexPid, Index) -> true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def]), EnumFun = fun ?MODULE:load_docs/2, [Changes] = couch_task_status:get([changes_done]), - Acc0 = {Changes, IndexPid, Db, Proc, TotalChanges, erlang:timestamp(), ExcludeIdRevs}, + Acc0 = {Changes, IndexKey, Db, Proc, TotalChanges, erlang:timestamp(), ExcludeIdRevs}, {ok, _} = couch_db:fold_changes(Db, CurSeq, EnumFun, Acc0, []), - ok = clouseau_rpc:commit(IndexPid, NewCurSeq) + ok = clouseau_rpc:commit(IndexKey, NewCurSeq) after ret_os_process(Proc) end, @@ -69,33 +69,33 @@ update(IndexPid, Index) -> couch_db:close(Db) end. -load_docs(FDI, {I, IndexPid, Db, Proc, Total, LastCommitTime, ExcludeIdRevs} = Acc) -> +load_docs(FDI, {I, IndexKey, Db, Proc, Total, LastCommitTime, ExcludeIdRevs} = Acc) -> couch_task_status:update([{changes_done, I}, {progress, (I * 100) div Total}]), DI = couch_doc:to_doc_info(FDI), #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{rev = Rev} | _]} = DI, %check if it is processed in purge_index to avoid update the index again. case lists:member({Id, Rev}, ExcludeIdRevs) of true -> ok; - false -> update_or_delete_index(IndexPid, Db, DI, Proc) + false -> update_or_delete_index(IndexKey, Db, DI, Proc) end, %% Force a commit every minute case timer:now_diff(Now = erlang:timestamp(), LastCommitTime) >= 60000000 of true -> - ok = clouseau_rpc:commit(IndexPid, Seq), - {ok, {I + 1, IndexPid, Db, Proc, Total, Now, ExcludeIdRevs}}; + ok = clouseau_rpc:commit(IndexKey, Seq), + {ok, {I + 1, IndexKey, Db, Proc, Total, Now, ExcludeIdRevs}}; false -> {ok, setelement(1, Acc, I + 1)} end. -purge_index(Db, IndexPid, Index) -> - {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid), +purge_index(Db, IndexKey, Index) -> + {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexKey), Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def]), FoldFun = fun({_PurgeSeq, _UUID, Id, _Revs}, Acc) -> case couch_db:get_full_doc_info(Db, Id) of not_found -> - ok = clouseau_rpc:delete(IndexPid, Id), + ok = clouseau_rpc:delete(IndexKey, Id), update_task(1), {ok, Acc}; FDI -> @@ -105,7 +105,7 @@ purge_index(Db, IndexPid, Index) -> true -> {ok, Acc}; false -> - update_or_delete_index(IndexPid, Db, DI, Proc), + update_or_delete_index(IndexKey, Db, DI, Proc), update_task(1), {ok, [{Id, Rev} | Acc]} end @@ -113,23 +113,23 @@ purge_index(Db, IndexPid, Index) -> end, {ok, ExcludeList} = couch_db:fold_purge_infos(Db, IdxPurgeSeq, FoldFun, []), NewPurgeSeq = couch_db:get_purge_seq(Db), - ok = clouseau_rpc:set_purge_seq(IndexPid, NewPurgeSeq), + ok = clouseau_rpc:set_purge_seq(IndexKey, NewPurgeSeq), update_local_doc(Db, Index, NewPurgeSeq), {ok, ExcludeList} after ret_os_process(Proc) end. -count_pending_purged_docs_since(Db, IndexPid) -> +count_pending_purged_docs_since(Db, IndexKey) -> DbPurgeSeq = couch_db:get_purge_seq(Db), - {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid), + {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexKey), DbPurgeSeq - IdxPurgeSeq. -update_or_delete_index(IndexPid, Db, DI, Proc) -> +update_or_delete_index(IndexKey, Db, DI, Proc) -> #doc_info{id = Id, revs = [#rev_info{deleted = Del} | _]} = DI, case Del of true -> - ok = clouseau_rpc:delete(IndexPid, Id); + ok = clouseau_rpc:delete(IndexKey, Id); false -> case maybe_skip_doc(Db, Id) of true -> @@ -141,8 +141,8 @@ update_or_delete_index(IndexPid, Db, DI, Proc) -> Fields1 = [list_to_tuple(Field) || Field <- Fields], Fields2 = maybe_add_partition(Db, Id, Fields1), case Fields2 of - [] -> ok = clouseau_rpc:delete(IndexPid, Id); - _ -> ok = clouseau_rpc:update(IndexPid, Id, Fields2) + [] -> ok = clouseau_rpc:delete(IndexKey, Id); + _ -> ok = clouseau_rpc:update(IndexKey, Id, Fields2) end end end. diff --git a/src/dreyfus/src/dreyfus_rpc.erl b/src/dreyfus/src/dreyfus_rpc.erl index 5fbe194dcae..2a931c39ada 100644 --- a/src/dreyfus/src/dreyfus_rpc.erl +++ b/src/dreyfus/src/dreyfus_rpc.erl @@ -62,10 +62,10 @@ call(Fun, DbName, DDoc, IndexName, QueryArgs0) -> index_call(Fun, DbName, Index, QueryArgs, MinSeq) -> case dreyfus_index_manager:get_index(DbName, Index) of - {ok, Pid} -> - case dreyfus_index:await(Pid, MinSeq) of - {ok, IndexPid, _Seq} -> - dreyfus_index:Fun(IndexPid, QueryArgs); + {ok, Key} -> + case dreyfus_index:await(Key, MinSeq) of + {ok, IndexKey, _Seq} -> + dreyfus_index:Fun(IndexKey, QueryArgs); Error -> Error end; diff --git a/src/dreyfus/src/dreyfus_util.erl b/src/dreyfus/src/dreyfus_util.erl index d651da9bd05..8dc4cdf63e4 100644 --- a/src/dreyfus/src/dreyfus_util.erl +++ b/src/dreyfus/src/dreyfus_util.erl @@ -345,12 +345,12 @@ maybe_create_local_purge_doc(Db, Index) -> ok end. -maybe_create_local_purge_doc(Db, IndexPid, Index) -> +maybe_create_local_purge_doc(Db, IndexKey, Index) -> DocId = dreyfus_util:get_local_purge_doc_id(Index#index.sig), case couch_db:open_doc(Db, DocId) of {not_found, _} -> DbPurgeSeq = couch_db:get_purge_seq(Db), - ok = clouseau_rpc:set_purge_seq(IndexPid, DbPurgeSeq), + ok = clouseau_rpc:set_purge_seq(IndexKey, DbPurgeSeq), DocContent = dreyfus_util:get_local_purge_doc_body( Db, DocId, DbPurgeSeq, Index ), From 8cbb5cd0c778893ca8ff333ec2f95ab224408d7c Mon Sep 17 00:00:00 2001 From: Gabor Pali Date: Thu, 19 Mar 2026 18:26:48 +0100 Subject: [PATCH 2/4] !fixup: apply `erlfmt-format` --- src/dreyfus/src/clouseau_rpc.erl | 12 ++++----- src/dreyfus/src/dreyfus_index.erl | 45 +++++++++++++++++-------------- 2 files changed, 31 insertions(+), 26 deletions(-) diff --git a/src/dreyfus/src/clouseau_rpc.erl b/src/dreyfus/src/clouseau_rpc.erl index 8ed91f29a54..d2a5cf52831 100644 --- a/src/dreyfus/src/clouseau_rpc.erl +++ b/src/dreyfus/src/clouseau_rpc.erl @@ -353,14 +353,14 @@ connected() -> end. rpc(Key, Message) -> - { Ref, Msg } = case Key of - {pid, Pid} -> { Pid, Message }; - {path, Path} -> { main, {forward, Path, Message} }; - Other -> Other - end, + {Ref, Msg} = + case Key of + {pid, Pid} -> {Pid, Message}; + {path, Path} -> {main, {forward, Path, Message}}; + Other -> Other + end, ioq:call_search(Ref, Msg, erlang:get(io_priority)). - clouseau() -> list_to_atom(config:get("dreyfus", "name", "clouseau@127.0.0.1")). diff --git a/src/dreyfus/src/dreyfus_index.erl b/src/dreyfus/src/dreyfus_index.erl index 6fbcb3146f7..05c36e51b74 100644 --- a/src/dreyfus/src/dreyfus_index.erl +++ b/src/dreyfus/src/dreyfus_index.erl @@ -84,10 +84,11 @@ info(Key0) -> to_index_key(Key) -> Pid = clouseau_rpc:get_index_pid_from_key(Key), case node(Pid) == node() of - true -> - IndexPid = gen_server:call(Pid, get_index_pid, infinity), - clouseau_rpc:index_key_from_pid(IndexPid); - false -> Key + true -> + IndexPid = gen_server:call(Pid, get_index_pid, infinity), + clouseau_rpc:index_key_from_pid(IndexPid); + false -> + Key end. design_doc_to_indexes(DbName, #doc{body = {Fields}} = Doc) -> @@ -246,22 +247,26 @@ handle_info( end; handle_info({'EXIT', _, {updated, _}}, State) -> {noreply, State}; -handle_info({'EXIT', FromPid, Reason}, #state{index = Index, index_key = IndexKey, updater_pid = UpdaterPid, waiting_list = WaitList} = State) -> +handle_info( + {'EXIT', FromPid, Reason}, + #state{index = Index, index_key = IndexKey, updater_pid = UpdaterPid, waiting_list = WaitList} = + State +) -> case clouseau_rpc:get_index_pid_from_key(IndexKey) of - IndexerPid when FromPid == IndexerPid -> - couch_log:notice( - "index for ~p closed with reason ~p", [index_name(Index), Reason] - ), - [gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList]; - _ when FromPid == UpdaterPid -> - couch_log:info( - "Shutting down index server ~p, updater ~p closing w/ reason ~w", - [index_name(Index), UpdaterPid, Reason] - ), - [gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList]; - _ -> - % probably dreyfus_index_manager. - couch_log:notice("Unknown pid ~p closed with reason ~p", [FromPid, Reason]) + IndexerPid when FromPid == IndexerPid -> + couch_log:notice( + "index for ~p closed with reason ~p", [index_name(Index), Reason] + ), + [gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList]; + _ when FromPid == UpdaterPid -> + couch_log:info( + "Shutting down index server ~p, updater ~p closing w/ reason ~w", + [index_name(Index), UpdaterPid, Reason] + ), + [gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList]; + _ -> + % probably dreyfus_index_manager. + couch_log:notice("Unknown pid ~p closed with reason ~p", [FromPid, Reason]) end, {stop, normal, State}; handle_info( @@ -284,7 +289,7 @@ open_index(DbName, #index{analyzer = Analyzer, sig = Sig}) -> Path = <>, case clouseau_rpc:open_index(self(), Path, Analyzer) of {ok, Pid} -> - Key = clouseau_rpc:index_key_from_pid(Pid), + Key = clouseau_rpc:index_key_from_pid(Pid), case clouseau_rpc:get_update_seq(Key) of {ok, Seq} -> {ok, Key, Seq}; From 11d97365aab64bc91005f2da33e34c1cdc4e6f03 Mon Sep 17 00:00:00 2001 From: Gabor Pali Date: Sat, 21 Mar 2026 00:34:12 +0100 Subject: [PATCH 3/4] !fixup: minor fixes --- src/dreyfus/src/clouseau_rpc.erl | 10 +++++----- src/dreyfus/src/dreyfus_index.erl | 3 ++- src/dreyfus/src/dreyfus_index_manager.erl | 4 ++-- src/dreyfus/src/dreyfus_rpc.erl | 4 ++-- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/src/dreyfus/src/clouseau_rpc.erl b/src/dreyfus/src/clouseau_rpc.erl index d2a5cf52831..f2d43939f9e 100644 --- a/src/dreyfus/src/clouseau_rpc.erl +++ b/src/dreyfus/src/clouseau_rpc.erl @@ -51,11 +51,11 @@ -type indexer_key() :: {path, string_as_binary(_), pid()} | {pid, pid()}. -spec index_key_from_pid(pid()) -> indexer_key(). -index_key_from_pid(Pid) -> +index_key_from_pid(Pid) when is_pid(Pid) -> {pid, Pid}. -spec index_key_from_path(string_as_binary(_), pid()) -> indexer_key(). -index_key_from_path(Path, Pid) -> +index_key_from_path(Path, Pid) when is_pid(Pid) -> {path, Path, Pid}. -spec get_index_pid_from_key(indexer_key()) -> pid(). @@ -101,7 +101,7 @@ get_root_dir() -> rpc({main, clouseau()}, {get_root_dir}). %% not used ??? --spec await(Ref :: indexer_key(), MinSeq :: commit_seq()) -> +-spec await(Key :: indexer_key(), MinSeq :: commit_seq()) -> ok | error(). await(Ref, MinSeq) -> @@ -356,8 +356,8 @@ rpc(Key, Message) -> {Ref, Msg} = case Key of {pid, Pid} -> {Pid, Message}; - {path, Path} -> {main, {forward, Path, Message}}; - Other -> Other + {path, Path, _Pid} -> {{main, clouseau()}, {forward, Path, Message}}; + Other -> {Other, Message} end, ioq:call_search(Ref, Msg, erlang:get(io_priority)). diff --git a/src/dreyfus/src/dreyfus_index.erl b/src/dreyfus/src/dreyfus_index.erl index 05c36e51b74..dceb6794c4f 100644 --- a/src/dreyfus/src/dreyfus_index.erl +++ b/src/dreyfus/src/dreyfus_index.erl @@ -56,7 +56,8 @@ start_link(DbName, Index) -> proc_lib:start_link(?MODULE, init, [{DbName, Index}]). await(Key, MinSeq) -> - MFA = {gen_server, call, [Key, {await, MinSeq}, infinity]}, + Pid = clouseau_rpc:get_index_pid_from_key(Key), + MFA = {gen_server, call, [Pid, {await, MinSeq}, infinity]}, dreyfus_util:time([index, await], MFA). search(Key0, QueryArgs) -> diff --git a/src/dreyfus/src/dreyfus_index_manager.erl b/src/dreyfus/src/dreyfus_index_manager.erl index 7921e839e72..6aa93d9608b 100644 --- a/src/dreyfus/src/dreyfus_index_manager.erl +++ b/src/dreyfus/src/dreyfus_index_manager.erl @@ -64,12 +64,12 @@ handle_call({get_index, DbName, #index{sig = Sig} = Index}, From, State) -> ets:insert(?BY_SIG, {{DbName, Sig}, [From | WaitList]}), {noreply, State}; [{_, ExistingPid}] -> - {reply, {ok, {pid, ExistingPid}}, State} + {reply, {ok, clouseau_rpc:index_key_from_pid(ExistingPid)}, State} end; handle_call({open_ok, DbName, Sig, NewPid}, {OpenerPid, _}, State) -> link(NewPid), [{_, WaitList}] = ets:lookup(?BY_SIG, {DbName, Sig}), - [gen_server:reply(From, {ok, NewPid}) || From <- WaitList], + [gen_server:reply(From, {ok, clouseau_rpc:index_key_from_pid(NewPid)}) || From <- WaitList], ets:delete(?BY_PID, OpenerPid), add_to_ets(NewPid, DbName, Sig), {reply, ok, State}; diff --git a/src/dreyfus/src/dreyfus_rpc.erl b/src/dreyfus/src/dreyfus_rpc.erl index 2a931c39ada..1055ad85ea8 100644 --- a/src/dreyfus/src/dreyfus_rpc.erl +++ b/src/dreyfus/src/dreyfus_rpc.erl @@ -83,8 +83,8 @@ info_int(DbName, DDoc, IndexName) -> case dreyfus_index:design_doc_to_index(DbName, DDoc, IndexName) of {ok, Index} -> case dreyfus_index_manager:get_index(DbName, Index) of - {ok, Pid} -> - case dreyfus_index:info(Pid) of + {ok, Key} -> + case dreyfus_index:info(Key) of {ok, Fields} -> Info = [{signature, Index#index.sig} | Fields], rexi:reply({ok, Info}); From 8c20f4075dc218f231255b3638f1fb4a1a8541bf Mon Sep 17 00:00:00 2001 From: Gabor Pali Date: Sat, 21 Mar 2026 01:02:51 +0100 Subject: [PATCH 4/4] !fixup: add toggle --- rel/overlay/etc/default.ini | 1 + src/dreyfus/src/clouseau_rpc.erl | 8 ++++---- src/dreyfus/src/dreyfus_index.erl | 6 +++++- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 558542fed4d..302d2ad246b 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -960,6 +960,7 @@ state_dir = {{state_dir}} ; enable Search functionality. ;name = clouseau@127.0.0.1 name = {{clouseau_name}} +path_keys = false ; CouchDB will try to re-connect to Clouseau using a bounded ; exponential backoff with the following number of iterations. diff --git a/src/dreyfus/src/clouseau_rpc.erl b/src/dreyfus/src/clouseau_rpc.erl index f2d43939f9e..a59ce2c570d 100644 --- a/src/dreyfus/src/clouseau_rpc.erl +++ b/src/dreyfus/src/clouseau_rpc.erl @@ -16,7 +16,7 @@ -include("dreyfus.hrl"). --export([index_key_from_pid/1, index_key_from_path/2, get_index_pid_from_key/1]). +-export([index_key_from_pid/1, index_key_with_path/2, get_index_pid_from_key/1]). -export([open_index/3]). -export([await/2, commit/2, get_update_seq/1, info/1, search/2]). -export([group1/7, group2/2]). @@ -54,12 +54,12 @@ index_key_from_pid(Pid) when is_pid(Pid) -> {pid, Pid}. --spec index_key_from_path(string_as_binary(_), pid()) -> indexer_key(). -index_key_from_path(Path, Pid) when is_pid(Pid) -> +-spec index_key_with_path(string_as_binary(_), pid()) -> indexer_key(). +index_key_with_path(Path, Pid) when is_pid(Pid) -> {path, Path, Pid}. -spec get_index_pid_from_key(indexer_key()) -> pid(). -get_index_pid_from_key({path, _, Pid}) -> +get_index_pid_from_key({path, _Path, Pid}) -> Pid; get_index_pid_from_key({pid, Pid}) -> Pid. diff --git a/src/dreyfus/src/dreyfus_index.erl b/src/dreyfus/src/dreyfus_index.erl index dceb6794c4f..46587355a16 100644 --- a/src/dreyfus/src/dreyfus_index.erl +++ b/src/dreyfus/src/dreyfus_index.erl @@ -290,7 +290,11 @@ open_index(DbName, #index{analyzer = Analyzer, sig = Sig}) -> Path = <>, case clouseau_rpc:open_index(self(), Path, Analyzer) of {ok, Pid} -> - Key = clouseau_rpc:index_key_from_pid(Pid), + Key = + case config:get_boolean("dreyfus", "path_keys", false) of + true -> clouseau_rpc:index_key_with_path(Path, Pid); + false -> clouseau_rpc:index_key_from_pid(Pid) + end, case clouseau_rpc:get_update_seq(Key) of {ok, Seq} -> {ok, Key, Seq};