feat: transparent client migration on DC connection death

When Telegram closes a DC TCP connection, instead of dropping all
multiplexed clients, the proxy now remaps them to a surviving (or
freshly-spawned) replacement DC connection.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Sergey Prokhorov 2026-04-08 00:46:40 +02:00
parent ed2fa40ee5
commit 492956a598
No known key found for this signature in database
GPG key ID: 1C570244E4EF3337
7 changed files with 341 additions and 13 deletions

View file

@ -5,7 +5,8 @@
-export([start/2,
stop/1,
get_rpc_handler_state/1]).
get_rpc_handler_state/1,
close_connection/1]).
-export([start_link/3,
ranch_init/1]).
-export([init/1,
@ -61,6 +62,10 @@ stop(Id) ->
get_rpc_handler_state(Pid) ->
gen_statem:call(Pid, get_rpc_handler_state).
%% Close the server-side TCP socket, simulating Telegram rotating the connection.
close_connection(Pid) ->
gen_statem:call(Pid, close_connection).
%% Callbacks
start_link(Ref, Transport, Opts) ->
@ -159,6 +164,9 @@ on_tunnel(info, {tcp, _Sock, TcpData}, #t_state{codec = Codec0} = S) ->
{keep_state, activate(S2#t_state{codec = Codec1})};
on_tunnel({call, From}, get_rpc_handler_state, #t_state{rpc_handler_state = HSt}) ->
{keep_state_and_data, [{reply, From, HSt}]};
on_tunnel({call, From}, close_connection, #t_state{sock = Sock, transport = Transport}) ->
Transport:close(Sock),
{stop_and_reply, normal, [{reply, From, ok}]};
on_tunnel(Type, Event, S) ->
handle_event(Type, Event, ?FUNCTION_NAME, S).

View file

@ -0,0 +1,17 @@
%% @doc rpc_handler for mtp_test_middle_server that echoes packets and reports
%% each one to a registered process named `mtp_test_rpc_sink'.
%% The report message is `{rpc_from, self(), ConnId, Data}', where `self()' is
%% the mtp_test_middle_server Ranch connection pid useful for tests that need
%% to identify which DC connection a client is multiplexed on and close it.
-module(mtp_test_reporter_rpc).
-export([init/1,
handle_rpc/2]).
init(_) ->
#{}.
handle_rpc({data, ConnId, Data}, St) ->
mtp_test_rpc_sink ! {rpc_from, self(), ConnId, Data},
{rpc, {proxy_ans, ConnId, Data}, St};
handle_rpc({remote_closed, ConnId}, St) ->
{noreply, St#{ConnId => closed}}.

View file

@ -27,7 +27,10 @@
domain_fronting_replay_case/1,
per_sni_secrets_on_case/1,
per_sni_secrets_wrong_secret_case/1,
malformed_tls_hello_decode_error_case/1
malformed_tls_hello_decode_error_case/1,
downstream_migration_case/1,
downstream_migration_multi_case/1,
downstream_migration_empty_pool_case/1
]).
-export([set_env/2,
@ -781,6 +784,103 @@ malformed_tls_hello_decode_error_case(Cfg) when is_list(Cfg) ->
1, mtp_test_metric:get_tags(
count, [?APP, protocol_error, total], [?FUNCTION_NAME, tls_bad_client_hello])).
%% @doc Client survives a DC connection rotation (1 client, 2 DC connections available).
downstream_migration_case({pre, Cfg}) ->
setup_single(?FUNCTION_NAME, 10000 + ?LINE,
#{init_dc_connections => 2, rpc_handler => mtp_test_reporter_rpc}, Cfg);
downstream_migration_case({post, Cfg}) ->
stop_single(Cfg);
downstream_migration_case(Cfg) when is_list(Cfg) ->
DcId = ?config(dc_id, Cfg),
Host = ?config(mtp_host, Cfg),
Port = ?config(mtp_port, Cfg),
Secret = ?config(mtp_secret, Cfg),
Pool = mtp_dc_pool:dc_to_pool_name(DcId),
register(mtp_test_rpc_sink, self()),
try
Cli = mtp_test_client:connect(Host, Port, Secret, DcId, mtp_secure),
Cli1 = ping(Cli),
%% Receive the reporter notification to learn which Ranch/middle-server pid
%% this client's DC connection tunnels through.
ServerPid = receive {rpc_from, Pid, _, _} -> Pid end,
ok = mtp_test_middle_server:close_connection(ServerPid),
%% Wait until handler has successfully migrated to the surviving downstream.
ok = mtp_test_metric:wait_for_value(
count, [?APP, downstream_migration, total],
[?FUNCTION_NAME, ok], 1, 5000),
%% Client must still work after migration.
Cli2 = ping(Cli1),
%% Pool tracking must be clean: exactly 1 upstream registered.
?assertMatch(#{n_upstreams := 1}, mtp_dc_pool:status(Pool)),
ok = mtp_test_client:close(Cli2)
after
unregister(mtp_test_rpc_sink)
end.
%% @doc All clients survive when one of two DC connections is rotated.
downstream_migration_multi_case({pre, Cfg}) ->
setup_single(?FUNCTION_NAME, 10000 + ?LINE,
#{init_dc_connections => 2, rpc_handler => mtp_test_reporter_rpc}, Cfg);
downstream_migration_multi_case({post, Cfg}) ->
stop_single(Cfg);
downstream_migration_multi_case(Cfg) when is_list(Cfg) ->
DcId = ?config(dc_id, Cfg),
Host = ?config(mtp_host, Cfg),
Port = ?config(mtp_port, Cfg),
Secret = ?config(mtp_secret, Cfg),
Pool = mtp_dc_pool:dc_to_pool_name(DcId),
N = 3,
register(mtp_test_rpc_sink, self()),
try
Clients = [mtp_test_client:connect(Host, Port, Secret, DcId, mtp_secure)
|| _ <- lists:seq(1, N)],
Clients1 = [ping(C) || C <- Clients],
%% Drain all N {rpc_from,...} messages, grouping by ServerPid to find
%% which DC connection each client landed on.
Groups = lists:foldl(fun(_, Acc) ->
receive {rpc_from, Pid, _, _} ->
maps:update_with(Pid, fun(C) -> C + 1 end, 1, Acc)
end
end, #{}, lists:seq(1, N)),
%% Close the DC connection carrying the most clients.
{ServerPid, NOnServer} = hd(lists:reverse(lists:keysort(2, maps:to_list(Groups)))),
ok = mtp_test_middle_server:close_connection(ServerPid),
%% Wait until exactly NOnServer clients have successfully migrated.
ok = mtp_test_metric:wait_for_value(
count, [?APP, downstream_migration, total],
[?FUNCTION_NAME, ok], NOnServer, 5000),
Clients2 = [ping(C) || C <- Clients1],
?assertMatch(#{n_upstreams := N}, mtp_dc_pool:status(Pool)),
[ok = mtp_test_client:close(C) || C <- Clients2]
after
unregister(mtp_test_rpc_sink)
end,
ok.
%% @doc When pool is empty after DC rotation, client closes gracefully.
downstream_migration_empty_pool_case({pre, Cfg}) ->
setup_single(?FUNCTION_NAME, 10000 + ?LINE,
#{init_dc_connections => 0}, Cfg);
downstream_migration_empty_pool_case({post, Cfg}) ->
stop_single(Cfg);
downstream_migration_empty_pool_case(Cfg) when is_list(Cfg) ->
DcId = ?config(dc_id, Cfg),
Host = ?config(mtp_host, Cfg),
Port = ?config(mtp_port, Cfg),
Secret = ?config(mtp_secret, Cfg),
DcCfg = ?config(dc_conf, Cfg),
Pool = mtp_dc_pool:dc_to_pool_name(DcId),
%% Manually add one connection and wait for it to be ready
ok = mtp_dc_pool:add_connection(Pool),
ok = wait_for_pool_status(Pool, fun(S) -> maps:get(n_downstreams, S) >= 1 end, 5000),
Cli = mtp_test_client:connect(Host, Port, Secret, DcId, mtp_secure),
_Cli1 = ping(Cli),
[Conn] = mtp_test_datacenter:middle_connections(DcCfg),
ok = mtp_test_middle_server:close_connection(Conn),
%% Pool stays empty (init_dc_connections=0 so no replacement spawned)
%% Client must close gracefully
?assertEqual({error, closed}, mtp_test_client:recv_packet(_Cli1, 2000)).
setup_single(Name, MtpPort, DcCfg0, Cfg) ->
setup_single(Name, "127.0.0.1", MtpPort, DcCfg0, Cfg).
@ -796,7 +896,11 @@ setup_single(Name, MtpIpStr, MtpPort, DcCfg0, Cfg) ->
secret => Secret,
tag => <<"dcbe8f1493fa4cd9ab300891c0b5b326">>}],
application:load(mtproto_proxy),
Cfg1 = set_env([{ports, Listeners}], Cfg),
AppEnv = case maps:find(init_dc_connections, DcCfg0) of
{ok, N} -> [{init_dc_connections, N}];
error -> []
end,
Cfg1 = set_env([{ports, Listeners}] ++ AppEnv, Cfg),
{ok, DcCfg} = mtp_test_datacenter:start_dc(PubKey, DcConf, DcCfg0),
{ok, _} = application:ensure_all_started(mtproto_proxy),
{ok, MtpIp} = inet:parse_address(MtpIpStr),
@ -851,3 +955,23 @@ ping(Cli0) ->
{ok, Packet, Cli2} = mtp_test_client:recv_packet(Cli1, 1000),
?assertEqual(Data, Packet),
Cli2.
wait_for_pool_status(Pool, Pred, Timeout) ->
Deadline = erlang:monotonic_time(millisecond) + Timeout,
wait_for_pool_status_loop(Pool, Pred, Deadline).
wait_for_pool_status_loop(Pool, Pred, Deadline) ->
Status = mtp_dc_pool:status(Pool),
case Pred(Status) of
true ->
ok;
false ->
Remaining = Deadline - erlang:monotonic_time(millisecond),
case Remaining > 0 of
true ->
timer:sleep(50),
wait_for_pool_status_loop(Pool, Pred, Deadline);
false ->
{error, {timeout, Status}}
end
end.