From e56ada1a34543bbf732ec9a6c17623316f0cc836 Mon Sep 17 00:00:00 2001 From: Flowseal Date: Thu, 16 Apr 2026 17:07:59 +0300 Subject: [PATCH] CF domains balancer --- proxy/balancer.py | 42 ++++++++++++++++++++++++++++++++++++++++++ proxy/bridge.py | 12 ++++-------- proxy/config.py | 12 +++++------- proxy/tg_ws_proxy.py | 17 ++++++----------- ui/ctk_tray_ui.py | 4 ++-- 5 files changed, 59 insertions(+), 28 deletions(-) create mode 100644 proxy/balancer.py diff --git a/proxy/balancer.py b/proxy/balancer.py new file mode 100644 index 0000000..31ef347 --- /dev/null +++ b/proxy/balancer.py @@ -0,0 +1,42 @@ +import random +from collections import Counter + +from typing import Dict, List, Iterator + + +class _Balancer: + def __init__(self): + self.domains: List[str] = [] + self._dc_to_domain: Dict[int, str] = {} + + def update_domains_list(self, domains_list: List[str]) -> None: + if Counter(self.domains) == Counter(domains_list): + return + + self.domains = domains_list[:] + + self._dc_to_domain: Dict[int, str] = { + dc_id: random.choice(self.domains) + for dc_id in (1, 2, 3, 4, 5, 203) + } + + def update_domain_for_dc(self, dc_id: int, domain: str) -> bool: + if self._dc_to_domain.get(dc_id) == domain: + return False + + self._dc_to_domain[dc_id] = domain + return True + + def get_domains_for_dc(self, dc_id: int) -> Iterator[str]: + current_domain = self._dc_to_domain.get(dc_id) + yield current_domain + + shuffled_domains = self.domains[:] + random.shuffle(shuffled_domains) + + for domain in shuffled_domains: + if domain != current_domain: + yield domain + + +balancer = _Balancer() diff --git a/proxy/bridge.py b/proxy/bridge.py index ed934c9..1b981d7 100644 --- a/proxy/bridge.py +++ b/proxy/bridge.py @@ -7,6 +7,7 @@ from typing import Dict, List, Optional from .utils import * from .stats import stats +from .balancer import balancer from .config import proxy_config from .raw_websocket import RawWebSocket @@ -160,17 +161,13 @@ async def _cfproxy_fallback(reader, writer, relay_init, label, dc=None, is_media=False, ctx: CryptoCtx = None, splitter=None): media_tag = ' media' if is_media else '' - - active = proxy_config.active_cfproxy_domain - others = [d for d in proxy_config.cfproxy_domains if d != active] - ws = None chosen_domain = None log.info("[%s] DC%d%s -> trying CF proxy", label, dc, media_tag) - for base_domain in ([active] + others): + for base_domain in balancer.get_domains_for_dc(dc): domain = f'kws{dc}.{base_domain}' try: ws = await RawWebSocket.connect(domain, domain, timeout=10.0) @@ -183,9 +180,8 @@ async def _cfproxy_fallback(reader, writer, relay_init, label, if ws is None: return False - if chosen_domain and chosen_domain != proxy_config.active_cfproxy_domain: - log.info("[%s] Switching active CF domain", label) - proxy_config.active_cfproxy_domain = chosen_domain + if chosen_domain and balancer.update_domain_for_dc(dc, chosen_domain): + log.info("[%s] Switched active CF domain", label) stats.connections_cfproxy += 1 await ws.send(relay_init) diff --git a/proxy/config.py b/proxy/config.py index aa4b414..b8458cc 100644 --- a/proxy/config.py +++ b/proxy/config.py @@ -9,6 +9,8 @@ from dataclasses import dataclass, field from typing import Dict, List from urllib.request import Request, urlopen +from .balancer import balancer + log = logging.getLogger('tg-mtproto-proxy') CFPROXY_DOMAINS_URL = ( @@ -45,8 +47,6 @@ class ProxyConfig: fallback_cfproxy: bool = True fallback_cfproxy_priority: bool = True cfproxy_user_domain: str = '' - cfproxy_domains: List[str] = field(default_factory=lambda: list(CFPROXY_DEFAULT_DOMAINS)) - active_cfproxy_domain: str = field(default_factory=lambda: random.choice(CFPROXY_DEFAULT_DOMAINS)) fake_tls_domain: str = '' proxy_protocol: bool = False @@ -79,12 +79,8 @@ def refresh_cfproxy_domains() -> None: if fetched: seen = set() pool = [d for d in fetched if not (d in seen or seen.add(d))] + balancer.update_domains_list(pool) log.info("CF proxy domain pool updated from GitHub (%d domains)", len(pool)) - else: - pool = list(proxy_config.cfproxy_domains) or list(CFPROXY_DEFAULT_DOMAINS) - - proxy_config.cfproxy_domains = pool - proxy_config.active_cfproxy_domain = random.choice(pool) _refresh_stop: threading.Event = threading.Event() @@ -96,6 +92,8 @@ def start_cfproxy_domain_refresh() -> None: _refresh_stop = threading.Event() stop = _refresh_stop + balancer.update_domains_list(CFPROXY_DEFAULT_DOMAINS) + def _loop(): refresh_cfproxy_domains() while not stop.wait(timeout=3600): diff --git a/proxy/tg_ws_proxy.py b/proxy/tg_ws_proxy.py index 900fc16..37d17d3 100644 --- a/proxy/tg_ws_proxy.py +++ b/proxy/tg_ws_proxy.py @@ -4,7 +4,6 @@ import os import sys import time import struct -import random import asyncio import hashlib import argparse @@ -25,10 +24,11 @@ if __name__ == '__main__' and (__package__ is None or __package__ == ''): from .utils import * from .stats import stats -from .config import proxy_config, parse_dc_ip_list, start_cfproxy_domain_refresh, CFPROXY_DEFAULT_DOMAINS +from .config import proxy_config, parse_dc_ip_list, start_cfproxy_domain_refresh from .bridge import MsgSplitter, CryptoCtx, do_fallback, bridge_ws_reencrypt from .raw_websocket import RawWebSocket, WsHandshakeError, set_sock_opts from .fake_tls import proxy_to_masking_domain, verify_client_hello, build_server_hello, FakeTlsStream, TLS_RECORD_HANDSHAKE +from .balancer import balancer log = logging.getLogger('tg-mtproto-proxy') @@ -535,11 +535,8 @@ async def _run(stop_event: Optional[asyncio.Event] = None): if proxy_config.fallback_cfproxy: user = proxy_config.cfproxy_user_domain if user: - proxy_config.cfproxy_domains = [user] - proxy_config.active_cfproxy_domain = user + balancer.update_domains_list([user]) else: - proxy_config.cfproxy_domains = list(CFPROXY_DEFAULT_DOMAINS) - proxy_config.active_cfproxy_domain = random.choice(CFPROXY_DEFAULT_DOMAINS) start_cfproxy_domain_refresh() secret_bytes = bytes.fromhex(proxy_config.secret) @@ -585,12 +582,11 @@ async def _run(stop_event: Optional[asyncio.Event] = None): user_domain = "user" if proxy_config.cfproxy_user_domain else "auto" log.info(" CF proxy: enabled (%s | %s)", prio, user_domain) log.info("=" * 60) - log.info(" Connect links:") + log.info(" Connect:") if ftls: - log.info(" ee (Fake TLS): %s", ee_link) + log.info(" %s", ee_link) else: - log.info(" (standard): %s", proxy_config.secret) - log.info(" dd (random padding): %s", dd_link) + log.info(" %s", dd_link) log.info("=" * 60) async def log_stats(): @@ -716,7 +712,6 @@ def main(): proxy_config.pool_size = max(0, args.pool_size) proxy_config.fallback_cfproxy = not args.no_cfproxy proxy_config.fallback_cfproxy_priority = args.cfproxy_priority - proxy_config.cfproxy_user_domain = args.cfproxy_domain proxy_config.fake_tls_domain = args.fake_tls_domain.strip() proxy_config.proxy_protocol = args.proxy_protocol diff --git a/ui/ctk_tray_ui.py b/ui/ctk_tray_ui.py index 3d3e6e1..87fdecb 100644 --- a/ui/ctk_tray_ui.py +++ b/ui/ctk_tray_ui.py @@ -6,7 +6,7 @@ from dataclasses import dataclass from typing import Any, Callable, Dict, List, Optional, Tuple, Union from proxy import __version__, get_link_host, parse_dc_ip_list -from proxy.config import proxy_config +from proxy.balancer import balancer from utils.update_check import RELEASES_PAGE_URL, get_status @@ -451,7 +451,7 @@ def install_tray_config_form( _threading.Thread(target=_worker, daemon=True).start() else: def _worker_auto(): - ok_domain, res = _run_cfproxy_auto_test(proxy_config.cfproxy_domains) + ok_domain, res = _run_cfproxy_auto_test(balancer.domains) if btn: btn.after(0, lambda: btn.configure(text="Тест", state="normal")) btn.after(0, lambda: _cfproxy_show_auto_test_results(ok_domain, res))