Source code for activemembers.gsuite
import hashlib
import logging
from base64 import b16encode
from django.utils.translation import gettext_lazy as _, override as lang_override
from googleapiclient.errors import HttpError
from members.models import Member
from utils.google_api import get_directory_api
from django.conf import settings
logger = logging.getLogger(__name__)
[docs]class GSuiteUserService:
def __init__(self, directory_api=get_directory_api()):
super().__init__()
self.directory_api = directory_api
[docs] def create_user(self, member: Member):
"""
Create a new GSuite user based on the provided data
:param member: The member that gets an account
:return returns a tuple with the password and id of the created user
"""
plain_password = Member.objects.make_random_password(15)
digest_password = hashlib.sha1(plain_password.encode("utf-8")).digest()
encoded_password = b16encode(digest_password).decode("utf-8")
try:
response = (
self.directory_api.users()
.insert(
body={
"name": {
"familyName": member.last_name,
"givenName": member.first_name,
},
"primaryEmail": f"{member.username}@{settings.GSUITE_MEMBERS_DOMAIN}",
"password": encoded_password,
"hashFunction": "SHA-1",
"changePasswordAtNextLogin": "true",
"externalIds": [{"value": f"{member.pk}", "type": "login_id"}],
"includeInGlobalAddressList": "false",
"orgUnitPath": "/",
},
)
.execute()
)
except HttpError as e:
if e.resp.status == 409:
return self.update_user(member, member.username)
raise e
return response["primaryEmail"], plain_password
[docs] def update_user(self, member: Member, username: str):
response = (
self.directory_api.users()
.patch(
body={
"suspended": "false",
"primaryEmail": f"{member.username}@{settings.GSUITE_MEMBERS_DOMAIN}",
},
userKey=f"{username}@{settings.GSUITE_MEMBERS_DOMAIN}",
)
.execute()
)
if username != member.username:
self.directory_api.users().aliases().delete(
userKey=f"{member.username}@{settings.GSUITE_MEMBERS_DOMAIN}",
alias=f"{username}@{settings.GSUITE_MEMBERS_DOMAIN}",
).execute()
with lang_override(member.profile.language):
password = _("known by the user")
return response["primaryEmail"], password
[docs] def suspend_user(self, username):
"""
Suspends the user in GSuite
:param username: username of the user
"""
self.directory_api.users().patch(
body={"suspended": "true",},
userKey=f"{username}@{settings.GSUITE_MEMBERS_DOMAIN}",
).execute()
[docs] def delete_user(self, email):
"""
Deletes the user from GSuite
:param email: primary email of the user
"""
self.directory_api.users().delete(userKey=email).execute()
[docs] def get_suspended_users(self):
"""
Get all the suspended users
:return:
"""
response = (
self.directory_api.users()
.list(domain=settings.GSUITE_MEMBERS_DOMAIN, query="isSuspended=true")
.execute()
)
return response.get("users", [])