Added path request frequency monitoring support to interfaces subsystem

This commit is contained in:
Mark Qvist 2026-05-09 00:51:44 +02:00
parent ef1ecb35e1
commit 8ed31d0dc8
6 changed files with 157 additions and 10 deletions

View file

@ -156,6 +156,32 @@ class BackboneInterface(Interface):
else:
raise SystemError("Insufficient parameters to create listener")
__last_ic_burst_check = 0
__last_ic_burst_state = False
@property
def ic_burst_active(self):
if time.time() > self.__last_ic_burst_check + 2:
self.__last_ic_burst_state = any(i.ic_burst_active for i in self.spawned_interfaces)
return self.__last_ic_burst_state
@ic_burst_active.setter
def ic_burst_active(self, value): pass
__ic_burst_activated_check = 0
__ic_burst_activated = 0
@property
def ic_burst_activated(self):
if time.time() > self.__ic_burst_activated_check + 2:
activated = [i.ic_burst_activated for i in self.spawned_interfaces if i.ic_burst_active]
if activated: self.__ic_burst_activated = min(activated)
return self.__ic_burst_activated
@ic_burst_activated.setter
def ic_burst_activated(self, value): pass
@staticmethod
def start():
if not BackboneInterface._job_active: threading.Thread(target=BackboneInterface.__job, daemon=True).start()
@ -206,7 +232,7 @@ class BackboneInterface(Interface):
@staticmethod
def deregister_fileno(fileno):
if fileno < 0:
RNS.log(f"Attempt to deregister invalid file descriptor {fileno}", RNS.LOG_ERROR)
RNS.log(f"Attempt to deregister invalid file descriptor {fileno}", RNS.LOG_WARNING)
return
try: BackboneInterface.epoll.unregister(fileno)
@ -320,11 +346,17 @@ class BackboneInterface(Interface):
elif fileno in BackboneInterface.listener_filenos:
owner_interface, server_socket = BackboneInterface.listener_filenos[fileno]
if fileno == server_socket.fileno() and (event & select.EPOLLIN):
client_socket, address = server_socket.accept()
client_socket.setblocking(0)
if not owner_interface.incoming_connection(client_socket):
try:
client_socket, address = server_socket.accept()
client_socket.setblocking(0)
if not owner_interface.incoming_connection(client_socket):
try: client_socket.close()
except Exception as e: RNS.log(f"Error while closing socket for failed incoming connection: {e}", RNS.LOG_ERROR)
except:
RNS.log(f"Accepting socket failed for incoming connection: {e}", RNS.LOG_WARNING)
try: client_socket.close()
except Exception as e: RNS.log(f"Error while closing socket for failed incoming connection: {e}", RNS.LOG_ERROR)
except Exception as e: RNS.log(f"Error while closing socket for failed incoming socket accept: {e}", RNS.LOG_WARNING)
elif fileno == server_socket.fileno() and (event & select.EPOLLHUP):
try: BackboneInterface.deregister_fileno(fileno)
@ -408,6 +440,12 @@ class BackboneInterface(Interface):
def sent_announce(self, from_spawned=False):
if from_spawned: self.oa_freq_deque.append(time.time())
def received_path_request(self, from_spawned=False):
if from_spawned: self.ip_freq_deque.append(time.time())
def sent_path_request(self, from_spawned=False):
if from_spawned: self.op_freq_deque.append(time.time())
def process_outgoing(self, data):
pass

View file

@ -1003,6 +1003,12 @@ class I2PInterface(Interface):
def sent_announce(self, from_spawned=False):
if from_spawned: self.oa_freq_deque.append(time.time())
def received_path_request(self, from_spawned=False):
if from_spawned: self.ip_freq_deque.append(time.time())
def sent_path_request(self, from_spawned=False):
if from_spawned: self.op_freq_deque.append(time.time())
def detach(self):
RNS.log("Detaching "+str(self), RNS.LOG_DEBUG)
self.i2p.stop()

View file

@ -55,8 +55,15 @@ class Interface:
# How many samples to use for announce
# frequency calculations
IA_FREQ_SAMPLES = 128
OA_FREQ_SAMPLES = 128
IA_FREQ_SAMPLES = 48
OA_FREQ_SAMPLES = 48
IP_FREQ_SAMPLES = 48
OP_FREQ_SAMPLES = 48
AR_MINFREQ_HZ = 0.1
PR_MINFREQ_HZ = 0.1
AR_FREQ_DECAY = 1/AR_MINFREQ_HZ
PR_FREQ_DECAY = 1/PR_MINFREQ_HZ
# Maximum amount of ingress limited announces
# to hold at any given time.
@ -68,10 +75,15 @@ class Interface:
IC_NEW_TIME = 2*60*60
IC_BURST_FREQ_NEW = 3
IC_BURST_FREQ = 10
IC_PR_BURST_FREQ_NEW = 3
IC_PR_BURST_FREQ = 10
IC_BURST_HOLD = 15
IC_BURST_PENALTY = 15
IC_HELD_RELEASE_INTERVAL = 5
IC_DEQUE_MIN_SAMPLE = 32
IC_DEQUE_MIN_SAMPLE = 2
IC_BURST_MIN_SAMPLES = 8
EC_PR_FREQ = 5
EGRESS_CONTROL = False
# Default announce rate targets
DEFAULT_AR_TARGET = 3600
@ -102,18 +114,26 @@ class Interface:
self.ic_burst_active = False
self.ic_burst_activated = 0
self.ic_pr_burst_active = False
self.ic_pr_burst_activated = 0
self.ic_held_release = 0
self.ic_max_held_announces = RNS.Reticulum.get_instance()._default_ic_max_held_announces()
self.ic_burst_hold = RNS.Reticulum.get_instance()._default_ic_burst_hold()
self.ic_burst_freq_new = RNS.Reticulum.get_instance()._default_ic_burst_freq_new()
self.ic_burst_freq = RNS.Reticulum.get_instance()._default_ic_burst_freq()
self.ic_pr_burst_freq_new = RNS.Reticulum.get_instance()._default_ic_pr_burst_freq_new()
self.ic_pr_burst_freq = RNS.Reticulum.get_instance()._default_ic_pr_burst_freq()
self.ic_new_time = RNS.Reticulum.get_instance()._default_ic_new_time()
self.ic_burst_penalty = RNS.Reticulum.get_instance()._default_ic_burst_penalty()
self.ic_held_release_interval = RNS.Reticulum.get_instance()._default_ic_held_release_interval()
self.ec_pr_freq = RNS.Reticulum.get_instance()._default_ec_pr_freq()
self.egress_control = RNS.Reticulum.get_instance()._default_egress_control()
self.held_announces = {}
self.ia_freq_deque = deque(maxlen=Interface.IA_FREQ_SAMPLES)
self.oa_freq_deque = deque(maxlen=Interface.OA_FREQ_SAMPLES)
self.ip_freq_deque = deque(maxlen=Interface.IA_FREQ_SAMPLES)
self.op_freq_deque = deque(maxlen=Interface.OA_FREQ_SAMPLES)
def get_hash(self):
return RNS.Identity.full_hash(str(self).encode("utf-8"))
@ -129,7 +149,7 @@ class Interface:
if self.ic_burst_active:
if ia_freq < freq_threshold and time.time() > self.ic_burst_activated+self.ic_burst_hold:
self.ic_burst_active = False
if len(self.ia_freq_deque) >= self.IC_BURST_MIN_SAMPLES: self.ic_burst_active = False
return True
@ -144,6 +164,37 @@ class Interface:
else: return False
def should_ingress_limit_pr(self):
if self.ingress_control:
freq_threshold = self.ic_pr_burst_freq_new if self.age() < self.ic_new_time else self.ic_pr_burst_freq
ip_freq = self.incoming_pr_frequency()
if self.ic_pr_burst_active:
if ip_freq < freq_threshold and time.time() > self.ic_pr_burst_activated+self.ic_burst_hold:
self.ic_pr_burst_active = False
return True
else:
if ip_freq > freq_threshold:
self.ic_pr_burst_active = True
self.ic_pr_burst_activated = time.time()
return True
else: return False
else: return False
def should_egress_limit_pr(self):
if self.egress_control:
freq_threshold = self.ec_pr_freq
op_freq = self.outgoing_pr_frequency()
if op_freq > freq_threshold:
if len(self.op_freq_deque) >= self.IC_BURST_MIN_SAMPLES: return True
return False
def optimise_mtu(self):
if self.AUTOCONFIGURE_MTU:
if self.bitrate >= 1_000_000_000:
@ -169,7 +220,7 @@ class Interface:
else:
self.HW_MTU = None
RNS.log(f"{self} hardware MTU set to {self.HW_MTU}", RNS.LOG_DEBUG) # TODO: Remove debug
RNS.log(f"{self} hardware MTU set to {self.HW_MTU}", RNS.LOG_DEBUG)
def age(self):
return time.time()-self.created
@ -215,12 +266,23 @@ class Interface:
if hasattr(self, "parent_interface") and self.parent_interface != None:
self.parent_interface.sent_announce(from_spawned=True)
def received_path_request(self, from_spawned=False):
self.ip_freq_deque.append(time.time())
if hasattr(self, "parent_interface") and self.parent_interface != None:
self.parent_interface.received_path_request(from_spawned=True)
def sent_path_request(self, from_spawned=False):
self.op_freq_deque.append(time.time())
if hasattr(self, "parent_interface") and self.parent_interface != None:
self.parent_interface.sent_path_request(from_spawned=True)
def incoming_announce_frequency(self):
n = len(self.ia_freq_deque)
if not n > self.IC_DEQUE_MIN_SAMPLE: return 0
else:
oldest = self.ia_freq_deque[0]
span = time.time() - oldest
if span > self.AR_FREQ_DECAY: self.ia_freq_deque.popleft()
if span <= 0: return 0
hz = n / span
return hz
@ -231,6 +293,29 @@ class Interface:
else:
oldest = self.oa_freq_deque[0]
span = time.time() - oldest
if span > self.AR_FREQ_DECAY: self.oa_freq_deque.popleft()
if span <= 0: return 0
hz = n / span
return hz
def incoming_pr_frequency(self):
n = len(self.ip_freq_deque)
if not n > self.IC_DEQUE_MIN_SAMPLE: return 0
else:
oldest = self.ip_freq_deque[0]
span = time.time() - oldest
if span > self.PR_FREQ_DECAY: self.ip_freq_deque.popleft()
if span <= 0: return 0
hz = n / span
return hz
def outgoing_pr_frequency(self):
n = len(self.op_freq_deque)
if not len(self.op_freq_deque) > 1: return 0
else:
oldest = self.op_freq_deque[0]
span = time.time() - oldest
if span > self.PR_FREQ_DECAY: self.op_freq_deque.popleft()
if span <= 0: return 0
hz = n / span
return hz

View file

@ -488,6 +488,12 @@ class LocalServerInterface(Interface):
def sent_announce(self, from_spawned=False):
if from_spawned: self.oa_freq_deque.append(time.time())
def received_path_request(self, from_spawned=False):
if from_spawned: self.ip_freq_deque.append(time.time())
def sent_path_request(self, from_spawned=False):
if from_spawned: self.op_freq_deque.append(time.time())
def __str__(self):
if self.socket_path: return "Shared Instance["+str(self.socket_path.replace("\0", ""))+"]"
else: return "Shared Instance["+str(self.bind_port)+"]"

View file

@ -549,6 +549,12 @@ class RNodeMultiInterface(Interface):
def sent_announce(self, from_spawned=False):
if from_spawned: self.oa_freq_deque.append(time.time())
def received_path_request(self, from_spawned=False):
if from_spawned: self.ip_freq_deque.append(time.time())
def sent_path_request(self, from_spawned=False):
if from_spawned: self.op_freq_deque.append(time.time())
def readLoop(self):
try:
in_frame = False

View file

@ -634,6 +634,12 @@ class TCPServerInterface(Interface):
def sent_announce(self, from_spawned=False):
if from_spawned: self.oa_freq_deque.append(time.time())
def received_path_request(self, from_spawned=False):
if from_spawned: self.ip_freq_deque.append(time.time())
def sent_path_request(self, from_spawned=False):
if from_spawned: self.op_freq_deque.append(time.time())
def process_outgoing(self, data):
pass