From 66f074655b1dc34fe49f7d743e0f1f0b32978614 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A1=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=9F=D1=80=D0=BE?= =?UTF-8?q?=D1=85=D0=BE=D1=80=D0=BE=D0=B2?= Date: Fri, 8 Mar 2019 00:44:35 +0100 Subject: [PATCH] Separate metrics for different backpressure types; test tweaks --- src/mtp_down_conn.erl | 30 +++++++++++++++++------------- src/mtp_metric.erl | 2 +- test/single_dc_SUITE.erl | 12 ++++++------ 3 files changed, 24 insertions(+), 20 deletions(-) diff --git a/src/mtp_down_conn.erl b/src/mtp_down_conn.erl index 6c55c30..343f545 100644 --- a/src/mtp_down_conn.erl +++ b/src/mtp_down_conn.erl @@ -293,8 +293,12 @@ non_ack_bump(Upstream, Size, #state{non_ack_count = Cnt, UpsOct + Size}}}). %% Do we have too much unconfirmed packets? -is_overflow(#state{non_ack_count = Cnt, non_ack_bytes = Oct}) -> - (Cnt > ?MAX_NON_ACK_COUNT) orelse (Oct > ?MAX_NON_ACK_BYTES). +is_overflow(#state{non_ack_count = Cnt}) when Cnt > ?MAX_NON_ACK_COUNT -> + count; +is_overflow(#state{non_ack_bytes = Oct}) when Oct > ?MAX_NON_ACK_BYTES -> + bytes; +is_overflow(_) -> + false. %% If we are not overflown and socket is passive, activate it activate_if_no_overflow(#state{overflow_passive = false, sock = Sock}) -> @@ -323,14 +327,14 @@ handle_ack(Upstream, Count, Size, #state{non_ack_count = Cnt, maybe_deactivate(#state{overflow_passive = false, dc_id = Dc} = St) -> case is_overflow(St) of - true -> - %% Was not overflow, now overflowed - mtp_metric:count_inc([?APP, down_backpressure, total], 1, - #{labels => [Dc, true]}), - St#state{overflow_passive = true}; false -> %% Was not overflow and still not - St + St; + Type -> + %% Was not overflow, now overflowed + mtp_metric:count_inc([?APP, down_backpressure, total], 1, + #{labels => [Dc, Type]}), + St#state{overflow_passive = true} end; maybe_deactivate(St) -> St. @@ -338,15 +342,15 @@ maybe_deactivate(St) -> %% Activate socket if we changed state from overflow to ok maybe_activate(#state{overflow_passive = true, sock = Sock, dc_id = Dc} = St) -> case is_overflow(St) of - true -> - %% Still overflow - St; false -> %% Was overflow, but now resolved ok = inet:setopts(Sock, [{active, once}]), mtp_metric:count_inc([?APP, down_backpressure, total], 1, - #{labels => [Dc, false]}), - St#state{overflow_passive = false} + #{labels => [Dc, off]}), + St#state{overflow_passive = false}; + _ -> + %% Still overflow + St end; maybe_activate(#state{} = St) -> St. diff --git a/src/mtp_metric.erl b/src/mtp_metric.erl index 1a6cf63..2019045 100644 --- a/src/mtp_metric.erl +++ b/src/mtp_metric.erl @@ -155,7 +155,7 @@ active_metrics() -> {count, [?APP, down_backpressure, total], "Times downstream backpressure state was changed", - #{labels => [dc_id, enabled]}}, + #{labels => [dc_id, state]}}, {histogram, [?APP, upstream_send_duration, seconds], "Duration of tcp send calls to upstream", #{duration_unit => seconds, diff --git a/test/single_dc_SUITE.erl b/test/single_dc_SUITE.erl index b8ac75e..732546e 100644 --- a/test/single_dc_SUITE.erl +++ b/test/single_dc_SUITE.erl @@ -126,7 +126,7 @@ downstream_size_backpressure_case(Cfg) when is_list(Cfg) -> %% Backpressure by size limit is defined in mtp_down_conn.erl:?MAX_NON_ACK_BYTES BPressureThreshold = ?MB(6), PacketSize = ?KB(400), - NPackets = 2 * BPressureThreshold div PacketSize, + NPackets = 4 * BPressureThreshold div PacketSize, Packet = crypto:strong_rand_bytes(PacketSize), Req = mtp_test_cmd_rpc:call(?MODULE, gen_rpc_replies, #{packet => Packet, n => NPackets}), @@ -135,7 +135,7 @@ downstream_size_backpressure_case(Cfg) when is_list(Cfg) -> %% Wait for backpressure-in ?assertEqual( ok, mtp_test_metric:wait_for_value( - count, [?APP, down_backpressure, total], [DcId, true], 1, 5000)), + count, [?APP, down_backpressure, total], [DcId, bytes], 1, 5000)), %% Upstream healthcheck should be disabled, otherwise it can interfere ?assertEqual(not_found, mtp_test_metric:get_tags( @@ -143,7 +143,7 @@ downstream_size_backpressure_case(Cfg) when is_list(Cfg) -> %% No backpressure-out, because we don't read any data ?assertEqual(not_found, mtp_test_metric:get_tags( - count, [?APP, down_backpressure, total], [DcId, false])), + count, [?APP, down_backpressure, total], [DcId, off])), %% Amount of bytes received by proxy will be bigger than amount sent to upstreams TgToProxy = mtp_test_metric:get_tags( @@ -156,7 +156,7 @@ downstream_size_backpressure_case(Cfg) when is_list(Cfg) -> {ok, _RecvPackets, Cli2} = mtp_test_client:recv_all(Cli1, 1000), ?assertEqual( ok, mtp_test_metric:wait_for( - count, [?APP, down_backpressure, total], [DcId, true], + count, [?APP, down_backpressure, total], [DcId, bytes], fun(V) -> is_integer(V) and (V > 0) end, 5000)), ok = mtp_test_client:close(Cli2), %% ct:pal("t->p ~p; p->c ~p; diff ~p", @@ -198,7 +198,7 @@ downstream_qlen_backpressure_case(Cfg) when is_list(Cfg) -> %% Wait for backpressure-in ?assertEqual( ok, mtp_test_metric:wait_for_value( - count, [?APP, down_backpressure, total], [DcId, true], 1, 5000)), + count, [?APP, down_backpressure, total], [DcId, count], 1, 5000)), %% Close connection to release backpressure ok = mtp_test_client:close(Cli1), ?assertEqual( @@ -206,7 +206,7 @@ downstream_qlen_backpressure_case(Cfg) when is_list(Cfg) -> count, [?APP, in_connection_closed, total], [?FUNCTION_NAME], 1, 5000)), ?assertEqual( ok, mtp_test_metric:wait_for( - count, [?APP, down_backpressure, total], [DcId, true], + count, [?APP, down_backpressure, total], [DcId, off], fun(V) -> is_integer(V) and (V > 0) end, 5000)), %% [{_, Pid, _, _}] = supervisor:which_children(mtp_down_conn_sup), %% ct:pal("Down conn state: ~p", [sys:get_state(Pid)]),