forked from CGM_Public/pretix_original
Send notifications about login with new client or country (#4032)
* Send notifications about login with new client or country * Rebase migration * Remove immediately * Fix isort * Text update
This commit is contained in:
48
src/pretix/base/migrations/0261_userknownloginsource.py
Normal file
48
src/pretix/base/migrations/0261_userknownloginsource.py
Normal file
@@ -0,0 +1,48 @@
|
||||
# Generated by Django 4.2.10 on 2024-04-02 15:37
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
|
||||
import pretix.helpers.countries
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("pretixbase", "0260_alter_reusablemedium_index_together"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name="UserKnownLoginSource",
|
||||
fields=[
|
||||
(
|
||||
"id",
|
||||
models.BigAutoField(
|
||||
auto_created=True, primary_key=True, serialize=False
|
||||
),
|
||||
),
|
||||
("agent_type", models.CharField(max_length=255, null=True)),
|
||||
("device_type", models.CharField(max_length=255, null=True)),
|
||||
("os_type", models.CharField(max_length=255, null=True)),
|
||||
(
|
||||
"country",
|
||||
pretix.helpers.countries.FastCountryField(
|
||||
countries=pretix.helpers.countries.CachedCountries,
|
||||
max_length=2,
|
||||
null=True,
|
||||
),
|
||||
),
|
||||
("last_seen", models.DateTimeField()),
|
||||
(
|
||||
"user",
|
||||
models.ForeignKey(
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
related_name="known_login_sources",
|
||||
to=settings.AUTH_USER_MODEL,
|
||||
),
|
||||
),
|
||||
],
|
||||
),
|
||||
]
|
||||
@@ -56,6 +56,7 @@ from webauthn.helpers.structs import PublicKeyCredentialDescriptor
|
||||
from pretix.base.i18n import language
|
||||
from pretix.helpers.urls import build_absolute_uri
|
||||
|
||||
from ...helpers.countries import FastCountryField
|
||||
from ...helpers.u2f import pub_key_from_der, websafe_decode
|
||||
from .base import LoggingMixin
|
||||
|
||||
@@ -579,6 +580,15 @@ class User(AbstractBaseUser, PermissionsMixin, LoggingMixin):
|
||||
self.save(update_fields=['session_token'])
|
||||
|
||||
|
||||
class UserKnownLoginSource(models.Model):
|
||||
user = models.ForeignKey('User', on_delete=models.CASCADE, related_name="known_login_sources")
|
||||
agent_type = models.CharField(max_length=255, null=True, blank=True)
|
||||
device_type = models.CharField(max_length=255, null=True, blank=True)
|
||||
os_type = models.CharField(max_length=255, null=True, blank=True)
|
||||
country = FastCountryField(null=True, blank=True)
|
||||
last_seen = models.DateTimeField()
|
||||
|
||||
|
||||
class StaffSession(models.Model):
|
||||
user = models.ForeignKey('User', on_delete=models.PROTECT)
|
||||
date_start = models.DateTimeField(auto_now_add=True)
|
||||
|
||||
@@ -31,6 +31,7 @@ from pretix.base.models import CachedCombinedTicket, CachedTicket
|
||||
from pretix.base.models.customers import CustomerSSOGrant
|
||||
|
||||
from ..models import CachedFile, CartPosition, InvoiceAddress
|
||||
from ..models.auth import UserKnownLoginSource
|
||||
from ..signals import periodic_task
|
||||
|
||||
|
||||
@@ -75,3 +76,9 @@ def clearsessions(sender, **kwargs):
|
||||
@scopes_disabled()
|
||||
def clear_oidc_data(sender, **kwargs):
|
||||
CustomerSSOGrant.objects.filter(expires__lt=now() - timedelta(days=14)).delete()
|
||||
|
||||
|
||||
@receiver(signal=periodic_task)
|
||||
@scopes_disabled()
|
||||
def clear_old_login_sources(sender, **kwargs):
|
||||
UserKnownLoginSource.objects.filter(last_seen__lt=now() - timedelta(days=365)).delete()
|
||||
|
||||
@@ -455,6 +455,8 @@ def pretixcontrol_logentry_display(sender: Event, logentry: LogEntry, **kwargs):
|
||||
'pretix.event.export.schedule.executed': _('A scheduled export has been executed.'),
|
||||
'pretix.event.export.schedule.failed': _('A scheduled export has failed: {reason}.'),
|
||||
'pretix.control.auth.user.created': _('The user has been created.'),
|
||||
'pretix.control.auth.user.new_source': _('A first login using {agent_type} on {os_type} from {country} has '
|
||||
'been detected.'),
|
||||
'pretix.user.settings.2fa.enabled': _('Two-factor authentication has been enabled.'),
|
||||
'pretix.user.settings.2fa.disabled': _('Two-factor authentication has been disabled.'),
|
||||
'pretix.user.settings.2fa.regenemergency': _('Your two-factor emergency codes have been regenerated.'),
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
{% load i18n %}{% blocktrans with url=url|safe os=source.os_type agent=source.agent_type %}Hello,
|
||||
|
||||
a login to your {{ instance }} account from an unusual or new location was detected. The login was performed using {{ agent }} on {{ os }} from {{ country }}.
|
||||
|
||||
If this was you, you can safely ignore this email.
|
||||
|
||||
If this was not you, we recommend that you change your password in your account settings:
|
||||
|
||||
{{ url }}
|
||||
|
||||
Best regards,
|
||||
Your {{ instance }} team
|
||||
{% endblocktrans %}
|
||||
@@ -67,6 +67,7 @@ from pretix.base.metrics import pretix_failed_logins, pretix_successful_logins
|
||||
from pretix.base.models import TeamInvite, U2FDevice, User, WebAuthnDevice
|
||||
from pretix.base.services.mail import SendMailException
|
||||
from pretix.helpers.http import get_client_ip, redirect_to_url
|
||||
from pretix.helpers.security import handle_login_source
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -91,6 +92,7 @@ def process_login(request, user, keep_logged_in):
|
||||
else:
|
||||
logger.info(f"Backend login successful for user {user.pk}.")
|
||||
pretix_successful_logins.inc(1)
|
||||
handle_login_source(user, request)
|
||||
auth_login(request, user)
|
||||
request.session['pretix_auth_login_time'] = int(time.time())
|
||||
if next_url and url_has_allowed_host_and_scheme(next_url, allowed_hosts=None):
|
||||
@@ -532,9 +534,10 @@ class Login2FAView(TemplateView):
|
||||
valid = match_token(self.user, token)
|
||||
|
||||
if valid:
|
||||
auth_login(request, self.user)
|
||||
logger.info(f"Backend login successful for user {self.user.pk} with 2FA.")
|
||||
pretix_successful_logins.inc(1)
|
||||
handle_login_source(self.user, request)
|
||||
auth_login(request, self.user)
|
||||
request.session['pretix_auth_login_time'] = int(time.time())
|
||||
del request.session['pretix_auth_2fa_user']
|
||||
del request.session['pretix_auth_2fa_time']
|
||||
|
||||
@@ -26,9 +26,15 @@ import time
|
||||
from django.conf import settings
|
||||
from django.contrib.gis.geoip2 import GeoIP2
|
||||
from django.core.cache import cache
|
||||
from django.utils.timezone import now
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from django_countries.fields import Country
|
||||
from geoip2.errors import AddressNotFoundError
|
||||
|
||||
from pretix.base.i18n import language
|
||||
from pretix.base.services.mail import SendMailException, mail
|
||||
from pretix.helpers.http import get_client_ip
|
||||
from pretix.helpers.urls import build_absolute_uri
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -119,3 +125,55 @@ def assert_session_valid(request):
|
||||
raise Session2FASetupRequired()
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def handle_login_source(user, request):
|
||||
from ua_parser import user_agent_parser
|
||||
|
||||
parsed_string = user_agent_parser.Parse(request.headers.get("User-Agent", ""))
|
||||
country = None
|
||||
|
||||
if settings.HAS_GEOIP:
|
||||
client_ip = get_client_ip(request)
|
||||
hashed_client_ip = hashlib.sha256(client_ip.encode()).hexdigest()
|
||||
country = cache.get_or_set(f'geoip_country_{hashed_client_ip}', lambda: _get_country(request), timeout=300)
|
||||
if country == "None":
|
||||
country = None
|
||||
|
||||
src, created = user.known_login_sources.update_or_create(
|
||||
agent_type=parsed_string.get("user_agent").get("family"),
|
||||
os_type=parsed_string.get("os").get("family"),
|
||||
device_type=parsed_string.get("device").get("family"),
|
||||
country=country,
|
||||
defaults={
|
||||
"last_seen": now(),
|
||||
}
|
||||
)
|
||||
|
||||
if created:
|
||||
user.log_action('pretix.control.auth.user.new_source', user=user, data={
|
||||
"agent_type": src.agent_type,
|
||||
"os_type": src.os_type,
|
||||
"device_type": src.device_type,
|
||||
"country": str(src.country) if src.country else "?",
|
||||
})
|
||||
if user.known_login_sources.count() > 1:
|
||||
# Do not send on first login or first login after introduction of this feature:
|
||||
try:
|
||||
with language(user.locale):
|
||||
mail(
|
||||
user.email,
|
||||
_('Login from new source detected'),
|
||||
'pretixcontrol/email/login_notice.txt',
|
||||
{
|
||||
'source': src,
|
||||
'country': Country(str(country)).name if country else _('Unknown country'),
|
||||
'instance': settings.PRETIX_INSTANCE_NAME,
|
||||
'url': build_absolute_uri('control:user.settings')
|
||||
},
|
||||
event=None,
|
||||
user=user,
|
||||
locale=user.locale
|
||||
)
|
||||
except SendMailException:
|
||||
pass # Not much we can do
|
||||
|
||||
Reference in New Issue
Block a user