Email: Check custom SMTP IP at usage time

This commit is contained in:
pajowu
2026-04-10 10:55:07 +02:00
committed by Raphael Michel
parent f50548cd02
commit 0bb04ca8f0
3 changed files with 199 additions and 1 deletions

View File

@@ -19,7 +19,10 @@
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see # You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>. # <https://www.gnu.org/licenses/>.
# #
import ipaddress
import logging import logging
import smtplib
import socket
from itertools import groupby from itertools import groupby
from smtplib import SMTPResponseException from smtplib import SMTPResponseException
from typing import TypeVar from typing import TypeVar
@@ -237,3 +240,80 @@ def base_renderers(sender, **kwargs):
def get_email_context(**kwargs): def get_email_context(**kwargs):
return PlaceholderContext(**kwargs).render_all() return PlaceholderContext(**kwargs).render_all()
def create_connection(address, timeout=socket.getdefaulttimeout(),
source_address=None, *, all_errors=False):
# Taken from the python stdlib, extended with a check for local ips
host, port = address
exceptions = []
for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
if not settings.get("MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS", False):
ip_addr = ipaddress.ip_address(sa[0])
if ip_addr.is_multicast:
raise socket.error(f"Request to multicast address {sa[0]} blocked")
if ip_addr.is_loopback or ip_addr.is_link_local:
raise socket.error(f"Request to local address {sa[0]} blocked")
if ip_addr.is_private:
raise socket.error(f"Request to private address {sa[0]} blocked")
sock = None
try:
sock = socket.socket(af, socktype, proto)
if timeout is not socket.getdefaulttimeout():
sock.settimeout(timeout)
if source_address:
sock.bind(source_address)
sock.connect(sa)
# Break explicitly a reference cycle
exceptions.clear()
return sock
except socket.error as exc:
if not all_errors:
exceptions.clear() # raise only the last error
exceptions.append(exc)
if sock is not None:
sock.close()
if len(exceptions):
try:
if not all_errors:
raise exceptions[0]
raise ExceptionGroup("create_connection failed", exceptions)
finally:
# Break explicitly a reference cycle
exceptions.clear()
else:
raise socket.error("getaddrinfo returns an empty list")
class CheckPrivateNetworkMixin:
# _get_socket taken 1:1 from smtplib, just with a call to our own create_connection
def _get_socket(self, host, port, timeout):
# This makes it simpler for SMTP_SSL to use the SMTP connect code
# and just alter the socket connection bit.
if timeout is not None and not timeout:
raise ValueError('Non-blocking socket (timeout=0) is not supported')
if self.debuglevel > 0:
self._print_debug('connect: to', (host, port), self.source_address)
return create_connection((host, port), timeout, self.source_address)
class SMTP(CheckPrivateNetworkMixin, smtplib.SMTP):
pass
# SMTP used here instead of mixin, because smtp.SMTP_SSL._get_socket calls super()._get_socket and then wraps this socket
# super()._get_socket needs to be our version from the mixin
class SMTP_SSL(smtplib.SMTP_SSL, SMTP): # noqa: N801
pass
class CheckPrivateNetworkSmtpBackend(EmailBackend):
@property
def connection_class(self):
return SMTP_SSL if self.use_ssl else SMTP

View File

@@ -264,7 +264,8 @@ EMAIL_HOST_PASSWORD = config.get('mail', 'password', fallback='')
EMAIL_USE_TLS = config.getboolean('mail', 'tls', fallback=False) EMAIL_USE_TLS = config.getboolean('mail', 'tls', fallback=False)
EMAIL_USE_SSL = config.getboolean('mail', 'ssl', fallback=False) EMAIL_USE_SSL = config.getboolean('mail', 'ssl', fallback=False)
EMAIL_SUBJECT_PREFIX = '[pretix] ' EMAIL_SUBJECT_PREFIX = '[pretix] '
EMAIL_BACKEND = EMAIL_CUSTOM_SMTP_BACKEND = 'django.core.mail.backends.smtp.EmailBackend' EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_CUSTOM_SMTP_BACKEND = 'pretixbase.email.CheckPrivateNetworkSmtpBackend'
EMAIL_TIMEOUT = 60 EMAIL_TIMEOUT = 60
ADMINS = [('Admin', n) for n in config.get('mail', 'admins', fallback='').split(",") if n] ADMINS = [('Admin', n) for n in config.get('mail', 'admins', fallback='').split(",") if n]

View File

@@ -35,8 +35,11 @@
import datetime import datetime
import os import os
import re import re
import socket
from contextlib import contextmanager
from decimal import Decimal from decimal import Decimal
from email.mime.text import MIMEText from email.mime.text import MIMEText
from unittest import mock
import pytest import pytest
from django.conf import settings from django.conf import settings
@@ -591,3 +594,117 @@ def test_attached_ical_localization(env, order):
assert len(djmail.outbox) == 1 assert len(djmail.outbox) == 1
assert len(djmail.outbox[0].attachments) == 1 assert len(djmail.outbox[0].attachments) == 1
assert description in djmail.outbox[0].attachments[0][1] assert description in djmail.outbox[0].attachments[0][1]
PRIVATE_IPS_RES = [
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('10.0.0.3', 443))],
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('0.0.0.0', 443))],
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('127.1.1.1', 443))],
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.5.3', 443))],
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('224.0.0.1', 443))],
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('::1', 443, 0, 0))],
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('fe80::1', 443, 0, 0))],
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('ff00::1', 443, 0, 0))],
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('fc00::1', 443, 0, 0))],
]
@contextmanager
def test_mail_connection(res, should_connect, use_ssl):
with (
mock.patch('socket.socket') as mock_socket,
mock.patch('socket.getaddrinfo', return_value=res),
mock.patch('smtplib.SMTP.getreply', return_value=(220, "")),
mock.patch('smtplib.SMTP.sendmail'),
mock.patch('ssl.SSLContext.wrap_socket') as mock_ssl
):
yield
if should_connect:
mock_socket.assert_called_once()
mock_socket.return_value.connect.assert_called_once_with(res[0][-1])
if use_ssl:
mock_ssl.assert_called_once()
else:
mock_socket.assert_not_called()
mock_socket.return_value.connect.assert_not_called()
mock_ssl.assert_not_called()
@pytest.mark.parametrize("res", PRIVATE_IPS_RES)
@pytest.mark.parametrize("use_ssl", [
True, False
])
def test_private_smtp_ip(res, use_ssl, settings):
settings.EMAIL_CUSTOM_SMTP_BACKEND = 'pretix.base.email.CheckPrivateNetworkSmtpBackend'
settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = False
with test_mail_connection(res=res, should_connect=False, use_ssl=use_ssl), pytest.raises(match="Request to .* blocked"):
connection = djmail.get_connection(backend=settings.EMAIL_CUSTOM_SMTP_BACKEND,
host="localhost",
use_ssl=use_ssl)
connection.open()
settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = True
with test_mail_connection(res=res, should_connect=True, use_ssl=use_ssl):
connection = djmail.get_connection(backend=settings.EMAIL_CUSTOM_SMTP_BACKEND,
host="localhost",
use_ssl=use_ssl)
connection.open()
@pytest.mark.parametrize("use_ssl", [
True, False
])
@pytest.mark.parametrize("allow_private", [
True, False
])
def test_public_smtp_ip(use_ssl, allow_private, settings):
settings.EMAIL_CUSTOM_SMTP_BACKEND = 'pretix.base.email.CheckPrivateNetworkSmtpBackend'
settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = allow_private
with test_mail_connection(res=[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('8.8.8.8', 443))], should_connect=True, use_ssl=use_ssl):
connection = djmail.get_connection(backend=settings.EMAIL_CUSTOM_SMTP_BACKEND,
host="localhost",
use_ssl=use_ssl)
connection.open()
@pytest.mark.django_db
@pytest.mark.parametrize("use_ssl", [
True, False
])
@pytest.mark.parametrize("allow_private_networks", [
True, False
])
@pytest.mark.parametrize("res", PRIVATE_IPS_RES)
def test_send_mail_private_ip(res, use_ssl, allow_private_networks, env):
settings.EMAIL_CUSTOM_SMTP_BACKEND = 'pretix.base.email.CheckPrivateNetworkSmtpBackend'
settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = allow_private_networks
event, user, organizer = env
event.settings.smtp_use_custom = True
event.settings.smtp_host = "example.com"
event.settings.smtp_use_ssl = use_ssl
event.settings.smtp_use_tls = False
def send_mail():
m = OutgoingMail.objects.create(
to=['recipient@example.com'],
subject='Test',
body_plain='Test',
sender='sender@example.com',
event=event
)
assert m.status == OutgoingMail.STATUS_QUEUED
mail_send_task.apply(kwargs={
'outgoing_mail': m.pk,
}, max_retries=0)
m.refresh_from_db()
return m
with test_mail_connection(res=res, should_connect=allow_private_networks, use_ssl=use_ssl):
m = send_mail()
if allow_private_networks:
assert m.status == OutgoingMail.STATUS_SENT
else:
assert m.status == OutgoingMail.STATUS_FAILED