diff --git a/rel/overlay/etc/default.ini b/rel/overlay/etc/default.ini index 558542fed4..302d2ad246 100644 --- a/rel/overlay/etc/default.ini +++ b/rel/overlay/etc/default.ini @@ -960,6 +960,7 @@ state_dir = {{state_dir}} ; enable Search functionality. ;name = clouseau@127.0.0.1 name = {{clouseau_name}} +path_keys = false ; CouchDB will try to re-connect to Clouseau using a bounded ; exponential backoff with the following number of iterations. diff --git a/src/dreyfus/src/clouseau_rpc.erl b/src/dreyfus/src/clouseau_rpc.erl index c036a5a9af..a59ce2c570 100644 --- a/src/dreyfus/src/clouseau_rpc.erl +++ b/src/dreyfus/src/clouseau_rpc.erl @@ -16,6 +16,7 @@ -include("dreyfus.hrl"). +-export([index_key_from_pid/1, index_key_with_path/2, get_index_pid_from_key/1]). -export([open_index/3]). -export([await/2, commit/2, get_update_seq/1, info/1, search/2]). -export([group1/7, group2/2]). @@ -47,7 +48,21 @@ -type analyzer_fields() :: [{field_name(), analyzer_name() | [analyzer_name()]}]. --type indexer_pid() :: pid(). +-type indexer_key() :: {path, string_as_binary(_), pid()} | {pid, pid()}. + +-spec index_key_from_pid(pid()) -> indexer_key(). +index_key_from_pid(Pid) when is_pid(Pid) -> + {pid, Pid}. + +-spec index_key_with_path(string_as_binary(_), pid()) -> indexer_key(). +index_key_with_path(Path, Pid) when is_pid(Pid) -> + {path, Path, Pid}. + +-spec get_index_pid_from_key(indexer_key()) -> pid(). +get_index_pid_from_key({path, _Path, Pid}) -> + Pid; +get_index_pid_from_key({pid, Pid}) -> + Pid. %% Example of the message %% {[ @@ -69,7 +84,7 @@ -define(SEARCH_SERVICE_TIMEOUT, 2000). -spec open_index(Peer :: pid(), Path :: shard(), Analyzer :: analyzer()) -> - {ok, indexer_pid()} | error(). + {ok, indexer_key()} | error(). open_index(Peer, Path, Analyzer) -> rpc({main, clouseau()}, {open, Peer, Path, Analyzer}). @@ -86,14 +101,14 @@ get_root_dir() -> rpc({main, clouseau()}, {get_root_dir}). %% not used ??? --spec await(Ref :: indexer_pid(), MinSeq :: commit_seq()) -> +-spec await(Key :: indexer_key(), MinSeq :: commit_seq()) -> ok | error(). await(Ref, MinSeq) -> rpc(Ref, {await, MinSeq}). %% deprecated --spec commit(Ref :: indexer_pid(), NewCommitSeq :: commit_seq()) -> +-spec commit(Ref :: indexer_key(), NewCommitSeq :: commit_seq()) -> ok | error(). commit(Ref, NewCommitSeq) -> @@ -107,25 +122,25 @@ commit(Ref, NewCommitSeq) -> | {committed_seq, committed_seq()} | {purge_seq, purge_seq()}. --spec info(Ref :: indexer_pid()) -> +-spec info(Ref :: indexer_key()) -> {ok, [info_result_item()]} | error(). info(Ref) -> rpc(Ref, info). --spec get_update_seq(Ref :: indexer_pid()) -> +-spec get_update_seq(Ref :: indexer_key()) -> {ok, update_seq()} | error(). get_update_seq(Ref) -> rpc(Ref, get_update_seq). --spec set_purge_seq(Ref :: indexer_pid(), Seq :: purge_seq()) -> +-spec set_purge_seq(Ref :: indexer_key(), Seq :: purge_seq()) -> ok | error(). set_purge_seq(Ref, Seq) -> rpc(Ref, {set_purge_seq, Seq}). --spec get_purge_seq(Ref :: indexer_pid()) -> +-spec get_purge_seq(Ref :: indexer_key()) -> {ok, purge_seq()} | error(). get_purge_seq(Ref) -> @@ -157,11 +172,11 @@ get_purge_seq(Ref) -> | {highlight_size, pos_integer()} | {legacy, boolean()}. --spec search(Ref :: indexer_pid(), Args :: [search_arg()]) -> +-spec search(Key :: indexer_key(), Args :: [search_arg()]) -> {ok, #top_docs{}} | error(). -search(Ref, Args) -> - case rpc(Ref, {search, Args}) of +search(Key, Args) -> + case rpc(Key, {search, Args}) of {ok, Response} when is_list(Response) -> {ok, #top_docs{ update_seq = couch_util:get_value(update_seq, Response), @@ -181,7 +196,7 @@ search(Ref, Args) -> | [field_name()]. -spec group1( - Ref :: indexer_pid(), + Key :: indexer_key(), Query :: query(), GroupBy :: field_name(), Refresh :: boolean(), @@ -190,8 +205,8 @@ search(Ref, Args) -> Limit :: limit() ) -> {ok, [{field_name(), sort_values()}]} | error(). -group1(Ref, Query, GroupBy, Refresh, Sort, Offset, Limit) -> - rpc(Ref, {group1, Query, GroupBy, Refresh, Sort, Offset, Limit}). +group1(Key, Query, GroupBy, Refresh, Sort, Offset, Limit) -> + rpc(Key, {group1, Query, GroupBy, Refresh, Sort, Offset, Limit}). -type group_name() :: string_as_binary(_) | null. -type sort_values() :: [string_as_binary(_) | null]. @@ -213,17 +228,17 @@ group1(Ref, Query, GroupBy, Refresh, Sort, Offset, Limit) -> | {highlight_size, pos_integer()}. -type grouped_results() :: [{field_name(), TotalHits :: non_neg_integer(), [#hit{}]}]. --spec group2(Ref :: indexer_pid(), Args :: [query_arg()]) -> +-spec group2(Key :: indexer_key(), Args :: [query_arg()]) -> {ok, {TotalHits :: non_neg_integer(), TotalGroupedHits :: non_neg_integer(), grouped_results()}}. -group2(Ref, Args) -> - rpc(Ref, {group2, Args}). +group2(Key, Args) -> + rpc(Key, {group2, Args}). --spec delete(Ref :: indexer_pid(), Id :: docid()) -> +-spec delete(Key :: indexer_key(), Id :: docid()) -> ok. -delete(Ref, Id) -> - rpc(Ref, {delete, couch_util:to_binary(Id)}). +delete(Key, Id) -> + rpc(Key, {delete, couch_util:to_binary(Id)}). -type docid() :: string_as_binary(_). @@ -242,11 +257,11 @@ delete(Ref, Id) -> -type yes_or_no() :: string_as_binary(yes) | string_as_binary(no). -spec update( - Ref :: indexer_pid(), Id :: docid(), Fields :: [{field_name(), field_value(), [field_option()]}] + Key :: indexer_key(), Id :: docid(), Fields :: [{field_name(), field_value(), [field_option()]}] ) -> ok. -update(Ref, Id, Fields) -> - rpc(Ref, {update, Id, Fields}). +update(Key, Id, Fields) -> + rpc(Key, {update, Id, Fields}). -spec cleanup(DbName :: string_as_binary(_)) -> ok. cleanup(DbName) -> @@ -337,7 +352,13 @@ connected() -> end end. -rpc(Ref, Msg) -> +rpc(Key, Message) -> + {Ref, Msg} = + case Key of + {pid, Pid} -> {Pid, Message}; + {path, Path, _Pid} -> {{main, clouseau()}, {forward, Path, Message}}; + Other -> {Other, Message} + end, ioq:call_search(Ref, Msg, erlang:get(io_priority)). clouseau() -> diff --git a/src/dreyfus/src/dreyfus_index.erl b/src/dreyfus/src/dreyfus_index.erl index d754050015..46587355a1 100644 --- a/src/dreyfus/src/dreyfus_index.erl +++ b/src/dreyfus/src/dreyfus_index.erl @@ -44,7 +44,7 @@ dbname, index, updater_pid = nil, - index_pid = nil, + index_key = nil, waiting_list = [] }). @@ -55,36 +55,41 @@ start_link(DbName, Index) -> proc_lib:start_link(?MODULE, init, [{DbName, Index}]). -await(Pid, MinSeq) -> +await(Key, MinSeq) -> + Pid = clouseau_rpc:get_index_pid_from_key(Key), MFA = {gen_server, call, [Pid, {await, MinSeq}, infinity]}, dreyfus_util:time([index, await], MFA). -search(Pid0, QueryArgs) -> - Pid = to_index_pid(Pid0), - MFA = {?MODULE, search_int, [Pid, QueryArgs]}, +search(Key0, QueryArgs) -> + Key = to_index_key(Key0), + MFA = {?MODULE, search_int, [Key, QueryArgs]}, dreyfus_util:time([index, search], MFA). -group1(Pid0, QueryArgs) -> - Pid = to_index_pid(Pid0), - MFA = {?MODULE, group1_int, [Pid, QueryArgs]}, +group1(Key0, QueryArgs) -> + Key = to_index_key(Key0), + MFA = {?MODULE, group1_int, [Key, QueryArgs]}, dreyfus_util:time([index, group1], MFA). -group2(Pid0, QueryArgs) -> - Pid = to_index_pid(Pid0), - MFA = {?MODULE, group2_int, [Pid, QueryArgs]}, +group2(Key0, QueryArgs) -> + Key = to_index_key(Key0), + MFA = {?MODULE, group2_int, [Key, QueryArgs]}, dreyfus_util:time([index, group2], MFA). -info(Pid0) -> - Pid = to_index_pid(Pid0), - MFA = {?MODULE, info_int, [Pid]}, +info(Key0) -> + Key = to_index_key(Key0), + MFA = {?MODULE, info_int, [Key]}, dreyfus_util:time([index, info], MFA). %% We either have a dreyfus_index gen_server pid or the remote %% clouseau pid. -to_index_pid(Pid) -> +to_index_key(Key) -> + Pid = clouseau_rpc:get_index_pid_from_key(Key), case node(Pid) == node() of - true -> gen_server:call(Pid, get_index_pid, infinity); - false -> Pid + true -> + IndexPid = gen_server:call(Pid, get_index_pid, infinity), + clouseau_rpc:index_key_from_pid(IndexPid); + false -> + Key end. design_doc_to_indexes(DbName, #doc{body = {Fields}} = Doc) -> @@ -110,17 +115,17 @@ design_doc_to_indexes(DbName, #doc{body = {Fields}} = Doc) -> init({DbName, Index}) -> process_flag(trap_exit, true), case open_index(DbName, Index) of - {ok, Pid, Seq} -> + {ok, Key, Seq} -> State = #state{ dbname = DbName, index = Index#index{current_seq = Seq, dbname = DbName}, - index_pid = Pid + index_key = Key }, case couch_db:open_int(DbName, []) of {ok, Db} -> try couch_db:monitor(Db), - dreyfus_util:maybe_create_local_purge_doc(Db, Pid, Index) + dreyfus_util:maybe_create_local_purge_doc(Db, Key, Index) after couch_db:close(Db) end, @@ -139,7 +144,7 @@ handle_call( #state{ index = #index{dbname = DbName, name = IdxName, ddoc_id = DDocId, current_seq = Seq} = Index, - index_pid = IndexPid, + index_key = IndexKey, updater_pid = nil, waiting_list = WaitList } = State @@ -150,7 +155,7 @@ handle_call( case dreyfus_util:in_black_list(DbName2, GroupId, IdxName) of false -> UpPid = spawn_link(fun() -> - dreyfus_index_updater:update(IndexPid, Index) + dreyfus_index_updater:update(IndexKey, Index) end), State#state{ updater_pid = UpPid, @@ -170,29 +175,32 @@ handle_call( _From, #state{index = #index{current_seq = Seq}} = State ) when RequestSeq =< Seq -> - {reply, {ok, State#state.index_pid, Seq}, State}; + {reply, {ok, State#state.index_key, Seq}, State}; handle_call({await, RequestSeq}, From, #state{waiting_list = WaitList} = State) -> {noreply, State#state{ waiting_list = [{From, RequestSeq} | WaitList] }}; % upgrade +handle_call(get_index_key, _From, State) -> + {reply, State#state.index_key, State}; +% upgrade handle_call(get_index_pid, _From, State) -> - {reply, State#state.index_pid, State}; + {reply, clouseau_rpc:get_index_pid_from_key(State#state.index_key), State}; % obsolete handle_call({search, QueryArgs0}, _From, State) -> - Reply = search_int(State#state.index_pid, QueryArgs0), + Reply = search_int(State#state.index_key, QueryArgs0), {reply, Reply, State}; % obsolete handle_call({group1, QueryArgs0}, _From, State) -> - Reply = group1_int(State#state.index_pid, QueryArgs0), + Reply = group1_int(State#state.index_key, QueryArgs0), {reply, Reply, State}; % obsolete handle_call({group2, QueryArgs0}, _From, State) -> - Reply = group2_int(State#state.index_pid, QueryArgs0), + Reply = group2_int(State#state.index_key, QueryArgs0), {reply, Reply, State}; % obsolete handle_call(info, _From, State) -> - Reply = info_int(State#state.index_pid), + Reply = info_int(State#state.index_key), {reply, Reply, State}. handle_cast(_Msg, State) -> @@ -202,13 +210,13 @@ handle_info( {'EXIT', FromPid, {updated, NewSeq}}, #state{ index = #index{dbname = DbName, name = IdxName, ddoc_id = DDocId} = Index0, - index_pid = IndexPid, + index_key = IndexKey, updater_pid = UpPid, waiting_list = WaitList } = State ) when UpPid == FromPid -> Index = Index0#index{current_seq = NewSeq}, - case reply_with_index(IndexPid, Index, WaitList) of + case reply_with_index(IndexKey, Index, WaitList) of [] -> {noreply, State#state{ index = Index, @@ -229,7 +237,7 @@ handle_info( nil; false -> spawn_link(fun() -> - dreyfus_index_updater:update(IndexPid, Index) + dreyfus_index_updater:update(IndexKey, Index) end) end, {noreply, State#state{ @@ -242,34 +250,25 @@ handle_info({'EXIT', _, {updated, _}}, State) -> {noreply, State}; handle_info( {'EXIT', FromPid, Reason}, - #state{ - index = Index, - index_pid = IndexPid, - waiting_list = WaitList - } = State -) when FromPid == IndexPid -> - couch_log:notice( - "index for ~p closed with reason ~p", [index_name(Index), Reason] - ), - [gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList], - {stop, normal, State}; -handle_info( - {'EXIT', FromPid, Reason}, - #state{ - index = Index, - updater_pid = UpPid, - waiting_list = WaitList - } = State -) when FromPid == UpPid -> - couch_log:info( - "Shutting down index server ~p, updater ~p closing w/ reason ~w", - [index_name(Index), UpPid, Reason] - ), - [gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList], - {stop, normal, State}; -handle_info({'EXIT', Pid, Reason}, State) -> - % probably dreyfus_index_manager. - couch_log:notice("Unknown pid ~p closed with reason ~p", [Pid, Reason]), + #state{index = Index, index_key = IndexKey, updater_pid = UpdaterPid, waiting_list = WaitList} = + State +) -> + case clouseau_rpc:get_index_pid_from_key(IndexKey) of + IndexerPid when FromPid == IndexerPid -> + couch_log:notice( + "index for ~p closed with reason ~p", [index_name(Index), Reason] + ), + [gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList]; + _ when FromPid == UpdaterPid -> + couch_log:info( + "Shutting down index server ~p, updater ~p closing w/ reason ~w", + [index_name(Index), UpdaterPid, Reason] + ), + [gen_server:reply(Pid, {error, Reason}) || {Pid, _} <- WaitList]; + _ -> + % probably dreyfus_index_manager. + couch_log:notice("Unknown pid ~p closed with reason ~p", [FromPid, Reason]) + end, {stop, normal, State}; handle_info( {'DOWN', _, _, Pid, Reason}, @@ -291,9 +290,14 @@ open_index(DbName, #index{analyzer = Analyzer, sig = Sig}) -> Path = <>, case clouseau_rpc:open_index(self(), Path, Analyzer) of {ok, Pid} -> - case clouseau_rpc:get_update_seq(Pid) of + Key = + case config:get_boolean("dreyfus", "path_keys", false) of + true -> clouseau_rpc:index_key_with_path(Path, Pid); + false -> clouseau_rpc:index_key_from_pid(Pid) + end, + case clouseau_rpc:get_update_seq(Key) of {ok, Seq} -> - {ok, Pid, Seq}; + {ok, Key, Seq}; Error -> Error end; @@ -336,18 +340,18 @@ design_doc_to_index(DbName, #doc{id = Id, body = {Fields}}, IndexName) -> {error, InvalidDDocError} end. -reply_with_index(IndexPid, Index, WaitList) -> - reply_with_index(IndexPid, Index, WaitList, []). +reply_with_index(IndexKey, Index, WaitList) -> + reply_with_index(IndexKey, Index, WaitList, []). -reply_with_index(_IndexPid, _Index, [], Acc) -> +reply_with_index(_IndexKey, _Index, [], Acc) -> Acc; -reply_with_index(IndexPid, #index{current_seq = IndexSeq} = Index, [{Pid, Seq} | Rest], Acc) when +reply_with_index(IndexKey, #index{current_seq = IndexSeq} = Index, [{Pid, Seq} | Rest], Acc) when Seq =< IndexSeq -> - gen_server:reply(Pid, {ok, IndexPid, IndexSeq}), - reply_with_index(IndexPid, Index, Rest, Acc); -reply_with_index(IndexPid, Index, [{Pid, Seq} | Rest], Acc) -> - reply_with_index(IndexPid, Index, Rest, [{Pid, Seq} | Acc]). + gen_server:reply(Pid, {ok, IndexKey, IndexSeq}), + reply_with_index(IndexKey, Index, Rest, Acc); +reply_with_index(IndexKey, Index, [{Pid, Seq} | Rest], Acc) -> + reply_with_index(IndexKey, Index, Rest, [{Pid, Seq} | Acc]). index_name(#index{dbname = DbName, ddoc_id = DDocId, name = IndexName}) -> <>. @@ -388,12 +392,12 @@ args_to_proplist2(#index_query_args{} = Args) -> {highlight_size, Args#index_query_args.highlight_size} ]. -search_int(Pid, QueryArgs0) -> +search_int(Key, QueryArgs0) -> QueryArgs = dreyfus_util:upgrade(QueryArgs0), Props = args_to_proplist(QueryArgs), - clouseau_rpc:search(Pid, Props). + clouseau_rpc:search(Key, Props). -group1_int(Pid, QueryArgs0) -> +group1_int(Key, QueryArgs0) -> QueryArgs = dreyfus_util:upgrade(QueryArgs0), #index_query_args{ q = Query, @@ -406,7 +410,7 @@ group1_int(Pid, QueryArgs0) -> } } = QueryArgs, clouseau_rpc:group1( - Pid, + Key, Query, GroupBy, Stale =:= false, @@ -415,10 +419,10 @@ group1_int(Pid, QueryArgs0) -> Limit ). -group2_int(Pid, QueryArgs0) -> +group2_int(Key, QueryArgs0) -> QueryArgs = dreyfus_util:upgrade(QueryArgs0), Props = args_to_proplist2(QueryArgs), - clouseau_rpc:group2(Pid, Props). + clouseau_rpc:group2(Key, Props). -info_int(Pid) -> - clouseau_rpc:info(Pid). +info_int(Key) -> + clouseau_rpc:info(Key). diff --git a/src/dreyfus/src/dreyfus_index_manager.erl b/src/dreyfus/src/dreyfus_index_manager.erl index a54db5e15b..6aa93d9608 100644 --- a/src/dreyfus/src/dreyfus_index_manager.erl +++ b/src/dreyfus/src/dreyfus_index_manager.erl @@ -64,12 +64,12 @@ handle_call({get_index, DbName, #index{sig = Sig} = Index}, From, State) -> ets:insert(?BY_SIG, {{DbName, Sig}, [From | WaitList]}), {noreply, State}; [{_, ExistingPid}] -> - {reply, {ok, ExistingPid}, State} + {reply, {ok, clouseau_rpc:index_key_from_pid(ExistingPid)}, State} end; handle_call({open_ok, DbName, Sig, NewPid}, {OpenerPid, _}, State) -> link(NewPid), [{_, WaitList}] = ets:lookup(?BY_SIG, {DbName, Sig}), - [gen_server:reply(From, {ok, NewPid}) || From <- WaitList], + [gen_server:reply(From, {ok, clouseau_rpc:index_key_from_pid(NewPid)}) || From <- WaitList], ets:delete(?BY_PID, OpenerPid), add_to_ets(NewPid, DbName, Sig), {reply, ok, State}; diff --git a/src/dreyfus/src/dreyfus_index_updater.erl b/src/dreyfus/src/dreyfus_index_updater.erl index 387ab09e24..21b1836714 100644 --- a/src/dreyfus/src/dreyfus_index_updater.erl +++ b/src/dreyfus/src/dreyfus_index_updater.erl @@ -20,7 +20,7 @@ -import(couch_query_servers, [get_os_process/1, ret_os_process/1, proc_prompt/2]). -update(IndexPid, Index) -> +update(IndexKey, Index) -> #index{ current_seq = CurSeq, dbname = DbName, @@ -31,7 +31,7 @@ update(IndexPid, Index) -> {ok, Db} = couch_db:open_int(DbName, []), try TotalUpdateChanges = couch_db:count_changes_since(Db, CurSeq), - TotalPurgeChanges = count_pending_purged_docs_since(Db, IndexPid), + TotalPurgeChanges = count_pending_purged_docs_since(Db, IndexKey), TotalChanges = TotalUpdateChanges + TotalPurgeChanges, couch_task_status:add_task([ @@ -49,7 +49,7 @@ update(IndexPid, Index) -> %ExcludeIdRevs is [{Id1, Rev1}, {Id2, Rev2}, ...] %The Rev is the final Rev, not purged Rev. - {ok, ExcludeIdRevs} = purge_index(Db, IndexPid, Index), + {ok, ExcludeIdRevs} = purge_index(Db, IndexKey, Index), %% compute on all docs modified since we last computed. NewCurSeq = couch_db:get_update_seq(Db), @@ -58,9 +58,9 @@ update(IndexPid, Index) -> true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def]), EnumFun = fun ?MODULE:load_docs/2, [Changes] = couch_task_status:get([changes_done]), - Acc0 = {Changes, IndexPid, Db, Proc, TotalChanges, erlang:timestamp(), ExcludeIdRevs}, + Acc0 = {Changes, IndexKey, Db, Proc, TotalChanges, erlang:timestamp(), ExcludeIdRevs}, {ok, _} = couch_db:fold_changes(Db, CurSeq, EnumFun, Acc0, []), - ok = clouseau_rpc:commit(IndexPid, NewCurSeq) + ok = clouseau_rpc:commit(IndexKey, NewCurSeq) after ret_os_process(Proc) end, @@ -69,33 +69,33 @@ update(IndexPid, Index) -> couch_db:close(Db) end. -load_docs(FDI, {I, IndexPid, Db, Proc, Total, LastCommitTime, ExcludeIdRevs} = Acc) -> +load_docs(FDI, {I, IndexKey, Db, Proc, Total, LastCommitTime, ExcludeIdRevs} = Acc) -> couch_task_status:update([{changes_done, I}, {progress, (I * 100) div Total}]), DI = couch_doc:to_doc_info(FDI), #doc_info{id = Id, high_seq = Seq, revs = [#rev_info{rev = Rev} | _]} = DI, %check if it is processed in purge_index to avoid update the index again. case lists:member({Id, Rev}, ExcludeIdRevs) of true -> ok; - false -> update_or_delete_index(IndexPid, Db, DI, Proc) + false -> update_or_delete_index(IndexKey, Db, DI, Proc) end, %% Force a commit every minute case timer:now_diff(Now = erlang:timestamp(), LastCommitTime) >= 60000000 of true -> - ok = clouseau_rpc:commit(IndexPid, Seq), - {ok, {I + 1, IndexPid, Db, Proc, Total, Now, ExcludeIdRevs}}; + ok = clouseau_rpc:commit(IndexKey, Seq), + {ok, {I + 1, IndexKey, Db, Proc, Total, Now, ExcludeIdRevs}}; false -> {ok, setelement(1, Acc, I + 1)} end. -purge_index(Db, IndexPid, Index) -> - {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid), +purge_index(Db, IndexKey, Index) -> + {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexKey), Proc = get_os_process(Index#index.def_lang), try true = proc_prompt(Proc, [<<"add_fun">>, Index#index.def]), FoldFun = fun({_PurgeSeq, _UUID, Id, _Revs}, Acc) -> case couch_db:get_full_doc_info(Db, Id) of not_found -> - ok = clouseau_rpc:delete(IndexPid, Id), + ok = clouseau_rpc:delete(IndexKey, Id), update_task(1), {ok, Acc}; FDI -> @@ -105,7 +105,7 @@ purge_index(Db, IndexPid, Index) -> true -> {ok, Acc}; false -> - update_or_delete_index(IndexPid, Db, DI, Proc), + update_or_delete_index(IndexKey, Db, DI, Proc), update_task(1), {ok, [{Id, Rev} | Acc]} end @@ -113,23 +113,23 @@ purge_index(Db, IndexPid, Index) -> end, {ok, ExcludeList} = couch_db:fold_purge_infos(Db, IdxPurgeSeq, FoldFun, []), NewPurgeSeq = couch_db:get_purge_seq(Db), - ok = clouseau_rpc:set_purge_seq(IndexPid, NewPurgeSeq), + ok = clouseau_rpc:set_purge_seq(IndexKey, NewPurgeSeq), update_local_doc(Db, Index, NewPurgeSeq), {ok, ExcludeList} after ret_os_process(Proc) end. -count_pending_purged_docs_since(Db, IndexPid) -> +count_pending_purged_docs_since(Db, IndexKey) -> DbPurgeSeq = couch_db:get_purge_seq(Db), - {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexPid), + {ok, IdxPurgeSeq} = clouseau_rpc:get_purge_seq(IndexKey), DbPurgeSeq - IdxPurgeSeq. -update_or_delete_index(IndexPid, Db, DI, Proc) -> +update_or_delete_index(IndexKey, Db, DI, Proc) -> #doc_info{id = Id, revs = [#rev_info{deleted = Del} | _]} = DI, case Del of true -> - ok = clouseau_rpc:delete(IndexPid, Id); + ok = clouseau_rpc:delete(IndexKey, Id); false -> case maybe_skip_doc(Db, Id) of true -> @@ -141,8 +141,8 @@ update_or_delete_index(IndexPid, Db, DI, Proc) -> Fields1 = [list_to_tuple(Field) || Field <- Fields], Fields2 = maybe_add_partition(Db, Id, Fields1), case Fields2 of - [] -> ok = clouseau_rpc:delete(IndexPid, Id); - _ -> ok = clouseau_rpc:update(IndexPid, Id, Fields2) + [] -> ok = clouseau_rpc:delete(IndexKey, Id); + _ -> ok = clouseau_rpc:update(IndexKey, Id, Fields2) end end end. diff --git a/src/dreyfus/src/dreyfus_rpc.erl b/src/dreyfus/src/dreyfus_rpc.erl index 5fbe194dca..1055ad85ea 100644 --- a/src/dreyfus/src/dreyfus_rpc.erl +++ b/src/dreyfus/src/dreyfus_rpc.erl @@ -62,10 +62,10 @@ call(Fun, DbName, DDoc, IndexName, QueryArgs0) -> index_call(Fun, DbName, Index, QueryArgs, MinSeq) -> case dreyfus_index_manager:get_index(DbName, Index) of - {ok, Pid} -> - case dreyfus_index:await(Pid, MinSeq) of - {ok, IndexPid, _Seq} -> - dreyfus_index:Fun(IndexPid, QueryArgs); + {ok, Key} -> + case dreyfus_index:await(Key, MinSeq) of + {ok, IndexKey, _Seq} -> + dreyfus_index:Fun(IndexKey, QueryArgs); Error -> Error end; @@ -83,8 +83,8 @@ info_int(DbName, DDoc, IndexName) -> case dreyfus_index:design_doc_to_index(DbName, DDoc, IndexName) of {ok, Index} -> case dreyfus_index_manager:get_index(DbName, Index) of - {ok, Pid} -> - case dreyfus_index:info(Pid) of + {ok, Key} -> + case dreyfus_index:info(Key) of {ok, Fields} -> Info = [{signature, Index#index.sig} | Fields], rexi:reply({ok, Info}); diff --git a/src/dreyfus/src/dreyfus_util.erl b/src/dreyfus/src/dreyfus_util.erl index d651da9bd0..8dc4cdf63e 100644 --- a/src/dreyfus/src/dreyfus_util.erl +++ b/src/dreyfus/src/dreyfus_util.erl @@ -345,12 +345,12 @@ maybe_create_local_purge_doc(Db, Index) -> ok end. -maybe_create_local_purge_doc(Db, IndexPid, Index) -> +maybe_create_local_purge_doc(Db, IndexKey, Index) -> DocId = dreyfus_util:get_local_purge_doc_id(Index#index.sig), case couch_db:open_doc(Db, DocId) of {not_found, _} -> DbPurgeSeq = couch_db:get_purge_seq(Db), - ok = clouseau_rpc:set_purge_seq(IndexPid, DbPurgeSeq), + ok = clouseau_rpc:set_purge_seq(IndexKey, DbPurgeSeq), DocContent = dreyfus_util:get_local_purge_doc_body( Db, DocId, DbPurgeSeq, Index ),