Compare commits

..

5 Commits

Author SHA1 Message Date
pajowu
f66f97bee1 Check custom smtp ip at usage too (#6073) 2026-04-10 10:55:07 +02:00
Raphael Michel
998a62add2 Fix crash on build 2026-04-10 09:45:47 +02:00
Raphael Michel
614408f5ae Add default protection for SSRF 2026-03-26 14:05:41 +01:00
Raphael Michel
1383e967df Hotfix font select in organizer 2026-03-25 15:14:20 +01:00
dependabot[bot]
c743e9fd3f Update sentry-sdk requirement from ==2.54.* to ==2.56.* (#6023)
Updates the requirements on [sentry-sdk](https://github.com/getsentry/sentry-python) to permit the latest version.
- [Release notes](https://github.com/getsentry/sentry-python/releases)
- [Changelog](https://github.com/getsentry/sentry-python/blob/master/CHANGELOG.md)
- [Commits](https://github.com/getsentry/sentry-python/compare/2.54.0...2.56.0)

---
updated-dependencies:
- dependency-name: sentry-sdk
  dependency-version: 2.56.0
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2026-03-25 13:41:13 +01:00
13 changed files with 458 additions and 84 deletions

View File

@@ -93,7 +93,7 @@ dependencies = [
"redis==7.1.*",
"reportlab==4.4.*",
"requests==2.32.*",
"sentry-sdk==2.54.*",
"sentry-sdk==2.56.*",
"sepaxml==2.7.*",
"stripe==7.9.*",
"text-unidecode==1.*",

View File

@@ -19,7 +19,10 @@
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
#
import ipaddress
import logging
import smtplib
import socket
from itertools import groupby
from smtplib import SMTPResponseException
from typing import TypeVar
@@ -237,3 +240,80 @@ def base_renderers(sender, **kwargs):
def get_email_context(**kwargs):
return PlaceholderContext(**kwargs).render_all()
def create_connection(address, timeout=socket.getdefaulttimeout(),
source_address=None, *, all_errors=False):
# Taken from the python stdlib, extended with a check for local ips
host, port = address
exceptions = []
for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
if not settings.get("MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS", False):
ip_addr = ipaddress.ip_address(sa[0])
if ip_addr.is_multicast:
raise socket.error(f"Request to multicast address {sa[0]} blocked")
if ip_addr.is_loopback or ip_addr.is_link_local:
raise socket.error(f"Request to local address {sa[0]} blocked")
if ip_addr.is_private:
raise socket.error(f"Request to private address {sa[0]} blocked")
sock = None
try:
sock = socket.socket(af, socktype, proto)
if timeout is not socket.getdefaulttimeout():
sock.settimeout(timeout)
if source_address:
sock.bind(source_address)
sock.connect(sa)
# Break explicitly a reference cycle
exceptions.clear()
return sock
except socket.error as exc:
if not all_errors:
exceptions.clear() # raise only the last error
exceptions.append(exc)
if sock is not None:
sock.close()
if len(exceptions):
try:
if not all_errors:
raise exceptions[0]
raise ExceptionGroup("create_connection failed", exceptions)
finally:
# Break explicitly a reference cycle
exceptions.clear()
else:
raise socket.error("getaddrinfo returns an empty list")
class CheckPrivateNetworkMixin:
# _get_socket taken 1:1 from smtplib, just with a call to our own create_connection
def _get_socket(self, host, port, timeout):
# This makes it simpler for SMTP_SSL to use the SMTP connect code
# and just alter the socket connection bit.
if timeout is not None and not timeout:
raise ValueError('Non-blocking socket (timeout=0) is not supported')
if self.debuglevel > 0:
self._print_debug('connect: to', (host, port), self.source_address)
return create_connection((host, port), timeout, self.source_address)
class SMTP(CheckPrivateNetworkMixin, smtplib.SMTP):
pass
# SMTP used here instead of mixin, because smtp.SMTP_SSL._get_socket calls super()._get_socket and then wraps this socket
# super()._get_socket needs to be our version from the mixin
class SMTP_SSL(smtplib.SMTP_SSL, SMTP): # noqa: N801
pass
class CheckPrivateNetworkSmtpBackend(EmailBackend):
@property
def connection_class(self):
return SMTP_SSL if self.use_ssl else SMTP

View File

@@ -100,7 +100,7 @@ def primary_font_kwargs():
choices = [('Open Sans', 'Open Sans')]
choices += sorted([
(a, {"title": a, "data": v}) for a, v in get_fonts(pdf_support_required=False).items()
(a, FontSelect.FontOption(title=a, data=v)) for a, v in get_fonts(pdf_support_required=False).items()
], key=lambda a: a[0])
return {
'choices': choices,

View File

@@ -27,11 +27,11 @@ from django.template import TemplateDoesNotExist, loader
from django.template.loader import get_template
from django.utils.functional import Promise
from django.utils.translation import gettext as _
from django.views.decorators.csrf import requires_csrf_token
from sentry_sdk import last_event_id
from pretix.base.i18n import language
from pretix.base.middleware import get_language_from_request
from pretix.multidomain.middlewares import requires_csrf_token
def csrf_failure(request, reason=""):

View File

@@ -34,6 +34,7 @@
import datetime
import os
from dataclasses import dataclass
from django import forms
from django.conf import settings
@@ -420,6 +421,11 @@ class SplitDateTimeField(forms.SplitDateTimeField):
class FontSelect(forms.RadioSelect):
option_template_name = 'pretixcontrol/font_option.html'
@dataclass
class FontOption:
title: str
data: str
class ItemMultipleChoiceField(SafeModelMultipleChoiceField):
def label_from_instance(self, obj):

View File

@@ -34,7 +34,6 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under the License.
from dataclasses import dataclass
from decimal import Decimal
from urllib.parse import urlencode
from zoneinfo import ZoneInfo
@@ -74,8 +73,8 @@ from pretix.base.settings import (
)
from pretix.base.validators import multimail_validate
from pretix.control.forms import (
MultipleLanguagesWidget, SalesChannelCheckboxSelectMultiple, SlugWidget,
SplitDateTimeField, SplitDateTimePickerWidget,
FontSelect, MultipleLanguagesWidget, SalesChannelCheckboxSelectMultiple,
SlugWidget, SplitDateTimeField, SplitDateTimePickerWidget,
)
from pretix.control.forms.widgets import Select2
from pretix.helpers.countries import CachedCountries
@@ -579,12 +578,6 @@ class EventSettingsValidationMixin:
del self.cleaned_data[field]
@dataclass
class FontOption:
title: str
data: str
class EventSettingsForm(EventSettingsValidationMixin, FormPlaceholderMixin, SettingsForm):
timezone = forms.ChoiceField(
choices=((a, a) for a in common_timezones),
@@ -736,7 +729,7 @@ class EventSettingsForm(EventSettingsValidationMixin, FormPlaceholderMixin, Sett
del self.fields['event_list_filters']
del self.fields['event_calendar_future_only']
self.fields['primary_font'].choices = [('Open Sans', 'Open Sans')] + sorted([
(a, FontOption(title=a, data=v)) for a, v in get_fonts(self.event, pdf_support_required=False).items()
(a, FontSelect.FontOption(title=a, data=v)) for a, v in get_fonts(self.event, pdf_support_required=False).items()
], key=lambda a: a[0])
# create "virtual" fields for better UX when editing <name>_asked and <name>_required fields

View File

@@ -20,7 +20,6 @@
# <https://www.gnu.org/licenses/>.
#
import re
from datetime import datetime
from django.conf import settings
@@ -49,10 +48,6 @@ def set_cookie_without_samesite(request, response, key, *args, **kwargs):
response.cookies[key]['Partitioned'] = True
def delete_cookie_without_samesite(request, response, key, *args, **kwargs):
kwargs['expires'] = datetime.fromtimestamp(0).strftime("%a, %d %b %Y %H:%M:%S GMT")
set_cookie_without_samesite(request, response, key, *args, **kwargs)
# Based on https://www.chromium.org/updates/same-site/incompatible-clients
# Copyright 2019 Google LLC.
# SPDX-License-Identifier: Apache-2.0

View File

@@ -19,12 +19,26 @@
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
#
import ipaddress
import socket
import sys
import types
from datetime import datetime
from http import cookies
from django.conf import settings
from PIL import Image
from requests.adapters import HTTPAdapter
from urllib3.connection import HTTPConnection, HTTPSConnection
from urllib3.connectionpool import HTTPConnectionPool, HTTPSConnectionPool
from urllib3.exceptions import (
ConnectTimeoutError, HTTPError, LocationParseError, NameResolutionError,
NewConnectionError,
)
from urllib3.util.connection import (
_TYPE_SOCKET_OPTIONS, _set_socket_options, allowed_gai_family,
)
from urllib3.util.timeout import _DEFAULT_TIMEOUT
def monkeypatch_vobject_performance():
@@ -89,6 +103,123 @@ def monkeypatch_requests_timeout():
HTTPAdapter.send = httpadapter_send
def monkeypatch_urllib3_ssrf_protection():
"""
pretix allows HTTP requests to untrusted URLs, e.g. through webhooks or external API URLs. This is dangerous since
it can allow access to private networks that should not be reachable by users ("server-side request forgery", SSRF).
Validating URLs at submission is not sufficient, since with DNS rebinding an attacker can make a domain name pass
validation and then resolve to a private IP address on actual execution. Unfortunately, there seems no clean solution
to this in Python land, so we monkeypatch urllib3's connection management to check the IP address to be external
*after* the DNS resolution.
This does not work when a global http(s) proxy is used, but in that scenario the proxy can perform the validation.
"""
if getattr(settings, "ALLOW_HTTP_TO_PRIVATE_NETWORKS", False):
# Settings are not supposed to change during runtime, so we can optimize performance and complexity by skipping
# this if not needed.
return
def create_connection(
address: tuple[str, int],
timeout=_DEFAULT_TIMEOUT,
source_address: tuple[str, int] | None = None,
socket_options: _TYPE_SOCKET_OPTIONS | None = None,
) -> socket.socket:
# This is copied from urllib3.util.connection v2.3.0
host, port = address
if host.startswith("["):
host = host.strip("[]")
err = None
# Using the value from allowed_gai_family() in the context of getaddrinfo lets
# us select whether to work with IPv4 DNS records, IPv6 records, or both.
# The original create_connection function always returns all records.
family = allowed_gai_family()
try:
host.encode("idna")
except UnicodeError:
raise LocationParseError(f"'{host}', label empty or too long") from None
for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
if not getattr(settings, "ALLOW_HTTP_TO_PRIVATE_NETWORKS", False):
ip_addr = ipaddress.ip_address(sa[0])
if ip_addr.is_multicast:
raise HTTPError(f"Request to multicast address {sa[0]} blocked")
if ip_addr.is_loopback or ip_addr.is_link_local:
raise HTTPError(f"Request to local address {sa[0]} blocked")
if ip_addr.is_private:
raise HTTPError(f"Request to private address {sa[0]} blocked")
sock = None
try:
sock = socket.socket(af, socktype, proto)
# If provided, set socket level options before connecting.
_set_socket_options(sock, socket_options)
if timeout is not _DEFAULT_TIMEOUT:
sock.settimeout(timeout)
if source_address:
sock.bind(source_address)
sock.connect(sa)
# Break explicitly a reference cycle
err = None
return sock
except OSError as _:
err = _
if sock is not None:
sock.close()
if err is not None:
try:
raise err
finally:
# Break explicitly a reference cycle
err = None
else:
raise OSError("getaddrinfo returns an empty list")
class ProtectionMixin:
def _new_conn(self) -> socket.socket:
# This is 1:1 the version from urllib3.connection.HTTPConnection._new_conn v2.3.0
# just with a call to our own create_connection
try:
sock = create_connection(
(self._dns_host, self.port),
self.timeout,
source_address=self.source_address,
socket_options=self.socket_options,
)
except socket.gaierror as e:
raise NameResolutionError(self.host, self, e) from e
except socket.timeout as e:
raise ConnectTimeoutError(
self,
f"Connection to {self.host} timed out. (connect timeout={self.timeout})",
) from e
except OSError as e:
raise NewConnectionError(
self, f"Failed to establish a new connection: {e}"
) from e
sys.audit("http.client.connect", self, self.host, self.port)
return sock
class ProtectedHTTPConnection(ProtectionMixin, HTTPConnection):
pass
class ProtectedHTTPSConnection(ProtectionMixin, HTTPSConnection):
pass
HTTPConnectionPool.ConnectionCls = ProtectedHTTPConnection
HTTPSConnectionPool.ConnectionCls = ProtectedHTTPSConnection
def monkeypatch_cookie_morsel():
# See https://code.djangoproject.com/ticket/34613
cookies.Morsel._flags.add("partitioned")
@@ -99,4 +230,5 @@ def monkeypatch_all_at_ready():
monkeypatch_vobject_performance()
monkeypatch_pillow_safer()
monkeypatch_requests_timeout()
monkeypatch_urllib3_ssrf_protection()
monkeypatch_cookie_morsel()

View File

@@ -50,15 +50,12 @@ from django.middleware.csrf import (
from django.shortcuts import render
from django.urls import set_urlconf
from django.utils.cache import patch_vary_headers
from django.utils.decorators import decorator_from_middleware
from django.utils.deprecation import MiddlewareMixin
from django.utils.http import http_date
from django_scopes import scopes_disabled
from pretix.base.models import Event, Organizer
from pretix.helpers.cookies import (
delete_cookie_without_samesite, set_cookie_without_samesite,
)
from pretix.helpers.cookies import set_cookie_without_samesite
from pretix.multidomain.models import KnownDomain
LOCAL_HOST_NAMES = ('testserver', 'localhost')
@@ -179,23 +176,9 @@ class SessionMiddleware(BaseSessionMiddleware):
# The session should be deleted only if the session is entirely empty
is_secure = request.scheme == 'https'
if '__Host-' + settings.SESSION_COOKIE_NAME in request.COOKIES and empty:
# response.delete_cookie does not work as we might have set a partitioned cookie
delete_cookie_without_samesite(
request, response,
'__Host-' + settings.SESSION_COOKIE_NAME,
path=settings.SESSION_COOKIE_PATH,
secure=is_secure,
httponly=settings.SESSION_COOKIE_HTTPONLY or None
)
response.delete_cookie('__Host-' + settings.SESSION_COOKIE_NAME)
elif settings.SESSION_COOKIE_NAME in request.COOKIES and empty:
# response.delete_cookie does not work as we might have set a partitioned cookie
delete_cookie_without_samesite(
request, response,
settings.SESSION_COOKIE_NAME,
path=settings.SESSION_COOKIE_PATH,
secure=is_secure,
httponly=settings.SESSION_COOKIE_HTTPONLY or None
)
response.delete_cookie(settings.SESSION_COOKIE_NAME)
else:
if accessed:
patch_vary_headers(response, ('Cookie',))
@@ -212,21 +195,15 @@ class SessionMiddleware(BaseSessionMiddleware):
if response.status_code != 500:
request.session.save()
if is_secure and settings.SESSION_COOKIE_NAME in request.COOKIES: # remove legacy cookie
# response.delete_cookie does not work as we might have set a partitioned cookie
delete_cookie_without_samesite(
request, response,
settings.SESSION_COOKIE_NAME,
path=settings.SESSION_COOKIE_PATH,
secure=is_secure,
httponly=settings.SESSION_COOKIE_HTTPONLY or None
)
response.delete_cookie(settings.SESSION_COOKIE_NAME)
response.delete_cookie(settings.SESSION_COOKIE_NAME, samesite="None")
set_cookie_without_samesite(
request, response,
'__Host-' + settings.SESSION_COOKIE_NAME if is_secure else settings.SESSION_COOKIE_NAME,
request.session.session_key, max_age=max_age,
expires=expires,
path=settings.SESSION_COOKIE_PATH,
secure=is_secure,
secure=request.scheme == 'https',
httponly=settings.SESSION_COOKIE_HTTPONLY or None
)
return response
@@ -271,50 +248,20 @@ class CsrfViewMiddleware(BaseCsrfMiddleware):
if request.session.get(CSRF_SESSION_KEY) != request.META["CSRF_COOKIE"]:
request.session[CSRF_SESSION_KEY] = request.META["CSRF_COOKIE"]
else:
is_secure = request.scheme == 'https'
# Set the CSRF cookie even if it's already set, so we renew
# the expiry timer.
if request.is_secure() and settings.CSRF_COOKIE_NAME in request.COOKIES: # remove legacy cookie
# response.delete_cookie does not work as we might have set a partitioned cookie
delete_cookie_without_samesite(
request, response,
settings.CSRF_COOKIE_NAME,
path=settings.CSRF_COOKIE_PATH,
secure=request.is_secure(),
httponly=settings.CSRF_COOKIE_HTTPONLY
)
if is_secure and settings.CSRF_COOKIE_NAME in request.COOKIES: # remove legacy cookie
response.delete_cookie(settings.CSRF_COOKIE_NAME)
response.delete_cookie(settings.CSRF_COOKIE_NAME, samesite="None")
set_cookie_without_samesite(
request, response,
'__Host-' + settings.CSRF_COOKIE_NAME if request.is_secure() else settings.CSRF_COOKIE_NAME,
'__Host-' + settings.CSRF_COOKIE_NAME if is_secure else settings.CSRF_COOKIE_NAME,
request.META["CSRF_COOKIE"],
max_age=settings.CSRF_COOKIE_AGE,
path=settings.CSRF_COOKIE_PATH,
secure=request.is_secure(),
secure=is_secure,
httponly=settings.CSRF_COOKIE_HTTPONLY
)
# Content varies with the CSRF cookie, so set the Vary header.
patch_vary_headers(response, ('Cookie',))
def process_response(self, request, response):
if (
not settings.CSRF_USE_SESSIONS
and request.is_secure()
and settings.CSRF_COOKIE_NAME in response.cookies
and response.cookies[settings.CSRF_COOKIE_NAME].value
):
raise ValueError("Usage of djangos CsrfViewMiddleware detected (legacy cookie found in response). "
"This may be caused by using csrf_project or requires_csrf_token from django.views.decorators.csrf. "
"Use the pretix.multidomain.middlewares equivalent instead.")
return super().process_response(request, response)
csrf_protect = decorator_from_middleware(CsrfViewMiddleware)
class _EnsureCsrfToken(CsrfViewMiddleware):
# Behave like CsrfViewMiddleware but don't reject requests or log warnings.
def _reject(self, request, reason):
return None
requires_csrf_token = decorator_from_middleware(_EnsureCsrfToken)

View File

@@ -42,6 +42,7 @@ from django.utils.functional import cached_property
from django.utils.http import url_has_allowed_host_and_scheme
from django.utils.translation import gettext_lazy as _
from django.views.decorators.cache import never_cache
from django.views.decorators.csrf import csrf_protect
from django.views.decorators.debug import sensitive_post_parameters
from django.views.generic import FormView, ListView, View
@@ -98,6 +99,7 @@ class LoginView(RedirectBackMixin, FormView):
redirect_authenticated_user = True
@method_decorator(sensitive_post_parameters())
@method_decorator(csrf_protect)
@method_decorator(never_cache)
def dispatch(self, request, *args, **kwargs):
if not request.organizer.settings.customer_accounts:
@@ -209,6 +211,7 @@ class RegistrationView(RedirectBackMixin, FormView):
redirect_authenticated_user = True
@method_decorator(sensitive_post_parameters())
@method_decorator(csrf_protect)
@method_decorator(never_cache)
def dispatch(self, request, *args, **kwargs):
if not request.organizer.settings.customer_accounts:
@@ -252,6 +255,7 @@ class SetPasswordView(FormView):
template_name = 'pretixpresale/organizers/customer_setpassword.html'
@method_decorator(sensitive_post_parameters())
@method_decorator(csrf_protect)
@method_decorator(never_cache)
def dispatch(self, request, *args, **kwargs):
if not request.organizer.settings.customer_accounts:
@@ -295,6 +299,7 @@ class ResetPasswordView(FormView):
template_name = 'pretixpresale/organizers/customer_resetpw.html'
@method_decorator(sensitive_post_parameters())
@method_decorator(csrf_protect)
@method_decorator(never_cache)
def dispatch(self, request, *args, **kwargs):
if not request.organizer.settings.customer_accounts:
@@ -518,6 +523,7 @@ class ChangePasswordView(CustomerAccountBaseMixin, FormView):
form_class = ChangePasswordForm
@method_decorator(sensitive_post_parameters())
@method_decorator(csrf_protect)
@method_decorator(never_cache)
def dispatch(self, request, *args, **kwargs):
if not request.organizer.settings.customer_accounts:
@@ -551,6 +557,7 @@ class ChangeInformationView(CustomerAccountBaseMixin, FormView):
form_class = ChangeInfoForm
@method_decorator(sensitive_post_parameters())
@method_decorator(csrf_protect)
@method_decorator(never_cache)
def dispatch(self, request, *args, **kwargs):
if not request.organizer.settings.customer_accounts:
@@ -658,6 +665,7 @@ class SSOLoginView(RedirectBackMixin, View):
redirect_authenticated_user = True
@method_decorator(sensitive_post_parameters())
@method_decorator(csrf_protect)
@method_decorator(never_cache)
def dispatch(self, request, *args, **kwargs):
if not request.organizer.settings.customer_accounts:
@@ -720,6 +728,7 @@ class SSOLoginReturnView(RedirectBackMixin, View):
redirect_authenticated_user = True
@method_decorator(sensitive_post_parameters())
@method_decorator(csrf_protect)
@method_decorator(never_cache)
def dispatch(self, request, *args, **kwargs):
if not request.organizer.settings.customer_accounts:

View File

@@ -208,6 +208,7 @@ CSRF_TRUSTED_ORIGINS = [urlparse(SITE_URL).scheme + '://' + urlparse(SITE_URL).h
TRUST_X_FORWARDED_FOR = config.getboolean('pretix', 'trust_x_forwarded_for', fallback=False)
USE_X_FORWARDED_HOST = config.getboolean('pretix', 'trust_x_forwarded_host', fallback=False)
ALLOW_HTTP_TO_PRIVATE_NETWORKS = config.getboolean('pretix', 'allow_http_to_private_networks', fallback=False)
REQUEST_ID_HEADER = config.get('pretix', 'request_id_header', fallback=False)
@@ -248,7 +249,8 @@ EMAIL_HOST_PASSWORD = config.get('mail', 'password', fallback='')
EMAIL_USE_TLS = config.getboolean('mail', 'tls', fallback=False)
EMAIL_USE_SSL = config.getboolean('mail', 'ssl', fallback=False)
EMAIL_SUBJECT_PREFIX = '[pretix] '
EMAIL_BACKEND = EMAIL_CUSTOM_SMTP_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_CUSTOM_SMTP_BACKEND = 'pretixbase.email.CheckPrivateNetworkSmtpBackend'
EMAIL_TIMEOUT = 60
ADMINS = [('Admin', n) for n in config.get('mail', 'admins', fallback='').split(",") if n]

View File

@@ -35,8 +35,11 @@
import datetime
import os
import re
import socket
from contextlib import contextmanager
from decimal import Decimal
from email.mime.text import MIMEText
from unittest import mock
import pytest
from django.conf import settings
@@ -591,3 +594,117 @@ def test_attached_ical_localization(env, order):
assert len(djmail.outbox) == 1
assert len(djmail.outbox[0].attachments) == 1
assert description in djmail.outbox[0].attachments[0][1]
PRIVATE_IPS_RES = [
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('10.0.0.3', 443))],
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('0.0.0.0', 443))],
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('127.1.1.1', 443))],
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.5.3', 443))],
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('224.0.0.1', 443))],
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('::1', 443, 0, 0))],
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('fe80::1', 443, 0, 0))],
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('ff00::1', 443, 0, 0))],
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('fc00::1', 443, 0, 0))],
]
@contextmanager
def test_mail_connection(res, should_connect, use_ssl):
with (
mock.patch('socket.socket') as mock_socket,
mock.patch('socket.getaddrinfo', return_value=res),
mock.patch('smtplib.SMTP.getreply', return_value=(220, "")),
mock.patch('smtplib.SMTP.sendmail'),
mock.patch('ssl.SSLContext.wrap_socket') as mock_ssl
):
yield
if should_connect:
mock_socket.assert_called_once()
mock_socket.return_value.connect.assert_called_once_with(res[0][-1])
if use_ssl:
mock_ssl.assert_called_once()
else:
mock_socket.assert_not_called()
mock_socket.return_value.connect.assert_not_called()
mock_ssl.assert_not_called()
@pytest.mark.parametrize("res", PRIVATE_IPS_RES)
@pytest.mark.parametrize("use_ssl", [
True, False
])
def test_private_smtp_ip(res, use_ssl, settings):
settings.EMAIL_CUSTOM_SMTP_BACKEND = 'pretix.base.email.CheckPrivateNetworkSmtpBackend'
settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = False
with test_mail_connection(res=res, should_connect=False, use_ssl=use_ssl), pytest.raises(match="Request to .* blocked"):
connection = djmail.get_connection(backend=settings.EMAIL_CUSTOM_SMTP_BACKEND,
host="localhost",
use_ssl=use_ssl)
connection.open()
settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = True
with test_mail_connection(res=res, should_connect=True, use_ssl=use_ssl):
connection = djmail.get_connection(backend=settings.EMAIL_CUSTOM_SMTP_BACKEND,
host="localhost",
use_ssl=use_ssl)
connection.open()
@pytest.mark.parametrize("use_ssl", [
True, False
])
@pytest.mark.parametrize("allow_private", [
True, False
])
def test_public_smtp_ip(use_ssl, allow_private, settings):
settings.EMAIL_CUSTOM_SMTP_BACKEND = 'pretix.base.email.CheckPrivateNetworkSmtpBackend'
settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = allow_private
with test_mail_connection(res=[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('8.8.8.8', 443))], should_connect=True, use_ssl=use_ssl):
connection = djmail.get_connection(backend=settings.EMAIL_CUSTOM_SMTP_BACKEND,
host="localhost",
use_ssl=use_ssl)
connection.open()
@pytest.mark.django_db
@pytest.mark.parametrize("use_ssl", [
True, False
])
@pytest.mark.parametrize("allow_private_networks", [
True, False
])
@pytest.mark.parametrize("res", PRIVATE_IPS_RES)
def test_send_mail_private_ip(res, use_ssl, allow_private_networks, env):
settings.EMAIL_CUSTOM_SMTP_BACKEND = 'pretix.base.email.CheckPrivateNetworkSmtpBackend'
settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = allow_private_networks
event, user, organizer = env
event.settings.smtp_use_custom = True
event.settings.smtp_host = "example.com"
event.settings.smtp_use_ssl = use_ssl
event.settings.smtp_use_tls = False
def send_mail():
m = OutgoingMail.objects.create(
to=['recipient@example.com'],
subject='Test',
body_plain='Test',
sender='sender@example.com',
event=event
)
assert m.status == OutgoingMail.STATUS_QUEUED
mail_send_task.apply(kwargs={
'outgoing_mail': m.pk,
}, max_retries=0)
m.refresh_from_db()
return m
with test_mail_connection(res=res, should_connect=allow_private_networks, use_ssl=use_ssl):
m = send_mail()
if allow_private_networks:
assert m.status == OutgoingMail.STATUS_SENT
else:
assert m.status == OutgoingMail.STATUS_FAILED

View File

@@ -0,0 +1,93 @@
#
# This file is part of pretix (Community Edition).
#
# Copyright (C) 2014-2020 Raphael Michel and contributors
# Copyright (C) 2020-today pretix GmbH and contributors
#
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software Foundation in version 3 of the License.
#
# ADDITIONAL TERMS APPLY: Pursuant to Section 7 of the GNU Affero General Public License, additional terms are
# applicable granting you additional permissions and placing additional restrictions on your usage of this software.
# Please refer to the pretix LICENSE file to obtain the full terms applicable to this work. If you did not receive
# this file, see <https://pretix.eu/about/en/license>.
#
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
#
from socket import AF_INET, SOCK_STREAM
from unittest import mock
import pytest
import requests
from django.test import override_settings
from dns.inet import AF_INET6
from urllib3.exceptions import HTTPError
def test_local_blocked():
with pytest.raises(HTTPError, match="Request to local address.*"):
requests.get("http://localhost", timeout=0.1)
with pytest.raises(HTTPError, match="Request to local address.*"):
requests.get("https://localhost", timeout=0.1)
def test_private_ip_blocked():
with pytest.raises(HTTPError, match="Request to private address.*"):
requests.get("http://10.0.0.1", timeout=0.1)
with pytest.raises(HTTPError, match="Request to private address.*"):
requests.get("https://10.0.0.1", timeout=0.1)
@pytest.mark.django_db
@pytest.mark.parametrize("res", [
[(AF_INET, SOCK_STREAM, 6, '', ('10.0.0.3', 443))],
[(AF_INET, SOCK_STREAM, 6, '', ('0.0.0.0', 443))],
[(AF_INET, SOCK_STREAM, 6, '', ('127.1.1.1', 443))],
[(AF_INET, SOCK_STREAM, 6, '', ('192.168.5.3', 443))],
[(AF_INET, SOCK_STREAM, 6, '', ('224.0.0.1', 443))],
[(AF_INET6, SOCK_STREAM, 6, '', ('::1', 443, 0, 0))],
[(AF_INET6, SOCK_STREAM, 6, '', ('fe80::1', 443, 0, 0))],
[(AF_INET6, SOCK_STREAM, 6, '', ('ff00::1', 443, 0, 0))],
[(AF_INET6, SOCK_STREAM, 6, '', ('fc00::1', 443, 0, 0))],
])
def test_dns_resolving_to_local_blocked(res):
with mock.patch('socket.getaddrinfo') as mock_addr:
mock_addr.return_value = res
with pytest.raises(HTTPError, match="Request to (multicast|private|local) address.*"):
requests.get("https://example.org", timeout=0.1)
with pytest.raises(HTTPError, match="Request to (multicast|private|local) address.*"):
requests.get("http://example.org", timeout=0.1)
def test_dns_remote_allowed():
class SocketOk(Exception):
pass
def side_effect(*args, **kwargs):
raise SocketOk
with mock.patch('socket.getaddrinfo') as mock_addr, mock.patch('socket.socket') as mock_socket:
mock_addr.return_value = [(AF_INET, SOCK_STREAM, 6, '', ('8.8.8.8', 443))]
mock_socket.side_effect = side_effect
with pytest.raises(SocketOk):
requests.get("https://example.org", timeout=0.1)
@override_settings(ALLOW_HTTP_TO_PRIVATE_NETWORKS=True)
def test_local_is_allowed():
class SocketOk(Exception):
pass
def side_effect(*args, **kwargs):
raise SocketOk
with mock.patch('socket.getaddrinfo') as mock_addr, mock.patch('socket.socket') as mock_socket:
mock_addr.return_value = [(AF_INET, SOCK_STREAM, 6, '', ('10.0.0.1', 443))]
mock_socket.side_effect = side_effect
with pytest.raises(SocketOk):
requests.get("https://example.org", timeout=0.1)