diff --git a/.travis.yml b/.travis.yml index 1c1155f..b6b4787 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,5 +8,6 @@ script: - ./rebar3 compile - ./rebar3 xref - ./rebar3 eunit + - ./rebar3 ct - ./rebar3 dialyzer - ./rebar3 proper diff --git a/rebar.config b/rebar.config index 90157b1..7eef606 100644 --- a/rebar.config +++ b/rebar.config @@ -39,6 +39,9 @@ }, {test, [{deps, - [{proper, "1.3.0"}]} + [{proper, "1.3.0"}]}, + {ct_opts, [{sys_config, ["./test/test-sys.config"]}]}, + {relx, + [{sys_config, "./test/test-sys.config"}]} ]}] }. diff --git a/src/mtp_obfuscated.erl b/src/mtp_obfuscated.erl index 05829ce..eb40d25 100644 --- a/src/mtp_obfuscated.erl +++ b/src/mtp_obfuscated.erl @@ -49,6 +49,8 @@ client_create(Secret, Protocol, DcId) -> DecKey :: binary(), DecIv :: binary(), CliCodec :: codec(). +client_create(Seed, HexSecret, Protocol, DcId) when byte_size(HexSecret) == 32 -> + client_create(Seed, mtp_handler:unhex(HexSecret), Protocol, DcId); client_create(Seed, Secret, Protocol, DcId) when byte_size(Seed) == 58, byte_size(Secret) == 16, DcId > -10, diff --git a/test/mtp_test_client.erl b/test/mtp_test_client.erl new file mode 100644 index 0000000..4256740 --- /dev/null +++ b/test/mtp_test_client.erl @@ -0,0 +1,100 @@ +-module(mtp_test_client). + +-export([connect/5, + send/2, + recv_packet/2, + recv_all/2, + close/1]). +-export_type([client/0]). + +-record(client, + {sock, + codec}). + +-opaque client() :: #client{}. +-type tcp_error() :: inet:posix() | closed. % | timeout. + +connect(Host, Port, Secret, DcId, Protocol) -> + Opts = [{packet, raw}, + {mode, binary}, + {active, false}, + {send_timeout, 5000}], + {ok, Sock} = gen_tcp:connect(Host, Port, Opts, 1000), + {Header, _, _, CryptoLayer} = mtp_obfuscated:client_create(Secret, Protocol, DcId), + ok = gen_tcp:send(Sock, Header), + PacketLayer = Protocol:new(), + Codec = mtp_codec:new(mtp_obfuscated, CryptoLayer, + Protocol, PacketLayer), + #client{sock = Sock, + codec = Codec}. + +send(Data, #client{sock = Sock, codec = Codec} = Client) -> + {Enc, Codec1} = mtp_codec:encode_packet(Data, Codec), + ok = gen_tcp:send(Sock, Enc), + Client#client{codec = Codec1}. + +-spec recv_packet(client(), timeout()) -> {ok, iodata(), client()} | {error, tcp_error() | timeout}. +recv_packet(#client{codec = Codec} = Client, Timeout) -> + case mtp_codec:try_decode_packet(<<>>, Codec) of + {ok, Data, Codec1} -> + %% We already had some data in codec's buffers + {ok, Data, Client#client{codec = Codec1}}; + {incomplete, Codec1} -> + recv_packet_inner(Client#client{codec = Codec1}, Timeout) + end. + +recv_packet_inner(#client{sock = Sock, codec = Codec0} = Client, Timeout) -> + case gen_tcp:recv(Sock, 0, Timeout) of + {ok, Stream} -> + io:format("~p: ~p~n", [byte_size(Stream), Stream]), + case mtp_codec:try_decode_packet(Stream, Codec0) of + {ok, Data, Codec} -> + {ok, Data, Client#client{codec = Codec}}; + {incomplete, Codec} -> + %% recurse + recv_packet_inner(Client#client{codec = Codec}, Timeout) + end; + Err -> + Err + end. + +-spec recv_all(client(), timeout()) -> {ok, [iodata()], client()} | {error, tcp_error()}. +recv_all(#client{sock = Sock, codec = Codec0} = Client, Timeout) -> + case tcp_recv_all(Sock, Timeout) of + {ok, Stream} -> + io:format("~p: ~p~n", [byte_size(Stream), Stream]), + {ok, Packets, Codec} = + mtp_codec:fold_packets( + fun(Packet, Acc, Codec) -> + {[Packet | Acc], Codec} + end, + [], Stream, Codec0), + {ok, lists:reverse(Packets), + Client#client{codec = Codec}}; + {error, timeout} -> + {ok, [], Client}; + Err -> + Err + end. + +tcp_recv_all(Sock, Timeout) -> + io:format("Sock: ~p; Timeout: ~p~n~n~n", [Sock, Timeout]), + case gen_tcp:recv(Sock, 0, Timeout) of + {ok, Stream} -> + tcp_recv_all_inner(Sock, Stream); + Err -> + Err + end. + +tcp_recv_all_inner(Sock, Acc) -> + case gen_tcp:recv(Sock, 0, 0) of + {ok, Stream} -> + tcp_recv_all_inner(Sock, <>); + {error, timeout} -> + {ok, Acc}; + Other -> + Other + end. + +close(#client{sock = Sock}) -> + ok = gen_tcp:close(Sock). diff --git a/test/mtp_test_metric.erl b/test/mtp_test_metric.erl new file mode 100644 index 0000000..1881f4d --- /dev/null +++ b/test/mtp_test_metric.erl @@ -0,0 +1,87 @@ +-module(mtp_test_metric). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([notify/4]). +-export([get/2, + get/3, + get_tags/3]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(state, {count = #{}, + gauge = #{}, + histogram = #{}}). + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +notify(Type, Name, Value, Extra) -> + try gen_server:call(?MODULE, {notify, Type, Name, Value, Extra}) + catch _:Reason -> + {error, Reason} + end. + +get(Type, Name) -> + get(Type, Name, #{}). + +get(Type, Name, Extra) -> + gen_server:call(?MODULE, {get, Type, Name, Extra}). + +get_tags(Type, Name, Tags) -> + get(Type, Name, #{labels => Tags}). + + +init([]) -> + {ok, #state{}}. + +handle_call({notify, count, Name, Value, Extra}, _From, #state{count = C} = State) -> + K = {Name, Extra}, + V1 = + case maps:find(K, C) of + {ok, V0} -> + V0 + Value; + error -> + Value + end, + {reply, ok, State#state{count = C#{K => V1}}}; +handle_call({notify, gauge, Name, Value, Extra}, _From, #state{gauge = G} = State) -> + K = {Name, Extra}, + {reply, ok, State#state{gauge = G#{K => Value}}}; +handle_call({notify, histogram, Name, Value, Extra}, _From, #state{histogram = H} = State) -> + K = {Name, Extra}, + V1 = + case maps:find(K, H) of + {ok, {Count, Total, Min, Max}} -> + {Count + 1, + Total + Value, + erlang:min(Min, Value), + erlang:max(Max, Value)}; + error -> + {1, + Value, + Value, + Value} + end, + {reply, ok, State#state{histogram = H#{K => V1}}}; +handle_call({get, Type, Name, Extra}, _From, State) -> + K = {Name, Extra}, + Tab = case Type of + count -> State#state.count; + gauge -> State#state.gauge; + histogram -> State#state.histogram + end, + {reply, maps:get(K, Tab, not_found), State}. + +handle_cast(_Msg, State) -> + {noreply, State}. +handle_info(_Info, State) -> + {noreply, State}. +terminate(_Reason, _State) -> + ok. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/test/single_dc_SUITE.erl b/test/single_dc_SUITE.erl new file mode 100644 index 0000000..548c285 --- /dev/null +++ b/test/single_dc_SUITE.erl @@ -0,0 +1,142 @@ +%% Basic tests with only one telegram DC +-module(single_dc_SUITE). + +-export([all/0, + init_per_suite/1, + end_per_suite/1, + init_per_testcase/2, + end_per_testcase/2]). + +-export([echo_secure_case/1, + echo_abridged_many_packets_case/1]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +all() -> + %% All exported functions of arity 1 whose name ends with "_case" + Exports = ?MODULE:module_info(exports), + [F + || {F, A} <- Exports, + A == 1, + case lists:reverse(atom_to_list(F)) of + "esac_" ++ _ -> true; + _ -> false + end]. + +init_per_suite(Cfg) -> + {ok, _} = application:ensure_all_started(inets), + {ok, _} = application:ensure_all_started(ranch), + Cfg. + +end_per_suite(Cfg) -> + Cfg. + +init_per_testcase(Name, Cfg) -> + ?MODULE:Name({pre, Cfg}). + +end_per_testcase(Name, Cfg) -> + ?MODULE:Name({post, Cfg}). + +echo_secure_case({pre, Cfg}) -> + setup_single(?FUNCTION_NAME, ?LINE, Cfg); +echo_secure_case({post, Cfg}) -> + stop_single(Cfg); +echo_secure_case(Cfg) when is_list(Cfg) -> + DcId = ?config(dc_id, Cfg), + Port = ?config(mtp_port, Cfg), + Secret = ?config(mtp_secret, Cfg), + Cli = mtp_test_client:connect({127, 0, 0, 1}, Port, Secret, DcId, mtp_secure), + Data = crypto:strong_rand_bytes(64), + Cli1 = mtp_test_client:send(Data, Cli), + {ok, Packet, Cli2} = mtp_test_client:recv_packet(Cli1, 1000), + ok = mtp_test_client:close(Cli2), + ?assertEqual(Data, Packet), + ?assertEqual(1, mtp_test_metric:get_tags( + count, [mtproto_proxy,in_connection,total], [?FUNCTION_NAME])), + %% race-condition + %% ?assertEqual(1, mtp_test_metric:get_tags( + %% count, [mtproto_proxy,in_connection_closed,total], [?FUNCTION_NAME])), + ?assertEqual({1, 64, 64, 64}, + mtp_test_metric:get_tags( + histogram, [mtproto_proxy,tg_packet_size,bytes], + [upstream_to_downstream])), + ?assertMatch({1, _, _, _}, % larger because of RPC headers + mtp_test_metric:get_tags( + histogram, [mtproto_proxy,tg_packet_size,bytes], + [downstream_to_upstream])). + +echo_abridged_many_packets_case({pre, Cfg}) -> + setup_single(?FUNCTION_NAME, ?LINE, Cfg); +echo_abridged_many_packets_case({post, Cfg}) -> + stop_single(Cfg); +echo_abridged_many_packets_case(Cfg) when is_list(Cfg) -> + DcId = ?config(dc_id, Cfg), + Port = ?config(mtp_port, Cfg), + Secret = ?config(mtp_secret, Cfg), + Cli0 = mtp_test_client:connect({127, 0, 0, 1}, Port, Secret, DcId, mtp_secure), + Packets = + [crypto:strong_rand_bytes(4 * rand:uniform(50)) + || _ <- lists:seq(1, 15)], + Cli2 = lists:foldl(fun mtp_test_client:send/2, Cli0, Packets), + timer:sleep(10), % TODO: some hook in proxy to find when sent + {ok, RecvPackets, Cli} = mtp_test_client:recv_all(Cli2, 1000), + ok = mtp_test_client:close(Cli), + ?assertEqual(Packets, RecvPackets). + +%% Helpers + +setup_single(Name, Offset, Cfg) -> + {ok, Pid} = mtp_test_metric:start_link(), + PubKey = crypto:strong_rand_bytes(128), + DcId = 1, + DcConf = [{DcId, {127, 0, 0, 1}, 10000 + Offset}], + MtpPort = 10000 + Offset + 1, + Secret = mtp_handler:hex(crypto:strong_rand_bytes(16)), + Listeners = [#{name => Name, + port => MtpPort, + listen_ip => "127.0.0.1", + secret => Secret, + tag => <<"dcbe8f1493fa4cd9ab300891c0b5b326">>}], + application:load(mtproto_proxy), + Cfg1 = set_env([{ports, Listeners}], Cfg), + {ok, DcCfg} = mtp_test_middle_server:start_dc(PubKey, DcConf, #{}), + application:load(mtproto_proxy), + {ok, _} = application:ensure_all_started(mtproto_proxy), + [{dc_id, DcId}, + {mtp_port, MtpPort}, + {mtp_secret, Secret}, + {dc_conf, DcCfg}, + {metric, Pid}| Cfg1]. + +stop_single(Cfg) -> + DcCfg = ?config(dc_conf, Cfg), + MetricPid = ?config(metric, Cfg), + ok = application:stop(mtproto_proxy), + {ok, _} = mtp_test_middle_server:stop_dc(DcCfg), + gen_server:stop(MetricPid), + Cfg. + + +set_env(Env, Cfg) -> + OldEnv = + [begin + %% OldV is undefined | {ok, V} + OldV = application:get_env(mtproto_proxy, K), + case V of + undefined -> application:unset_env(mtproto_proxy, K); + _ -> + application:set_env(mtproto_proxy, K, V) + end, + {K, OldV} + end || {K, V} <- Env], + [{mtp_env, OldEnv} | Cfg]. + +%% reset_env(Cfg) -> +%% OldEnv = ?config(mtp_env, Cfg), +%% [case V of +%% undefined -> +%% application:unset_env(mtproto_proxy, K); +%% {ok, Val} -> +%% application:set_env(mtproto_proxy, K, Val) +%% end || {K, V} <- OldEnv]. diff --git a/test/test-sys.config b/test/test-sys.config new file mode 100644 index 0000000..0cb904b --- /dev/null +++ b/test/test-sys.config @@ -0,0 +1,33 @@ +%% -*- mode: erlang -*- +[ + {mtproto_proxy, + [ + {ports, []}, + {external_ip, "127.0.0.1"}, + {listen_ip, "127.0.0.1"}, + {num_acceptors, 2}, + {init_dc_connections, 1}, + {metric_backend, mtp_test_metric} + ]}, + + %% Logging config + {lager, + [{log_root, "log"}, + {crash_log, "crash.log"}, + {handlers, + [ + {lager_console_backend, + [{level, critical}]}, + + {lager_file_backend, + [{file, "application.log"}, + {level, warning}, + + %% Do fsync only on critical messages + {sync_on, critical} + ]} + ]}]}, + + {sasl, + [{errlog_type, error}]} +].