CF domains balancer

This commit is contained in:
Flowseal 2026-04-16 17:07:59 +03:00
parent b44d79a933
commit e56ada1a34
5 changed files with 59 additions and 28 deletions

42
proxy/balancer.py Normal file
View file

@ -0,0 +1,42 @@
import random
from collections import Counter
from typing import Dict, List, Iterator
class _Balancer:
def __init__(self):
self.domains: List[str] = []
self._dc_to_domain: Dict[int, str] = {}
def update_domains_list(self, domains_list: List[str]) -> None:
if Counter(self.domains) == Counter(domains_list):
return
self.domains = domains_list[:]
self._dc_to_domain: Dict[int, str] = {
dc_id: random.choice(self.domains)
for dc_id in (1, 2, 3, 4, 5, 203)
}
def update_domain_for_dc(self, dc_id: int, domain: str) -> bool:
if self._dc_to_domain.get(dc_id) == domain:
return False
self._dc_to_domain[dc_id] = domain
return True
def get_domains_for_dc(self, dc_id: int) -> Iterator[str]:
current_domain = self._dc_to_domain.get(dc_id)
yield current_domain
shuffled_domains = self.domains[:]
random.shuffle(shuffled_domains)
for domain in shuffled_domains:
if domain != current_domain:
yield domain
balancer = _Balancer()

View file

@ -7,6 +7,7 @@ from typing import Dict, List, Optional
from .utils import *
from .stats import stats
from .balancer import balancer
from .config import proxy_config
from .raw_websocket import RawWebSocket
@ -160,17 +161,13 @@ async def _cfproxy_fallback(reader, writer, relay_init, label,
dc=None, is_media=False,
ctx: CryptoCtx = None, splitter=None):
media_tag = ' media' if is_media else ''
active = proxy_config.active_cfproxy_domain
others = [d for d in proxy_config.cfproxy_domains if d != active]
ws = None
chosen_domain = None
log.info("[%s] DC%d%s -> trying CF proxy",
label, dc, media_tag)
for base_domain in ([active] + others):
for base_domain in balancer.get_domains_for_dc(dc):
domain = f'kws{dc}.{base_domain}'
try:
ws = await RawWebSocket.connect(domain, domain, timeout=10.0)
@ -183,9 +180,8 @@ async def _cfproxy_fallback(reader, writer, relay_init, label,
if ws is None:
return False
if chosen_domain and chosen_domain != proxy_config.active_cfproxy_domain:
log.info("[%s] Switching active CF domain", label)
proxy_config.active_cfproxy_domain = chosen_domain
if chosen_domain and balancer.update_domain_for_dc(dc, chosen_domain):
log.info("[%s] Switched active CF domain", label)
stats.connections_cfproxy += 1
await ws.send(relay_init)

View file

@ -9,6 +9,8 @@ from dataclasses import dataclass, field
from typing import Dict, List
from urllib.request import Request, urlopen
from .balancer import balancer
log = logging.getLogger('tg-mtproto-proxy')
CFPROXY_DOMAINS_URL = (
@ -45,8 +47,6 @@ class ProxyConfig:
fallback_cfproxy: bool = True
fallback_cfproxy_priority: bool = True
cfproxy_user_domain: str = ''
cfproxy_domains: List[str] = field(default_factory=lambda: list(CFPROXY_DEFAULT_DOMAINS))
active_cfproxy_domain: str = field(default_factory=lambda: random.choice(CFPROXY_DEFAULT_DOMAINS))
fake_tls_domain: str = ''
proxy_protocol: bool = False
@ -79,12 +79,8 @@ def refresh_cfproxy_domains() -> None:
if fetched:
seen = set()
pool = [d for d in fetched if not (d in seen or seen.add(d))]
balancer.update_domains_list(pool)
log.info("CF proxy domain pool updated from GitHub (%d domains)", len(pool))
else:
pool = list(proxy_config.cfproxy_domains) or list(CFPROXY_DEFAULT_DOMAINS)
proxy_config.cfproxy_domains = pool
proxy_config.active_cfproxy_domain = random.choice(pool)
_refresh_stop: threading.Event = threading.Event()
@ -96,6 +92,8 @@ def start_cfproxy_domain_refresh() -> None:
_refresh_stop = threading.Event()
stop = _refresh_stop
balancer.update_domains_list(CFPROXY_DEFAULT_DOMAINS)
def _loop():
refresh_cfproxy_domains()
while not stop.wait(timeout=3600):

View file

@ -4,7 +4,6 @@ import os
import sys
import time
import struct
import random
import asyncio
import hashlib
import argparse
@ -25,10 +24,11 @@ if __name__ == '__main__' and (__package__ is None or __package__ == ''):
from .utils import *
from .stats import stats
from .config import proxy_config, parse_dc_ip_list, start_cfproxy_domain_refresh, CFPROXY_DEFAULT_DOMAINS
from .config import proxy_config, parse_dc_ip_list, start_cfproxy_domain_refresh
from .bridge import MsgSplitter, CryptoCtx, do_fallback, bridge_ws_reencrypt
from .raw_websocket import RawWebSocket, WsHandshakeError, set_sock_opts
from .fake_tls import proxy_to_masking_domain, verify_client_hello, build_server_hello, FakeTlsStream, TLS_RECORD_HANDSHAKE
from .balancer import balancer
log = logging.getLogger('tg-mtproto-proxy')
@ -535,11 +535,8 @@ async def _run(stop_event: Optional[asyncio.Event] = None):
if proxy_config.fallback_cfproxy:
user = proxy_config.cfproxy_user_domain
if user:
proxy_config.cfproxy_domains = [user]
proxy_config.active_cfproxy_domain = user
balancer.update_domains_list([user])
else:
proxy_config.cfproxy_domains = list(CFPROXY_DEFAULT_DOMAINS)
proxy_config.active_cfproxy_domain = random.choice(CFPROXY_DEFAULT_DOMAINS)
start_cfproxy_domain_refresh()
secret_bytes = bytes.fromhex(proxy_config.secret)
@ -585,12 +582,11 @@ async def _run(stop_event: Optional[asyncio.Event] = None):
user_domain = "user" if proxy_config.cfproxy_user_domain else "auto"
log.info(" CF proxy: enabled (%s | %s)", prio, user_domain)
log.info("=" * 60)
log.info(" Connect links:")
log.info(" Connect:")
if ftls:
log.info(" ee (Fake TLS): %s", ee_link)
log.info(" %s", ee_link)
else:
log.info(" (standard): %s", proxy_config.secret)
log.info(" dd (random padding): %s", dd_link)
log.info(" %s", dd_link)
log.info("=" * 60)
async def log_stats():
@ -716,7 +712,6 @@ def main():
proxy_config.pool_size = max(0, args.pool_size)
proxy_config.fallback_cfproxy = not args.no_cfproxy
proxy_config.fallback_cfproxy_priority = args.cfproxy_priority
proxy_config.cfproxy_user_domain = args.cfproxy_domain
proxy_config.fake_tls_domain = args.fake_tls_domain.strip()
proxy_config.proxy_protocol = args.proxy_protocol

View file

@ -6,7 +6,7 @@ from dataclasses import dataclass
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
from proxy import __version__, get_link_host, parse_dc_ip_list
from proxy.config import proxy_config
from proxy.balancer import balancer
from utils.update_check import RELEASES_PAGE_URL, get_status
@ -451,7 +451,7 @@ def install_tray_config_form(
_threading.Thread(target=_worker, daemon=True).start()
else:
def _worker_auto():
ok_domain, res = _run_cfproxy_auto_test(proxy_config.cfproxy_domains)
ok_domain, res = _run_cfproxy_auto_test(balancer.domains)
if btn:
btn.after(0, lambda: btn.configure(text="Тест", state="normal"))
btn.after(0, lambda: _cfproxy_show_auto_test_results(ok_domain, res))