diff --git a/src/mtp_handler.erl b/src/mtp_handler.erl index 71a48ba..7aa5073 100644 --- a/src/mtp_handler.erl +++ b/src/mtp_handler.erl @@ -26,9 +26,6 @@ -define(MAX_UP_INIT_BUF_SIZE, 1024 * 1024). %1mb -define(HEALTH_CHECK_INTERVAL, 5000). --define(HEALTH_CHECK_MAX_QLEN, 300). --define(HEALTH_CHECK_GC, 400 * 1024). %400kb --define(HEALTH_CHECK_MAX_MEM, 3 * 1024 * 1024). %3mb -define(APP, mtproto_proxy). diff --git a/src/mtproto_proxy_app.erl b/src/mtproto_proxy_app.erl index bf615f0..7515c9a 100644 --- a/src/mtproto_proxy_app.erl +++ b/src/mtproto_proxy_app.erl @@ -16,9 +16,9 @@ %%==================================================================== start(_StartType, _StartArgs) -> Res = {ok, _} = mtproto_proxy_sup:start_link(), - io:format("+++++++++++++++++++++++++++++++++++++++~n" - "Erlang MTProto proxy by @seriyps https://github.com/seriyps/mtproto_proxy~n" - "Sponsored by and powers @socksy_bot~n"), + report("+++++++++++++++++++++++++++++++++++++++~n" + "Erlang MTProto proxy by @seriyps https://github.com/seriyps/mtproto_proxy~n" + "Sponsored by and powers @socksy_bot~n", []), [start_proxy(Where) || Where <- application:get_env(?APP, ports, [])], Res. @@ -52,9 +52,13 @@ start_proxy(#{name := Name, port := Port, secret := Secret, tag := Tag} = P) -> "https://t.me/proxy?server=~s&port=~w&secret=~s", [application:get_env(?APP, external_ip, ListenIpStr), Port, Secret]), - io:format("Proxy started on ~s:~p with secret: ~s, tag: ~s~nUrl: ~s~n", - [ListenIpStr, Port, Secret, Tag, Url]), + report("Proxy started on ~s:~p with secret: ~s, tag: ~s~nUrl: ~s~n", + [ListenIpStr, Port, Secret, Tag, Url]), Res. stop_proxy(#{name := Name}) -> ranch:stop_listener(Name). + +report(Fmt, Args) -> + io:format(Fmt, Args), + lager:info(Fmt, Args). diff --git a/test/mtp_prop_gen.erl b/test/mtp_prop_gen.erl index 67fd009..631e03b 100644 --- a/test/mtp_prop_gen.erl +++ b/test/mtp_prop_gen.erl @@ -51,6 +51,4 @@ dc_id() -> codec() -> Protocols = [mtp_abridged, mtp_intermediate, mtp_secure], - proper_types:oneof( - [proper_types:exactly(P) - || P <- Protocols]). + proper_types:oneof(Protocols). diff --git a/test/mtp_test_client.erl b/test/mtp_test_client.erl index 5563198..c3807c8 100644 --- a/test/mtp_test_client.erl +++ b/test/mtp_test_client.erl @@ -48,7 +48,7 @@ recv_packet(#client{codec = Codec} = Client, Timeout) -> recv_packet_inner(#client{sock = Sock, codec = Codec0} = Client, Timeout) -> case gen_tcp:recv(Sock, 0, Timeout) of {ok, Stream} -> - io:format("~p: ~p~n", [byte_size(Stream), Stream]), + %% io:format("~p: ~p~n", [byte_size(Stream), Stream]), case mtp_codec:try_decode_packet(Stream, Codec0) of {ok, Data, Codec} -> {ok, Data, Client#client{codec = Codec}}; diff --git a/test/mtp_test_cmd_rpc.erl b/test/mtp_test_cmd_rpc.erl index ef3db61..ef90c3b 100644 --- a/test/mtp_test_cmd_rpc.erl +++ b/test/mtp_test_cmd_rpc.erl @@ -30,11 +30,15 @@ handle_rpc({data, ConnId, Req}, St) -> case M:F(Opts, ConnId, St) of {reply, Resp, St1} -> {rpc, {proxy_ans, ConnId, term_to_packet(Resp)}, St1}; + {close, St1} -> + {rpc, {close_ext, ConnId}, tombstone(ConnId, St1)}; {return, What} -> What end; handle_rpc({remote_closed, ConnId}, St) -> - is_integer(maps:get(ConnId, St, undefined)) - orelse error({unexpected_closed, ConnId}), - {noreply, St#{ConnId := tombstone}}. + {noreply, tombstone(ConnId, St)}. +tombstone(ConnId, St) -> + ({ok, tombstone} =/= maps:find(ConnId, St)) + orelse error({already_closed, ConnId}), + St#{ConnId => tombstone}. diff --git a/test/mtp_test_datacenter.erl b/test/mtp_test_datacenter.erl index 1a89e3c..c2343be 100644 --- a/test/mtp_test_datacenter.erl +++ b/test/mtp_test_datacenter.erl @@ -8,6 +8,7 @@ stop_dc/1, start_config_server/5, stop_config_server/1]). +-export([middle_connections/1]). -export([dc_list_to_config/1]). -export([do/1]). @@ -47,6 +48,10 @@ stop_dc(#{srv_ids := Ids} = Acc) -> ok = lists:foreach(fun mtp_test_middle_server:stop/1, Ids), {ok, maps:without([srv_ids], Acc1)}. +middle_connections(#{srv_ids := Ids}) -> + lists:flatten([ranch:procs(Id, connections) + || Id <- Ids]). + %% %% Inets HTTPD to use as a mock for https://core.telegram.org %% diff --git a/test/mtp_test_middle_server.erl b/test/mtp_test_middle_server.erl index 1c8770c..1fc8d92 100644 --- a/test/mtp_test_middle_server.erl +++ b/test/mtp_test_middle_server.erl @@ -4,7 +4,8 @@ -behaviour(gen_statem). -export([start/2, - stop/1]). + stop/1, + get_rpc_handler_state/1]). -export([start_link/4, ranch_init/1]). -export([init/1, @@ -57,6 +58,9 @@ start(Id, #{port := _, secret := _} = Opts) -> stop(Id) -> ranch:stop_listener(Id). +get_rpc_handler_state(Pid) -> + gen_statem:call(Pid, get_rpc_handler_state). + %% Callbacks start_link(Ref, _, Transport, Opts) -> @@ -155,6 +159,8 @@ on_tunnel(info, {tcp, _Sock, TcpData}, #t_state{codec = Codec0} = S) -> {S2, S2#t_state.codec} end, S, TcpData, Codec0), {keep_state, activate(S2#t_state{codec = Codec1})}; +on_tunnel({call, From}, get_rpc_handler_state, #t_state{rpc_handler_state = HSt}) -> + {keep_state_and_data, [{reply, From, HSt}]}; on_tunnel(Type, Event, S) -> handle_event(Type, Event, ?FUNCTION_NAME, S). diff --git a/test/prop_mtp_statefull.erl b/test/prop_mtp_statefull.erl new file mode 100644 index 0000000..5a19dba --- /dev/null +++ b/test/prop_mtp_statefull.erl @@ -0,0 +1,311 @@ +%% @doc Statefull property-based tests +-module(prop_mtp_statefull). +-export([prop_check_pooling/0, + prop_check_pooling/1, + initial_state/0, + command/1, + precondition/2, + postcondition/3, + next_state/3]). +-export([connect/2, + echo_packet/2, + ask_for_close/1, + close/1]). +-export([gen_rpc_echo/3, + gen_rpc_close/3]). + +-include_lib("proper/include/proper.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-record(st, { + ever_opened = 0, + open = [], + closed = [], + ask_for_close = [], + n_packets = #{} + }). + +-define(PORT, 10800). +-define(SECRET, <<"d0d6e111bada5511fcce9584deadbeef">>). +-define(HOST, {127, 0, 0, 1}). +-define(DC_ID, 1). +-define(APP, mtproto_proxy). + +prop_check_pooling(doc) -> + "Check that connections and packets are 'accounted' correctly". + +prop_check_pooling() -> + ?FORALL(Cmds, commands(?MODULE), aggregate(command_names(Cmds), run_cmds(Cmds))). + +initial_state() -> + #st{}. + +command(#st{open = [], ever_opened = EO}) -> + {call, ?MODULE, connect, [EO, mtp_prop_gen:codec()]}; +command(#st{open = L, ever_opened = EO}) -> + proper_types:frequency( + [ + {1, {call, ?MODULE, connect, [EO, mtp_prop_gen:codec()]}}, + {5, {call, ?MODULE, echo_packet, [proper_types:oneof(L), proper_types:binary()]}}, + {2, {call, ?MODULE, close, [proper_types:oneof(L)]}}, + {2, {call, ?MODULE, ask_for_close, [proper_types:oneof(L)]}} + ]). + +precondition(#st{open = L}, {call, ?MODULE, close, _}) -> + length(L) > 0; +precondition(#st{open = L}, {call, ?MODULE, echo_packet, _}) -> + length(L) > 0; +precondition(#st{open = L}, {call, ?MODULE, ask_for_close, _}) -> + length(L) > 0; +precondition(_St, {call, _Mod, _Fun, _Args}) -> + true. + +%% Given the state `State' *prior* to the call `{call, Mod, Fun, Args}', +%% determine whether the result `Res' (coming from the actual system) +%% makes sense. +postcondition(_State, {call, ?MODULE, connect, _Args}, _Res) -> + true; +postcondition(_State, {call, ?MODULE, close, _Args}, _Res) -> + true; +postcondition(_State, {call, ?MODULE, ask_for_close, _Args}, _Res) -> + true; +postcondition(_State, {call, ?MODULE, echo_packet, [_Conn, SendBin]}, RecvBin) -> + ?assertEqual(SendBin, RecvBin), + true; +postcondition(_State, {call, _Mod, _Fun, _Args}, _Res) -> + false. + +%% Assuming the postcondition for a call was true, update the model +%% accordingly for the test to proceed. +next_state(#st{open = L, ever_opened = EO} = St, _Res, + {call, ?MODULE, connect, [ConnId, _Proto]}) -> + St#st{open = [ConnId | L], + ever_opened = EO + 1}; +next_state(#st{open = L, closed = Cl} = St, _Res, {call, ?MODULE, close, [ConnId]}) -> + St#st{open = lists:delete(ConnId, L), + closed = [ConnId | Cl]}; +next_state(#st{open = L, closed = Cl, ask_for_close = NA} = St, _Res, + {call, ?MODULE, ask_for_close, [ConnId]}) -> + St#st{open = lists:delete(ConnId, L), + closed = [ConnId | Cl], + ask_for_close = [ConnId | NA]}; +next_state(#st{n_packets = N} = St, _Res, {call, ?MODULE, echo_packet, [ConnId, _]}) -> + NForConn = maps:get(ConnId, N, 0), + St#st{n_packets = N#{ConnId => NForConn + 1}}; +next_state(State, _Res, {call, ?MODULE, _, _}) -> + State. + +run_cmds(Cmds) -> + Cfg = setup(#{rpc_handler => mtp_test_cmd_rpc}), + {History, State, Result} = run_commands(?MODULE, Cmds), + %% Validate final states of proxy and "middle server" + timer:sleep(100), + ServerState = collect_server_state(Cfg), + Metrics = collect_metrics(Cfg), + ShimDump = shim_dump(), + stop(Cfg), + ?WHENFAIL(io:format("History: ~p\n" + "State: ~w\n" + "ServerState: ~p\n" + "Result: ~p\n", + [History, State, ServerState, Result]), + proper:conjunction( + [{state_ok, check_state(State, ServerState, Metrics, ShimDump)}, + {result_ok, Result =:= ok}])). + +%% Post-run checks. Assert that model's final state matches proxy and middle-server state +collect_server_state(Cfg) -> + DcCfg = ?config(dc_conf, Cfg), + Pids = mtp_test_datacenter:middle_connections(DcCfg), + States = [mtp_test_middle_server:get_rpc_handler_state(Pid) || Pid <- Pids], + %% io:format("~p~n", [States]), + %% Can use just maps:merge/2 because connection IDs in different states will not overlap + lists:foldl(fun maps:merge/2, #{}, States). + +collect_metrics(_Cfg) -> + GetTags = fun(Type, Name, Tags) -> + case mtp_test_metric:get_tags(Type, Name, Tags) of + not_found when Type == histogram -> {0, 0, 0, 0}; + not_found -> 0; + Val -> Val + end + end, + #{in_connections => GetTags(count, [?APP, in_connection, total], [?MODULE]), + closed_connections => GetTags(count, [?APP, in_connection_closed, total], [?MODULE]), + tg_in_packet_size => GetTags( + histogram, [?APP, tg_packet_size, bytes], [upstream_to_downstream]), + tg_out_packet_size => GetTags( + histogram, [?APP, tg_packet_size, bytes], [downstream_to_upstream]) + }. + + +check_state(#st{closed = ModClosed, n_packets = ModPackets, ask_for_close = ModAskClose, + open = ModClients, ever_opened = ModOpened} = _St, + SrvState, Metrics, ShimDump) -> + %% io:format("~n~w~n~p~n~p~n~p~n", [St, SrvState, Metrics, ShimDump]), + %% Assert shim is correct + ?assertEqual(length(ModClients), map_size(ShimDump)), + + %% Total number of packets + ModTotalPackets = maps:fold(fun(_K, N, Acc) -> Acc + N end, 0, ModPackets), + SrvTotalPackets = maps:fold(fun({n_packets, _}, N, Acc) -> Acc + N; + (_, _, Acc) -> Acc + end, 0, SrvState), + ?assertEqual(ModTotalPackets, SrvTotalPackets), + + %% Number of connections that ever sent data RPC + SrvConnsWithPackets = maps:fold(fun({n_packets, _}, _, Acc) -> Acc + 1; + (_, _, Acc) -> Acc + end, 0, SrvState), + ?assertEqual(map_size(ModPackets), SrvConnsWithPackets), + + %% Number of sent data RPC per-connection + ModSentPerConn = maps:values(ModPackets), + SrvSentPerConn = maps:fold(fun({n_packets, _}, N, Acc) -> [N | Acc]; + (_, _, Acc) -> Acc + end, [], SrvState), + ?assertEqual(lists:sort(ModSentPerConn), lists:sort(SrvSentPerConn)), + + %% Number of telegram packets send from client to server + ModTgPackets = length(ModAskClose) + ModTotalPackets, + ?assertMatch({ModTgPackets, _, _, _}, maps:get(tg_in_packet_size, Metrics)), + + %% Number of connections that were ever open + %% Can be only asserted by metrics + ?assertEqual(ModOpened, maps:get(in_connections, Metrics)), + + %% Number of connections that were closed + SrvClosed = maps:fold(fun(_, tombstone, Acc) -> Acc + 1; + (_, _, Acc) -> Acc + end, 0, SrvState), + ?assertEqual(length(ModClosed), SrvClosed), + ?assertEqual(length(ModClosed), maps:get(closed_connections, Metrics)), + + %% Number of still open connections + %% On middleproxy side, connection only started to be tracked if it sent any data. + %% So, if we opened a connection and haven't sent anything, middle will not know about it + MAlive = length(ordsets:intersection( + ordsets:from_list(ModClients), + ordsets:from_list(maps:keys(ModPackets)))), + SrvAlive = maps:fold(fun(Id, Num, Acc) when is_integer(Id), is_integer(Num) -> Acc + 1; + (_, _, Acc) -> Acc + end, 0, SrvState), + ?assertEqual(MAlive, SrvAlive), + true. + +%% Connect to proxy +connect(Id, Protocol) -> + Conn = mtp_test_client:connect(?HOST, ?PORT, ?SECRET, ?DC_ID, Protocol), + shim_add(Id, Conn), + ok. + +%% Send and receive back some binary data +echo_packet(Id, RandBin) -> + Cli0 = shim_pop(Id), + Req = mtp_test_cmd_rpc:call(?MODULE, gen_rpc_echo, RandBin), + Cli1 = mtp_test_client:send(Req, Cli0), + {ok, Res, Cli2} = mtp_test_client:recv_packet(Cli1, 1000), + shim_add(Id, Cli2), + mtp_test_cmd_rpc:packet_to_term(Res). + +gen_rpc_echo(RandBin, ConnId, St) -> + Key = {n_packets, ConnId}, + NPackets = maps:get(Key, St, 0), + {reply, RandBin, St#{ConnId => 1, + Key => NPackets + 1}}. + +%% Close from client-side +close(Id) -> + Conn = shim_pop(Id), + mtp_test_client:close(Conn). + +%% Close from telegram-server side +ask_for_close(Id) -> + Cli0 = shim_pop(Id), + Req = mtp_test_cmd_rpc:call(?MODULE, gen_rpc_close, []), + Cli1 = mtp_test_client:send(Req, Cli0), + {error, closed} = mtp_test_client:recv_packet(Cli1, 1000), + ok. + +gen_rpc_close([], _ConnId, St) -> + {close, St}. + + +%% Setup / teardown +setup(DcCfg0) -> + application:ensure_all_started(lager), + lager:set_loglevel(lager_console_backend, critical), + {ok, Pid} = mtp_test_metric:start_link(), + PubKey = crypto:strong_rand_bytes(128), + DcId = ?DC_ID, + Ip = ?HOST, + DcConf = [{DcId, Ip, ?PORT + 5}], + Secret = ?SECRET, + Listeners = [#{name => ?MODULE, + port => ?PORT, + listen_ip => inet:ntoa(Ip), + secret => Secret, + tag => <<"dcbe8f1493fa4cd9ab300891c0b5b326">>}], + application:load(mtproto_proxy), + Cfg1 = single_dc_SUITE:set_env([{ports, Listeners}, + {metric_backend, mtp_test_metric}], []), + {ok, DcCfg} = mtp_test_datacenter:start_dc(PubKey, DcConf, DcCfg0), + application:load(mtproto_proxy), + {ok, _} = application:ensure_all_started(mtproto_proxy), + shim_start(), + [{dc_conf, DcCfg}, {metric, Pid} | Cfg1]. + +stop(Cfg) -> + DcCfg = ?config(dc_conf, Cfg), + MetricPid = ?config(metric, Cfg), + ok = application:stop(mtproto_proxy), + {ok, _} = mtp_test_datacenter:stop_dc(DcCfg), + single_dc_SUITE:reset_env(Cfg), + gen_server:stop(MetricPid), + shim_stop(), + Cfg. + + +%% Proces - wrapper holding client connections and states +shim_add(Id, Conn) -> + ?MODULE ! {add, Id, Conn}. + +shim_pop(Id) -> + ?MODULE ! {pop, self(), Id}, + receive {conn, Conn} -> + Conn + end. + +shim_dump() -> + ?MODULE ! {dump, self()}, + receive {dump, Conns} -> + Conns + end. + +shim_start() -> + Pid = proc_lib:spawn_link(fun loop/0), + register(?MODULE, Pid). + +shim_stop() -> + Pid = whereis(?MODULE), + unregister(?MODULE), + exit(Pid, normal). + +loop() -> + loop(#{}). + + +loop(Acc) -> + receive + {dump, From} -> + From ! {dump, Acc}, + loop(Acc); + {add, Id, Conn} -> + false = maps:is_key(Id, Acc), + loop(Acc#{Id => Conn}); + {pop, From, Id} -> + {Conn, Acc1} = maps:take(Id, Acc), + From ! {conn, Conn}, + loop(Acc1) + end. diff --git a/test/single_dc_SUITE.erl b/test/single_dc_SUITE.erl index 732546e..20b0f65 100644 --- a/test/single_dc_SUITE.erl +++ b/test/single_dc_SUITE.erl @@ -13,6 +13,9 @@ downstream_qlen_backpressure_case/1 ]). +-export([set_env/2, + reset_env/1]). + -export([gen_rpc_replies/3]). -include_lib("common_test/include/ct.hrl"). @@ -49,7 +52,7 @@ end_per_testcase(Name, Cfg) -> %% @doc Send single packet and receive it back echo_secure_case({pre, Cfg}) -> - setup_single(?FUNCTION_NAME, ?LINE, #{}, Cfg); + setup_single(?FUNCTION_NAME, 10000 + ?LINE, #{}, Cfg); echo_secure_case({post, Cfg}) -> stop_single(Cfg); echo_secure_case(Cfg) when is_list(Cfg) -> @@ -78,7 +81,7 @@ echo_secure_case(Cfg) when is_list(Cfg) -> %% @doc Send many packets and receive them back echo_abridged_many_packets_case({pre, Cfg}) -> - setup_single(?FUNCTION_NAME, ?LINE, #{}, Cfg); + setup_single(?FUNCTION_NAME, 10000 + ?LINE, #{}, Cfg); echo_abridged_many_packets_case({post, Cfg}) -> stop_single(Cfg); echo_abridged_many_packets_case(Cfg) when is_list(Cfg) -> @@ -111,11 +114,11 @@ echo_abridged_many_packets_case(Cfg) when is_list(Cfg) -> %% @doc test downstream backpressure when size of non-acknowledged packets grows above threshold downstream_size_backpressure_case({pre, Cfg}) -> - Cfg1 = setup_single(?FUNCTION_NAME, ?LINE, #{rpc_handler => mtp_test_cmd_rpc}, Cfg), + Cfg1 = setup_single(?FUNCTION_NAME, 10000 + ?LINE, #{rpc_handler => mtp_test_cmd_rpc}, Cfg), %% Disable upstream healthchecks - application:set_env(?APP, upstream_healthchecks, []), - Cfg1; + set_env([{upstream_healthchecks, []}], Cfg1); downstream_size_backpressure_case({post, Cfg}) -> + reset_env(Cfg), stop_single(Cfg); downstream_size_backpressure_case(Cfg) when is_list(Cfg) -> DcId = ?config(dc_id, Cfg), @@ -171,13 +174,13 @@ downstream_size_backpressure_case(Cfg) when is_list(Cfg) -> downstream_qlen_backpressure_case({pre, Cfg}) -> application:load(mtproto_proxy), %% Reducing downstream socket buffer size. Otherwise we can get queue overflow from just single - %% socket data packet - application:set_env(mtproto_proxy, downstream_socket_buffer_size, 1024), - Cfg1 = setup_single(?FUNCTION_NAME, ?LINE, #{rpc_handler => mtp_test_cmd_rpc}, Cfg), + %% socket data packet; %% Disable upstream healthchecks - application:set_env(?APP, upstream_healthchecks, []), - Cfg1; + Cfg1 = set_env([{downstream_socket_buffer_size, 1024}, + {upstream_healthchecks, []}], Cfg), + setup_single(?FUNCTION_NAME, 10000 + ?LINE, #{rpc_handler => mtp_test_cmd_rpc}, Cfg1); downstream_qlen_backpressure_case({post, Cfg}) -> + reset_env(Cfg), stop_single(Cfg); downstream_qlen_backpressure_case(Cfg) when is_list(Cfg) -> DcId = ?config(dc_id, Cfg), @@ -222,13 +225,12 @@ gen_rpc_replies(#{packet := Packet, n := N}, ConnId, St) -> %% Helpers -setup_single(Name, Offset, DcCfg0, Cfg) -> +setup_single(Name, MtpPort, DcCfg0, Cfg) -> {ok, Pid} = mtp_test_metric:start_link(), PubKey = crypto:strong_rand_bytes(128), DcId = 1, Ip = {127, 0, 0, 1}, - DcConf = [{DcId, Ip, 10000 + Offset}], - MtpPort = 10000 + Offset + 1, + DcConf = [{DcId, Ip, MtpPort + 10}], Secret = mtp_handler:hex(crypto:strong_rand_bytes(16)), Listeners = [#{name => Name, port => MtpPort, @@ -270,11 +272,11 @@ set_env(Env, Cfg) -> end || {K, V} <- Env], [{mtp_env, OldEnv} | Cfg]. -%% reset_env(Cfg) -> -%% OldEnv = ?config(mtp_env, Cfg), -%% [case V of -%% undefined -> -%% application:unset_env(mtproto_proxy, K); -%% {ok, Val} -> -%% application:set_env(mtproto_proxy, K, Val) -%% end || {K, V} <- OldEnv]. +reset_env(Cfg) -> + OldEnv = ?config(mtp_env, Cfg), + [case V of + undefined -> + application:unset_env(mtproto_proxy, K); + {ok, Val} -> + application:set_env(mtproto_proxy, K, Val) + end || {K, V} <- OldEnv].