# coding=utf-8 from __future__ import unicode_literals import json import os from django.contrib.auth.views import redirect_to_login from django.views.decorators.csrf import csrf_exempt import ldap import logging from django.conf import settings from django.core.exceptions import PermissionDenied from django.core.urlresolvers import reverse from django.core.cache import cache from django.db.models import Q from django.http.response import HttpResponse, HttpResponseBadRequest, HttpResponseRedirect, Http404, \ HttpResponseForbidden from django.shortcuts import render, get_object_or_404, resolve_url from django.contrib.auth.decorators import login_required from django.contrib.auth.models import User from django.views.decorators.http import require_POST from django.views.decorators.cache import cache_control from adim.models import AnObj, AnObjMembership from adim.permissions import get_permission_class, has_anobj_access, get_ttp_sharing_mode, SHARING_MODE_NONE from adim_ttp.decorators import attp_login from adim_utils.decorators import clear_function_cache from .forms import UploadImageFileForm from sendfile import sendfile from .utils import add_image_border, create_image_thumbnail logger = logging.getLogger(__name__) def home(request): """ Home page :param request: :return: """ context = {} if request.user.is_anonymous(): context['next'] = request.GET.get('next', "") return render(request, "adim/home.html", context) def handle_404(request): if request.user.is_authenticated(): return HttpResponseRedirect(reverse("adim_app:annotate-new")) else: return HttpResponseRedirect(reverse('adim_app:home')) @login_required def essai(request): """ Page d'essais :param request: :return: """ if settings.DEBUG and request.META.get('REMOTE_ADDR') in settings.INTERNAL_IPS: return render(request, "adim/essai.html", {}) else: raise Http404() @login_required def _get_anobj(request, anobj_uuid=None, anobj_id=None): """ Returns an AnObj with the uuid or id passed in parameters if the request.user has sufficient permissions for it Raise a 404 if it cannot return the AnObj :param request: :param anobj_uuid: :return: AnObj :raise: PermissionDenied """ q = {} if anobj_uuid is not None: q['uuid'] = anobj_uuid elif anobj_id is not None: q['id'] = anobj_id if not q: raise Http404() anobj = get_object_or_404(AnObj, **q) if not has_anobj_access(request, anobj): raise PermissionDenied() return anobj @login_required @cache_control(public=True, max_age=120) def send_anobj_img(request, anobj_uuid): try: anobj = _get_anobj(request, anobj_uuid=anobj_uuid) return sendfile(request, anobj.image.path) except Http404: return HttpResponseForbidden('Sorry, you cannot access this file') @login_required def anobj_thumb(request, anobj_uuid): if request.method == 'GET': return send_anobj_thumb(request, anobj_uuid=anobj_uuid) elif request.method == 'POST': return upload_anobj_thumb(request, anobj_uuid=anobj_uuid) @login_required @cache_control(public=True, max_age=1) def send_anobj_thumb(request, anobj_uuid): try: anobj = _get_anobj(request, anobj_uuid=anobj_uuid) thumb_name = "{name}__{user}.png".format(name=os.path.splitext(anobj.image.name)[0], user=request.user.id) thumb_path = os.path.join( settings.MEDIA_ROOT, thumb_name ) # If there is not yet a thumbnail for the current user # return the original one if not os.path.isfile(thumb_path): thumb_name = "{name}__.png".format(name=os.path.splitext(anobj.image.name)[0]) thumb_path = os.path.join( settings.MEDIA_ROOT, thumb_name ) return sendfile(request, thumb_path) except Http404: return HttpResponseForbidden('Sorry, you cannot access this file') @login_required def annotate_new(request): return render(request, "adim/annotation_new.html", {}) # @login_required def annotate(request, anobj_uuid=None): """ Annotation page :param request: :param anobj_uuid: :return: """ # ----- Some preliminary validations if anobj_uuid is None or len(anobj_uuid) < 8: raise Http404() try: anobj = AnObj.objects.select_related('owner').get(uuid__startswith=anobj_uuid) except AnObj.DoesNotExist: raise Http404() # In case of fragmentary uuid, redirect to the url with full uuid if len(anobj_uuid) < 32: return HttpResponseRedirect(reverse('adim_app:annotate', kwargs={'anobj_uuid': anobj.uuid})) # ----- Login check. Not using decorator so we can delegate to Trusted Third Party if needed permission = get_permission_class(anobj.sharing_mode) if request.user.is_anonymous(): if permission and permission.ttp: check_url = settings.ATTP.get(permission.ttp_id, {}).get('CHECK_URL') return HttpResponseRedirect(check_url.format(uuid=anobj.uuid)) else: return redirect_to_login(resolve_url('adim_app:annotate', anobj_uuid=anobj.uuid)) # ----- Build context context = { 'membership': False } # is_owner = request.user == anobj.owner # is_owner = request.user in anobj.owners.all() is_owner = anobj.is_owned(request.user.id) # ----- Detailed check for permissions membership = None # User is owner and anobj is not shared via Trusted Third Party if is_owner and not (permission and permission.ttp): if anobj.sharing_mode != SHARING_MODE_NONE: membership, _ = AnObjMembership.objects.get_or_create(anobj=anobj, user=request.user) # User is guest or owner and anobj shared via TTP else: if permission is None: # AnObj not shared raise Http404() # raise PermissionDenied() elif not permission.has_permission(request, anobj): # AnObj shared but user has no permission yet if permission.has_interactive_registration: # Interactive registration exists, call it return permission.get_interactive_registration_response(request, anobj) # No interactive registration for this sharing model, deny access raise PermissionDenied() else: # AnObj shared, user authorized and registered pass # TTP permission may have changed ownership if permission.ttp: clear_function_cache(f='adim.models.annotablesis_owned', args=(anobj, request.user.id)) is_owner = anobj.is_owned(request.user.id) membership = AnObjMembership.objects.get(anobj=anobj, user=request.user) # Interactive registration may post credentials, if so redirect to current view with GET method if request.method == 'POST': return HttpResponseRedirect(reverse('adim_app:annotate', kwargs={'anobj_uuid': anobj_uuid})) context.update({ 'is_owner': is_owner, 'membership': membership, 'anobj': anobj }) # ----- Determine if we may display shared annotations if is_owner: owner_membership = membership else: try: owner_membership = AnObjMembership.objects.get(anobj=anobj, user=anobj.owner) except AnObjMembership.DoesNotExist: owner_membership = None context.update({'display_shared_annotations': (anobj.sharing_mode != SHARING_MODE_NONE) and ( anobj.is_owned(request.user.id) or anobj.allow_public_publishing or ( owner_membership and owner_membership.publish_mode == 2 ) ) }) # ----- Environment specific settings template_path = ['adim'] if anobj.env: # select template from adim/env// template_path.extend(['env', anobj.env]) # add env-settings to the context if it exists env_settings = settings.ADIM_ENV.get(anobj.env) if env_settings: context.update({'env_settings': env_settings}) template_path.append("annotation.html") return render(request, "/".join(template_path), context) @csrf_exempt @attp_login @login_required def upload_file(request, anobj_uuid=None): """ -- inspired by: https://github.com/miki725/Django-jQuery-File-Uploader-Integration-demo/blob/master/upload/views.py :param request: :return: """ if request.method != 'POST': return HttpResponseBadRequest() response_type = "application/json" response_data = {} user = request.user # if request.user.is_authenticated() else moodle_meta.get('user') form = UploadImageFileForm(request.POST, request.FILES) if form.is_valid(): image_file = request.FILES['image_file'] file_response = _validate_uploaded_file(image_file) response_data.update({ 'error': file_response.get('error'), 'files': [file_response], }) anobj_name = form.cleaned_data['name'] # Create AnObj if not file_response['error']: anobj = None if anobj_uuid: try: anobj = _get_anobj(request, anobj_uuid=anobj_uuid) except Http404: anobj = None if anobj: anobj.image = image_file if anobj_name: anobj.name = anobj_name anobj.save() else: anobj = AnObj.objects.create( owner=user, name=anobj_name or os.path.splitext(image_file.name)[0], image=image_file ) if hasattr(request, 'attp_message'): ttp_id = request.attp_message.get('attp_id') sharing_mode = get_ttp_sharing_mode(ttp_id=ttp_id) if sharing_mode: anobj.sharing_mode = sharing_mode sharing_opts = request.attp_message.get('sharing_opts') if sharing_opts: anobj.sharing_opts = sharing_opts anobj.save() # Create original thumbnail, returned to user who has not yet annotated create_image_thumbnail(anobj.image.path) response_data['next'] = reverse('adim_app:annotate', kwargs={'anobj_uuid': anobj.uuid}) response_data['uuid'] = anobj.uuid # Needed when using iFrame transport if "text/html" in request.META["HTTP_ACCEPT"]: response_type = "text/html" else: response_data['error'] = "invalid" if 'application/json' in request.META.get('HTTP_ACCEPT', ''): return HttpResponse(json.dumps(response_data), content_type=response_type) else: return HttpResponseRedirect(response_data['next']) def _validate_uploaded_file(image_file): options = { # the maximum file size (must be in bytes) "maxfilesize": settings.ADIM_UPLOAD_MAX_FILESIZE * 2 ** 20, # 2 Mb # the file types which are going to be allowed for upload # must be a mimetype "acceptedformats": ( "image/jpeg", "image/jpg", "image/png", ) } error_id = False if image_file.size > options["maxfilesize"]: error_id = "maxFileSize" # allowed file type if image_file.content_type not in options["acceptedformats"]: error_id = "acceptFileTypes" response_data = { "name": image_file.name, "size": image_file.size, "type": image_file.content_type, "error": error_id, } return response_data @login_required @require_POST def upload_anobj_thumb(request, anobj_uuid=None): if anobj_uuid: anobj = _get_anobj(request, anobj_uuid=anobj_uuid) else: try: anobj_id = int(request.POST.get('aid')) anobj = _get_anobj(request, anobj_id=anobj_id) except TypeError: return HttpResponseBadRequest() thumb_name = "{name}__{user}.png".format(name=os.path.splitext(anobj.image.name)[0], user=request.user.id) # thumb_name = "{name}__.png".format(name=os.path.splitext(anobj.image.name)[0]) thumb_path = os.path.join(settings.MEDIA_ROOT, thumb_name) # thumb_url = reverse('adim.app:ao_thumb', kwargs={'anobj_uuid': anobj.uuid}) thumb_file = request.FILES['file'] response_data = _validate_uploaded_file(thumb_file) if response_data['error']: return HttpResponseBadRequest() with open(thumb_path, 'wb+') as destination: for chunk in thumb_file.chunks(): destination.write(chunk) add_image_border(thumb_path, save=True) return HttpResponse() def _handle_uploaded_file(image_file, destination): destination_path = os.path.join(settings.MEDIA_ROOT, destination, image_file.name) with open(destination_path, 'wb+') as destination: for chunk in image_file.chunks(): destination.write(chunk) @login_required def suggest_users(request): """ Return a list of usernames that match a query passed in as a query string This is the end point for the Bloodhound suggestion engine used for user suggestion while adding users to a shared AnObj :param request: :return: """ query_str = request.GET.get('q', "") # tokens = filter(bool, re.compile("\W+").split(query_str)) ## Use this for nonword limit instead of whitespace tokens = query_str.split() matching_users = [] usernames = [] if not tokens: return HttpResponse(content=json.dumps(matching_users), content_type="application/json") # -- Search for local users q = Q() for token in tokens: q = q & Q(username__icontains=token) for user in User.objects.filter(q): matching_users.append({'username': user.username, 'id': user.id}) usernames.append(user.username) # -- Search for ldap users if len(matching_users) < settings.ADIM_SUGGESTION['LIMIT']: cache_key = "ldapusers_" + "_".join(tokens) ldap_users = cache.get(cache_key) if ldap_users is None: filter_str = "(mail=*{}*)".format("*".join(tokens)) print "\n{h} HIT LDAP: {q} {h}\n".format(h="#" * 30, q=filter_str) ldap_object = ldap.initialize(settings.ADIM_SUGGESTION['LDAP']['URL']) try: results = ldap_object.search_st( base=settings.ADIM_SUGGESTION['LDAP']['BASE'], scope=ldap.SCOPE_SUBTREE, filterstr=filter_str, attrlist=(str("mail"),), timeout=settings.ADIM_SUGGESTION['LDAP']['TIMEOUT'] ) except ldap.TIMEOUT: results = [] ldap_users = map(lambda r: r[1].get('mail', [""])[0], results) # ldap_users = [ # "Julien.Furrer@unil.ch", # "Julien.Furrer.1@unil.ch", # "Julien.Furrer.2@unil.ch", # ] cache.set(cache_key, ldap_users, 3600 * 24) matching_users += [ {'username': user} for user in ldap_users if user not in usernames ] return HttpResponse(content=json.dumps(matching_users), content_type="application/json")