diff --git a/config/sys.config.back.example b/config/sys.config.back.example new file mode 100644 index 0000000..b3a3b1b --- /dev/null +++ b/config/sys.config.back.example @@ -0,0 +1,54 @@ +%% -*- mode: erlang -*- +%% +%% sys.config for the BACK node (foreign server). +%% +%% The back node runs DC pool connections to Telegram. +%% It does NOT listen for client connections — that is done by the front node(s). +%% +%% Multiple front nodes can share one back node. +%% Start the back node first so pools are ready when fronts connect. +%% +[ + {mtproto_proxy, + [ + %% This node is a back-end only. + {node_role, back}, + + %% No `ports' section — the back node does not accept client connections. + + %% Telegram proxy secret (fetched from core.telegram.org). + %% Defaults are fine; override only if you need a custom URL. + %% {proxy_secret_url, "https://core.telegram.org/getProxySecret"}, + %% {proxy_config_url, "https://core.telegram.org/getProxyConfig"}, + + %% The back node must know its external IP for Telegram's encryption. + %% Either set it explicitly or let ip_lookup_services resolve it. + %% {external_ip, "YOUR.BACK.NODE.EXTERNAL.IP"}, + {ip_lookup_services, + ["http://ipv4.seriyps.com/", + "http://v4.ident.me/", + "http://ipv4.icanhazip.com/", + "https://digitalresistance.dog/myIp"]}, + + %% Multiplexing tuning (back node creates DC connections). + %% {init_dc_connections, 2}, + %% {clients_per_dc_connection, 300} + ]}, + + {kernel, + [{logger_level, info}, + {logger, + [{handler, default, logger_std_h, + #{level => info, + config => #{type => file, + file => "/var/log/mtproto-proxy/back.log", + max_no_bytes => 104857600, + max_no_files => 10, + filesync_repeat_interval => no_repeat}}}, + {handler, console, logger_std_h, + #{level => critical, + config => #{type => standard_io}}} + ]}]}, + {sasl, + [{errlog_type, error}]} +]. diff --git a/config/sys.config.front.example b/config/sys.config.front.example new file mode 100644 index 0000000..4ea3804 --- /dev/null +++ b/config/sys.config.front.example @@ -0,0 +1,59 @@ +%% -*- mode: erlang -*- +%% +%% sys.config for the FRONT node (domestic server). +%% +%% The front node runs Ranch listeners and handles client connections. +%% It does NOT connect to Telegram directly — all DC pool work is delegated +%% to the back node via Erlang distribution. +%% +%% Start the BACK node first, then the front node. +%% +[ + {mtproto_proxy, + [ + %% This node is a front-end only. + {node_role, front}, + + %% Name of the back node. Must match -name in the back node's vm.args. + {back_node, 'back@10.0.0.2'}, + + %% Listener ports (same as single-node setup). + {ports, + [#{name => mtp_handler_1, + listen_ip => "0.0.0.0", + port => 443, + secret => <<"d0d6e111bada5511fcce9584deadbeef">>, + tag => <<"dcbe8f1493fa4cd9ab300891c0b5b326">>} + ]}, + + %% Session / replay protection lives on the front node. + {replay_check_session_storage, on}, + + %% Allowed protocols (front node enforces this during handshake). + {allowed_protocols, [mtp_fake_tls, mtp_secure]}, + + %% Connection policies (optional, evaluated on front). + %% {policy, [{max_connections, [client_ipv4], 3}]}, + + {init_timeout_sec, 60}, + {hibernate_timeout_sec, 60}, + {ready_timeout_sec, 1200} + ]}, + + {kernel, + [{logger_level, info}, + {logger, + [{handler, default, logger_std_h, + #{level => info, + config => #{type => file, + file => "/var/log/mtproto-proxy/front.log", + max_no_bytes => 104857600, + max_no_files => 10, + filesync_repeat_interval => no_repeat}}}, + {handler, console, logger_std_h, + #{level => critical, + config => #{type => standard_io}}} + ]}]}, + {sasl, + [{errlog_type, error}]} +]. diff --git a/config/vm.args.back.example b/config/vm.args.back.example new file mode 100644 index 0000000..ceee2ac --- /dev/null +++ b/config/vm.args.back.example @@ -0,0 +1,44 @@ +## vm.args for the BACK node (foreign server — connects to Telegram). +## +## Replace 10.0.0.2 with the actual IP address of this (back) server. +-name back@10.0.0.2 + +## Cookie must be identical on both front and back nodes. +## Generate with: openssl rand -hex 32 +-setcookie mtproto-proxy-cookie + +## Essential for >500 connections ++K true ++A 2 ++SDio 2 + +## ----------------------------------------------------------------------- +## Inter-server link security +## +## The front and back nodes communicate over Erlang distribution. +## You MUST protect this link — it allows arbitrary code execution. +## Choose ONE of the following options: +## +## Option A: Censorship-resistant tunnel (recommended if front is in Russia) +## - Russia blocks WireGuard and OpenVPN by DPI. Use a tunnel that blends +## in with normal traffic: Shadowsocks, VLESS/XRay, Hysteria2, etc. +## - Set up the tunnel between front and back servers and use the tunnel +## interface IP in -name above (e.g., back@10.8.0.2). +## - No extra Erlang config needed once the tunnel is up. +## - If front is NOT in a censored region, WireGuard/IPsec work fine too. +## +## Option B: TLS distribution (no tunnel required) +## - Generate certificates: scripts/gen_dist_certs.sh init /etc/mtproto-proxy/dist +## - Place ca.pem, back.pem, back.key in /etc/mtproto-proxy/dist/ +## - Deploy ssl_dist.back.conf as /etc/mtproto-proxy/dist/ssl_dist.conf +## - On each front node run: scripts/gen_dist_certs.sh add-node /etc/mtproto-proxy/dist front +## +## -proto_dist inet_tls +## -ssl_dist_optfile /etc/mtproto-proxy/dist/ssl_dist.conf +## +## Firewall: allow TCP on the distribution port (9199 below) only between +## the front and back servers, never to the public internet. +## +## -kernel inet_dist_listen_min 9199 +## -kernel inet_dist_listen_max 9199 +## ----------------------------------------------------------------------- diff --git a/config/vm.args.front.example b/config/vm.args.front.example new file mode 100644 index 0000000..278eba8 --- /dev/null +++ b/config/vm.args.front.example @@ -0,0 +1,44 @@ +## vm.args for the FRONT node (domestic server — accepts client connections). +## +## Replace 10.0.0.1 with the actual IP address of this (front) server. +-name front@10.0.0.1 + +## Cookie must be identical on both front and back nodes. +## Generate with: openssl rand -hex 32 +-setcookie mtproto-proxy-cookie + +## Essential for >500 connections ++K true ++A 2 ++SDio 2 + +## ----------------------------------------------------------------------- +## Inter-server link security +## +## The front and back nodes communicate over Erlang distribution. +## You MUST protect this link — it allows arbitrary code execution. +## Choose ONE of the following options: +## +## Option A: Censorship-resistant tunnel (recommended if front is in Russia) +## - Russia blocks WireGuard and OpenVPN by DPI. Use a tunnel that blends +## in with normal traffic: Shadowsocks, VLESS/XRay, Hysteria2, etc. +## - Set up the tunnel between front and back servers and use the tunnel +## interface IP in -name above (e.g., front@10.8.0.1). +## - No extra Erlang config needed once the tunnel is up. +## - If front is NOT in a censored region, WireGuard/IPsec work fine too. +## +## Option B: TLS distribution (no tunnel required) +## - On back server: scripts/gen_dist_certs.sh init /etc/mtproto-proxy/dist +## - Run per front: scripts/gen_dist_certs.sh add-node /etc/mtproto-proxy/dist front +## - Place ca.pem, front.pem, front.key in /etc/mtproto-proxy/dist/ here. +## - Deploy ssl_dist.front.conf as /etc/mtproto-proxy/dist/ssl_dist.conf +## +## -proto_dist inet_tls +## -ssl_dist_optfile /etc/mtproto-proxy/dist/ssl_dist.conf +## +## Firewall: allow TCP on the distribution port (9199 below) only between +## the front and back servers, never to the public internet. +## +## -kernel inet_dist_listen_min 9199 +## -kernel inet_dist_listen_max 9199 +## ----------------------------------------------------------------------- diff --git a/src/mtp_config.erl b/src/mtp_config.erl index f1aeb00..5d3e605 100644 --- a/src/mtp_config.erl +++ b/src/mtp_config.erl @@ -21,7 +21,8 @@ get_secret/0, get_default_dc/0, status/0, - update/0]). + update/0, + backend_node/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -74,9 +75,20 @@ get_downstream_safe(DcId, Opts) -> end. get_downstream_pool(DcId) -> - try whereis(mtp_dc_pool:dc_to_pool_name(DcId)) of - undefined -> not_found; - Pid when is_pid(Pid) -> {ok, Pid} + try mtp_dc_pool:dc_to_pool_name(DcId) of + PoolName -> + case backend_node() of + local -> + case whereis(PoolName) of + undefined -> not_found; + Pid when is_pid(Pid) -> {ok, Pid} + end; + {remote, BackNode} -> + case erpc:call(BackNode, erlang, whereis, [PoolName]) of + undefined -> not_found; + Pid when is_pid(Pid) -> {ok, {PoolName, BackNode}} + end + end catch error:invalid_dc_id -> not_found end. @@ -101,9 +113,14 @@ get_secret() -> -spec get_default_dc() -> dc_id() | undefined. get_default_dc() -> - case ets:lookup(?TAB, ?DEFAULT_DC_KEY) of - [{?DEFAULT_DC_KEY, DcId}] -> DcId; - [] -> undefined + case backend_node() of + local -> + case ets:lookup(?TAB, ?DEFAULT_DC_KEY) of + [{?DEFAULT_DC_KEY, DcId}] -> DcId; + [] -> undefined + end; + {remote, BackNode} -> + gen_server:call({?MODULE, BackNode}, get_default_dc) end. -spec status() -> [mtp_dc_pool:status()]. @@ -120,6 +137,16 @@ status() -> update() -> gen_server:cast(?MODULE, update). +%% @doc Returns `local' when all pools run on this node (both/back role), +%% or `{remote, Node}' when this is a front node pointing at a back node. +-spec backend_node() -> local | {remote, node()}. +backend_node() -> + case application:get_env(?APP, back_node, undefined) of + undefined -> local; + Node when Node =:= node() -> local; + Node -> {remote, Node} + end. + %%%=================================================================== %%% gen_server callbacks @@ -138,6 +165,12 @@ init([]) -> {ok, State}. %%-------------------------------------------------------------------- +handle_call(get_default_dc, _From, #state{tab = Tab} = State) -> + DcId = case ets:lookup(Tab, ?DEFAULT_DC_KEY) of + [{?DEFAULT_DC_KEY, Id}] -> Id; + [] -> undefined + end, + {reply, DcId, State}; handle_call(_Request, _From, State) -> Reply = ok, {reply, Reply, State}. diff --git a/src/mtp_metric.erl b/src/mtp_metric.erl index d346ca9..6b60f3f 100644 --- a/src/mtp_metric.erl +++ b/src/mtp_metric.erl @@ -77,29 +77,40 @@ get_backend() -> Labels :: #{atom() => binary() | atom()}, Value :: integer() | float(). passive_metrics() -> - DownStatus = mtp_config:status(), - [{gauge, [?APP, dc_num_downstreams], - "Count of connections to downstream", - [{#{dc => DcId}, NDowns} - || #{n_downstreams := NDowns, dc_id := DcId} <- DownStatus]}, - {gauge, [?APP, dc_num_upstreams], - "Count of upstreams connected to DC", - [{#{dc => DcId}, NUps} - || #{n_upstreams := NUps, dc_id := DcId} <- DownStatus]}, - {gauge, [?APP, dc_upstreams_per_downstream], - "Count of upstreams connected to DC", - lists:flatmap( - fun(#{min := Min, - max := Max, - dc_id := DcId}) -> - [{#{dc => DcId, meter => min}, Min}, - {#{dc => DcId, meter => max}, Max}] - end, DownStatus)} - | - [{gauge, [?APP, connections, count], - "Count of ranch connections", - [{#{listener => H}, maps:get(all_connections, P)} - || {H, P} <- mtproto_proxy_app:mtp_listeners()]}] ]. + Role = application:get_env(?APP, node_role, both), + DcGauges = + case Role of + front -> []; + _ -> + DownStatus = mtp_config:status(), + [{gauge, [?APP, dc_num_downstreams], + "Count of connections to downstream", + [{#{dc => DcId}, NDowns} + || #{n_downstreams := NDowns, dc_id := DcId} <- DownStatus]}, + {gauge, [?APP, dc_num_upstreams], + "Count of upstreams connected to DC", + [{#{dc => DcId}, NUps} + || #{n_upstreams := NUps, dc_id := DcId} <- DownStatus]}, + {gauge, [?APP, dc_upstreams_per_downstream], + "Count of upstreams connected to DC", + lists:flatmap( + fun(#{min := Min, + max := Max, + dc_id := DcId}) -> + [{#{dc => DcId, meter => min}, Min}, + {#{dc => DcId, meter => max}, Max}] + end, DownStatus)}] + end, + RanchGauge = + case Role of + back -> []; + _ -> + [{gauge, [?APP, connections, count], + "Count of ranch connections", + [{#{listener => H}, maps:get(all_connections, P)} + || {H, P} <- mtproto_proxy_app:mtp_listeners()]}] + end, + DcGauges ++ RanchGauge. -spec active_metrics() -> [{metric_type(), metric_name(), metric_doc(), Opts}] when diff --git a/src/mtproto_proxy_app.erl b/src/mtproto_proxy_app.erl index 46b2dea..773a14d 100644 --- a/src/mtproto_proxy_app.erl +++ b/src/mtproto_proxy_app.erl @@ -35,7 +35,21 @@ start(_StartType, _StartArgs) -> report("+++++++++++++++++++++++++++++++++++++++~n" "🇺🇦 Stand with Ukraine! Glory to the heroes! 🇺🇦~n" "Erlang MTProto proxy by @seriyps https://github.com/seriyps/mtproto_proxy~n", []), - [start_proxy(Where) || Where <- application:get_env(?APP, ports, [])], + Role = node_role(), + case Role of + front -> + case application:get_env(?APP, back_node) of + {ok, BackNode} -> + net_kernel:connect_node(BackNode); + undefined -> + ?LOG_WARNING("node_role=front but back_node is not configured", []) + end; + _ -> ok + end, + case Role of + back -> ok; + _ -> [start_proxy(Where) || Where <- application:get_env(?APP, ports, [])] + end, Res. @@ -50,9 +64,10 @@ stop(_State) -> config_change(Changed, New, Removed) -> %% app's env is already updated when this callback is called - ok = lists:foreach(fun(K) -> config_changed(removed, K, []) end, Removed), - ok = lists:foreach(fun({K, V}) -> config_changed(changed, K, V) end, Changed), - ok = lists:foreach(fun({K, V}) -> config_changed(new, K, V) end, New). + Role = node_role(), + ok = lists:foreach(fun(K) -> config_changed(removed, K, [], Role) end, Removed), + ok = lists:foreach(fun({K, V}) -> config_changed(changed, K, V, Role) end, Changed), + ok = lists:foreach(fun({K, V}) -> config_changed(new, K, V, Role) end, New). %%-------------------------------------------------------------------- %% Other APIs @@ -166,29 +181,36 @@ start_proxy(#{name := Name, port := Port, secret := Secret, tag := Tag} = P) -> stop_proxy(#{name := Name}) -> ranch:stop_listener(Name). -config_changed(_, ip_lookup_services, _) -> +config_changed(_, ip_lookup_services, _, front) -> ok; +config_changed(_, ip_lookup_services, _, _) -> mtp_config:update(); -config_changed(_, proxy_secret_url, _) -> +config_changed(_, proxy_secret_url, _, front) -> ok; +config_changed(_, proxy_secret_url, _, _) -> mtp_config:update(); -config_changed(_, proxy_config_url, _) -> +config_changed(_, proxy_config_url, _, front) -> ok; +config_changed(_, proxy_config_url, _, _) -> mtp_config:update(); -config_changed(Action, max_connections, N) when Action == new; Action == changed -> +config_changed(Action, max_connections, _, back) when Action == new; Action == changed -> ok; +config_changed(Action, max_connections, N, _) when Action == new; Action == changed -> (is_integer(N) and (N >= 0)) orelse error({"max_connections should be non_neg_integer", N}), lists:foreach(fun({Name, _}) -> ranch:set_max_connections(Name, N) end, mtp_listeners()); -config_changed(Action, downstream_socket_buffer_size, N) when Action == new; Action == changed -> +config_changed(Action, downstream_socket_buffer_size, _, front) when Action == new; Action == changed -> ok; +config_changed(Action, downstream_socket_buffer_size, N, _) when Action == new; Action == changed -> [{ok, _} = mtp_down_conn:set_config(Pid, downstream_socket_buffer_size, N) || Pid <- downstream_connections()], ok; -config_changed(Action, downstream_backpressure, BpOpts) when Action == new; Action == changed -> +config_changed(Action, downstream_backpressure, _, front) when Action == new; Action == changed -> ok; +config_changed(Action, downstream_backpressure, BpOpts, _) when Action == new; Action == changed -> is_map(BpOpts) orelse error(invalid_downstream_backpressure), [{ok, _} = mtp_down_conn:set_config(Pid, downstream_backpressure, BpOpts) || Pid <- downstream_connections()], ok; %% Since upstream connections are mostly short-lived, live-update doesn't make much difference -%% config_changed(Action, upstream_socket_buffer_size, N) when Action == new; Action == changed -> -config_changed(Action, ports, Ports) when Action == new; Action == changed -> +%% config_changed(Action, upstream_socket_buffer_size, N, _) when Action == new; Action == changed -> +config_changed(Action, ports, _, back) when Action == new; Action == changed -> ok; +config_changed(Action, ports, Ports, _) when Action == new; Action == changed -> %% TODO: update secret or ad_tag without disconnect RanchPorts = ordsets:from_list(running_ports()), DefaultListenIp = #{listen_ip => application:get_env(?APP, listen_ip, "0.0.0.0")}, @@ -199,7 +221,7 @@ config_changed(Action, ports, Ports) when Action == new; Action == changed -> lists:foreach(fun stop_proxy/1, ToStop), [{ok, _} = start_proxy(Conf) || Conf <- ToStart], ok; -config_changed(Action, K, V) -> +config_changed(Action, K, V, _) -> %% Most of the other config options are applied automatically without extra work ?LOG_INFO("Config ~p ~p to ~p ignored", [K, Action, V]), ok. @@ -207,6 +229,9 @@ config_changed(Action, K, V) -> downstream_connections() -> [Pid || {_, Pid, worker, [mtp_down_conn]} <- supervisor:which_children(mtp_down_conn_sup)]. +node_role() -> + application:get_env(?APP, node_role, both). + build_urls(Host, Port, Secret, Protocols) -> MkUrl = fun(ProtoSecret) -> diff --git a/src/mtproto_proxy_sup.erl b/src/mtproto_proxy_sup.erl index 5baedfe..ec4a7bd 100644 --- a/src/mtproto_proxy_sup.erl +++ b/src/mtproto_proxy_sup.erl @@ -2,6 +2,7 @@ %% @doc mtproto_proxy top level supervisor. %% @end %%
+%% In `both' (default) or `back' role: %% dc_pool_sup (simple_one_for_one) %% dc_pool_1 [conn1, conn3, conn4, ..] %% dc_pool_-1 [conn2, ..] @@ -9,12 +10,10 @@ %% dc_pool_-2 [conn6, conn8, ..] %% ... %% down_conn_sup (simple_one_for_one) -%% conn1 -%% conn2 -%% conn3 -%% conn4 -%% ... -%% connN +%% conn1..connN +%% +%% In `front' role only session/policy children are started; +%% DC pools live on the remote back node. %%%%%------------------------------------------------------------------- @@ -45,19 +44,28 @@ init([]) -> SupFlags = #{strategy => one_for_all, %TODO: maybe change strategy intensity => 50, period => 5}, - Childs = [#{id => mtp_down_conn_sup, - type => supervisor, - start => {mtp_down_conn_sup, start_link, []}}, - #{id => mtp_dc_pool_sup, - type => supervisor, - start => {mtp_dc_pool_sup, start_link, []}}, - #{id => mtp_config, - start => {mtp_config, start_link, []}}, - #{id => mtp_session_storage, - start => {mtp_session_storage, start_link, []}}, - #{id => mtp_policy_table, - start => {mtp_policy_table, start_link, []}}, - #{id => mtp_policy_counter, - start => {mtp_policy_counter, start_link, []}} - ], - {ok, {SupFlags, Childs}}. + Role = application:get_env(mtproto_proxy, node_role, both), + {ok, {SupFlags, children(Role)}}. + +%%==================================================================== +%% Internal functions +%%==================================================================== + +children(front) -> + [#{id => mtp_session_storage, + start => {mtp_session_storage, start_link, []}}, + #{id => mtp_policy_table, + start => {mtp_policy_table, start_link, []}}, + #{id => mtp_policy_counter, + start => {mtp_policy_counter, start_link, []}}]; +children(back) -> + [#{id => mtp_down_conn_sup, + type => supervisor, + start => {mtp_down_conn_sup, start_link, []}}, + #{id => mtp_dc_pool_sup, + type => supervisor, + start => {mtp_dc_pool_sup, start_link, []}}, + #{id => mtp_config, + start => {mtp_config, start_link, []}}]; +children(both) -> + children(back) ++ children(front). diff --git a/test/split_dc_SUITE.erl b/test/split_dc_SUITE.erl new file mode 100644 index 0000000..774cf47 --- /dev/null +++ b/test/split_dc_SUITE.erl @@ -0,0 +1,229 @@ +%% @doc Tests for split-mode (front/back node) distributed setup. +%% +%% The CT node acts as the front node (Ranch listener + mtp_handler). +%% A peer node started with the `peer' module acts as the back node +%% (mtp_config + DC pool + mtp_down_conn). +%% +%% Both nodes run on the same host so 127.0.0.1 is reachable from both sides. +%% The fake telegram datacenter (HTTP config server + middle server) runs on +%% the CT node, which is fine because the back node can reach 127.0.0.1. +-module(split_dc_SUITE). + +-export([all/0, + init_per_suite/1, + end_per_suite/1, + init_per_testcase/2, + end_per_testcase/2]). + +-export([echo_split_case/1, + migration_split_case/1]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-define(APP, mtproto_proxy). + +all() -> + %% All exported functions of arity 1 whose name ends with "_case" + Exports = ?MODULE:module_info(exports), + [F + || {F, A} <- Exports, + A == 1, + case lists:reverse(atom_to_list(F)) of + "esac_" ++ _ -> true; + _ -> false + end]. + +init_per_suite(Cfg) -> + {ok, _} = application:ensure_all_started(inets), + {ok, _} = application:ensure_all_started(ranch), + %% peer:start_link requires the current node to be distributed. + %% Start distribution if rebar3 ct didn't already do so. + Distributed = + case net_kernel:start([split_dc_test, shortnames]) of + {ok, _} -> true; + {error, {already_started, _}} -> false + end, + [{started_distribution, Distributed} | Cfg]. + +end_per_suite(Cfg) -> + case ?config(started_distribution, Cfg) of + true -> net_kernel:stop(); + false -> ok + end, + Cfg. + +init_per_testcase(Name, Cfg) -> + ?MODULE:Name({pre, Cfg}). + +end_per_testcase(Name, Cfg) -> + ?MODULE:Name({post, Cfg}). + +%%==================================================================== +%% Test cases +%%==================================================================== + +%% @doc Full echo through a split front/back setup using mtp_secure protocol. +%% Verifies that data flows end-to-end across the distributed nodes and that +%% the front-node metrics are recorded correctly. +echo_split_case({pre, Cfg}) -> + setup_split(?FUNCTION_NAME, 13000 + ?LINE, #{}, Cfg); +echo_split_case({post, Cfg}) -> + stop_split(Cfg); +echo_split_case(Cfg) when is_list(Cfg) -> + DcId = ?config(dc_id, Cfg), + Host = ?config(mtp_host, Cfg), + Port = ?config(mtp_port, Cfg), + Secret = ?config(mtp_secret, Cfg), + Cli = mtp_test_client:connect(Host, Port, Secret, DcId, mtp_secure), + Cli2 = ping(Cli), + %% Front-node metrics: protocol negotiated and connection counted. + ?assertEqual( + 1, mtp_test_metric:get_tags( + count, [?APP, protocol_ok, total], [?FUNCTION_NAME, mtp_secure])), + ok = mtp_test_client:close(Cli2), + ok = mtp_test_metric:wait_for_value( + count, [?APP, in_connection_closed, total], [?FUNCTION_NAME], 1, 5000), + ok. + +%% @doc Client survives a DC connection rotation in split mode. +%% mtp_handler (front node) migrates the client to a surviving DC connection +%% on the back node. The migration metric is emitted by mtp_handler, so it +%% is checked locally on the CT (front) node. +migration_split_case({pre, Cfg}) -> + setup_split(?FUNCTION_NAME, 13000 + ?LINE, + #{init_dc_connections => 2, rpc_handler => mtp_test_reporter_rpc}, Cfg); +migration_split_case({post, Cfg}) -> + stop_split(Cfg); +migration_split_case(Cfg) when is_list(Cfg) -> + DcId = ?config(dc_id, Cfg), + Host = ?config(mtp_host, Cfg), + Port = ?config(mtp_port, Cfg), + Secret = ?config(mtp_secret, Cfg), + Pool = mtp_dc_pool:dc_to_pool_name(DcId), + BackNode = ?config(back_node, Cfg), + register(mtp_test_rpc_sink, self()), + try + Cli = mtp_test_client:connect(Host, Port, Secret, DcId, mtp_secure), + Cli1 = ping(Cli), + %% mtp_test_reporter_rpc runs inside the middle server (CT node), so + %% {rpc_from, ServerPid, ConnId, Data} arrives in our mailbox directly. + ServerPid = receive {rpc_from, Pid, _, _} -> Pid end, + ok = mtp_test_middle_server:close_connection(ServerPid), + %% mtp_handler lives on the front (CT) node — migration metric is local. + ok = mtp_test_metric:wait_for_value( + count, [?APP, downstream_migration, total], + [?FUNCTION_NAME, DcId, ok], 1, 5000), + Cli2 = ping(Cli1), + %% Back-node pool must still track our one upstream. + ?assertMatch(#{n_upstreams := 1}, gen_server:call({Pool, BackNode}, status)), + ok = mtp_test_client:close(Cli2) + after + unregister(mtp_test_rpc_sink) + end. + +%%==================================================================== +%% Setup / teardown helpers +%%==================================================================== + +setup_split(Name, MtpPort, DcCfg0, Cfg) -> + PubKey = crypto:strong_rand_bytes(128), + DcId = 1, + DcConf = [{DcId, {127, 0, 0, 1}, MtpPort + 10}], + + %% Start the fake DC (HTTP config server + middle server) on the CT node. + %% This also sets proxy_secret_url / proxy_config_url / external_ip in the + %% local app env (they are used by the back node, not the front). + {ok, DcCfg} = mtp_test_datacenter:start_dc(PubKey, DcConf, DcCfg0), + {ok, ProxySecretUrl} = application:get_env(?APP, proxy_secret_url), + {ok, ProxyConfigUrl} = application:get_env(?APP, proxy_config_url), + + %% Start the metric store for the front (CT) node. + {ok, FrontMetricPid} = mtp_test_metric:start_link(), + + try + %% ---- Start back peer node ---- + %% Use the same code paths as the CT node so all modules are available. + PeerName = list_to_atom("back_" ++ atom_to_list(Name)), + {ok, BackPeer, BackNode} = peer:start_link( + #{name => PeerName, + args => ["-pa" | code:get_path()]}), + try + %% Configure the back node before starting the application. + ok = rpc:call(BackNode, application, load, [?APP]), + %% Unset ip_lookup_services so mtp_config doesn't fetch the real external + %% IP and overwrite the "127.0.0.1" we set for external_ip below. + ok = rpc:call(BackNode, application, unset_env, [?APP, ip_lookup_services]), + BackEnv0 = [{node_role, back}, + {proxy_secret_url, ProxySecretUrl}, + {proxy_config_url, ProxyConfigUrl}, + {external_ip, "127.0.0.1"}, + {metric_backend, mtp_test_metric}], + BackEnv = case maps:find(init_dc_connections, DcCfg0) of + {ok, N} -> [{init_dc_connections, N} | BackEnv0]; + error -> BackEnv0 + end, + [ok = rpc:call(BackNode, application, set_env, [?APP, K, V]) || {K, V} <- BackEnv], + {ok, BackMetricPid} = rpc:call(BackNode, mtp_test_metric, start_link, []), + {ok, _} = rpc:call(BackNode, application, ensure_all_started, [?APP]), + + %% ---- Configure and start front (CT) node ---- + Secret = mtp_handler:hex(crypto:strong_rand_bytes(16)), + Listeners = [#{name => Name, + port => MtpPort, + listen_ip => "127.0.0.1", + secret => Secret, + tag => <<"dcbe8f1493fa4cd9ab300891c0b5b326">>}], + %% node_role and back_node override whatever test-sys.config set. + application:set_env(?APP, node_role, front), + application:set_env(?APP, back_node, BackNode), + application:set_env(?APP, ports, Listeners), + {ok, _} = application:ensure_all_started(?APP), + + {ok, MtpIp} = inet:parse_address("127.0.0.1"), + [{dc_id, DcId}, + {mtp_host, MtpIp}, + {mtp_port, MtpPort}, + {mtp_secret, Secret}, + {dc_conf, DcCfg}, + {back_node, BackNode}, + {back_peer, BackPeer}, + {back_metric_pid, BackMetricPid}, + {front_metric_pid, FrontMetricPid} | Cfg] + catch E1:R1:ST1 -> + peer:stop(BackPeer), + erlang:raise(E1, R1, ST1) + end + catch E2:R2:ST2 -> + gen_server:stop(FrontMetricPid), + {ok, _} = mtp_test_datacenter:stop_dc(DcCfg), + erlang:raise(E2, R2, ST2) + end. + +stop_split(Cfg) -> + DcCfg = ?config(dc_conf, Cfg), + BackNode = ?config(back_node, Cfg), + BackPeer = ?config(back_peer, Cfg), + BackMetricPid = ?config(back_metric_pid, Cfg), + FrontMetricPid = ?config(front_metric_pid, Cfg), + ok = application:stop(?APP), + ok = application:unload(?APP), + ok = rpc:call(BackNode, application, stop, [?APP]), + ok = rpc:call(BackNode, application, unload, [?APP]), + %% Stop metric on back node before the peer process disappears. + rpc:call(BackNode, gen_server, stop, [BackMetricPid]), + ok = peer:stop(BackPeer), + {ok, _} = mtp_test_datacenter:stop_dc(DcCfg), + gen_server:stop(FrontMetricPid), + Cfg. + +%%==================================================================== +%% Internal helpers +%%==================================================================== + +ping(Cli0) -> + Data = crypto:strong_rand_bytes(64), + Cli1 = mtp_test_client:send(Data, Cli0), + {ok, Packet, Cli2} = mtp_test_client:recv_packet(Cli1, 3000), + ?assertEqual(Data, Packet), + Cli2.