Compare commits

..

1 Commits

Author SHA1 Message Date
Raphael Michel 9ebd5d83ca RejectInvalidInputMiddleware: Ignore invalid charsets on body 2026-06-25 17:42:42 +02:00
5 changed files with 11 additions and 15 deletions
-5
View File
@@ -57,8 +57,6 @@ logger = logging.getLogger('pretix.base.email')
T = TypeVar("T", bound=EmailBackend)
_cgnat_net = ipaddress.ip_network('100.64.0.0/10')
def test_custom_smtp_backend(backend: T, from_addr: str) -> None:
try:
@@ -255,15 +253,12 @@ def create_connection(address, timeout=socket.getdefaulttimeout(),
if not getattr(settings, "MAIL_CUSTOM_SMTP_ALLOW_PRIVATE_NETWORKS", False):
ip_addr = ipaddress.ip_address(sa[0])
check_ip4 = ip_addr.ipv4_mapped if getattr(ip_addr, "ipv4_mapped", None) else ip_addr
if ip_addr.is_multicast:
raise socket.error(f"Request to multicast address {sa[0]} blocked")
if ip_addr.is_loopback or ip_addr.is_link_local:
raise socket.error(f"Request to local address {sa[0]} blocked")
if ip_addr.is_private:
raise socket.error(f"Request to private address {sa[0]} blocked")
if check_ip4 in _cgnat_net:
raise socket.error(f"Request to RFC 6598 address {sa[0]} blocked")
sock = None
try:
+10 -2
View File
@@ -370,8 +370,16 @@ class RejectInvalidInputMiddleware(MiddlewareMixin):
if "\x00" in request.META['QUERY_STRING'] or "%00" in request.META['QUERY_STRING']:
raise BadRequest("Invalid characters in input.")
if request.method in ('POST', 'PUT', 'PATCH') and request.content_type == "application/x-www-form-urlencoded":
if any("\x00" in value for key, value_list in request.POST.lists() for value in value_list):
raise BadRequest("Invalid characters in input.")
try:
post_data = request.POST.lists()
except BadRequest:
# Reading request.POST wasn't possible, probably an invalid charset. Django will crash once we actually
# use request.POST, but if we don't, let's not crash it (required for some weird payment provider
# webhooks, e.g. computop).
pass
else:
if any("\x00" in value for key, value_list in post_data for value in value_list):
raise BadRequest("Invalid characters in input.")
class CustomCommonMiddleware(CommonMiddleware):
+1 -2
View File
@@ -151,14 +151,13 @@ def monkeypatch_urllib3_ssrf_protection():
if not getattr(settings, "ALLOW_HTTP_TO_PRIVATE_NETWORKS", False):
ip_addr = ipaddress.ip_address(sa[0])
check_ip4 = ip_addr.ipv4_mapped if getattr(ip_addr, "ipv4_mapped", None) else ip_addr
if ip_addr.is_multicast:
raise HTTPError(f"Request to multicast address {sa[0]} blocked")
if ip_addr.is_loopback or ip_addr.is_link_local:
raise HTTPError(f"Request to local address {sa[0]} blocked")
if ip_addr.is_private:
raise HTTPError(f"Request to private address {sa[0]} blocked")
if check_ip4 in _cgnat_net:
if ip_addr in _cgnat_net:
raise HTTPError(f"Request to RFC 6598 address {sa[0]} blocked")
sock = None
-3
View File
@@ -602,13 +602,10 @@ PRIVATE_IPS_RES = [
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('127.1.1.1', 443))],
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('192.168.5.3', 443))],
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('224.0.0.1', 443))],
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('100.64.0.1', 443))],
[(socket.AF_INET, socket.SOCK_STREAM, 6, '', ('100.100.100.100', 443))],
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('::1', 443, 0, 0))],
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('fe80::1', 443, 0, 0))],
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('ff00::1', 443, 0, 0))],
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('fc00::1', 443, 0, 0))],
[(socket.AF_INET6, socket.SOCK_STREAM, 6, '', ('::ffff:100.64.0.1', 443, 0, 0))],
]
-3
View File
@@ -43,8 +43,6 @@ def test_private_ip_blocked():
requests.get("https://10.0.0.1", timeout=0.1)
with pytest.raises(HTTPError, match="Request to RFC 6598 address.*"):
requests.get("https://100.100.100.100", timeout=0.1)
with pytest.raises(HTTPError, match="Request to RFC 6598 address.*"):
requests.get("https://[::ffff:100.64.0.1]", timeout=0.1)
@pytest.mark.django_db
@@ -60,7 +58,6 @@ def test_private_ip_blocked():
[(AF_INET6, SOCK_STREAM, 6, '', ('fe80::1', 443, 0, 0))],
[(AF_INET6, SOCK_STREAM, 6, '', ('ff00::1', 443, 0, 0))],
[(AF_INET6, SOCK_STREAM, 6, '', ('fc00::1', 443, 0, 0))],
[(AF_INET6, SOCK_STREAM, 6, "", ("::ffff:100.64.0.1", 443, 0, 0))],
])
def test_dns_resolving_to_local_blocked(res):
with mock.patch('socket.getaddrinfo') as mock_addr: