Gitlab CSE Unil

permissions.py 9.09 KB
Newer Older
1
2
# coding=utf-8
from __future__ import unicode_literals
3
from django.conf import settings
4
from django.core.exceptions import PermissionDenied
5
from django.http.response import HttpResponseRedirect
6
from django.shortcuts import render
7
8
from django.core.cache import cache
import sys
9
import logging
10

11
from .models import AnObjMembership
12
13
14
15
16

SHARING_MODE_NONE = 0
SHARING_MODE_MANUAL = 1
SHARING_MODE_REGKEY = 4
SHARING_MODE_AAIRULES = 8
17
18
19
# SHARING_MODE_TTP_MOODLE = 16
# SHARING_MODE_TTP_TOTO = 24
# SHARING_MODE_TTP_MY_TTP = 32
20

21
logger = logging.getLogger(__name__)
22
23
24
25
26
27
28
29
30

class PermissionClass(object):
    """
    Base class for Permission Classes.
    Cannot be used directly, has to be extended.
    Child classes need to implement ``check_registration``
    and ``check_revocation``methods
    """
    has_interactive_registration = False
31
    ttp = False
32

33
    def get_interactive_registration_response(self, request, anobj):
34
35
        raise NotImplementedError()

36
    def _register_user(self, user, anobj):
37
        if not anobj.locked:
38
39
            AnObjMembership.objects.create(anobj=anobj, user=user)
            # anobj.members.add(user)
40

41
    def _revoke_user(self, user, anobj):
42
        if not anobj.locked:
43
44
            AnObjMembership.objects.filter(anobj=anobj, user=user).delete()
            # anobj.members.remove(user)
45

46
    def check_registration(self, request, anobj):
47
48
        raise NotImplementedError()

49
    def check_revocation(self, request, anobj):
50
51
        raise NotImplementedError()

52
    def check_permission(self, request, anobj):
53
54
55
56
57
58
        user = request.user
        # Anonymous users have no permissions
        if user.is_anonymous():
            raise PermissionDenied()

        # Owner can always access
59
60
61
        # if anobj.owner == user:
        # if user in anobj.owners.all():
        if anobj.is_owned(user.id):
62
63
64
65
66
67
            return True

        members = anobj.members.all()
        if user in members:
            if not anobj.locked:
                # Check revocation only if anobj is not locked
68
                self.check_revocation(request, anobj)
69
70
71
72
        else:
            if anobj.locked:
                # If anobj is locked never try to register
                raise PermissionDenied()
73
            self.check_registration(request, anobj)
74
75
76

        return True

77
    def has_permission(self, request, anobj):
78
        try:
79
            return self.check_permission(request, anobj)
80
81
82
83
84
85
86
87
88
89
90
91
        except PermissionDenied:
            return False


class IsMember(PermissionClass):
    """
    The simplest permission mode:
        (#no auto registration -> check_registration always fails#)

        no auto revocation -> check_revocation always pass
    """

92
    def check_registration(self, request, anobj):
93
94
95
96
97
98
99
100
101
102
103
        """
        Check if the username is listed in the sharing options.
        This may occur if the username has been added as member but the
        corresponding user object was not yet created.
        """
        # obviously, we need some sharing options to perform the check
        if not anobj.sharing_opts:
            raise PermissionDenied()

        opts_members = anobj.sharing_opts.get('members', [])
        if request.user.username in opts_members:
104
            self._register_user(request.user, anobj)
105
106
107
108
109
110
111
112
            # The user is now in the anobj.members list, so we have to remove it
            #   from the sharing_opts.members list
            opts_members.remove(request.user.username)
            anobj.sharing_opts['members'] = opts_members
            anobj.save()
        else:
            raise PermissionDenied()

113
    def check_revocation(self, request, anobj):
114
115
116
117
118
119
120
121
        return anobj


class RegistrationKey(PermissionClass):
    """
    Register user if she knows the key defined in
    sharing options of the anobj
    """
122
    def check_registration(self, request, anobj):
123
124
125
126
127
128
        reg_key = request.POST.get('k')
        # We need at least a key and some sharing options
        if reg_key is None or not anobj.sharing_opts:
            raise PermissionDenied()

        if anobj.sharing_opts.get('key') == reg_key:
129
            self._register_user(request.user, anobj)
130
131
132
        else:
            raise PermissionDenied()

133
    def check_revocation(self, request, anobj):
134
135
136
137
138
139
140
        """
        noop, there is no revocation possible here
        """
        return anobj

    has_interactive_registration = True

141
    def get_interactive_registration_response(self, request, anobj):
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
        errors = []
        if request.POST.get('k') is not None:
            errors.append({'code': "invalid_key"})
        if anobj.locked:
            errors.append({'code': "locked_anobj"})

        context = {
            'anobj': anobj,
            'errors': errors
        }
        return render(request, "registration/adim/registration_key.html", context)


class AAIRules(PermissionClass):
    pass


159
160
161
class ATTP(PermissionClass):
    has_interactive_registration = True
    ttp = True
162
    ttp_id = None
163

164
165
166
167
    def __init__(self, ttp_id=None):
        self.ttp_id = ttp_id

    def set_attp_status(self, request, anobj, status):
168
        key = "attp_{ttp_id}{user_id}{uuid}".format(
169
            ttp_id=self.ttp_id, user_id=request.user.id,
170
171
172
173
174
            uuid=anobj.uuid[:12]
        )
        cache.set(key, status, settings.ATTP['OPTIONS']['CACHE_TIMEOUT'])
        return status

175
    def get_attp_status(self, request, anobj):
176
        key = "attp_{ttp_id}{user_id}{uuid}".format(
177
            ttp_id=self.ttp_id, user_id=request.user.id,
178
179
180
            uuid=anobj.uuid[:12]
        )
        status = cache.get(key)
181
        logger.debug("ATTP status from cache: {}".format(status is not None))
182
183
        return status

184
    def clear_attp_status(self, request, anobj):
185
186
187
188
189
190
        session_key = "anobj_{}".format(anobj.uuid[:12])
        try:
            del(request.session[session_key])
        except KeyError:
            pass

191
    def check_registration(self, request, anobj):
192
193
        pass

194
    def check_revocation(self, request, anobj):
195
196
        pass

197
198
    def check_permission(self, request, anobj):
        perm_status = self.get_attp_status(request, anobj)
199
200
201
202
203
204
        if perm_status is None:
            raise PermissionDenied()

        elif perm_status == 'denied':
            if not anobj.locked:
                # Revoke only if anobj is not locked
205
                self._revoke_user(request.user, anobj)
206

207
            self.clear_attp_status(request, anobj)
208
209
210
211
            raise PermissionDenied()

        else:
            if request.user not in anobj.members.all():
212
                self._register_user(request.user, anobj)
213
214
215
216
217
218
219
220
221
222
223
224
225
226

            # Check ownership
            owners = anobj.owners.all()
            if request.user in owners:
                if perm_status != 'owner' and len(owners) > 1:
                    # is owner, but shouldn't -> remove only if not last one
                    anobj.owners.remove(request.user)
            else:
                if perm_status == 'owner':
                    # is not owner, but should -> add
                    anobj.owners.add(request.user)

        return True

227
228
229
    def get_interactive_registration_response(self, request, anobj):
        if self.get_attp_status(request, anobj) is None:
            check_url = settings.ATTP.get(self.ttp_id, {}).get('CHECK_URL')
230
231
            return HttpResponseRedirect(check_url.format(uuid=anobj.uuid))
        else:
232
            self.clear_attp_status(request, anobj)
233
234
235
            raise PermissionDenied()


236
237
PERMISSION_CLASSES = {
    SHARING_MODE_NONE: None,
238
239
240
    SHARING_MODE_MANUAL: IsMember(),
    SHARING_MODE_REGKEY: RegistrationKey(),
    SHARING_MODE_AAIRULES: AAIRules(),
241
242
}

243
244
245
246
247
248
249
250
251
252
# Register TTP services defined in the settings
for __ttp_id, __ttp_def in settings.ATTP.items():
    if __ttp_id != 'OPTIONS':
        attr_name = "SHARING_MODE_TTP_{}".format(__ttp_id.upper())
        mode = getattr(sys.modules[__name__], attr_name, -1)
        # Mode not defined in the module, create it from settings
        if mode == -1:
            setattr(sys.modules[__name__], attr_name, __ttp_def['MODE_ID'])
            PERMISSION_CLASSES[__ttp_def['MODE_ID']] = ATTP(__ttp_id)

253
254
255
256
257

def get_permission_class(sharing_mode):
    return PERMISSION_CLASSES.get(sharing_mode)


258
259
260
261
262
263
264
265
266
267
268
def get_ttp_sharing_mode(ttp_id=''):
    """
    Return the sharing mode value for the given ttp_id
    :param ttp_id:
    :return:
    """
    attr_name = 'SHARING_MODE_TTP_{}'.format(ttp_id.upper())
    mode = getattr(sys.modules[__name__], attr_name, 0)
    return mode


269
270
271
272
def check_anobj_permission(request, anobj):
    p = get_permission_class(anobj.sharing_mode)
    if p is not None:
        p.check_permission(request, anobj)
273
274
275
    # elif request.user != anobj.owner:
    # elif request.user not in anobj.owners.all():
    elif not anobj.is_owned(request.user.id):
276
277
278
279
280
281
282
283
284
285
286
287
288
        raise PermissionDenied()


def has_anobj_access(request, anobj):
    """
    Verify that request.user has access to the anobj.
    Doesn't apply registration nor revocation,
    no need to know what kind of sharing_mode we have.

    :param request:
    :param anobj:
    :return: boolean
    """
289
290
291
    # return request.user == anobj.owner or request.user in anobj.members.all()
    # return request.user in anobj.owners.all() or request.user in anobj.members.all()
    return anobj.is_owned(request.user.id) or request.user in anobj.members.all()