diff --git a/src/mtp_handler.erl b/src/mtp_handler.erl index ec25c20..209eceb 100644 --- a/src/mtp_handler.erl +++ b/src/mtp_handler.erl @@ -28,6 +28,8 @@ -define(MAX_UP_INIT_BUF_SIZE, 1024 * 1024). %1mb -define(HEALTH_CHECK_INTERVAL, 5000). +% telegram server responds with "l\xfe\xff\xff" if client packet MTProto is invalid +-define(SRV_ERROR, <<108, 254, 255, 255>>). -define(APP, mtproto_proxy). @@ -51,7 +53,8 @@ started_at :: pos_integer(), timer_state = init :: init | hibernate | stop, timer :: gen_timeout:tout(), - last_queue_check :: integer()}). + last_queue_check :: integer(), + srv_error_filter :: first | on | off}). -type transport() :: module(). -type stage() :: init | tunnel. @@ -99,6 +102,7 @@ init({Socket, Transport, [Name, Secret, Tag]}) -> {TimeoutKey, TimeoutDefault} = state_timeout(init), Timer = gen_timeout:new( #{timeout => {env, ?APP, TimeoutKey, TimeoutDefault}}), + Filter = application:get_env(?APP, replay_check_server_error_filter, off), NowMs = erlang:system_time(millisecond), State = #state{sock = Socket, secret = unhex(Secret), @@ -108,7 +112,8 @@ init({Socket, Transport, [Name, Secret, Tag]}) -> addr = {Ip, Port}, started_at = NowMs, timer = Timer, - last_queue_check = NowMs}, + last_queue_check = NowMs, + srv_error_filter = Filter}, {ok, State}; {error, Reason} -> mtp_metric:count_inc([?APP, in_connection_closed, total], 1, #{labels => [Name]}), @@ -120,11 +125,35 @@ handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. -handle_cast({proxy_ans, Down, Data}, #state{down = Down} = S) -> +handle_cast({proxy_ans, Down, Data}, #state{down = Down, srv_error_filter = off} = S) -> %% telegram server -> proxy + %% srv_error_filter is 'off' {ok, S1} = up_send(Data, S), ok = mtp_down_conn:ack(Down, 1, iolist_size(Data)), maybe_check_health(bump_timer(S1)); +handle_cast({proxy_ans, Down, ?SRV_ERROR = Data}, #state{down = Down, srv_error_filter = Filter, + addr = {Ip, _}} = S) when Filter =/= off -> + %% telegram server -> proxy + %% Server replied with server error; it might be another kind of replay attack; + %% Don't send this packet to client so proxy won't be fingerprinted + ok = mtp_down_conn:ack(Down, 1, iolist_size(Data)), + ?log(warning, "~s: protocol_error srv_error_filtered", [inet:ntoa(Ip)]), + mtp_metric:count_inc([?APP, protocol_error, total], 1, #{labels => [srv_error_filtered]}), + case Filter of + first -> S#state{srv_error_filter = off}; + on -> S + end; +handle_cast({proxy_ans, Down, Data}, #state{down = Down, srv_error_filter = Filter} = S) when Filter =/= off -> + %% telegram server -> proxy + %% Normal data packet + %% srv_error_filter is 'on' or srv_error_filter is 'first' and it's 1st server packet + {ok, S1} = up_send(Data, S), + ok = mtp_down_conn:ack(Down, 1, iolist_size(Data)), + S2 = case Filter of + first -> S1#state{srv_error_filter = off}; + on -> S1 + end, + maybe_check_health(bump_timer(S2)); handle_cast({close_ext, Down}, #state{down = Down, sock = USock, transport = UTrans} = S) -> ?log(debug, "asked to close connection by downstream"), ok = UTrans:close(USock), @@ -137,7 +166,7 @@ handle_cast(Other, State) -> {noreply, State}. handle_info({tcp, Sock, Data}, #state{sock = Sock, transport = Transport, - listener = Listener} = S) -> + listener = Listener, addr = {Ip, _}} = S) -> %% client -> proxy Size = byte_size(Data), mtp_metric:count_inc([?APP, received, upstream, bytes], Size, #{labels => [Listener]}), @@ -152,7 +181,7 @@ handle_info({tcp, Sock, Data}, #state{sock = Sock, transport = Transport, {stop, normal, S} catch error:{protocol_error, Type, Extra} -> mtp_metric:count_inc([?APP, protocol_error, total], 1, #{labels => [Type]}), - ?log(warning, "protocol_error ~p ~p", [Type, Extra]), + ?log(warning, "~s: protocol_error ~p ~p", [inet:ntoa(Ip), Type, Extra]), {stop, normal, maybe_close_down(S)} end; handle_info({tcp_closed, Sock}, #state{sock = Sock} = S) -> @@ -268,11 +297,11 @@ handle_upstream_data(Bin, #state{stage = init, stage_state = Buf} = S) -> maybe_check_replay(Packet) -> %% Check for session replay attack: attempt to connect with the same 1st 64byte packet - case lists:member(mtp_session_storage, application:get_env(?APP, replay_checks_enabled, [])) of - true -> + case application:get_env(?APP, replay_check_session_storage, off) of + on -> (new == mtp_session_storage:check_add(Packet)) orelse error({protocol_error, replay_session_detected, Packet}); - false -> + off -> ok end. diff --git a/src/mtproto_proxy.app.src b/src/mtproto_proxy.app.src index 10801a0..e850945 100644 --- a/src/mtproto_proxy.app.src +++ b/src/mtproto_proxy.app.src @@ -63,7 +63,18 @@ %% List of enabled replay-attack checks. See %% https://habr.com/ru/post/452144/ - {replay_checks_enabled, [mtp_session_storage]}, + %% session_storage - store last used 1st client packets in special + %% storage, drop connections with same 1st packet + %% Values: on/off + %% Default: off + {replay_check_session_storage, on}, + %% server_error_filter - drop server error responses. + %% Values: + %% first - drop server error only if it's 1st server packet + %% on - drop all server error packets + %% off - don't drop server errors + %% Default: off + {replay_check_server_error_filter, first}, %% Options for `mtp_session_storage` replay attack check %% Those settings are not precise! They are checked not in realtime, but diff --git a/test/single_dc_SUITE.erl b/test/single_dc_SUITE.erl index 596f3d3..490120b 100644 --- a/test/single_dc_SUITE.erl +++ b/test/single_dc_SUITE.erl @@ -13,7 +13,8 @@ downstream_size_backpressure_case/1, downstream_qlen_backpressure_case/1, config_change_case/1, - replay_attack_case/1 + replay_attack_case/1, + replay_attack_server_error_case/1 ]). -export([set_env/2, @@ -346,6 +347,32 @@ replay_attack_case(Cfg) when is_list(Cfg) -> ?assertEqual(1, ErrCount()), ?assertEqual({error, closed}, mtp_test_client:recv_packet(Cli2, 1000)). +%% @doc test replay attack protection. +%% Server error responses are not proxied +replay_attack_server_error_case({pre, Cfg}) -> + setup_single(?FUNCTION_NAME, 10000 + ?LINE, #{}, Cfg); +replay_attack_server_error_case({post, Cfg}) -> + stop_single(Cfg); +replay_attack_server_error_case(Cfg) when is_list(Cfg) -> + DcId = ?config(dc_id, Cfg), + Host = ?config(mtp_host, Cfg), + Port = ?config(mtp_port, Cfg), + Secret = ?config(mtp_secret, Cfg), + ErrCount = fun() -> + mtp_test_metric:get_tags( + count, [?APP, protocol_error, total], [srv_error_filtered]) + end, + ?assertEqual(not_found, ErrCount()), + Cli1 = mtp_test_client:connect(Host, Port, Secret, DcId, mtp_secure), + %% Let TG server echo error packet back, but packet will be filtered + _Cli2 = mtp_test_client:send(<<108, 254, 255, 255>>, Cli1), + ?assertEqual( + ok, mtp_test_metric:wait_for_value( + count, [?APP, protocol_error, total], [srv_error_filtered], 1, 5000), + {mtp_session_storage:status(), + sys:get_state(mtp_test_metric)}), + ?assertEqual(1, ErrCount()). + %% TODO: send a lot, not read, and then close - assert connection IDs are cleaned up %% Helpers