Source code for registrations.services

"""The services defined by the registrations package"""
import string
import unicodedata
from typing import Union

from django.conf import settings
from django.contrib.admin.models import LogEntry, CHANGE
from django.contrib.admin.options import get_content_type_for_model
from django.contrib.auth import get_user_model
from django.db.models import Q, QuerySet
from django.utils import timezone

import members
from members.models import Membership, Profile, Member
from payments.models import Payment
from registrations import emails
from registrations.models import Entry, Registration, Renewal
from utils.snippets import datetime_to_lectureyear


def _generate_username(registration: Registration) -> str:
    """
    Create username from first and lastname

    :param registration: Model containing first and last name
    :type registration: Registration
    :return: Created username
    :rtype: str
    """
    username = (registration.first_name[0] + registration.last_name).lower()
    username = "".join(c for c in username if c.isalpha())
    username = "".join(
        c for c in unicodedata.normalize("NFKD", username) if c in string.ascii_letters
    ).lower()

    # Limit length to 150 characters since Django doesn't support longer
    if len(username) > 150:
        username = username[:150]

    return username


[docs]def check_unique_user(entry: Entry) -> bool: """ Check that the username and email address of the entry are unique. :param entry: Registration entry :type entry: Entry :return: True if unique, False if not unique :rtype: boolean """ try: registration = entry.registration username = _generate_username(registration) if ( get_user_model().objects.filter(username=username).exists() and registration.username is not None ): username = registration.username return not ( get_user_model() .objects.filter(Q(email=registration.email) | Q(username=username)) .exists() ) except Registration.DoesNotExist: pass return True
[docs]def confirm_entry(queryset: QuerySet) -> int: """ Confirm all entries in the queryset :param queryset: queryset of entries :type queryset: Queryset[Entry] :return: number of updated rows :rtype: integer """ queryset = queryset.filter(status=Entry.STATUS_CONFIRM) rows_updated = queryset.update( status=Entry.STATUS_REVIEW, updated_at=timezone.now() ) return rows_updated
[docs]def reject_entries(user_id: int, queryset: QuerySet) -> int: """ Reject all entries in the queryset :param user_id: Id of the user executing this action :param queryset: queryset of entries :type queryset: Queryset[Entry] :return: number of updated rows :rtype: integer """ queryset = queryset.filter(status=Entry.STATUS_REVIEW) entries = list(queryset.all()) rows_updated = queryset.update( status=Entry.STATUS_REJECTED, updated_at=timezone.now() ) for entry in entries: log_obj = None try: emails.send_registration_rejected_message(entry.registration) log_obj = entry.registration except Registration.DoesNotExist: try: emails.send_renewal_rejected_message(entry.renewal) log_obj = entry.renewal except Renewal.DoesNotExist: pass if log_obj: LogEntry.objects.log_action( user_id=user_id, content_type_id=get_content_type_for_model(log_obj).pk, object_id=log_obj.pk, object_repr=str(log_obj), action_flag=CHANGE, change_message="Changed status to rejected", ) return rows_updated
[docs]def accept_entries(user_id: int, queryset: QuerySet) -> int: """ Accept all entries in the queryset :param user_id: Id of the user executing this action :param queryset: queryset of entries :type queryset: Queryset[Entry] :return: number of updated rows :rtype: integer """ queryset = queryset.filter(status=Entry.STATUS_REVIEW) entries = queryset.all() updated_entries = [] for entry in entries: # Check if the user is unique if not check_unique_user(entry): # User is not unique, do not proceed continue entry.status = Entry.STATUS_ACCEPTED entry.updated_at = timezone.now() entry.payment = _create_payment_for_entry(entry) log_obj = None try: if entry.registration.username is None: entry.registration.username = _generate_username(entry.registration) entry.registration.save() emails.send_registration_accepted_message(entry.registration, entry.payment) log_obj = entry.registration except Registration.DoesNotExist: try: emails.send_renewal_accepted_message(entry.renewal, entry.payment) log_obj = entry.renewal except Renewal.DoesNotExist: pass if log_obj: LogEntry.objects.log_action( user_id=user_id, content_type_id=get_content_type_for_model(log_obj).pk, object_id=log_obj.pk, object_repr=str(log_obj), action_flag=CHANGE, change_message="Change status to approved", ) entry.save() updated_entries.append(entry.pk) return len(updated_entries)
[docs]def revert_entry(user_id: int, entry: Entry) -> None: """ Revert status of entry to review so that it can be corrected :param user_id: Id of the user executing this action :param entry: Entry that should be reverted """ if not (entry.status in [Entry.STATUS_ACCEPTED, Entry.STATUS_REJECTED]): return payment = entry.payment entry.status = Entry.STATUS_REVIEW entry.updated_at = timezone.now() entry.payment = None entry.save() if payment is not None: payment.delete() log_obj = None try: log_obj = entry.registration except Registration.DoesNotExist: try: log_obj = entry.renewal except Renewal.DoesNotExist: pass if log_obj: LogEntry.objects.log_action( user_id=user_id, content_type_id=get_content_type_for_model(log_obj).pk, object_id=log_obj.pk, object_repr=str(log_obj), action_flag=CHANGE, change_message="Revert status to review", )
def _create_payment_for_entry(entry: Entry) -> Payment: """ Create payment model for entry :param entry: Registration or Renewal model :type entry: Entry :return: Payment connected to the entry with the right price :rtype: Payment """ amount = settings.MEMBERSHIP_PRICES[entry.length] if entry.contribution and entry.membership_type == Membership.BENEFACTOR: amount = entry.contribution notes = f"Membership registration. {entry.get_membership_type_display()}." topic = f"Member registration [{entry.membership_type.upper()}]" try: renewal = entry.renewal membership = renewal.member.latest_membership notes = f"Membership renewal. {entry.get_membership_type_display()}." topic = f"Member renewal [{entry.membership_type.upper()}]" # Having a latest membership which has an until date implies that this # membership lasts/lasted till the end of the lecture year # This means it's possible to renew the 'year' membership # to a 'study' membership and the price should be adjusted since # it is considered an upgrade without paying twice # The rules for this behaviour are taken from the HR # Since it is possible for people to renew their membership # but processing to occur _after_ the membership ended # we're checking if that is the case so that these members # still get the discount price if ( membership is not None and membership.until is not None and entry.created_at.date() < membership.until and renewal.length == Entry.MEMBERSHIP_STUDY ): amount = ( settings.MEMBERSHIP_PRICES[Entry.MEMBERSHIP_STUDY] - settings.MEMBERSHIP_PRICES[Entry.MEMBERSHIP_YEAR] ) except Renewal.DoesNotExist: pass return Payment.objects.create(amount=amount, notes=notes, topic=topic) def _create_member_from_registration(registration: Registration) -> Member: """ Create User and Member model from Registration :param registration: Registration model :type registration: Registration :return: Created member object :rtype: Member """ # Generate random password for user that we can send to the new user password = get_user_model().objects.make_random_password(length=15) # Make sure the username and email are unique if not check_unique_user(registration): raise ValueError("Username or email address of the registration are not unique") # Create user user = get_user_model().objects.create_user( username=registration.username.lower(), email=registration.email, password=password, first_name=registration.first_name, last_name=registration.last_name, ) # Add profile to created user Profile.objects.create( user=user, programme=registration.programme, student_number=registration.student_number, starting_year=registration.starting_year, address_street=registration.address_street, address_street2=registration.address_street2, address_postal_code=registration.address_postal_code, address_city=registration.address_city, address_country=registration.address_country, phone_number=registration.phone_number, birthday=registration.birthday, language=registration.language, show_birthday=registration.optin_birthday, receive_optin=registration.optin_mailinglist, ) # Send welcome message to new member members.emails.send_welcome_message(user, password, registration.language) return Member.objects.get(pk=user.pk)
[docs]def calculate_membership_since() -> timezone.datetime: """ Calculate the start date of a membership If it's August we act as if it's the next lecture year already and we start new memberships in September :return: """ since = timezone.now().date() if timezone.now().month == 8: since = since.replace(month=9, day=1) return since
def _create_membership_from_entry( entry: Entry, member: Member = None ) -> Union[Membership, None]: """ Create or update Membership model based on Entry model information :param entry: Entry model :type entry: Entry :return: The created or updated membership :rtype: Membership """ lecture_year = datetime_to_lectureyear(timezone.now()) since = calculate_membership_since() until = None if timezone.now().month == 8: lecture_year += 1 if entry.length == Entry.MEMBERSHIP_YEAR: # If entry is Renewal set since to current membership until + 1 day # Unless there is no current membership try: member = entry.renewal.member membership = member.current_membership if membership is not None: if membership.until is None: raise ValueError( "This member already has a never ending membership" ) since = membership.until except Renewal.DoesNotExist: pass until = timezone.datetime(year=lecture_year + 1, month=9, day=1).date() elif entry.length == Entry.MEMBERSHIP_STUDY: try: renewal = entry.renewal member = renewal.member membership = member.latest_membership # Having a latest membership which has an until date implies that # this membership last(s/ed) till the end of the lecture year # This means it's possible to renew the 'year' membership # to a 'study' membership thus the until date should now be None # and no new membership is needed. # The rules for this behaviour are taken from the HR if membership is not None: if membership.until is None: raise ValueError( "This member already has a never ending membership" ) if entry.created_at.date() < membership.until: membership.until = None membership.save() return membership except Renewal.DoesNotExist: pass else: return None return Membership.objects.create( user=member, since=since, until=until, type=entry.membership_type )
[docs]def process_payment(payment: Payment) -> None: """ Process the payment for the entry and send the right emails :param payment: The payment that should be processed :type payment: Payment """ if not payment.processed: return try: entry = payment.registrations_entry except Entry.DoesNotExist: return if entry.status != Entry.STATUS_ACCEPTED: return member = None try: registration = entry.registration # Create user and member member = _create_member_from_registration(registration) except Registration.DoesNotExist: try: # Get member from renewal renewal = entry.renewal member = renewal.member # Send email of payment confirmation for renewal, # not needed for registration since a new member already # gets the welcome email emails.send_renewal_complete_message(entry.renewal) except Renewal.DoesNotExist: pass # If member was retrieved, then create a new membership if member is not None: Payment.objects.filter(pk=payment.pk).update(paid_by=member) membership = _create_membership_from_entry(entry, member) entry.membership = membership entry.status = Entry.STATUS_COMPLETED entry.save()
[docs]def execute_data_minimisation(dry_run=False): """ Delete completed or rejected registrations that were modified at least 31 days ago :param dry_run: does not really remove data if True :return: number of removed registrations """ deletion_period = timezone.now() - timezone.timedelta(days=31) objects = Entry.objects.filter( (Q(status=Entry.STATUS_COMPLETED) | Q(status=Entry.STATUS_REJECTED)) & Q(updated_at__lt=deletion_period) ) if dry_run: return objects.count() return objects.delete()[0]