Compare commits

...

3 Commits

Author SHA1 Message Date
pajowu
f66f97bee1 Check custom smtp ip at usage too (#6073) 2026-04-10 10:55:07 +02:00
Raphael Michel
998a62add2 Fix crash on build 2026-04-10 09:45:47 +02:00
Raphael Michel
614408f5ae Add default protection for SSRF 2026-03-26 14:05:41 +01:00
5 changed files with 425 additions and 1 deletions

View File

@@ -19,7 +19,10 @@
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
#
import ipaddress
import logging
import smtplib
import socket
from itertools import groupby
from smtplib import SMTPResponseException
from typing import TypeVar
@@ -237,3 +240,80 @@ def base_renderers(sender, **kwargs):
def get_email_context(**kwargs):
return PlaceholderContext(**kwargs).render_all()
def create_connection(address, timeout=socket.getdefaulttimeout(),
source_address=None, *, all_errors=False):
# Taken from the python stdlib, extended with a check for local ips
host, port = address
exceptions = []
for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
if not settings.get("MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS", False):
ip_addr = ipaddress.ip_address(sa[0])
if ip_addr.is_multicast:
raise socket.error(f"Request to multicast address {sa[0]} blocked")
if ip_addr.is_loopback or ip_addr.is_link_local:
raise socket.error(f"Request to local address {sa[0]} blocked")
if ip_addr.is_private:
raise socket.error(f"Request to private address {sa[0]} blocked")
sock = None
try:
sock = socket.socket(af, socktype, proto)
if timeout is not socket.getdefaulttimeout():
sock.settimeout(timeout)
if source_address:
sock.bind(source_address)
sock.connect(sa)
# Break explicitly a reference cycle
exceptions.clear()
return sock
except socket.error as exc:
if not all_errors:
exceptions.clear() # raise only the last error
exceptions.append(exc)
if sock is not None:
sock.close()
if len(exceptions):
try:
if not all_errors:
raise exceptions[0]
raise ExceptionGroup("create_connection failed", exceptions)
finally:
# Break explicitly a reference cycle
exceptions.clear()
else:
raise socket.error("getaddrinfo returns an empty list")
class CheckPrivateNetworkMixin:
# _get_socket taken 1:1 from smtplib, just with a call to our own create_connection
def _get_socket(self, host, port, timeout):
# This makes it simpler for SMTP_SSL to use the SMTP connect code
# and just alter the socket connection bit.
if timeout is not None and not timeout:
raise ValueError('Non-blocking socket (timeout=0) is not supported')
if self.debuglevel > 0:
self._print_debug('connect: to', (host, port), self.source_address)
return create_connection((host, port), timeout, self.source_address)
class SMTP(CheckPrivateNetworkMixin, smtplib.SMTP):
pass
# SMTP used here instead of mixin, because smtp.SMTP_SSL._get_socket calls super()._get_socket and then wraps this socket
# super()._get_socket needs to be our version from the mixin
class SMTP_SSL(smtplib.SMTP_SSL, SMTP): # noqa: N801
pass
class CheckPrivateNetworkSmtpBackend(EmailBackend):
@property
def connection_class(self):
return SMTP_SSL if self.use_ssl else SMTP

View File

@@ -19,12 +19,26 @@
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
#
import ipaddress
import socket
import sys
import types
from datetime import datetime
from http import cookies
from django.conf import settings
from PIL import Image
from requests.adapters import HTTPAdapter
from urllib3.connection import HTTPConnection, HTTPSConnection
from urllib3.connectionpool import HTTPConnectionPool, HTTPSConnectionPool
from urllib3.exceptions import (
ConnectTimeoutError, HTTPError, LocationParseError, NameResolutionError,
NewConnectionError,
)
from urllib3.util.connection import (
_TYPE_SOCKET_OPTIONS, _set_socket_options, allowed_gai_family,
)
from urllib3.util.timeout import _DEFAULT_TIMEOUT
def monkeypatch_vobject_performance():
@@ -89,6 +103,123 @@ def monkeypatch_requests_timeout():
HTTPAdapter.send = httpadapter_send
def monkeypatch_urllib3_ssrf_protection():
"""
pretix allows HTTP requests to untrusted URLs, e.g. through webhooks or external API URLs. This is dangerous since
it can allow access to private networks that should not be reachable by users ("server-side request forgery", SSRF).
Validating URLs at submission is not sufficient, since with DNS rebinding an attacker can make a domain name pass
validation and then resolve to a private IP address on actual execution. Unfortunately, there seems no clean solution
to this in Python land, so we monkeypatch urllib3's connection management to check the IP address to be external
*after* the DNS resolution.
This does not work when a global http(s) proxy is used, but in that scenario the proxy can perform the validation.
"""
if getattr(settings, "ALLOW_HTTP_TO_PRIVATE_NETWORKS", False):
# Settings are not supposed to change during runtime, so we can optimize performance and complexity by skipping
# this if not needed.
return
def create_connection(
address: tuple[str, int],
timeout=_DEFAULT_TIMEOUT,
source_address: tuple[str, int] | None = None,
socket_options: _TYPE_SOCKET_OPTIONS | None = None,
) -> socket.socket:
# This is copied from urllib3.util.connection v2.3.0
host, port = address
if host.startswith("["):
host = host.strip("[]")
err = None
# Using the value from allowed_gai_family() in the context of getaddrinfo lets
# us select whether to work with IPv4 DNS records, IPv6 records, or both.
# The original create_connection function always returns all records.
family = allowed_gai_family()
try:
host.encode("idna")
except UnicodeError:
raise LocationParseError(f"'{host}', label empty or too long") from None
for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
if not getattr(settings, "ALLOW_HTTP_TO_PRIVATE_NETWORKS", False):
ip_addr = ipaddress.ip_address(sa[0])
if ip_addr.is_multicast:
raise HTTPError(f"Request to multicast address {sa[0]} blocked")
if ip_addr.is_loopback or ip_addr.is_link_local:
raise HTTPError(f"Request to local address {sa[0]} blocked")
if ip_addr.is_private:
raise HTTPError(f"Request to private address {sa[0]} blocked")
sock = None
try:
sock = socket.socket(af, socktype, proto)
# If provided, set socket level options before connecting.
_set_socket_options(sock, socket_options)
if timeout is not _DEFAULT_TIMEOUT:
sock.settimeout(timeout)
if source_address:
sock.bind(source_address)
sock.connect(sa)
# Break explicitly a reference cycle
err = None
return sock
except OSError as _:
err = _
if sock is not None:
sock.close()
if err is not None:
try:
raise err
finally:
# Break explicitly a reference cycle
err = None
else:
raise OSError("getaddrinfo returns an empty list")
class ProtectionMixin:
def _new_conn(self) -> socket.socket:
# This is 1:1 the version from urllib3.connection.HTTPConnection._new_conn v2.3.0
# just with a call to our own create_connection
try:
sock = create_connection(
(self._dns_host, self.port),
self.timeout,
source_address=self.source_address,
socket_options=self.socket_options,
)
except socket.gaierror as e:
raise NameResolutionError(self.host, self, e) from e
except socket.timeout as e:
raise ConnectTimeoutError(
self,
f"Connection to {self.host} timed out. (connect timeout={self.timeout})",
) from e
except OSError as e:
raise NewConnectionError(
self, f"Failed to establish a new connection: {e}"
) from e
sys.audit("http.client.connect", self, self.host, self.port)
return sock
class ProtectedHTTPConnection(ProtectionMixin, HTTPConnection):
pass
class ProtectedHTTPSConnection(ProtectionMixin, HTTPSConnection):
pass
HTTPConnectionPool.ConnectionCls = ProtectedHTTPConnection
HTTPSConnectionPool.ConnectionCls = ProtectedHTTPSConnection
def monkeypatch_cookie_morsel():
# See https://code.djangoproject.com/ticket/34613
cookies.Morsel._flags.add("partitioned")
@@ -99,4 +230,5 @@ def monkeypatch_all_at_ready():
monkeypatch_vobject_performance()
monkeypatch_pillow_safer()
monkeypatch_requests_timeout()
monkeypatch_urllib3_ssrf_protection()
monkeypatch_cookie_morsel()

View File

@@ -208,6 +208,7 @@ CSRF_TRUSTED_ORIGINS = [urlparse(SITE_URL).scheme + '://' + urlparse(SITE_URL).h
TRUST_X_FORWARDED_FOR = config.getboolean('pretix', 'trust_x_forwarded_for', fallback=False)
USE_X_FORWARDED_HOST = config.getboolean('pretix', 'trust_x_forwarded_host', fallback=False)
ALLOW_HTTP_TO_PRIVATE_NETWORKS = config.getboolean('pretix', 'allow_http_to_private_networks', fallback=False)
REQUEST_ID_HEADER = config.get('pretix', 'request_id_header', fallback=False)
@@ -248,7 +249,8 @@ EMAIL_HOST_PASSWORD = config.get('mail', 'password', fallback='')
EMAIL_USE_TLS = config.getboolean('mail', 'tls', fallback=False)
EMAIL_USE_SSL = config.getboolean('mail', 'ssl', fallback=False)
EMAIL_SUBJECT_PREFIX = '[pretix] '
EMAIL_BACKEND = EMAIL_CUSTOM_SMTP_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_CUSTOM_SMTP_BACKEND = 'pretixbase.email.CheckPrivateNetworkSmtpBackend'
EMAIL_TIMEOUT = 60
ADMINS = [('Admin', n) for n in config.get('mail', 'admins', fallback='').split(",") if n]

View File

@@ -35,8 +35,11 @@
import datetime
import os
import re
import socket
from contextlib import contextmanager
from decimal import Decimal
from email.mime.text import MIMEText
from unittest import mock
import pytest
from django.conf import settings
@@ -591,3 +594,117 @@ def test_attached_ical_localization(env, order):
assert len(djmail.outbox) == 1
assert len(djmail.outbox[0].attachments) == 1
assert description in djmail.outbox[0].attachments[0][1]
PRIVATE_IPS_RES = [
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('10.0.0.3', 443))],
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('0.0.0.0', 443))],
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('127.1.1.1', 443))],
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.5.3', 443))],
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('224.0.0.1', 443))],
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('::1', 443, 0, 0))],
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('fe80::1', 443, 0, 0))],
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('ff00::1', 443, 0, 0))],
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('fc00::1', 443, 0, 0))],
]
@contextmanager
def test_mail_connection(res, should_connect, use_ssl):
with (
mock.patch('socket.socket') as mock_socket,
mock.patch('socket.getaddrinfo', return_value=res),
mock.patch('smtplib.SMTP.getreply', return_value=(220, "")),
mock.patch('smtplib.SMTP.sendmail'),
mock.patch('ssl.SSLContext.wrap_socket') as mock_ssl
):
yield
if should_connect:
mock_socket.assert_called_once()
mock_socket.return_value.connect.assert_called_once_with(res[0][-1])
if use_ssl:
mock_ssl.assert_called_once()
else:
mock_socket.assert_not_called()
mock_socket.return_value.connect.assert_not_called()
mock_ssl.assert_not_called()
@pytest.mark.parametrize("res", PRIVATE_IPS_RES)
@pytest.mark.parametrize("use_ssl", [
True, False
])
def test_private_smtp_ip(res, use_ssl, settings):
settings.EMAIL_CUSTOM_SMTP_BACKEND = 'pretix.base.email.CheckPrivateNetworkSmtpBackend'
settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = False
with test_mail_connection(res=res, should_connect=False, use_ssl=use_ssl), pytest.raises(match="Request to .* blocked"):
connection = djmail.get_connection(backend=settings.EMAIL_CUSTOM_SMTP_BACKEND,
host="localhost",
use_ssl=use_ssl)
connection.open()
settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = True
with test_mail_connection(res=res, should_connect=True, use_ssl=use_ssl):
connection = djmail.get_connection(backend=settings.EMAIL_CUSTOM_SMTP_BACKEND,
host="localhost",
use_ssl=use_ssl)
connection.open()
@pytest.mark.parametrize("use_ssl", [
True, False
])
@pytest.mark.parametrize("allow_private", [
True, False
])
def test_public_smtp_ip(use_ssl, allow_private, settings):
settings.EMAIL_CUSTOM_SMTP_BACKEND = 'pretix.base.email.CheckPrivateNetworkSmtpBackend'
settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = allow_private
with test_mail_connection(res=[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('8.8.8.8', 443))], should_connect=True, use_ssl=use_ssl):
connection = djmail.get_connection(backend=settings.EMAIL_CUSTOM_SMTP_BACKEND,
host="localhost",
use_ssl=use_ssl)
connection.open()
@pytest.mark.django_db
@pytest.mark.parametrize("use_ssl", [
True, False
])
@pytest.mark.parametrize("allow_private_networks", [
True, False
])
@pytest.mark.parametrize("res", PRIVATE_IPS_RES)
def test_send_mail_private_ip(res, use_ssl, allow_private_networks, env):
settings.EMAIL_CUSTOM_SMTP_BACKEND = 'pretix.base.email.CheckPrivateNetworkSmtpBackend'
settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = allow_private_networks
event, user, organizer = env
event.settings.smtp_use_custom = True
event.settings.smtp_host = "example.com"
event.settings.smtp_use_ssl = use_ssl
event.settings.smtp_use_tls = False
def send_mail():
m = OutgoingMail.objects.create(
to=['recipient@example.com'],
subject='Test',
body_plain='Test',
sender='sender@example.com',
event=event
)
assert m.status == OutgoingMail.STATUS_QUEUED
mail_send_task.apply(kwargs={
'outgoing_mail': m.pk,
}, max_retries=0)
m.refresh_from_db()
return m
with test_mail_connection(res=res, should_connect=allow_private_networks, use_ssl=use_ssl):
m = send_mail()
if allow_private_networks:
assert m.status == OutgoingMail.STATUS_SENT
else:
assert m.status == OutgoingMail.STATUS_FAILED

View File

@@ -0,0 +1,93 @@
#
# This file is part of pretix (Community Edition).
#
# Copyright (C) 2014-2020 Raphael Michel and contributors
# Copyright (C) 2020-today pretix GmbH and contributors
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation in version 3 of the License.
#
# ADDITIONAL TERMS APPLY: Pursuant to Section 7 of the GNU Affero General Public License, additional terms are
# applicable granting you additional permissions and placing additional restrictions on your usage of this software.
# Please refer to the pretix LICENSE file to obtain the full terms applicable to this work. If you did not receive
# this file, see <https://pretix.eu/about/en/license>.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
#
from socket import AF_INET, SOCK_STREAM
from unittest import mock
import pytest
import requests
from django.test import override_settings
from dns.inet import AF_INET6
from urllib3.exceptions import HTTPError
def test_local_blocked():
with pytest.raises(HTTPError, match="Request to local address.*"):
requests.get("http://localhost", timeout=0.1)
with pytest.raises(HTTPError, match="Request to local address.*"):
requests.get("https://localhost", timeout=0.1)
def test_private_ip_blocked():
with pytest.raises(HTTPError, match="Request to private address.*"):
requests.get("http://10.0.0.1", timeout=0.1)
with pytest.raises(HTTPError, match="Request to private address.*"):
requests.get("https://10.0.0.1", timeout=0.1)
@pytest.mark.django_db
@pytest.mark.parametrize("res", [
[(AF_INET, SOCK_STREAM, 6, '', ('10.0.0.3', 443))],
[(AF_INET, SOCK_STREAM, 6, '', ('0.0.0.0', 443))],
[(AF_INET, SOCK_STREAM, 6, '', ('127.1.1.1', 443))],
[(AF_INET, SOCK_STREAM, 6, '', ('192.168.5.3', 443))],
[(AF_INET, SOCK_STREAM, 6, '', ('224.0.0.1', 443))],
[(AF_INET6, SOCK_STREAM, 6, '', ('::1', 443, 0, 0))],
[(AF_INET6, SOCK_STREAM, 6, '', ('fe80::1', 443, 0, 0))],
[(AF_INET6, SOCK_STREAM, 6, '', ('ff00::1', 443, 0, 0))],
[(AF_INET6, SOCK_STREAM, 6, '', ('fc00::1', 443, 0, 0))],
])
def test_dns_resolving_to_local_blocked(res):
with mock.patch('socket.getaddrinfo') as mock_addr:
mock_addr.return_value = res
with pytest.raises(HTTPError, match="Request to (multicast|private|local) address.*"):
requests.get("https://example.org", timeout=0.1)
with pytest.raises(HTTPError, match="Request to (multicast|private|local) address.*"):
requests.get("http://example.org", timeout=0.1)
def test_dns_remote_allowed():
class SocketOk(Exception):
pass
def side_effect(*args, **kwargs):
raise SocketOk
with mock.patch('socket.getaddrinfo') as mock_addr, mock.patch('socket.socket') as mock_socket:
mock_addr.return_value = [(AF_INET, SOCK_STREAM, 6, '', ('8.8.8.8', 443))]
mock_socket.side_effect = side_effect
with pytest.raises(SocketOk):
requests.get("https://example.org", timeout=0.1)
@override_settings(ALLOW_HTTP_TO_PRIVATE_NETWORKS=True)
def test_local_is_allowed():
class SocketOk(Exception):
pass
def side_effect(*args, **kwargs):
raise SocketOk
with mock.patch('socket.getaddrinfo') as mock_addr, mock.patch('socket.socket') as mock_socket:
mock_addr.return_value = [(AF_INET, SOCK_STREAM, 6, '', ('10.0.0.1', 443))]
mock_socket.side_effect = side_effect
with pytest.raises(SocketOk):
requests.get("https://example.org", timeout=0.1)