From 0bb04ca8f039dbe6e1ff8b69dc3d9b4e36018890 Mon Sep 17 00:00:00 2001 From: pajowu Date: Fri, 10 Apr 2026 10:55:07 +0200 Subject: [PATCH] Email: Check custom SMTP IP at usage time --- src/pretix/base/email.py | 80 ++++++++++++++++++++++++ src/pretix/settings.py | 3 +- src/tests/base/test_mail.py | 117 ++++++++++++++++++++++++++++++++++++ 3 files changed, 199 insertions(+), 1 deletion(-) diff --git a/src/pretix/base/email.py b/src/pretix/base/email.py index 155b296573..9be1d204c3 100644 --- a/src/pretix/base/email.py +++ b/src/pretix/base/email.py @@ -19,7 +19,10 @@ # You should have received a copy of the GNU Affero General Public License along with this program. If not, see # . # +import ipaddress import logging +import smtplib +import socket from itertools import groupby from smtplib import SMTPResponseException from typing import TypeVar @@ -237,3 +240,80 @@ def base_renderers(sender, **kwargs): def get_email_context(**kwargs): return PlaceholderContext(**kwargs).render_all() + + +def create_connection(address, timeout=socket.getdefaulttimeout(), + source_address=None, *, all_errors=False): + # Taken from the python stdlib, extended with a check for local ips + + host, port = address + exceptions = [] + for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM): + af, socktype, proto, canonname, sa = res + + if not settings.get("MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS", False): + ip_addr = ipaddress.ip_address(sa[0]) + if ip_addr.is_multicast: + raise socket.error(f"Request to multicast address {sa[0]} blocked") + if ip_addr.is_loopback or ip_addr.is_link_local: + raise socket.error(f"Request to local address {sa[0]} blocked") + if ip_addr.is_private: + raise socket.error(f"Request to private address {sa[0]} blocked") + + sock = None + try: + sock = socket.socket(af, socktype, proto) + if timeout is not socket.getdefaulttimeout(): + sock.settimeout(timeout) + if source_address: + sock.bind(source_address) + sock.connect(sa) + # Break explicitly a reference cycle + exceptions.clear() + return sock + + except socket.error as exc: + if not all_errors: + exceptions.clear() # raise only the last error + exceptions.append(exc) + if sock is not None: + sock.close() + + if len(exceptions): + try: + if not all_errors: + raise exceptions[0] + raise ExceptionGroup("create_connection failed", exceptions) + finally: + # Break explicitly a reference cycle + exceptions.clear() + else: + raise socket.error("getaddrinfo returns an empty list") + + +class CheckPrivateNetworkMixin: + # _get_socket taken 1:1 from smtplib, just with a call to our own create_connection + def _get_socket(self, host, port, timeout): + # This makes it simpler for SMTP_SSL to use the SMTP connect code + # and just alter the socket connection bit. + if timeout is not None and not timeout: + raise ValueError('Non-blocking socket (timeout=0) is not supported') + if self.debuglevel > 0: + self._print_debug('connect: to', (host, port), self.source_address) + return create_connection((host, port), timeout, self.source_address) + + +class SMTP(CheckPrivateNetworkMixin, smtplib.SMTP): + pass + + +# SMTP used here instead of mixin, because smtp.SMTP_SSL._get_socket calls super()._get_socket and then wraps this socket +# super()._get_socket needs to be our version from the mixin +class SMTP_SSL(smtplib.SMTP_SSL, SMTP): # noqa: N801 + pass + + +class CheckPrivateNetworkSmtpBackend(EmailBackend): + @property + def connection_class(self): + return SMTP_SSL if self.use_ssl else SMTP diff --git a/src/pretix/settings.py b/src/pretix/settings.py index d5ec121589..a798c55d33 100644 --- a/src/pretix/settings.py +++ b/src/pretix/settings.py @@ -264,7 +264,8 @@ EMAIL_HOST_PASSWORD = config.get('mail', 'password', fallback='') EMAIL_USE_TLS = config.getboolean('mail', 'tls', fallback=False) EMAIL_USE_SSL = config.getboolean('mail', 'ssl', fallback=False) EMAIL_SUBJECT_PREFIX = '[pretix] ' -EMAIL_BACKEND = EMAIL_CUSTOM_SMTP_BACKEND = 'django.core.mail.backends.smtp.EmailBackend' +EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend' +EMAIL_CUSTOM_SMTP_BACKEND = 'pretixbase.email.CheckPrivateNetworkSmtpBackend' EMAIL_TIMEOUT = 60 ADMINS = [('Admin', n) for n in config.get('mail', 'admins', fallback='').split(",") if n] diff --git a/src/tests/base/test_mail.py b/src/tests/base/test_mail.py index 8ccf0965bf..9f1f971e53 100644 --- a/src/tests/base/test_mail.py +++ b/src/tests/base/test_mail.py @@ -35,8 +35,11 @@ import datetime import os import re +import socket +from contextlib import contextmanager from decimal import Decimal from email.mime.text import MIMEText +from unittest import mock import pytest from django.conf import settings @@ -591,3 +594,117 @@ def test_attached_ical_localization(env, order): assert len(djmail.outbox) == 1 assert len(djmail.outbox[0].attachments) == 1 assert description in djmail.outbox[0].attachments[0][1] + + +PRIVATE_IPS_RES = [ + [(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('10.0.0.3', 443))], + [(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('0.0.0.0', 443))], + [(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('127.1.1.1', 443))], + [(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.5.3', 443))], + [(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('224.0.0.1', 443))], + [(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('::1', 443, 0, 0))], + [(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('fe80::1', 443, 0, 0))], + [(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('ff00::1', 443, 0, 0))], + [(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('fc00::1', 443, 0, 0))], +] + + +@contextmanager +def test_mail_connection(res, should_connect, use_ssl): + with ( + mock.patch('socket.socket') as mock_socket, + mock.patch('socket.getaddrinfo', return_value=res), + mock.patch('smtplib.SMTP.getreply', return_value=(220, "")), + mock.patch('smtplib.SMTP.sendmail'), + mock.patch('ssl.SSLContext.wrap_socket') as mock_ssl + ): + yield + + if should_connect: + mock_socket.assert_called_once() + mock_socket.return_value.connect.assert_called_once_with(res[0][-1]) + if use_ssl: + mock_ssl.assert_called_once() + else: + mock_socket.assert_not_called() + mock_socket.return_value.connect.assert_not_called() + mock_ssl.assert_not_called() + + +@pytest.mark.parametrize("res", PRIVATE_IPS_RES) +@pytest.mark.parametrize("use_ssl", [ + True, False +]) +def test_private_smtp_ip(res, use_ssl, settings): + settings.EMAIL_CUSTOM_SMTP_BACKEND = 'pretix.base.email.CheckPrivateNetworkSmtpBackend' + settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = False + with test_mail_connection(res=res, should_connect=False, use_ssl=use_ssl), pytest.raises(match="Request to .* blocked"): + connection = djmail.get_connection(backend=settings.EMAIL_CUSTOM_SMTP_BACKEND, + host="localhost", + use_ssl=use_ssl) + connection.open() + + settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = True + with test_mail_connection(res=res, should_connect=True, use_ssl=use_ssl): + connection = djmail.get_connection(backend=settings.EMAIL_CUSTOM_SMTP_BACKEND, + host="localhost", + use_ssl=use_ssl) + connection.open() + + +@pytest.mark.parametrize("use_ssl", [ + True, False +]) +@pytest.mark.parametrize("allow_private", [ + True, False +]) +def test_public_smtp_ip(use_ssl, allow_private, settings): + settings.EMAIL_CUSTOM_SMTP_BACKEND = 'pretix.base.email.CheckPrivateNetworkSmtpBackend' + settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = allow_private + + with test_mail_connection(res=[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('8.8.8.8', 443))], should_connect=True, use_ssl=use_ssl): + connection = djmail.get_connection(backend=settings.EMAIL_CUSTOM_SMTP_BACKEND, + host="localhost", + use_ssl=use_ssl) + connection.open() + + +@pytest.mark.django_db +@pytest.mark.parametrize("use_ssl", [ + True, False +]) +@pytest.mark.parametrize("allow_private_networks", [ + True, False +]) +@pytest.mark.parametrize("res", PRIVATE_IPS_RES) +def test_send_mail_private_ip(res, use_ssl, allow_private_networks, env): + settings.EMAIL_CUSTOM_SMTP_BACKEND = 'pretix.base.email.CheckPrivateNetworkSmtpBackend' + settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = allow_private_networks + + event, user, organizer = env + event.settings.smtp_use_custom = True + event.settings.smtp_host = "example.com" + event.settings.smtp_use_ssl = use_ssl + event.settings.smtp_use_tls = False + + def send_mail(): + m = OutgoingMail.objects.create( + to=['recipient@example.com'], + subject='Test', + body_plain='Test', + sender='sender@example.com', + event=event + ) + assert m.status == OutgoingMail.STATUS_QUEUED + mail_send_task.apply(kwargs={ + 'outgoing_mail': m.pk, + }, max_retries=0) + m.refresh_from_db() + return m + + with test_mail_connection(res=res, should_connect=allow_private_networks, use_ssl=use_ssl): + m = send_mail() + if allow_private_networks: + assert m.status == OutgoingMail.STATUS_SENT + else: + assert m.status == OutgoingMail.STATUS_FAILED