Source code for photos.services
import hashlib
import logging
import os
import tarfile
from zipfile import ZipInfo, is_zipfile, ZipFile
from django.conf import settings
from django.contrib import messages
from django.core.files import File
from django.db.models import When, Value, BooleanField, ExpressionWrapper, Q, Case
from django.http import Http404
from django.utils.translation import gettext_lazy as _
from PIL.JpegImagePlugin import JpegImageFile
from PIL import ExifTags, Image, UnidentifiedImageError
from photos.models import Photo
logger = logging.getLogger(__name__)
[docs]def photo_determine_rotation(pil_image):
EXIF_ORIENTATION = {
1: 0,
2: 0,
3: 180,
4: 180,
5: 90,
6: 90,
7: 270,
8: 270,
}
if isinstance(pil_image, JpegImageFile) and pil_image._getexif():
exif = {
ExifTags.TAGS[k]: v
for k, v in pil_image._getexif().items()
if k in ExifTags.TAGS
}
if exif.get("Orientation"):
return EXIF_ORIENTATION[exif.get("Orientation")]
return 0
[docs]def check_shared_album_token(album, token):
if token != album.access_token:
raise Http404("Invalid token.")
[docs]def is_album_accessible(request, album):
if request.member and request.member.current_membership is not None:
return True
elif request.member and request.member.current_membership is None:
# This user is currently not a member, so need to check if he/she
# can view this album by checking the membership
filter = Q(since__lte=album.date) & Q(until__gte=album.date)
return request.member.membership_set.filter(filter).count() > 0
return False
# Annotate the albums which are accessible by the user
[docs]def get_annotated_accessible_albums(request, albums):
if request.member and request.member.current_membership is not None:
albums = albums.annotate(
accessible=ExpressionWrapper(Value(True), output_field=BooleanField())
)
elif request.member and request.member.current_membership is None:
albums_filter = Q(pk__in=[])
for membership in request.member.membership_set.all():
albums_filter |= Q(date__gte=membership.since) & Q(
date__lte=membership.until
)
albums = albums.annotate(
accessible=Case(
When(albums_filter, then=Value(True)),
default=Value(False),
output_field=BooleanField(),
)
)
else:
albums = albums.annotate(
accessible=ExpressionWrapper(Value(False), output_field=BooleanField())
)
return albums
[docs]def save_photo(photo_obj):
hash_sha1 = hashlib.sha1()
for chunk in iter(lambda: photo_obj.file.read(4096), b""):
hash_sha1.update(chunk)
photo_obj.file.close()
digest = hash_sha1.hexdigest()
photo_obj._digest = digest
original_path = photo_obj.file.path
image = Image.open(original_path)
os.remove(original_path)
if (
Photo.objects.filter(album=photo_obj.album, _digest=digest)
.exclude(pk=photo_obj.pk)
.exists()
):
photo_obj.delete()
return False
image_path, _ext = os.path.splitext(original_path)
image_path = "{}.jpg".format(image_path)
photo_obj.rotation = photo_determine_rotation(image)
# Image.thumbnail does not upscale an image that is smaller
image.thumbnail(settings.PHOTO_UPLOAD_SIZE, Image.ANTIALIAS)
logger.info("Trying to save to %s", image_path)
image.convert("RGB").save(image_path, "JPEG")
photo_obj.original_file = image_path
image_name, _ext = os.path.splitext(photo_obj.file.name)
photo_obj.file.name = "{}.jpg".format(image_name)
photo_obj.save()
return True