This commit is contained in:
m-hau 2026-06-24 08:11:26 +08:00 committed by GitHub
commit 619095dbdd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 52 additions and 9 deletions

View file

@ -28,6 +28,7 @@ ver. 1.1.1-dev-1 (20??/??/??) - development nightly edition
e. g. setting `blocktype="DROP"` via jail for action would now apply for IPv4 and IPv6 chains,
to submit different `blocktype` for IPv4 and IPv6 from jail, one can pass them like in this example:
`banaction = iptables-ipset[blocktype="...", blocktype?family=inet6="..."]`
* fixes restoring bans with custom failure-id
* `jail.conf`:
- default banactions need to be specified in `paths-*.conf` (maintainer level) now
- since stock fail2ban includes `paths-debian.conf` by default, banactions are `nftables`

View file

@ -288,7 +288,7 @@ class Actions(JailThread, Mapping):
if not isinstance(ip, IPAddr):
ipa = IPAddr(ip)
if not ipa.isSingle: # subnet (mask/cidr) or raw (may be dns/hostname):
ips = list(filter(ipa.contains, self.banManager.getBanList()))
ips = list(filter(ipa.contains, self.banManager.getBannedIPs()))
if ips:
return self.removeBannedIP(ips, db, ifexists)
# not found:

View file

@ -115,6 +115,9 @@ class BanManager:
) for t in lst]
return [t[0].getID() for t in lst]
def getBannedIPs(self):
return list(sorted(ticket.getIP() for ticket in self.__banList.values()))
##
# Returns a iterator to ban list (used in reload, so idle).
#

View file

@ -297,7 +297,7 @@ class Jail(object):
# mark ticked was restored from database - does not put it again into db:
ticket.restored = True
#logSys.debug('restored ticket: %s', ticket)
if self.filter._inIgnoreIPList(ticket.getID(), ticket): continue
if self.filter._inIgnoreIPList(ticket.getIP(), ticket): continue
# correct start time / ban time (by the same end of ban):
btm = ticket.getBanTime(forbantime)
diftm = MyTime.time() - ticket.getTime()

View file

@ -24,8 +24,10 @@ __author__ = "Cyril Jaquier"
__copyright__ = "Copyright (c) 2004 Cyril Jaquier"
__license__ = "GPL"
from typing import Union
from ..helpers import getLogger
from .ipdns import IPAddr
from .ipdns import IPAddr, asip
from .mytime import MyTime
# Gets the instance of the logger.
@ -56,8 +58,11 @@ class Ticket(object):
self._data = {'matches': matches or [], 'failures': 0}
if data is not None:
for k,v in data.items():
if v is not None:
self._data[k] = v
if v is None:
continue
if k == 'ip':
v = asip(v)
self._data[k] = v
if ticket:
# ticket available - copy whole information from ticket:
self.update(ticket)
@ -86,17 +91,25 @@ class Ticket(object):
if v is not None:
setattr(self, n, v)
def setID(self, value):
def setID(self, value: Union[str, IPAddr, tuple, None]) -> None:
# guarantee using IPAddr instead of unicode, str for the IP
if isinstance(value, str):
value = IPAddr(value)
if not isinstance(value, (IPAddr, tuple, type(None))):
raise TypeError(f"ID has unsupported type: {type(value).__name__}")
self._id = value
def getID(self):
def getID(self) -> Union[IPAddr, tuple, None]:
if isinstance(self._id, IPAddr):
return str(self._id)
return self._id
def getIP(self):
return self._data.get('ip', self._id)
def getIP(self) -> IPAddr:
if 'ip' in self._data:
return self._data['ip']
if isinstance(self._id, IPAddr):
return self._id
raise ValueError("No IP available")
def setTime(self, value):
self._time = value

View file

@ -26,6 +26,7 @@ from ..server.mytime import MyTime
import unittest
from ..server.ticket import Ticket, FailTicket, BanTicket
from ..server.ipdns import IPAddr
class TicketTests(unittest.TestCase):
@ -39,7 +40,9 @@ class TicketTests(unittest.TestCase):
# Ticket
t = Ticket('193.168.0.128', tm, matches)
self.assertIsInstance(t.getID(), str)
self.assertEqual(t.getID(), '193.168.0.128')
self.assertIsInstance(t.getIP(), IPAddr)
self.assertEqual(t.getIP(), '193.168.0.128')
self.assertEqual(t.getTime(), tm)
self.assertEqual(t.getMatches(), matches2)
@ -61,12 +64,18 @@ class TicketTests(unittest.TestCase):
self.assertFalse(t.isTimedOut(tm + 60 + 1))
t.setBanTime(60)
# wrong id type causes error
with self.assertRaises(TypeError):
Ticket(id=123)
# BanTicket
tm = MyTime.time()
matches = ['first', 'second']
ft = FailTicket('193.168.0.128', tm, matches)
ft.setBanTime(60*60)
self.assertIsInstance(t.getID(), str)
self.assertEqual(ft.getID(), '193.168.0.128')
self.assertIsInstance(t.getIP(), IPAddr)
self.assertEqual(ft.getIP(), '193.168.0.128')
self.assertEqual(ft.getTime(), tm)
self.assertEqual(ft.getMatches(), matches2)
@ -110,6 +119,8 @@ class TicketTests(unittest.TestCase):
# copy all from another ticket:
ft2 = FailTicket(ticket=ft)
self.assertEqual(ft, ft2)
self.assertEqual(ft.getID(), ft2.getID())
self.assertEqual(ft.getIP(), ft2.getIP())
self.assertEqual(ft.getData(), ft2.getData())
self.assertEqual(ft2.getAttempt(), 4)
self.assertEqual(ft2.getRetry(), 7)
@ -122,13 +133,28 @@ class TicketTests(unittest.TestCase):
tm = MyTime.time()
# different ID (string) and IP:
t = Ticket('123-456-678', tm, data={'ip':'192.0.2.1'})
self.assertIsInstance(t.getID(), str)
self.assertEqual(t.getID(), '123-456-678')
self.assertIsInstance(t.getIP(), IPAddr)
self.assertEqual(t.getIP(), '192.0.2.1')
# different ID (tuple) and IP:
t = Ticket(('192.0.2.1', '5000'), tm, data={'ip':'192.0.2.1'})
self.assertIsInstance(t.getID(), tuple)
self.assertIsInstance(t.getID()[0], str)
self.assertIsInstance(t.getID()[1], str)
self.assertEqual(t.getID(), ('192.0.2.1', '5000'))
self.assertIsInstance(t.getIP(), IPAddr)
self.assertEqual(t.getIP(), '192.0.2.1')
# invalid ip type causes an error
with self.assertRaises(TypeError):
Ticket('123-456-789', tm, data={'ip':192021})
# no IPAddr causes an error
t = Ticket(('192.0.2.1', '5000'), tm, data={})
with self.assertRaises(ValueError):
t.getIP()
def testTicketFlags(self):
flags = ('restored', 'banned')
ticket = Ticket('test', 0)