Source code for registrations.services

"""The services defined by the registrations package."""
import string
import unicodedata
from typing import Union

from django.contrib.admin.models import LogEntry, CHANGE
from django.contrib.admin.options import get_content_type_for_model
from django.contrib.auth import get_user_model
from django.db.models import Q, QuerySet
from django.utils import timezone

import members
from members.models import Membership, Profile, Member
from registrations import emails
from registrations.models import Entry, Registration, Renewal
from utils.snippets import datetime_to_lectureyear


def _generate_username(registration: Registration) -> str:
    """Create username from first and lastname.

    :param registration: Model containing first and last name
    :type registration: Registration
    :return: Created username
    :rtype: str
    """
    username = (registration.first_name[0] + registration.last_name).lower()
    username = "".join(c for c in username if c.isalpha())
    username = "".join(
        c for c in unicodedata.normalize("NFKD", username) if c in string.ascii_letters
    ).lower()

    # Limit length to 150 characters since Django doesn't support longer
    if len(username) > 150:
        username = username[:150]

    return username


[docs]def check_unique_user(entry: Entry) -> bool: """Check that the username and email address of the entry are unique. :param entry: Registration entry :type entry: Entry :return: True if unique, False if not unique :rtype: boolean """ try: registration = entry.registration username = _generate_username(registration) if ( get_user_model().objects.filter(username=username).exists() and registration.username is not None ): username = registration.username return not ( get_user_model() .objects.filter(Q(email=registration.email) | Q(username=username)) .exists() ) except Registration.DoesNotExist: pass return True
[docs]def confirm_entry(queryset: QuerySet) -> int: """Confirm all entries in the queryset. :param queryset: queryset of entries :type queryset: Queryset[Entry] :return: number of updated rows :rtype: integer """ queryset = queryset.filter(status=Entry.STATUS_CONFIRM) rows_updated = queryset.update( status=Entry.STATUS_REVIEW, updated_at=timezone.now() ) return rows_updated
[docs]def reject_entries(user_id: int, queryset: QuerySet) -> int: """Reject all entries in the queryset. :param user_id: Id of the user executing this action :param queryset: queryset of entries :type queryset: Queryset[Entry] :return: number of updated rows :rtype: integer """ queryset = queryset.filter(status=Entry.STATUS_REVIEW) entries = list(queryset.all()) rows_updated = queryset.update( status=Entry.STATUS_REJECTED, updated_at=timezone.now() ) for entry in entries: log_obj = None try: emails.send_registration_rejected_message(entry.registration) log_obj = entry.registration except Registration.DoesNotExist: try: emails.send_renewal_rejected_message(entry.renewal) log_obj = entry.renewal except Renewal.DoesNotExist: pass if log_obj: LogEntry.objects.log_action( user_id=user_id, content_type_id=get_content_type_for_model(log_obj).pk, object_id=log_obj.pk, object_repr=str(log_obj), action_flag=CHANGE, change_message="Changed status to rejected", ) return rows_updated
[docs]def accept_entries(user_id: int, queryset: QuerySet) -> int: """Accept all entries in the queryset. :param user_id: Id of the user executing this action :param queryset: queryset of entries :type queryset: Queryset[Entry] :return: number of updated rows :rtype: integer """ queryset = queryset.filter(status=Entry.STATUS_REVIEW) entries = queryset.all() updated_entries = [] for entry in entries: # Check if the user is unique if not check_unique_user(entry): # User is not unique, do not proceed continue entry.status = Entry.STATUS_ACCEPTED entry.updated_at = timezone.now() log_obj = None try: if entry.registration.username is None: entry.registration.username = _generate_username(entry.registration) entry.registration.save() emails.send_registration_accepted_message(entry.registration) log_obj = entry.registration except Registration.DoesNotExist: try: emails.send_renewal_accepted_message(entry.renewal) log_obj = entry.renewal except Renewal.DoesNotExist: pass if log_obj: LogEntry.objects.log_action( user_id=user_id, content_type_id=get_content_type_for_model(log_obj).pk, object_id=log_obj.pk, object_repr=str(log_obj), action_flag=CHANGE, change_message="Change status to approved", ) entry.save() updated_entries.append(entry.pk) return len(updated_entries)
[docs]def revert_entry(user_id: int, entry: Entry) -> None: """Revert status of entry to review so that it can be corrected. :param user_id: Id of the user executing this action :param entry: Entry that should be reverted """ if not (entry.status in [Entry.STATUS_ACCEPTED, Entry.STATUS_REJECTED]): return payment = entry.payment entry.status = Entry.STATUS_REVIEW entry.updated_at = timezone.now() entry.payment = None entry.save() if payment is not None: payment.delete() log_obj = None try: log_obj = entry.registration except Registration.DoesNotExist: try: log_obj = entry.renewal except Renewal.DoesNotExist: pass if log_obj: LogEntry.objects.log_action( user_id=user_id, content_type_id=get_content_type_for_model(log_obj).pk, object_id=log_obj.pk, object_repr=str(log_obj), action_flag=CHANGE, change_message="Revert status to review", )
def _create_member_from_registration(registration: Registration) -> Member: """Create User and Member model from Registration. :param registration: Registration model :type registration: Registration :return: Created member object :rtype: Member """ # Generate random password for user that we can send to the new user password = get_user_model().objects.make_random_password(length=15) # Make sure the username and email are unique if not check_unique_user(registration): raise ValueError("Username or email address of the registration are not unique") # Create user user = get_user_model().objects.create_user( username=registration.username.lower(), email=registration.email, password=password, first_name=registration.first_name, last_name=registration.last_name, ) # Add profile to created user Profile.objects.create( user=user, programme=registration.programme, student_number=registration.student_number, starting_year=registration.starting_year, address_street=registration.address_street, address_street2=registration.address_street2, address_postal_code=registration.address_postal_code, address_city=registration.address_city, address_country=registration.address_country, phone_number=registration.phone_number, birthday=registration.birthday, show_birthday=registration.optin_birthday, receive_optin=registration.optin_mailinglist, ) # Send welcome message to new member members.emails.send_welcome_message(user, password, registration.language) return Member.objects.get(pk=user.pk)
[docs]def calculate_membership_since() -> timezone.datetime: """Calculate the start date of a membership. If it's August we act as if it's the next lecture year already and we start new memberships in September :return: """ since = timezone.now().date() if timezone.now().month == 8: since = since.replace(month=9, day=1) return since
def _create_membership_from_entry( entry: Entry, member: Member = None ) -> Union[Membership, None]: """Create or update Membership model based on Entry model information. :param entry: Entry model :type entry: Entry :return: The created or updated membership :rtype: Membership """ lecture_year = datetime_to_lectureyear(timezone.now()) since = calculate_membership_since() until = None if timezone.now().month == 8: lecture_year += 1 if entry.length == Entry.MEMBERSHIP_STUDY: try: renewal = entry.renewal member = renewal.member membership = member.latest_membership # Having a latest membership which has an until date implies that # this membership last(s/ed) till the end of the lecture year # This means it's possible to renew the 'year' membership # to a 'study' membership thus the until date should now be None # and no new membership is needed. # The rules for this behaviour are taken from the HR if membership is not None: if membership.until is None: raise ValueError( "This member already has a never ending membership" ) if entry.created_at.date() < membership.until: membership.until = None membership.save() return membership except Renewal.DoesNotExist: pass else: # If entry is Renewal set since to current membership until + 1 day # Unless there is no current membership try: member = entry.renewal.member membership = member.current_membership if membership is not None: if membership.until is None: raise ValueError( "This member already has a never ending membership" ) since = membership.until except Renewal.DoesNotExist: pass until = timezone.datetime(year=lecture_year + 1, month=9, day=1).date() return Membership.objects.create( user=member, since=since, until=until, type=entry.membership_type )
[docs]def process_entry_save(entry: Entry) -> None: """Once an entry is saved, process the entry if it is paid. :param entry: The entry that should be processed :type entry: Entry """ if not entry or not entry.payment: return if entry.status != Entry.STATUS_ACCEPTED: return try: registration = entry.registration # Create user and member member = _create_member_from_registration(registration) except Registration.DoesNotExist: # Get member from renewal renewal = entry.renewal member = renewal.member # Send email of payment confirmation for renewal, # not needed for registration since a new member already # gets the welcome email emails.send_renewal_complete_message(entry.renewal) entry.payment.paid_by = member # This should actually be a PaymentUser, but as PaymentUser is a proxy model of Member, this doesn't break entry.payment.save() membership = _create_membership_from_entry(entry, member) entry.membership = membership entry.status = Entry.STATUS_COMPLETED entry.save()
[docs]def execute_data_minimisation(dry_run=False): """Delete completed or rejected registrations that were modified at least 31 days ago. :param dry_run: does not really remove data if True :return: number of removed registrations """ deletion_period = timezone.now() - timezone.timedelta(days=31) objects = Entry.objects.filter( (Q(status=Entry.STATUS_COMPLETED) | Q(status=Entry.STATUS_REJECTED)) & Q(updated_at__lt=deletion_period) ) if dry_run: return objects.count() return objects.delete()[0]