mirror of
https://github.com/pretix/pretix.git
synced 2026-04-23 23:22:32 +00:00
Compare commits
5 Commits
fix-delete
...
ssrf-prote
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f66f97bee1 | ||
|
|
998a62add2 | ||
|
|
614408f5ae | ||
|
|
1383e967df | ||
|
|
c743e9fd3f |
@@ -93,7 +93,7 @@ dependencies = [
|
||||
"redis==7.1.*",
|
||||
"reportlab==4.4.*",
|
||||
"requests==2.32.*",
|
||||
"sentry-sdk==2.54.*",
|
||||
"sentry-sdk==2.56.*",
|
||||
"sepaxml==2.7.*",
|
||||
"stripe==7.9.*",
|
||||
"text-unidecode==1.*",
|
||||
|
||||
@@ -19,7 +19,10 @@
|
||||
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
|
||||
# <https://www.gnu.org/licenses/>.
|
||||
#
|
||||
import ipaddress
|
||||
import logging
|
||||
import smtplib
|
||||
import socket
|
||||
from itertools import groupby
|
||||
from smtplib import SMTPResponseException
|
||||
from typing import TypeVar
|
||||
@@ -237,3 +240,80 @@ def base_renderers(sender, **kwargs):
|
||||
|
||||
def get_email_context(**kwargs):
|
||||
return PlaceholderContext(**kwargs).render_all()
|
||||
|
||||
|
||||
def create_connection(address, timeout=socket.getdefaulttimeout(),
|
||||
source_address=None, *, all_errors=False):
|
||||
# Taken from the python stdlib, extended with a check for local ips
|
||||
|
||||
host, port = address
|
||||
exceptions = []
|
||||
for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
|
||||
af, socktype, proto, canonname, sa = res
|
||||
|
||||
if not settings.get("MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS", False):
|
||||
ip_addr = ipaddress.ip_address(sa[0])
|
||||
if ip_addr.is_multicast:
|
||||
raise socket.error(f"Request to multicast address {sa[0]} blocked")
|
||||
if ip_addr.is_loopback or ip_addr.is_link_local:
|
||||
raise socket.error(f"Request to local address {sa[0]} blocked")
|
||||
if ip_addr.is_private:
|
||||
raise socket.error(f"Request to private address {sa[0]} blocked")
|
||||
|
||||
sock = None
|
||||
try:
|
||||
sock = socket.socket(af, socktype, proto)
|
||||
if timeout is not socket.getdefaulttimeout():
|
||||
sock.settimeout(timeout)
|
||||
if source_address:
|
||||
sock.bind(source_address)
|
||||
sock.connect(sa)
|
||||
# Break explicitly a reference cycle
|
||||
exceptions.clear()
|
||||
return sock
|
||||
|
||||
except socket.error as exc:
|
||||
if not all_errors:
|
||||
exceptions.clear() # raise only the last error
|
||||
exceptions.append(exc)
|
||||
if sock is not None:
|
||||
sock.close()
|
||||
|
||||
if len(exceptions):
|
||||
try:
|
||||
if not all_errors:
|
||||
raise exceptions[0]
|
||||
raise ExceptionGroup("create_connection failed", exceptions)
|
||||
finally:
|
||||
# Break explicitly a reference cycle
|
||||
exceptions.clear()
|
||||
else:
|
||||
raise socket.error("getaddrinfo returns an empty list")
|
||||
|
||||
|
||||
class CheckPrivateNetworkMixin:
|
||||
# _get_socket taken 1:1 from smtplib, just with a call to our own create_connection
|
||||
def _get_socket(self, host, port, timeout):
|
||||
# This makes it simpler for SMTP_SSL to use the SMTP connect code
|
||||
# and just alter the socket connection bit.
|
||||
if timeout is not None and not timeout:
|
||||
raise ValueError('Non-blocking socket (timeout=0) is not supported')
|
||||
if self.debuglevel > 0:
|
||||
self._print_debug('connect: to', (host, port), self.source_address)
|
||||
return create_connection((host, port), timeout, self.source_address)
|
||||
|
||||
|
||||
class SMTP(CheckPrivateNetworkMixin, smtplib.SMTP):
|
||||
pass
|
||||
|
||||
|
||||
# SMTP used here instead of mixin, because smtp.SMTP_SSL._get_socket calls super()._get_socket and then wraps this socket
|
||||
# super()._get_socket needs to be our version from the mixin
|
||||
class SMTP_SSL(smtplib.SMTP_SSL, SMTP): # noqa: N801
|
||||
pass
|
||||
|
||||
|
||||
class CheckPrivateNetworkSmtpBackend(EmailBackend):
|
||||
@property
|
||||
def connection_class(self):
|
||||
return SMTP_SSL if self.use_ssl else SMTP
|
||||
|
||||
@@ -100,7 +100,7 @@ def primary_font_kwargs():
|
||||
|
||||
choices = [('Open Sans', 'Open Sans')]
|
||||
choices += sorted([
|
||||
(a, {"title": a, "data": v}) for a, v in get_fonts(pdf_support_required=False).items()
|
||||
(a, FontSelect.FontOption(title=a, data=v)) for a, v in get_fonts(pdf_support_required=False).items()
|
||||
], key=lambda a: a[0])
|
||||
return {
|
||||
'choices': choices,
|
||||
|
||||
@@ -27,11 +27,11 @@ from django.template import TemplateDoesNotExist, loader
|
||||
from django.template.loader import get_template
|
||||
from django.utils.functional import Promise
|
||||
from django.utils.translation import gettext as _
|
||||
from django.views.decorators.csrf import requires_csrf_token
|
||||
from sentry_sdk import last_event_id
|
||||
|
||||
from pretix.base.i18n import language
|
||||
from pretix.base.middleware import get_language_from_request
|
||||
from pretix.multidomain.middlewares import requires_csrf_token
|
||||
|
||||
|
||||
def csrf_failure(request, reason=""):
|
||||
|
||||
@@ -34,6 +34,7 @@
|
||||
|
||||
import datetime
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
|
||||
from django import forms
|
||||
from django.conf import settings
|
||||
@@ -420,6 +421,11 @@ class SplitDateTimeField(forms.SplitDateTimeField):
|
||||
class FontSelect(forms.RadioSelect):
|
||||
option_template_name = 'pretixcontrol/font_option.html'
|
||||
|
||||
@dataclass
|
||||
class FontOption:
|
||||
title: str
|
||||
data: str
|
||||
|
||||
|
||||
class ItemMultipleChoiceField(SafeModelMultipleChoiceField):
|
||||
def label_from_instance(self, obj):
|
||||
|
||||
@@ -34,7 +34,6 @@
|
||||
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
from dataclasses import dataclass
|
||||
from decimal import Decimal
|
||||
from urllib.parse import urlencode
|
||||
from zoneinfo import ZoneInfo
|
||||
@@ -74,8 +73,8 @@ from pretix.base.settings import (
|
||||
)
|
||||
from pretix.base.validators import multimail_validate
|
||||
from pretix.control.forms import (
|
||||
MultipleLanguagesWidget, SalesChannelCheckboxSelectMultiple, SlugWidget,
|
||||
SplitDateTimeField, SplitDateTimePickerWidget,
|
||||
FontSelect, MultipleLanguagesWidget, SalesChannelCheckboxSelectMultiple,
|
||||
SlugWidget, SplitDateTimeField, SplitDateTimePickerWidget,
|
||||
)
|
||||
from pretix.control.forms.widgets import Select2
|
||||
from pretix.helpers.countries import CachedCountries
|
||||
@@ -579,12 +578,6 @@ class EventSettingsValidationMixin:
|
||||
del self.cleaned_data[field]
|
||||
|
||||
|
||||
@dataclass
|
||||
class FontOption:
|
||||
title: str
|
||||
data: str
|
||||
|
||||
|
||||
class EventSettingsForm(EventSettingsValidationMixin, FormPlaceholderMixin, SettingsForm):
|
||||
timezone = forms.ChoiceField(
|
||||
choices=((a, a) for a in common_timezones),
|
||||
@@ -736,7 +729,7 @@ class EventSettingsForm(EventSettingsValidationMixin, FormPlaceholderMixin, Sett
|
||||
del self.fields['event_list_filters']
|
||||
del self.fields['event_calendar_future_only']
|
||||
self.fields['primary_font'].choices = [('Open Sans', 'Open Sans')] + sorted([
|
||||
(a, FontOption(title=a, data=v)) for a, v in get_fonts(self.event, pdf_support_required=False).items()
|
||||
(a, FontSelect.FontOption(title=a, data=v)) for a, v in get_fonts(self.event, pdf_support_required=False).items()
|
||||
], key=lambda a: a[0])
|
||||
|
||||
# create "virtual" fields for better UX when editing <name>_asked and <name>_required fields
|
||||
|
||||
@@ -20,7 +20,6 @@
|
||||
# <https://www.gnu.org/licenses/>.
|
||||
#
|
||||
import re
|
||||
from datetime import datetime
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
@@ -49,10 +48,6 @@ def set_cookie_without_samesite(request, response, key, *args, **kwargs):
|
||||
response.cookies[key]['Partitioned'] = True
|
||||
|
||||
|
||||
def delete_cookie_without_samesite(request, response, key, *args, **kwargs):
|
||||
kwargs['expires'] = datetime.fromtimestamp(0).strftime("%a, %d %b %Y %H:%M:%S GMT")
|
||||
set_cookie_without_samesite(request, response, key, *args, **kwargs)
|
||||
|
||||
# Based on https://www.chromium.org/updates/same-site/incompatible-clients
|
||||
# Copyright 2019 Google LLC.
|
||||
# SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
@@ -19,12 +19,26 @@
|
||||
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
|
||||
# <https://www.gnu.org/licenses/>.
|
||||
#
|
||||
import ipaddress
|
||||
import socket
|
||||
import sys
|
||||
import types
|
||||
from datetime import datetime
|
||||
from http import cookies
|
||||
|
||||
from django.conf import settings
|
||||
from PIL import Image
|
||||
from requests.adapters import HTTPAdapter
|
||||
from urllib3.connection import HTTPConnection, HTTPSConnection
|
||||
from urllib3.connectionpool import HTTPConnectionPool, HTTPSConnectionPool
|
||||
from urllib3.exceptions import (
|
||||
ConnectTimeoutError, HTTPError, LocationParseError, NameResolutionError,
|
||||
NewConnectionError,
|
||||
)
|
||||
from urllib3.util.connection import (
|
||||
_TYPE_SOCKET_OPTIONS, _set_socket_options, allowed_gai_family,
|
||||
)
|
||||
from urllib3.util.timeout import _DEFAULT_TIMEOUT
|
||||
|
||||
|
||||
def monkeypatch_vobject_performance():
|
||||
@@ -89,6 +103,123 @@ def monkeypatch_requests_timeout():
|
||||
HTTPAdapter.send = httpadapter_send
|
||||
|
||||
|
||||
def monkeypatch_urllib3_ssrf_protection():
|
||||
"""
|
||||
pretix allows HTTP requests to untrusted URLs, e.g. through webhooks or external API URLs. This is dangerous since
|
||||
it can allow access to private networks that should not be reachable by users ("server-side request forgery", SSRF).
|
||||
Validating URLs at submission is not sufficient, since with DNS rebinding an attacker can make a domain name pass
|
||||
validation and then resolve to a private IP address on actual execution. Unfortunately, there seems no clean solution
|
||||
to this in Python land, so we monkeypatch urllib3's connection management to check the IP address to be external
|
||||
*after* the DNS resolution.
|
||||
|
||||
This does not work when a global http(s) proxy is used, but in that scenario the proxy can perform the validation.
|
||||
"""
|
||||
if getattr(settings, "ALLOW_HTTP_TO_PRIVATE_NETWORKS", False):
|
||||
# Settings are not supposed to change during runtime, so we can optimize performance and complexity by skipping
|
||||
# this if not needed.
|
||||
return
|
||||
|
||||
def create_connection(
|
||||
address: tuple[str, int],
|
||||
timeout=_DEFAULT_TIMEOUT,
|
||||
source_address: tuple[str, int] | None = None,
|
||||
socket_options: _TYPE_SOCKET_OPTIONS | None = None,
|
||||
) -> socket.socket:
|
||||
# This is copied from urllib3.util.connection v2.3.0
|
||||
host, port = address
|
||||
if host.startswith("["):
|
||||
host = host.strip("[]")
|
||||
err = None
|
||||
|
||||
# Using the value from allowed_gai_family() in the context of getaddrinfo lets
|
||||
# us select whether to work with IPv4 DNS records, IPv6 records, or both.
|
||||
# The original create_connection function always returns all records.
|
||||
family = allowed_gai_family()
|
||||
|
||||
try:
|
||||
host.encode("idna")
|
||||
except UnicodeError:
|
||||
raise LocationParseError(f"'{host}', label empty or too long") from None
|
||||
|
||||
for res in socket.getaddrinfo(host, port, family, socket.SOCK_STREAM):
|
||||
af, socktype, proto, canonname, sa = res
|
||||
|
||||
if not getattr(settings, "ALLOW_HTTP_TO_PRIVATE_NETWORKS", False):
|
||||
ip_addr = ipaddress.ip_address(sa[0])
|
||||
if ip_addr.is_multicast:
|
||||
raise HTTPError(f"Request to multicast address {sa[0]} blocked")
|
||||
if ip_addr.is_loopback or ip_addr.is_link_local:
|
||||
raise HTTPError(f"Request to local address {sa[0]} blocked")
|
||||
if ip_addr.is_private:
|
||||
raise HTTPError(f"Request to private address {sa[0]} blocked")
|
||||
|
||||
sock = None
|
||||
try:
|
||||
sock = socket.socket(af, socktype, proto)
|
||||
|
||||
# If provided, set socket level options before connecting.
|
||||
_set_socket_options(sock, socket_options)
|
||||
|
||||
if timeout is not _DEFAULT_TIMEOUT:
|
||||
sock.settimeout(timeout)
|
||||
if source_address:
|
||||
sock.bind(source_address)
|
||||
sock.connect(sa)
|
||||
# Break explicitly a reference cycle
|
||||
err = None
|
||||
return sock
|
||||
|
||||
except OSError as _:
|
||||
err = _
|
||||
if sock is not None:
|
||||
sock.close()
|
||||
|
||||
if err is not None:
|
||||
try:
|
||||
raise err
|
||||
finally:
|
||||
# Break explicitly a reference cycle
|
||||
err = None
|
||||
else:
|
||||
raise OSError("getaddrinfo returns an empty list")
|
||||
|
||||
class ProtectionMixin:
|
||||
def _new_conn(self) -> socket.socket:
|
||||
# This is 1:1 the version from urllib3.connection.HTTPConnection._new_conn v2.3.0
|
||||
# just with a call to our own create_connection
|
||||
try:
|
||||
sock = create_connection(
|
||||
(self._dns_host, self.port),
|
||||
self.timeout,
|
||||
source_address=self.source_address,
|
||||
socket_options=self.socket_options,
|
||||
)
|
||||
except socket.gaierror as e:
|
||||
raise NameResolutionError(self.host, self, e) from e
|
||||
except socket.timeout as e:
|
||||
raise ConnectTimeoutError(
|
||||
self,
|
||||
f"Connection to {self.host} timed out. (connect timeout={self.timeout})",
|
||||
) from e
|
||||
|
||||
except OSError as e:
|
||||
raise NewConnectionError(
|
||||
self, f"Failed to establish a new connection: {e}"
|
||||
) from e
|
||||
|
||||
sys.audit("http.client.connect", self, self.host, self.port)
|
||||
return sock
|
||||
|
||||
class ProtectedHTTPConnection(ProtectionMixin, HTTPConnection):
|
||||
pass
|
||||
|
||||
class ProtectedHTTPSConnection(ProtectionMixin, HTTPSConnection):
|
||||
pass
|
||||
|
||||
HTTPConnectionPool.ConnectionCls = ProtectedHTTPConnection
|
||||
HTTPSConnectionPool.ConnectionCls = ProtectedHTTPSConnection
|
||||
|
||||
|
||||
def monkeypatch_cookie_morsel():
|
||||
# See https://code.djangoproject.com/ticket/34613
|
||||
cookies.Morsel._flags.add("partitioned")
|
||||
@@ -99,4 +230,5 @@ def monkeypatch_all_at_ready():
|
||||
monkeypatch_vobject_performance()
|
||||
monkeypatch_pillow_safer()
|
||||
monkeypatch_requests_timeout()
|
||||
monkeypatch_urllib3_ssrf_protection()
|
||||
monkeypatch_cookie_morsel()
|
||||
|
||||
@@ -50,15 +50,12 @@ from django.middleware.csrf import (
|
||||
from django.shortcuts import render
|
||||
from django.urls import set_urlconf
|
||||
from django.utils.cache import patch_vary_headers
|
||||
from django.utils.decorators import decorator_from_middleware
|
||||
from django.utils.deprecation import MiddlewareMixin
|
||||
from django.utils.http import http_date
|
||||
from django_scopes import scopes_disabled
|
||||
|
||||
from pretix.base.models import Event, Organizer
|
||||
from pretix.helpers.cookies import (
|
||||
delete_cookie_without_samesite, set_cookie_without_samesite,
|
||||
)
|
||||
from pretix.helpers.cookies import set_cookie_without_samesite
|
||||
from pretix.multidomain.models import KnownDomain
|
||||
|
||||
LOCAL_HOST_NAMES = ('testserver', 'localhost')
|
||||
@@ -179,23 +176,9 @@ class SessionMiddleware(BaseSessionMiddleware):
|
||||
# The session should be deleted only if the session is entirely empty
|
||||
is_secure = request.scheme == 'https'
|
||||
if '__Host-' + settings.SESSION_COOKIE_NAME in request.COOKIES and empty:
|
||||
# response.delete_cookie does not work as we might have set a partitioned cookie
|
||||
delete_cookie_without_samesite(
|
||||
request, response,
|
||||
'__Host-' + settings.SESSION_COOKIE_NAME,
|
||||
path=settings.SESSION_COOKIE_PATH,
|
||||
secure=is_secure,
|
||||
httponly=settings.SESSION_COOKIE_HTTPONLY or None
|
||||
)
|
||||
response.delete_cookie('__Host-' + settings.SESSION_COOKIE_NAME)
|
||||
elif settings.SESSION_COOKIE_NAME in request.COOKIES and empty:
|
||||
# response.delete_cookie does not work as we might have set a partitioned cookie
|
||||
delete_cookie_without_samesite(
|
||||
request, response,
|
||||
settings.SESSION_COOKIE_NAME,
|
||||
path=settings.SESSION_COOKIE_PATH,
|
||||
secure=is_secure,
|
||||
httponly=settings.SESSION_COOKIE_HTTPONLY or None
|
||||
)
|
||||
response.delete_cookie(settings.SESSION_COOKIE_NAME)
|
||||
else:
|
||||
if accessed:
|
||||
patch_vary_headers(response, ('Cookie',))
|
||||
@@ -212,21 +195,15 @@ class SessionMiddleware(BaseSessionMiddleware):
|
||||
if response.status_code != 500:
|
||||
request.session.save()
|
||||
if is_secure and settings.SESSION_COOKIE_NAME in request.COOKIES: # remove legacy cookie
|
||||
# response.delete_cookie does not work as we might have set a partitioned cookie
|
||||
delete_cookie_without_samesite(
|
||||
request, response,
|
||||
settings.SESSION_COOKIE_NAME,
|
||||
path=settings.SESSION_COOKIE_PATH,
|
||||
secure=is_secure,
|
||||
httponly=settings.SESSION_COOKIE_HTTPONLY or None
|
||||
)
|
||||
response.delete_cookie(settings.SESSION_COOKIE_NAME)
|
||||
response.delete_cookie(settings.SESSION_COOKIE_NAME, samesite="None")
|
||||
set_cookie_without_samesite(
|
||||
request, response,
|
||||
'__Host-' + settings.SESSION_COOKIE_NAME if is_secure else settings.SESSION_COOKIE_NAME,
|
||||
request.session.session_key, max_age=max_age,
|
||||
expires=expires,
|
||||
path=settings.SESSION_COOKIE_PATH,
|
||||
secure=is_secure,
|
||||
secure=request.scheme == 'https',
|
||||
httponly=settings.SESSION_COOKIE_HTTPONLY or None
|
||||
)
|
||||
return response
|
||||
@@ -271,50 +248,20 @@ class CsrfViewMiddleware(BaseCsrfMiddleware):
|
||||
if request.session.get(CSRF_SESSION_KEY) != request.META["CSRF_COOKIE"]:
|
||||
request.session[CSRF_SESSION_KEY] = request.META["CSRF_COOKIE"]
|
||||
else:
|
||||
is_secure = request.scheme == 'https'
|
||||
# Set the CSRF cookie even if it's already set, so we renew
|
||||
# the expiry timer.
|
||||
if request.is_secure() and settings.CSRF_COOKIE_NAME in request.COOKIES: # remove legacy cookie
|
||||
# response.delete_cookie does not work as we might have set a partitioned cookie
|
||||
delete_cookie_without_samesite(
|
||||
request, response,
|
||||
settings.CSRF_COOKIE_NAME,
|
||||
path=settings.CSRF_COOKIE_PATH,
|
||||
secure=request.is_secure(),
|
||||
httponly=settings.CSRF_COOKIE_HTTPONLY
|
||||
)
|
||||
if is_secure and settings.CSRF_COOKIE_NAME in request.COOKIES: # remove legacy cookie
|
||||
response.delete_cookie(settings.CSRF_COOKIE_NAME)
|
||||
response.delete_cookie(settings.CSRF_COOKIE_NAME, samesite="None")
|
||||
set_cookie_without_samesite(
|
||||
request, response,
|
||||
'__Host-' + settings.CSRF_COOKIE_NAME if request.is_secure() else settings.CSRF_COOKIE_NAME,
|
||||
'__Host-' + settings.CSRF_COOKIE_NAME if is_secure else settings.CSRF_COOKIE_NAME,
|
||||
request.META["CSRF_COOKIE"],
|
||||
max_age=settings.CSRF_COOKIE_AGE,
|
||||
path=settings.CSRF_COOKIE_PATH,
|
||||
secure=request.is_secure(),
|
||||
secure=is_secure,
|
||||
httponly=settings.CSRF_COOKIE_HTTPONLY
|
||||
)
|
||||
# Content varies with the CSRF cookie, so set the Vary header.
|
||||
patch_vary_headers(response, ('Cookie',))
|
||||
|
||||
def process_response(self, request, response):
|
||||
if (
|
||||
not settings.CSRF_USE_SESSIONS
|
||||
and request.is_secure()
|
||||
and settings.CSRF_COOKIE_NAME in response.cookies
|
||||
and response.cookies[settings.CSRF_COOKIE_NAME].value
|
||||
):
|
||||
raise ValueError("Usage of djangos CsrfViewMiddleware detected (legacy cookie found in response). "
|
||||
"This may be caused by using csrf_project or requires_csrf_token from django.views.decorators.csrf. "
|
||||
"Use the pretix.multidomain.middlewares equivalent instead.")
|
||||
|
||||
return super().process_response(request, response)
|
||||
|
||||
|
||||
csrf_protect = decorator_from_middleware(CsrfViewMiddleware)
|
||||
|
||||
|
||||
class _EnsureCsrfToken(CsrfViewMiddleware):
|
||||
# Behave like CsrfViewMiddleware but don't reject requests or log warnings.
|
||||
def _reject(self, request, reason):
|
||||
return None
|
||||
|
||||
|
||||
requires_csrf_token = decorator_from_middleware(_EnsureCsrfToken)
|
||||
|
||||
@@ -42,6 +42,7 @@ from django.utils.functional import cached_property
|
||||
from django.utils.http import url_has_allowed_host_and_scheme
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django.views.decorators.cache import never_cache
|
||||
from django.views.decorators.csrf import csrf_protect
|
||||
from django.views.decorators.debug import sensitive_post_parameters
|
||||
from django.views.generic import FormView, ListView, View
|
||||
|
||||
@@ -98,6 +99,7 @@ class LoginView(RedirectBackMixin, FormView):
|
||||
redirect_authenticated_user = True
|
||||
|
||||
@method_decorator(sensitive_post_parameters())
|
||||
@method_decorator(csrf_protect)
|
||||
@method_decorator(never_cache)
|
||||
def dispatch(self, request, *args, **kwargs):
|
||||
if not request.organizer.settings.customer_accounts:
|
||||
@@ -209,6 +211,7 @@ class RegistrationView(RedirectBackMixin, FormView):
|
||||
redirect_authenticated_user = True
|
||||
|
||||
@method_decorator(sensitive_post_parameters())
|
||||
@method_decorator(csrf_protect)
|
||||
@method_decorator(never_cache)
|
||||
def dispatch(self, request, *args, **kwargs):
|
||||
if not request.organizer.settings.customer_accounts:
|
||||
@@ -252,6 +255,7 @@ class SetPasswordView(FormView):
|
||||
template_name = 'pretixpresale/organizers/customer_setpassword.html'
|
||||
|
||||
@method_decorator(sensitive_post_parameters())
|
||||
@method_decorator(csrf_protect)
|
||||
@method_decorator(never_cache)
|
||||
def dispatch(self, request, *args, **kwargs):
|
||||
if not request.organizer.settings.customer_accounts:
|
||||
@@ -295,6 +299,7 @@ class ResetPasswordView(FormView):
|
||||
template_name = 'pretixpresale/organizers/customer_resetpw.html'
|
||||
|
||||
@method_decorator(sensitive_post_parameters())
|
||||
@method_decorator(csrf_protect)
|
||||
@method_decorator(never_cache)
|
||||
def dispatch(self, request, *args, **kwargs):
|
||||
if not request.organizer.settings.customer_accounts:
|
||||
@@ -518,6 +523,7 @@ class ChangePasswordView(CustomerAccountBaseMixin, FormView):
|
||||
form_class = ChangePasswordForm
|
||||
|
||||
@method_decorator(sensitive_post_parameters())
|
||||
@method_decorator(csrf_protect)
|
||||
@method_decorator(never_cache)
|
||||
def dispatch(self, request, *args, **kwargs):
|
||||
if not request.organizer.settings.customer_accounts:
|
||||
@@ -551,6 +557,7 @@ class ChangeInformationView(CustomerAccountBaseMixin, FormView):
|
||||
form_class = ChangeInfoForm
|
||||
|
||||
@method_decorator(sensitive_post_parameters())
|
||||
@method_decorator(csrf_protect)
|
||||
@method_decorator(never_cache)
|
||||
def dispatch(self, request, *args, **kwargs):
|
||||
if not request.organizer.settings.customer_accounts:
|
||||
@@ -658,6 +665,7 @@ class SSOLoginView(RedirectBackMixin, View):
|
||||
redirect_authenticated_user = True
|
||||
|
||||
@method_decorator(sensitive_post_parameters())
|
||||
@method_decorator(csrf_protect)
|
||||
@method_decorator(never_cache)
|
||||
def dispatch(self, request, *args, **kwargs):
|
||||
if not request.organizer.settings.customer_accounts:
|
||||
@@ -720,6 +728,7 @@ class SSOLoginReturnView(RedirectBackMixin, View):
|
||||
redirect_authenticated_user = True
|
||||
|
||||
@method_decorator(sensitive_post_parameters())
|
||||
@method_decorator(csrf_protect)
|
||||
@method_decorator(never_cache)
|
||||
def dispatch(self, request, *args, **kwargs):
|
||||
if not request.organizer.settings.customer_accounts:
|
||||
|
||||
@@ -208,6 +208,7 @@ CSRF_TRUSTED_ORIGINS = [urlparse(SITE_URL).scheme + '://' + urlparse(SITE_URL).h
|
||||
|
||||
TRUST_X_FORWARDED_FOR = config.getboolean('pretix', 'trust_x_forwarded_for', fallback=False)
|
||||
USE_X_FORWARDED_HOST = config.getboolean('pretix', 'trust_x_forwarded_host', fallback=False)
|
||||
ALLOW_HTTP_TO_PRIVATE_NETWORKS = config.getboolean('pretix', 'allow_http_to_private_networks', fallback=False)
|
||||
|
||||
|
||||
REQUEST_ID_HEADER = config.get('pretix', 'request_id_header', fallback=False)
|
||||
@@ -248,7 +249,8 @@ EMAIL_HOST_PASSWORD = config.get('mail', 'password', fallback='')
|
||||
EMAIL_USE_TLS = config.getboolean('mail', 'tls', fallback=False)
|
||||
EMAIL_USE_SSL = config.getboolean('mail', 'ssl', fallback=False)
|
||||
EMAIL_SUBJECT_PREFIX = '[pretix] '
|
||||
EMAIL_BACKEND = EMAIL_CUSTOM_SMTP_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
|
||||
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
|
||||
EMAIL_CUSTOM_SMTP_BACKEND = 'pretixbase.email.CheckPrivateNetworkSmtpBackend'
|
||||
EMAIL_TIMEOUT = 60
|
||||
|
||||
ADMINS = [('Admin', n) for n in config.get('mail', 'admins', fallback='').split(",") if n]
|
||||
|
||||
@@ -35,8 +35,11 @@
|
||||
import datetime
|
||||
import os
|
||||
import re
|
||||
import socket
|
||||
from contextlib import contextmanager
|
||||
from decimal import Decimal
|
||||
from email.mime.text import MIMEText
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
from django.conf import settings
|
||||
@@ -591,3 +594,117 @@ def test_attached_ical_localization(env, order):
|
||||
assert len(djmail.outbox) == 1
|
||||
assert len(djmail.outbox[0].attachments) == 1
|
||||
assert description in djmail.outbox[0].attachments[0][1]
|
||||
|
||||
|
||||
PRIVATE_IPS_RES = [
|
||||
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('10.0.0.3', 443))],
|
||||
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('0.0.0.0', 443))],
|
||||
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('127.1.1.1', 443))],
|
||||
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.5.3', 443))],
|
||||
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('224.0.0.1', 443))],
|
||||
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('::1', 443, 0, 0))],
|
||||
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('fe80::1', 443, 0, 0))],
|
||||
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('ff00::1', 443, 0, 0))],
|
||||
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('fc00::1', 443, 0, 0))],
|
||||
]
|
||||
|
||||
|
||||
@contextmanager
|
||||
def test_mail_connection(res, should_connect, use_ssl):
|
||||
with (
|
||||
mock.patch('socket.socket') as mock_socket,
|
||||
mock.patch('socket.getaddrinfo', return_value=res),
|
||||
mock.patch('smtplib.SMTP.getreply', return_value=(220, "")),
|
||||
mock.patch('smtplib.SMTP.sendmail'),
|
||||
mock.patch('ssl.SSLContext.wrap_socket') as mock_ssl
|
||||
):
|
||||
yield
|
||||
|
||||
if should_connect:
|
||||
mock_socket.assert_called_once()
|
||||
mock_socket.return_value.connect.assert_called_once_with(res[0][-1])
|
||||
if use_ssl:
|
||||
mock_ssl.assert_called_once()
|
||||
else:
|
||||
mock_socket.assert_not_called()
|
||||
mock_socket.return_value.connect.assert_not_called()
|
||||
mock_ssl.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("res", PRIVATE_IPS_RES)
|
||||
@pytest.mark.parametrize("use_ssl", [
|
||||
True, False
|
||||
])
|
||||
def test_private_smtp_ip(res, use_ssl, settings):
|
||||
settings.EMAIL_CUSTOM_SMTP_BACKEND = 'pretix.base.email.CheckPrivateNetworkSmtpBackend'
|
||||
settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = False
|
||||
with test_mail_connection(res=res, should_connect=False, use_ssl=use_ssl), pytest.raises(match="Request to .* blocked"):
|
||||
connection = djmail.get_connection(backend=settings.EMAIL_CUSTOM_SMTP_BACKEND,
|
||||
host="localhost",
|
||||
use_ssl=use_ssl)
|
||||
connection.open()
|
||||
|
||||
settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = True
|
||||
with test_mail_connection(res=res, should_connect=True, use_ssl=use_ssl):
|
||||
connection = djmail.get_connection(backend=settings.EMAIL_CUSTOM_SMTP_BACKEND,
|
||||
host="localhost",
|
||||
use_ssl=use_ssl)
|
||||
connection.open()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("use_ssl", [
|
||||
True, False
|
||||
])
|
||||
@pytest.mark.parametrize("allow_private", [
|
||||
True, False
|
||||
])
|
||||
def test_public_smtp_ip(use_ssl, allow_private, settings):
|
||||
settings.EMAIL_CUSTOM_SMTP_BACKEND = 'pretix.base.email.CheckPrivateNetworkSmtpBackend'
|
||||
settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = allow_private
|
||||
|
||||
with test_mail_connection(res=[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('8.8.8.8', 443))], should_connect=True, use_ssl=use_ssl):
|
||||
connection = djmail.get_connection(backend=settings.EMAIL_CUSTOM_SMTP_BACKEND,
|
||||
host="localhost",
|
||||
use_ssl=use_ssl)
|
||||
connection.open()
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize("use_ssl", [
|
||||
True, False
|
||||
])
|
||||
@pytest.mark.parametrize("allow_private_networks", [
|
||||
True, False
|
||||
])
|
||||
@pytest.mark.parametrize("res", PRIVATE_IPS_RES)
|
||||
def test_send_mail_private_ip(res, use_ssl, allow_private_networks, env):
|
||||
settings.EMAIL_CUSTOM_SMTP_BACKEND = 'pretix.base.email.CheckPrivateNetworkSmtpBackend'
|
||||
settings.MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS = allow_private_networks
|
||||
|
||||
event, user, organizer = env
|
||||
event.settings.smtp_use_custom = True
|
||||
event.settings.smtp_host = "example.com"
|
||||
event.settings.smtp_use_ssl = use_ssl
|
||||
event.settings.smtp_use_tls = False
|
||||
|
||||
def send_mail():
|
||||
m = OutgoingMail.objects.create(
|
||||
to=['recipient@example.com'],
|
||||
subject='Test',
|
||||
body_plain='Test',
|
||||
sender='sender@example.com',
|
||||
event=event
|
||||
)
|
||||
assert m.status == OutgoingMail.STATUS_QUEUED
|
||||
mail_send_task.apply(kwargs={
|
||||
'outgoing_mail': m.pk,
|
||||
}, max_retries=0)
|
||||
m.refresh_from_db()
|
||||
return m
|
||||
|
||||
with test_mail_connection(res=res, should_connect=allow_private_networks, use_ssl=use_ssl):
|
||||
m = send_mail()
|
||||
if allow_private_networks:
|
||||
assert m.status == OutgoingMail.STATUS_SENT
|
||||
else:
|
||||
assert m.status == OutgoingMail.STATUS_FAILED
|
||||
|
||||
93
src/tests/helpers/test_urllib.py
Normal file
93
src/tests/helpers/test_urllib.py
Normal file
@@ -0,0 +1,93 @@
|
||||
#
|
||||
# This file is part of pretix (Community Edition).
|
||||
#
|
||||
# Copyright (C) 2014-2020 Raphael Michel and contributors
|
||||
# Copyright (C) 2020-today pretix GmbH and contributors
|
||||
#
|
||||
# This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General
|
||||
# Public License as published by the Free Software Foundation in version 3 of the License.
|
||||
#
|
||||
# ADDITIONAL TERMS APPLY: Pursuant to Section 7 of the GNU Affero General Public License, additional terms are
|
||||
# applicable granting you additional permissions and placing additional restrictions on your usage of this software.
|
||||
# Please refer to the pretix LICENSE file to obtain the full terms applicable to this work. If you did not receive
|
||||
# this file, see <https://pretix.eu/about/en/license>.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied
|
||||
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License along with this program. If not, see
|
||||
# <https://www.gnu.org/licenses/>.
|
||||
#
|
||||
from socket import AF_INET, SOCK_STREAM
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
import requests
|
||||
from django.test import override_settings
|
||||
from dns.inet import AF_INET6
|
||||
from urllib3.exceptions import HTTPError
|
||||
|
||||
|
||||
def test_local_blocked():
|
||||
with pytest.raises(HTTPError, match="Request to local address.*"):
|
||||
requests.get("http://localhost", timeout=0.1)
|
||||
with pytest.raises(HTTPError, match="Request to local address.*"):
|
||||
requests.get("https://localhost", timeout=0.1)
|
||||
|
||||
|
||||
def test_private_ip_blocked():
|
||||
with pytest.raises(HTTPError, match="Request to private address.*"):
|
||||
requests.get("http://10.0.0.1", timeout=0.1)
|
||||
with pytest.raises(HTTPError, match="Request to private address.*"):
|
||||
requests.get("https://10.0.0.1", timeout=0.1)
|
||||
|
||||
|
||||
@pytest.mark.django_db
|
||||
@pytest.mark.parametrize("res", [
|
||||
[(AF_INET, SOCK_STREAM, 6, '', ('10.0.0.3', 443))],
|
||||
[(AF_INET, SOCK_STREAM, 6, '', ('0.0.0.0', 443))],
|
||||
[(AF_INET, SOCK_STREAM, 6, '', ('127.1.1.1', 443))],
|
||||
[(AF_INET, SOCK_STREAM, 6, '', ('192.168.5.3', 443))],
|
||||
[(AF_INET, SOCK_STREAM, 6, '', ('224.0.0.1', 443))],
|
||||
[(AF_INET6, SOCK_STREAM, 6, '', ('::1', 443, 0, 0))],
|
||||
[(AF_INET6, SOCK_STREAM, 6, '', ('fe80::1', 443, 0, 0))],
|
||||
[(AF_INET6, SOCK_STREAM, 6, '', ('ff00::1', 443, 0, 0))],
|
||||
[(AF_INET6, SOCK_STREAM, 6, '', ('fc00::1', 443, 0, 0))],
|
||||
])
|
||||
def test_dns_resolving_to_local_blocked(res):
|
||||
with mock.patch('socket.getaddrinfo') as mock_addr:
|
||||
mock_addr.return_value = res
|
||||
with pytest.raises(HTTPError, match="Request to (multicast|private|local) address.*"):
|
||||
requests.get("https://example.org", timeout=0.1)
|
||||
with pytest.raises(HTTPError, match="Request to (multicast|private|local) address.*"):
|
||||
requests.get("http://example.org", timeout=0.1)
|
||||
|
||||
|
||||
def test_dns_remote_allowed():
|
||||
class SocketOk(Exception):
|
||||
pass
|
||||
|
||||
def side_effect(*args, **kwargs):
|
||||
raise SocketOk
|
||||
|
||||
with mock.patch('socket.getaddrinfo') as mock_addr, mock.patch('socket.socket') as mock_socket:
|
||||
mock_addr.return_value = [(AF_INET, SOCK_STREAM, 6, '', ('8.8.8.8', 443))]
|
||||
mock_socket.side_effect = side_effect
|
||||
with pytest.raises(SocketOk):
|
||||
requests.get("https://example.org", timeout=0.1)
|
||||
|
||||
|
||||
@override_settings(ALLOW_HTTP_TO_PRIVATE_NETWORKS=True)
|
||||
def test_local_is_allowed():
|
||||
class SocketOk(Exception):
|
||||
pass
|
||||
|
||||
def side_effect(*args, **kwargs):
|
||||
raise SocketOk
|
||||
|
||||
with mock.patch('socket.getaddrinfo') as mock_addr, mock.patch('socket.socket') as mock_socket:
|
||||
mock_addr.return_value = [(AF_INET, SOCK_STREAM, 6, '', ('10.0.0.1', 443))]
|
||||
mock_socket.side_effect = side_effect
|
||||
with pytest.raises(SocketOk):
|
||||
requests.get("https://example.org", timeout=0.1)
|
||||
Reference in New Issue
Block a user