From c47e0cf50ec35c7a39b0ce10ee9b0b7e3b2d2f92 Mon Sep 17 00:00:00 2001 From: zpeng01 Date: Wed, 16 Apr 2025 15:27:32 +0800 Subject: [PATCH 1/3] add ssl connection to eredis_sub --- README.md | 13 +++ include/eredis_sub.hrl | 7 +- src/eredis_sub.erl | 50 +++++++++- src/eredis_sub_client.erl | 200 ++++++++++++++++++++++++++++++-------- 4 files changed, 226 insertions(+), 44 deletions(-) diff --git a/README.md b/README.md index 0f4276e3..91bfe812 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,7 @@ Supported Redis features: * Pipelining * Authentication & multiple dbs * Pubsub + * SSL connections ## Example @@ -27,6 +28,10 @@ To connect to a Redis instance listening on a Unix domain socket: {ok, C1} = eredis:start_link({local, "/var/run/redis.sock"}, 0). +To connect to a Redis instance with SSL enabled: + + {ok, C2} = eredis:start_link("127.0.0.1", 6379, 0, "", 100, 5000, true). + MSET and MGET: ```erlang @@ -91,6 +96,13 @@ ok 3> ``` +SSL PubSub: + +```erl +1> {ok, Sub} = eredis_sub:start_link("127.0.0.1", 6379, "", 100, infinity, drop, true). +2> Receiver = spawn_link(fun() -> eredis_sub:controlling_process(Sub), eredis_sub:subscribe(Sub, [<<"foo">>]), eredis_sub:receiver(Sub) end). +``` + EUnit tests: ```console @@ -125,6 +137,7 @@ the following arguments: * Reconnect sleep, integer of milliseconds to sleep between reconnect attempts * Connect timeout, timeout value in milliseconds to use in `gen_tcp:connect`, default is 5000 * Socket options, proplist of options to be sent to `gen_tcp:connect`, default is `?SOCKET_OPTS` +* IsSSL, boolean flag for SSL connection, default is false ## Reconnecting on Redis down / network failure / timeout / etc diff --git a/include/eredis_sub.hrl b/include/eredis_sub.hrl index b86a167f..1c7ee3f1 100644 --- a/include/eredis_sub.hrl +++ b/include/eredis_sub.hrl @@ -1,3 +1,5 @@ +-include("eredis.hrl"). + %% State in eredis_sub_client -record(state, { host :: string() | undefined, @@ -29,5 +31,8 @@ msg_state = need_ack :: ready | need_ack, interval :: integer() | undefined, - tref :: reference() | undefined + tref :: reference() | undefined, + + %% Whether to use SSL connection + is_ssl = false :: boolean() }). diff --git a/src/eredis_sub.erl b/src/eredis_sub.erl index 044c818f..49ba4b3b 100644 --- a/src/eredis_sub.erl +++ b/src/eredis_sub.erl @@ -3,20 +3,21 @@ %% -module(eredis_sub). -include("eredis.hrl"). +-include("eredis_sub.hrl"). %% Default timeout for calls to the client gen_server %% Specified in http://www.erlang.org/doc/man/gen_server.html#call-3 -define(TIMEOUT, 5000). --export([start_link/0, start_link/1, start_link/3, start_link/6, stop/1, +-export([start_link/0, start_link/1, start_link/3, start_link/6, start_link/7, stop/1, controlling_process/1, controlling_process/2, controlling_process/3, ack_message/1, subscribe/2, unsubscribe/2, channels/1]). -export([psubscribe/2,punsubscribe/2]). --export([receiver/1, sub_example/0, pub_example/0]). +-export([receiver/1, sub_example/0, pub_example/0, ssl_sub_example/0]). --export([psub_example/0,ppub_example/0]). +-export([psub_example/0,ppub_example/0, ssl_psub_example/0]). %% %% PUBLIC API @@ -27,6 +28,10 @@ start_link() -> start_link(Host, Port, Password) -> start_link(Host, Port, Password, 100, infinity, drop). + start_link([]). + +start_link(Host, Port, Password, IsSSL) -> + start_link(Host, Port, Password, 100, infinity, drop, IsSSL). start_link(Host, Port, Password, ReconnectSleep, MaxQueueSize, QueueBehaviour) @@ -38,7 +43,21 @@ start_link(Host, Port, Password, ReconnectSleep, (QueueBehaviour =:= drop orelse QueueBehaviour =:= exit) -> eredis_sub_client:start_link(Host, Port, Password, ReconnectSleep, - MaxQueueSize, QueueBehaviour). + MaxQueueSize, QueueBehaviour, false). + +%% @doc: Start link with SSL support +start_link(Host, Port, Password, ReconnectSleep, + MaxQueueSize, QueueBehaviour, IsSSL) + when is_list(Host) andalso + is_integer(Port) andalso + is_list(Password) andalso + (is_integer(ReconnectSleep) orelse ReconnectSleep =:= no_reconnect) andalso + (is_integer(MaxQueueSize) orelse MaxQueueSize =:= infinity) andalso + (QueueBehaviour =:= drop orelse QueueBehaviour =:= exit) andalso + is_boolean(IsSSL) -> + + eredis_sub_client:start_link(Host, Port, Password, ReconnectSleep, + MaxQueueSize, QueueBehaviour, IsSSL). %% @doc: Callback for starting from poolboy @@ -50,8 +69,9 @@ start_link(Args) -> ReconnectSleep = proplists:get_value(reconnect_sleep, Args, 100), MaxQueueSize = proplists:get_value(max_queue_size, Args, infinity), QueueBehaviour = proplists:get_value(queue_behaviour, Args, drop), + IsSSL = proplists:get_value(is_ssl, Args, false), start_link(Host, Port, Password, ReconnectSleep, - MaxQueueSize, QueueBehaviour). + MaxQueueSize, QueueBehaviour, IsSSL). stop(Pid) -> eredis_sub_client:stop(Pid). @@ -183,4 +203,24 @@ ppub_example() -> eredis:q(P, ["PUBLISH", "foo123", "bar"]), eredis_client:stop(P). +%% @doc: Example of SSL subscription +ssl_sub_example() -> + {ok, Sub} = start_link("127.0.0.1", 6379, "", 100, infinity, drop, true), + Receiver = spawn_link(fun () -> + controlling_process(Sub), + subscribe(Sub, [<<"foo">>]), + receiver(Sub) + end), + {Sub, Receiver}. + +%% @doc: Example of SSL pattern subscription +ssl_psub_example() -> + {ok, Sub} = start_link("127.0.0.1", 6379, "", 100, infinity, drop, true), + Receiver = spawn_link(fun () -> + controlling_process(Sub), + psubscribe(Sub, [<<"foo*">>]), + receiver(Sub) + end), + {Sub, Receiver}. + diff --git a/src/eredis_sub_client.erl b/src/eredis_sub_client.erl index 18f31af6..b9ae34a9 100644 --- a/src/eredis_sub_client.erl +++ b/src/eredis_sub_client.erl @@ -14,7 +14,7 @@ %% API --export([start_link/6, stop/1]). +-export([start_link/6, start_link/7, stop/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -32,7 +32,19 @@ QueueBehaviour::drop | exit) -> {ok, Pid::pid()} | {error, Reason::term()}. start_link(Host, Port, Password, ReconnectSleep, MaxQueueSize, QueueBehaviour) -> - Args = [Host, Port, Password, ReconnectSleep, MaxQueueSize, QueueBehaviour], + Args = [Host, Port, Password, ReconnectSleep, MaxQueueSize, QueueBehaviour, false], + gen_server:start_link(?MODULE, Args, []). + +-spec start_link(Host::list(), + Port::integer(), + Password::string(), + ReconnectSleep::reconnect_sleep(), + MaxQueueSize::integer() | infinity, + QueueBehaviour::drop | exit, + IsSSL::boolean()) -> + {ok, Pid::pid()} | {error, Reason::term()}. +start_link(Host, Port, Password, ReconnectSleep, MaxQueueSize, QueueBehaviour, IsSSL) -> + Args = [Host, Port, Password, ReconnectSleep, MaxQueueSize, QueueBehaviour, IsSSL], gen_server:start_link(?MODULE, Args, []). @@ -43,7 +55,7 @@ stop(Pid) -> %% gen_server callbacks %%==================================================================== -init([Host, Port, Password, ReconnectSleep, MaxQueueSize, QueueBehaviour]) -> +init([Host, Port, Password, ReconnectSleep, MaxQueueSize, QueueBehaviour, IsSSL]) -> State = #state{host = Host, port = Port, password = list_to_binary(Password), @@ -52,11 +64,17 @@ init([Host, Port, Password, ReconnectSleep, MaxQueueSize, QueueBehaviour]) -> parser_state = eredis_parser:init(), msg_queue = queue:new(), max_queue_size = MaxQueueSize, - queue_behaviour = QueueBehaviour}, + queue_behaviour = QueueBehaviour, + is_ssl = IsSSL}, case connect(State) of {ok, NewState} -> - ok = inet:setopts(NewState#state.socket, [{active, once}]), + case NewState#state.is_ssl of + true -> + ok = ssl:setopts(NewState#state.socket, [{active, once}]); + false -> + ok = inet:setopts(NewState#state.socket, [{active, once}]) + end, {ok, New_TimerRef} = timer:send_after(1000 * 60, timeout), {ok, NewState#state{interval=60,tref=New_TimerRef}}; {error, Reason} -> @@ -108,14 +126,24 @@ handle_cast({ack_message, Pid}, handle_cast({subscribe, Pid, Channels}, #state{controlling_process = {_, Pid}} = State) -> Command = eredis:create_multibulk(["SUBSCRIBE" | Channels]), - ok = gen_tcp:send(State#state.socket, Command), + case State#state.is_ssl of + true -> + ok = ssl:send(State#state.socket, Command); + false -> + ok = gen_tcp:send(State#state.socket, Command) + end, NewChannels = add_channels(Channels, State#state.channels), {noreply, State#state{channels = NewChannels}}; handle_cast({psubscribe, Pid, Channels}, #state{controlling_process = {_, Pid}} = State) -> Command = eredis:create_multibulk(["PSUBSCRIBE" | Channels]), - ok = gen_tcp:send(State#state.socket, Command), + case State#state.is_ssl of + true -> + ok = ssl:send(State#state.socket, Command); + false -> + ok = gen_tcp:send(State#state.socket, Command) + end, NewChannels = add_channels(Channels, State#state.channels), {noreply, State#state{channels = NewChannels}}; @@ -123,7 +151,12 @@ handle_cast({psubscribe, Pid, Channels}, #state{controlling_process = {_, Pid}} handle_cast({unsubscribe, Pid, Channels}, #state{controlling_process = {_, Pid}} = State) -> Command = eredis:create_multibulk(["UNSUBSCRIBE" | Channels]), - ok = gen_tcp:send(State#state.socket, Command), + case State#state.is_ssl of + true -> + ok = ssl:send(State#state.socket, Command); + false -> + ok = gen_tcp:send(State#state.socket, Command) + end, NewChannels = remove_channels(Channels, State#state.channels), {noreply, State#state{channels = NewChannels}}; @@ -131,7 +164,12 @@ handle_cast({unsubscribe, Pid, Channels}, #state{controlling_process = {_, Pid}} handle_cast({punsubscribe, Pid, Channels}, #state{controlling_process = {_, Pid}} = State) -> Command = eredis:create_multibulk(["PUNSUBSCRIBE" | Channels]), - ok = gen_tcp:send(State#state.socket, Command), + case State#state.is_ssl of + true -> + ok = ssl:send(State#state.socket, Command); + false -> + ok = gen_tcp:send(State#state.socket, Command) + end, NewChannels = remove_channels(Channels, State#state.channels), {noreply, State#state{channels = NewChannels}}; @@ -147,33 +185,20 @@ handle_cast(_Msg, State) -> %% Receive data from socket, see handle_response/2 handle_info({tcp, _Socket, Bs}, State) -> ok = inet:setopts(State#state.socket, [{active, once}]), + process_response(Bs, State); - NewState = handle_response(Bs, State), - case NewState#state.max_queue_size of - infinity -> - {noreply, NewState}; - MaxQueueSize -> - case (MsgQueueLen = queue:len(NewState#state.msg_queue)) > MaxQueueSize of - true -> - case State#state.queue_behaviour of - drop -> - Msg = {dropped, MsgQueueLen}, - send_to_controller(Msg, NewState), - {noreply, NewState#state{msg_queue = queue:new()}}; - exit -> - Msg = {exited, MsgQueueLen}, - send_to_controller(Msg, NewState), - {stop, max_queue_size, State} - end; - _ -> - {noreply, NewState} - end - end; +handle_info({ssl, _Socket, Bs}, State) -> + ok = ssl:setopts(State#state.socket, [{active, once}]), + process_response(Bs, State); handle_info({tcp_error, _Socket, _Reason}, State) -> %% This will be followed by a close {noreply, State}; +handle_info({ssl_error, _Socket, _Reason}, State) -> + %% This will be followed by a close + {noreply, State}; + %% Socket got closed, for example by Redis terminating idle %% clients. If desired, spawn of a new process which will try to reconnect and %% notify us when Redis is ready. In the meantime, we can respond with @@ -182,6 +207,10 @@ handle_info({tcp_closed, _Socket}, #state{reconnect_sleep = no_reconnect} = Stat %% If we aren't going to reconnect, then there is nothing else for this process to do. {stop, normal, State#state{socket = undefined}}; +handle_info({ssl_closed, _Socket}, #state{reconnect_sleep = no_reconnect} = State) -> + %% If we aren't going to reconnect, then there is nothing else for this process to do. + {stop, normal, State#state{socket = undefined}}; + handle_info({tcp_closed, _Socket}, State) -> Self = self(), send_to_controller({eredis_disconnected, Self}, State), @@ -191,6 +220,15 @@ handle_info({tcp_closed, _Socket}, State) -> %% signal we are "down"; discard possibly patrially parsed data {noreply, State#state{socket = undefined, parser_state = eredis_parser:init()}}; +handle_info({ssl_closed, _Socket}, State) -> + Self = self(), + send_to_controller({eredis_disconnected, Self}, State), + spawn(fun() -> reconnect_loop(Self, State) end), + + %% Throw away the socket. The absence of a socket is used to + %% signal we are "down"; discard possibly patrially parsed data + {noreply, State#state{socket = undefined, parser_state = eredis_parser:init()}}; + %% Controller might want to be notified about every reconnect attempt handle_info(reconnect_attempt, State) -> send_to_controller({eredis_reconnect_attempt, self()}, State), @@ -206,7 +244,12 @@ handle_info({reconnect_failed, Reason}, State) -> %% already connected and authenticated. handle_info({connection_ready, Socket}, #state{socket = undefined} = State) -> send_to_controller({eredis_connected, self()}, State), - ok = inet:setopts(Socket, [{active, once}]), + case State#state.is_ssl of + true -> + ok = ssl:setopts(Socket, [{active, once}]); + false -> + ok = inet:setopts(Socket, [{active, once}]) + end, {noreply, State#state{socket = Socket}}; @@ -239,7 +282,11 @@ terminate(_Reason, #state{tref=TimerRef} = State) -> _ = timer:cancel(TimerRef), case State#state.socket of undefined -> ok; - Socket -> gen_tcp:close(Socket) + Socket -> + case State#state.is_ssl of + true -> ssl:close(Socket); + false -> gen_tcp:close(Socket) + end end, ok. @@ -265,6 +312,30 @@ add_channels(Channels, OldChannels) -> end end, OldChannels, Channels). +%% Helper function to process incoming responses +process_response(Bs, State) -> + NewState = handle_response(Bs, State), + case NewState#state.max_queue_size of + infinity -> + {noreply, NewState}; + MaxQueueSize -> + case (MsgQueueLen = queue:len(NewState#state.msg_queue)) > MaxQueueSize of + true -> + case State#state.queue_behaviour of + drop -> + Msg = {dropped, MsgQueueLen}, + send_to_controller(Msg, NewState), + {noreply, NewState#state{msg_queue = queue:new()}}; + exit -> + Msg = {exited, MsgQueueLen}, + send_to_controller(Msg, NewState), + {stop, max_queue_size, State} + end; + _ -> + {noreply, NewState} + end + end. + -spec handle_response(Data::binary(), State::#state{}) -> NewState::#state{}. %% @doc: Handle the response coming from Redis. This should only be %% channel messages that we should forward to the controlling process @@ -329,9 +400,17 @@ queue_or_send(Msg, State) -> %% synchronous and if Redis returns something we don't expect, we %% crash. Returns {ok, State} or {error, Reason}. connect(State) -> + case State#state.is_ssl of + true -> + ssl_connect(State); + false -> + normal_connect(State) + end. + +normal_connect(State) -> case gen_tcp:connect(State#state.host, State#state.port, [?SOCKET_MODE | ?SOCKET_OPTS]) of {ok, Socket} -> - case authenticate(Socket, State#state.password) of + case authenticate(Socket, State#state.password, false) of ok -> {ok, State#state{socket = Socket}}; {error, Reason} -> @@ -341,12 +420,52 @@ connect(State) -> {error, {connection_error, Reason}} end. +ssl_connect(State) -> + case ssl:connect(State#state.host, State#state.port, [?SOCKET_MODE | ?SOCKET_OPTS]) of + {ok, Socket} -> + case authenticate(Socket, State#state.password, true) of + ok -> + {ok, State#state{socket = Socket}}; + {error, Reason} -> + {error, {authentication_error, Reason}} + end; + {error, Reason} -> + {error, {connection_error, Reason}} + end. -authenticate(_Socket, <<>>) -> - ok; -authenticate(Socket, Password) -> - eredis_client:do_sync_command(Socket, ["AUTH", " \"", Password, "\"\r\n"]). +authenticate(_Socket, <<>>, _IsSSL) -> + ok; +authenticate(Socket, Password, IsSSL) -> + Command = ["AUTH", " \"", Password, "\"\r\n"], + do_sync_command(Socket, Command, IsSSL). + +%% Helper function for synchronous command execution +do_sync_command(Socket, Command, true) -> + case ssl:send(Socket, Command) of + ok -> + case ssl_recv(Socket, 0, ?RECV_TIMEOUT) of + {ok, <<"+OK\r\n">>} -> + ok; + {ok, <<"-", Error/binary>>} -> + {error, Error}; + {error, Reason} -> + {error, Reason} + end; + {error, Reason} -> + {error, Reason} + end; +do_sync_command(Socket, Command, false) -> + eredis_client:do_sync_command(Socket, Command). + +%% Helper function to receive data from an SSL socket +ssl_recv(Socket, Length, Timeout) -> + case ssl:recv(Socket, Length, Timeout) of + {ok, Data} -> + {ok, Data}; + {error, Reason} -> + {error, Reason} + end. %% @doc: Loop until a connection can be established, this includes %% successfully issuing the auth and select calls. When we have a @@ -355,7 +474,12 @@ reconnect_loop(Client, #state{reconnect_sleep=ReconnectSleep}=State) -> Client ! reconnect_attempt, case catch(connect(State)) of {ok, #state{socket = Socket}} -> - gen_tcp:controlling_process(Socket, Client), + case State#state.is_ssl of + true -> + ssl:controlling_process(Socket, Client); + false -> + gen_tcp:controlling_process(Socket, Client) + end, Client ! {connection_ready, Socket}; {error, Reason} -> Client ! {reconnect_failed, Reason}, From 332b450654e66054365a0b1c5dc85484b7c40e57 Mon Sep 17 00:00:00 2001 From: zpeng01 Date: Wed, 16 Apr 2025 15:30:18 +0800 Subject: [PATCH 2/3] update --- include/eredis_sub.hrl | 2 -- 1 file changed, 2 deletions(-) diff --git a/include/eredis_sub.hrl b/include/eredis_sub.hrl index 1c7ee3f1..84afb873 100644 --- a/include/eredis_sub.hrl +++ b/include/eredis_sub.hrl @@ -1,5 +1,3 @@ --include("eredis.hrl"). - %% State in eredis_sub_client -record(state, { host :: string() | undefined, From 3afd6fe2e4638060c053cbce429575903235b7b6 Mon Sep 17 00:00:00 2001 From: zpeng01 Date: Wed, 16 Apr 2025 15:31:51 +0800 Subject: [PATCH 3/3] update --- src/eredis_sub.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/eredis_sub.erl b/src/eredis_sub.erl index 49ba4b3b..8967e9b7 100644 --- a/src/eredis_sub.erl +++ b/src/eredis_sub.erl @@ -9,7 +9,7 @@ %% Specified in http://www.erlang.org/doc/man/gen_server.html#call-3 -define(TIMEOUT, 5000). --export([start_link/0, start_link/1, start_link/3, start_link/6, start_link/7, stop/1, +-export([start_link/0, start_link/1, start_link/3, start_link/4, start_link/6, start_link/7, stop/1, controlling_process/1, controlling_process/2, controlling_process/3, ack_message/1, subscribe/2, unsubscribe/2, channels/1]).