This commit is contained in:
BMO 2026-05-11 13:16:30 +02:00 committed by GitHub
commit a9f54bcf64
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -19,6 +19,7 @@
import socket
import smtplib
import textwrap
import email.policy
from email.message import EmailMessage
from email.utils import formatdate, formataddr
@ -71,172 +72,215 @@ Matches for %(ip)s for jail %(jailname)s:
class SMTPAction(ActionBase):
"""Fail2Ban action which sends emails to inform on jail starting,
stopping and bans.
"""
"""Fail2Ban action which sends emails to inform on jail starting,
stopping and bans.
"""
def __init__(
self, jail, name, host="localhost", ssl=False, user=None, password=None,
sendername="Fail2Ban", sender="fail2ban", dest="root", matches=None):
"""Initialise action.
def __init__(
self, jail, name, host="localhost", ssl=False, user=None, password=None,
sendername="Fail2Ban", sender="fail2ban", dest="root", matches=None):
"""Initialise action.
Parameters
----------
jail : Jail
The jail which the action belongs to.
name : str
Named assigned to the action.
host : str, optional
SMTP host, of host:port format. Default host "localhost" and
port "25"
ssl : bool, optional
Whether to use TLS for the SMTP connection or not. Default False.
user : str, optional
Username used for authentication with SMTP server.
password : str, optional
Password used for authentication with SMTP server.
sendername : str, optional
Name to use for from address in email. Default "Fail2Ban".
sender : str, optional
Email address to use for from address in email.
Default "fail2ban".
dest : str, optional
Email addresses of intended recipient(s) in comma space ", "
delimited format. Default "root".
matches : str, optional
Type of matches to be included from ban in email. Can be one
of "matches", "ipmatches" or "ipjailmatches". Default None
(see man jail.conf.5).
"""
Parameters
----------
jail : Jail
The jail which the action belongs to.
name : str
Named assigned to the action.
host : str, optional
SMTP host, of host:port format. Default host "localhost" and
port "25"
ssl : bool, optional
Whether to use TLS for the SMTP connection or not. Default False.
user : str, optional
Username used for authentication with SMTP server.
password : str, optional
Password used for authentication with SMTP server.
sendername : str, optional
Name to use for from address in email. Default "Fail2Ban".
sender : str, optional
Email address to use for from address in email.
Default "fail2ban".
dest : str, optional
Email addresses of intended recipient(s) in comma space ", "
delimited format. Default "root".
matches : str, optional
Type of matches to be included from ban in email. Can be one
of "matches", "ipmatches" or "ipjailmatches". Default None
(see man jail.conf.5).
"""
super(SMTPAction, self).__init__(jail, name)
super(SMTPAction, self).__init__(jail, name)
self.host = host
self.ssl = ssl
self.host = host
self.ssl = ssl
self.user = user
self.password =password
self.user = user
self.password =password
self.fromname = sendername
self.fromaddr = sender
self.toaddr = dest
self.fromname = sendername
self.fromaddr = sender
self.toaddr = dest
self.matches = matches
self.matches = matches
self.message_values = CallingMap(
jailname = self._jail.name,
hostname = socket.gethostname,
bantime = lambda: self._jail.actions.getBanTime(),
)
self.message_values = CallingMap(
jailname = self._jail.name,
hostname = socket.gethostname,
bantime = lambda: self._jail.actions.getBanTime(),
)
# bypass ban/unban for restored tickets
self.norestored = 1
# bypass ban/unban for restored tickets
self.norestored = 1
def _sendMessage(self, subject, text):
"""Sends message based on arguments and instance's properties.
@staticmethod
def _wrap_text(text, width=78):
"""Wraps long lines in text to comply with RFC 5321 / RFC 5322.
Parameters
----------
subject : str
Subject of the email.
text : str
Body of the email.
RFC 5321 requires that lines MUST NOT exceed 998 characters
(excluding the CRLF). RFC 5322 recommends wrapping at 78
characters for readability.
Raises
------
SMTPConnectionError
Error on connecting to host.
SMTPAuthenticationError
Error authenticating with SMTP server.
SMTPException
See Python `smtplib` for full list of other possible
exceptions.
"""
msg = EmailMessage(policy=email.policy.SMTP)
msg.set_content(text)
msg['Subject'] = subject
msg['From'] = formataddr((self.fromname, self.fromaddr))
msg['To'] = self.toaddr
msg['Date'] = formatdate()
This method wraps each line that exceeds `width` characters
while preserving existing paragraph breaks and blank lines.
smtp_host, smtp_port = self.host.split(':')
smtp = smtplib.SMTP(host=smtp_host, port=smtp_port)
try:
r = smtp.connect(host=smtp_host, port=smtp_port)
self._logSys.debug("Connected to SMTP '%s', response: %i: %s",
self.host, *r)
Parameters
----------
text : str
The text to wrap.
width : int, optional
Maximum line width. Default 78 per RFC 5322.
if self.ssl: # pragma: no cover
r = smtp.starttls()[0];
if r != 220: # pragma: no cover
raise Exception("Failed to starttls() on '%s': %s" % (self.host, r))
Returns
-------
str
The wrapped text.
"""
wrapper = textwrap.TextWrapper(
width=width,
replace_whitespace=False,
drop_whitespace=False,
expand_tabs=False,
)
wrapped_lines = []
for line in text.splitlines(True):
# Preserve blank lines and short lines as-is
if len(line.rstrip('\r\n')) <= width:
wrapped_lines.append(line)
else:
# Wrap long lines, preserving the original line ending
line_ending = '\n'
stripped = line.rstrip('\r\n')
wrapped = wrapper.fill(stripped)
wrapped_lines.append(wrapped + line_ending)
return ''.join(wrapped_lines)
if self.user and self.password: # pragma: no cover (ATM no tests covering that)
smtp.login(self.user, self.password)
failed_recipients = smtp.sendmail(
self.fromaddr, self.toaddr.split(", "), msg.as_string())
except smtplib.SMTPConnectError: # pragma: no cover
self._logSys.error("Error connecting to host '%s'", self.host)
raise
except smtplib.SMTPAuthenticationError: # pragma: no cover
self._logSys.error(
"Failed to authenticate with host '%s' user '%s'",
self.host, self.user)
raise
except smtplib.SMTPException: # pragma: no cover
self._logSys.error(
"Error sending mail to host '%s' from '%s' to '%s'",
self.host, self.fromaddr, self.toaddr)
raise
else:
if failed_recipients: # pragma: no cover
self._logSys.warning(
"Email to '%s' failed to following recipients: %r",
self.toaddr, failed_recipients)
self._logSys.debug("Email '%s' successfully sent", subject)
finally:
try:
self._logSys.debug("Disconnected from '%s', response %i: %s",
self.host, *smtp.quit())
except smtplib.SMTPServerDisconnected: # pragma: no cover
pass # Not connected
def _sendMessage(self, subject, text):
"""Sends message based on arguments and instance's properties.
def start(self):
"""Sends email to recipients informing that the jail has started.
"""
self._sendMessage(
"[Fail2Ban] %(jailname)s: started on %(hostname)s" %
self.message_values,
messages['start'] % self.message_values)
Parameters
----------
subject : str
Subject of the email.
text : str
Body of the email.
def stop(self):
"""Sends email to recipients informing that the jail has stopped.
"""
self._sendMessage(
"[Fail2Ban] %(jailname)s: stopped on %(hostname)s" %
self.message_values,
messages['stop'] % self.message_values)
Raises
------
SMTPConnectionError
Error on connecting to host.
SMTPAuthenticationError
Error authenticating with SMTP server.
SMTPException
See Python `smtplib` for full list of other possible
exceptions.
"""
text = self._wrap_text(text)
msg = EmailMessage(policy=email.policy.SMTP)
msg.set_content(text)
msg['Subject'] = subject
msg['From'] = formataddr((self.fromname, self.fromaddr))
msg['To'] = self.toaddr
msg['Date'] = formatdate()
def ban(self, aInfo):
"""Sends email to recipients informing that ban has occurred.
smtp_host, smtp_port = self.host.split(':')
smtp = smtplib.SMTP(host=smtp_host, port=smtp_port)
try:
r = smtp.connect(host=smtp_host, port=smtp_port)
self._logSys.debug("Connected to SMTP '%s', response: %i: %s",
self.host, *r)
Parameters
----------
aInfo : dict
Dictionary which includes information in relation to
the ban.
"""
if aInfo.get('restored'):
return
aInfo.update(self.message_values)
message = "".join([
messages['ban']['head'],
messages['ban'].get(self.matches, ""),
messages['ban']['tail']
])
self._sendMessage(
"[Fail2Ban] %(jailname)s: banned %(ip)s from %(hostname)s" %
aInfo,
message % aInfo)
if self.ssl: # pragma: no cover
r = smtp.starttls()[0];
if r != 220: # pragma: no cover
raise Exception("Failed to starttls() on '%s': %s" % (self.host, r))
if self.user and self.password: # pragma: no cover (ATM no tests covering that)
smtp.login(self.user, self.password)
failed_recipients = smtp.sendmail(
self.fromaddr, self.toaddr.split(", "), msg.as_string())
except smtplib.SMTPConnectError: # pragma: no cover
self._logSys.error("Error connecting to host '%s'", self.host)
raise
except smtplib.SMTPAuthenticationError: # pragma: no cover
self._logSys.error(
"Failed to authenticate with host '%s' user '%s'",
self.host, self.user)
raise
except smtplib.SMTPException: # pragma: no cover
self._logSys.error(
"Error sending mail to host '%s' from '%s' to '%s'",
self.host, self.fromaddr, self.toaddr)
raise
else:
if failed_recipients: # pragma: no cover
self._logSys.warning(
"Email to '%s' failed to following recipients: %r",
self.toaddr, failed_recipients)
self._logSys.debug("Email '%s' successfully sent", subject)
finally:
try:
self._logSys.debug("Disconnected from '%s', response %i: %s",
self.host, *smtp.quit())
except smtplib.SMTPServerDisconnected: # pragma: no cover
pass # Not connected
def start(self):
"""Sends email to recipients informing that the jail has started.
"""
self._sendMessage(
"[Fail2Ban] %(jailname)s: started on %(hostname)s" %
self.message_values,
messages['start'] % self.message_values)
def stop(self):
"""Sends email to recipients informing that the jail has stopped.
"""
self._sendMessage(
"[Fail2Ban] %(jailname)s: stopped on %(hostname)s" %
self.message_values,
messages['stop'] % self.message_values)
def ban(self, aInfo):
"""Sends email to recipients informing that ban has occurred.
Parameters
----------
aInfo : dict
Dictionary which includes information in relation to
the ban.
"""
if aInfo.get('restored'):
return
aInfo.update(self.message_values)
message = "".join([
messages['ban']['head'],
messages['ban'].get(self.matches, ""),
messages['ban']['tail']
])
self._sendMessage(
"[Fail2Ban] %(jailname)s: banned %(ip)s from %(hostname)s" %
aInfo,
message % aInfo)
Action = SMTPAction