Gitlab CSE Unil

Commit f05f6ae9 authored by Julien Furrer's avatar Julien Furrer
Browse files

Updated adim.permission module, PERMISSION_CLASSES use instances instead of Class

The TTP permission classes are bulit from settings, no need for hardcoding.
parent 8a856559
......@@ -6,6 +6,7 @@ from django.http.response import HttpResponseRedirect
from django.shortcuts import render
from django.core.cache import cache
import sys
import logging
from .models import AnObjMembership
......@@ -13,10 +14,11 @@ SHARING_MODE_NONE = 0
SHARING_MODE_MANUAL = 1
SHARING_MODE_REGKEY = 4
SHARING_MODE_AAIRULES = 8
SHARING_MODE_TTP_MOODLE = 16
SHARING_MODE_TTP_TOTO = 24
SHARING_MODE_TTP_MY_TTP = 32
# SHARING_MODE_TTP_MOODLE = 16
# SHARING_MODE_TTP_TOTO = 24
# SHARING_MODE_TTP_MY_TTP = 32
logger = logging.getLogger(__name__)
class PermissionClass(object):
"""
......@@ -28,32 +30,26 @@ class PermissionClass(object):
has_interactive_registration = False
ttp = False
@classmethod
def get_interactive_registration_response(cls, request, anobj):
def get_interactive_registration_response(self, request, anobj):
raise NotImplementedError()
@classmethod
def _register_user(cls, user, anobj):
def _register_user(self, user, anobj):
if not anobj.locked:
AnObjMembership.objects.create(anobj=anobj, user=user)
# anobj.members.add(user)
@classmethod
def _revoke_user(cls, user, anobj):
def _revoke_user(self, user, anobj):
if not anobj.locked:
AnObjMembership.objects.filter(anobj=anobj, user=user).delete()
# anobj.members.remove(user)
@classmethod
def check_registration(cls, request, anobj):
def check_registration(self, request, anobj):
raise NotImplementedError()
@classmethod
def check_revocation(cls, request, anobj):
def check_revocation(self, request, anobj):
raise NotImplementedError()
@classmethod
def check_permission(cls, request, anobj):
def check_permission(self, request, anobj):
user = request.user
# Anonymous users have no permissions
if user.is_anonymous():
......@@ -69,19 +65,18 @@ class PermissionClass(object):
if user in members:
if not anobj.locked:
# Check revocation only if anobj is not locked
cls.check_revocation(request, anobj)
self.check_revocation(request, anobj)
else:
if anobj.locked:
# If anobj is locked never try to register
raise PermissionDenied()
cls.check_registration(request, anobj)
self.check_registration(request, anobj)
return True
@classmethod
def has_permission(cls, request, anobj):
def has_permission(self, request, anobj):
try:
return cls.check_permission(request, anobj)
return self.check_permission(request, anobj)
except PermissionDenied:
return False
......@@ -94,8 +89,7 @@ class IsMember(PermissionClass):
no auto revocation -> check_revocation always pass
"""
@classmethod
def check_registration(cls, request, anobj):
def check_registration(self, request, anobj):
"""
Check if the username is listed in the sharing options.
This may occur if the username has been added as member but the
......@@ -107,7 +101,7 @@ class IsMember(PermissionClass):
opts_members = anobj.sharing_opts.get('members', [])
if request.user.username in opts_members:
cls._register_user(request.user, anobj)
self._register_user(request.user, anobj)
# The user is now in the anobj.members list, so we have to remove it
# from the sharing_opts.members list
opts_members.remove(request.user.username)
......@@ -116,8 +110,7 @@ class IsMember(PermissionClass):
else:
raise PermissionDenied()
@classmethod
def check_revocation(cls, request, anobj):
def check_revocation(self, request, anobj):
return anobj
......@@ -126,20 +119,18 @@ class RegistrationKey(PermissionClass):
Register user if she knows the key defined in
sharing options of the anobj
"""
@classmethod
def check_registration(cls, request, anobj):
def check_registration(self, request, anobj):
reg_key = request.POST.get('k')
# We need at least a key and some sharing options
if reg_key is None or not anobj.sharing_opts:
raise PermissionDenied()
if anobj.sharing_opts.get('key') == reg_key:
cls._register_user(request.user, anobj)
self._register_user(request.user, anobj)
else:
raise PermissionDenied()
@classmethod
def check_revocation(cls, request, anobj):
def check_revocation(self, request, anobj):
"""
noop, there is no revocation possible here
"""
......@@ -147,8 +138,7 @@ class RegistrationKey(PermissionClass):
has_interactive_registration = True
@classmethod
def get_interactive_registration_response(cls, request, anobj):
def get_interactive_registration_response(self, request, anobj):
errors = []
if request.POST.get('k') is not None:
errors.append({'code': "invalid_key"})
......@@ -169,63 +159,57 @@ class AAIRules(PermissionClass):
class ATTP(PermissionClass):
has_interactive_registration = True
ttp = True
ttp_id = 'moodle'
ttp_id = None
@classmethod
def set_attp_status(cls, request, anobj, status):
def __init__(self, ttp_id=None):
self.ttp_id = ttp_id
def set_attp_status(self, request, anobj, status):
key = "attp_{ttp_id}{user_id}{uuid}".format(
ttp_id=cls.ttp_id, user_id=request.user.id,
ttp_id=self.ttp_id, user_id=request.user.id,
uuid=anobj.uuid[:12]
)
cache.set(key, status, settings.ATTP['OPTIONS']['CACHE_TIMEOUT'])
return status
@classmethod
def get_attp_status(cls, request, anobj):
def get_attp_status(self, request, anobj):
key = "attp_{ttp_id}{user_id}{uuid}".format(
ttp_id=cls.ttp_id, user_id=request.user.id,
ttp_id=self.ttp_id, user_id=request.user.id,
uuid=anobj.uuid[:12]
)
status = cache.get(key)
logger.debug("ATTP status from cache: {}".format(status is not None))
return status
@classmethod
def clear_attp_status(cls, request, anobj):
# key = "attp_{ttp_id}{user_id}{uuid}".format(
# ttp_id=cls.ttp_id, user_id=request.user.id,
# uuid=anobj.uuid[:12]
# )
def clear_attp_status(self, request, anobj):
session_key = "anobj_{}".format(anobj.uuid[:12])
try:
del(request.session[session_key])
except KeyError:
pass
@classmethod
def check_registration(cls, request, anobj):
def check_registration(self, request, anobj):
pass
@classmethod
def check_revocation(cls, request, anobj):
def check_revocation(self, request, anobj):
pass
@classmethod
def check_permission(cls, request, anobj):
perm_status = cls.get_attp_status(request, anobj)
def check_permission(self, request, anobj):
perm_status = self.get_attp_status(request, anobj)
if perm_status is None:
raise PermissionDenied()
elif perm_status == 'denied':
if not anobj.locked:
# Revoke only if anobj is not locked
cls._revoke_user(request.user, anobj)
self._revoke_user(request.user, anobj)
cls.clear_attp_status(request, anobj)
self.clear_attp_status(request, anobj)
raise PermissionDenied()
else:
if request.user not in anobj.members.all():
cls._register_user(request.user, anobj)
self._register_user(request.user, anobj)
# Check ownership
owners = anobj.owners.all()
......@@ -238,41 +222,34 @@ class ATTP(PermissionClass):
# is not owner, but should -> add
anobj.owners.add(request.user)
# cls.clear_attp_status(request, anobj)
return True
@classmethod
def get_interactive_registration_response(cls, request, anobj):
if cls.get_attp_status(request, anobj) is None:
check_url = settings.ATTP.get(cls.ttp_id, {}).get('CHECK_URL')
def get_interactive_registration_response(self, request, anobj):
if self.get_attp_status(request, anobj) is None:
check_url = settings.ATTP.get(self.ttp_id, {}).get('CHECK_URL')
return HttpResponseRedirect(check_url.format(uuid=anobj.uuid))
else:
cls.clear_attp_status(request, anobj)
self.clear_attp_status(request, anobj)
raise PermissionDenied()
class MoodleTTP(ATTP):
ttp_id = 'moodle'
class TotoTTP(ATTP):
ttp_id = 'toto'
class MyTTP(ATTP):
ttp_id = 'my_ttp'
PERMISSION_CLASSES = {
SHARING_MODE_NONE: None,
SHARING_MODE_MANUAL: IsMember,
SHARING_MODE_REGKEY: RegistrationKey,
SHARING_MODE_AAIRULES: AAIRules,
SHARING_MODE_TTP_MOODLE: MoodleTTP,
SHARING_MODE_TTP_TOTO: TotoTTP,
SHARING_MODE_TTP_MY_TTP: MyTTP
SHARING_MODE_MANUAL: IsMember(),
SHARING_MODE_REGKEY: RegistrationKey(),
SHARING_MODE_AAIRULES: AAIRules(),
}
# Register TTP services defined in the settings
for __ttp_id, __ttp_def in settings.ATTP.items():
if __ttp_id != 'OPTIONS':
attr_name = "SHARING_MODE_TTP_{}".format(__ttp_id.upper())
mode = getattr(sys.modules[__name__], attr_name, -1)
# Mode not defined in the module, create it from settings
if mode == -1:
setattr(sys.modules[__name__], attr_name, __ttp_def['MODE_ID'])
PERMISSION_CLASSES[__ttp_def['MODE_ID']] = ATTP(__ttp_id)
def get_permission_class(sharing_mode):
return PERMISSION_CLASSES.get(sharing_mode)
......@@ -285,7 +262,6 @@ def get_ttp_sharing_mode(ttp_id=''):
:return:
"""
attr_name = 'SHARING_MODE_TTP_{}'.format(ttp_id.upper())
print attr_name
mode = getattr(sys.modules[__name__], attr_name, 0)
return mode
......
......@@ -159,10 +159,10 @@ def annotate(request, anobj_uuid=None):
return HttpResponseRedirect(reverse('adim_app:annotate', kwargs={'anobj_uuid': anobj.uuid}))
# ----- Login check. Not using decorator so we can delegate to Trusted Third Party if needed
permission_class = get_permission_class(anobj.sharing_mode)
permission = get_permission_class(anobj.sharing_mode)
if request.user.is_anonymous():
if permission_class and permission_class.ttp:
check_url = settings.ATTP.get(permission_class.ttp_id, {}).get('CHECK_URL')
if permission and permission.ttp:
check_url = settings.ATTP.get(permission.ttp_id, {}).get('CHECK_URL')
return HttpResponseRedirect(check_url.format(uuid=anobj.uuid))
else:
return redirect_to_login(resolve_url('adim_app:annotate', anobj_uuid=anobj.uuid))
......@@ -177,22 +177,22 @@ def annotate(request, anobj_uuid=None):
# ----- Detailed check for permissions
membership = None
if is_owner and not (permission_class and permission_class.ttp):
# User is owner and anobj is not shared via Trusted Third Party
if is_owner and not (permission and permission.ttp):
if anobj.sharing_mode != SHARING_MODE_NONE:
membership, _ = AnObjMembership.objects.get_or_create(anobj=anobj, user=request.user)
else:
# User is guest or owner and anobj shared via TTP
if permission_class is None:
else:
if permission is None:
# AnObj not shared
raise Http404()
# raise PermissionDenied()
elif not permission_class.has_permission(request, anobj):
elif not permission.has_permission(request, anobj):
# AnObj shared but user has no permission yet
if permission_class.has_interactive_registration:
if permission.has_interactive_registration:
# Interactive registration exists, call it
return permission_class.get_interactive_registration_response(request, anobj)
return permission.get_interactive_registration_response(request, anobj)
# No interactive registration for this sharing model, deny access
raise PermissionDenied()
......@@ -201,7 +201,7 @@ def annotate(request, anobj_uuid=None):
pass
# TTP permission may have changed ownership
if permission_class.ttp:
if permission.ttp:
clear_function_cache(f='adim.models.annotablesis_owned', args=(anobj, request.user.id))
is_owner = anobj.is_owned(request.user.id)
......
......@@ -217,18 +217,35 @@ AAI = {
}
}
# ATTP = {
# 'OPTIONS': {
# 'CACHE_TIMEOUT': <the time in sec the ttp authorization result is kept in cache>
# },
# '<ttp_service_id>': {
# 'CHECK_URL' : <the url for authorization on the ttp>
# 'MODE_ID': <the sharing_mode value for that ttp. this should be a unique int > 15
# }
# }
#
# CAUTION: when adding a ttp service, update the template (templates/adim/aom-modal.inc.html) with MODE_ID where needed
#
ATTP = {
'OPTIONS': {
'CACHE_TIMEOUT': 20, # 30,
},
'moodle': {
'CHECK_URL': "http://localhost/tests/phpupload/gv.php?a={uuid}"
'CHECK_URL': "http://localhost/tests/phpupload/gv.php?a={uuid}",
'MODE_ID': 16
},
'toto': {
'CHECK_URL': "http://localhost/tests/phpupload/toto.php?a={uuid}"
'CHECK_URL': "http://localhost/tests/phpupload/toto.php?a={uuid}",
'MODE_ID': 24
},
'my_ttp': {
'CHECK_URL': "http://my-ttp:8001/ttp/check/{uuid}/"
'CHECK_URL': "http://my-ttp:8001/ttp/check/{uuid}/",
'MODE_ID': 32
}
}
......@@ -309,6 +326,11 @@ LOGGING = {
'level': 'WARNING',
'propagate': True,
},
'adim.permissions': {
'handlers': ['console'],
'level': 'DEBUG', # 'NOTSET',
'propagate': True,
},
'adim_app': {
'handlers': ['console', 'file'],
'level': 'DEBUG',
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment