Gitlab CSE Unil

utils.py 3.42 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
"""

"""

from hashlib import sha256
import hmac
from base64 import b64decode
import json
from time import time
import logging

from django.conf import settings
from django.core.cache import cache

15
16
17
18
# try:
#     from shibauth.models import ShibbUserAttributes
# except ImportError:
#     ShibbUserAttributes = None
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60


logger = logging.getLogger(__name__)

ATTP_MSG_TIMEOUT = 60


def get_attp_message(attp_msg64, attp_hash):
    """
    Deserialize and validate the attp_message
    Return the message as a dict

    :param attp_msg64:
    :param attp_hash:
    :return:
    """
    msg_data = None
    try:
        msg_str = b64decode(attp_msg64)
        msg = json.loads(msg_str)
    except (TypeError, ValueError, IndexError):
        # malformed massage
        # TODO: log malformed message error
        logger.debug("malformed message")
        msg = None

    if msg:
        attp_id = msg.get('attp_id', '')

        # Check timeout
        try:
            attp_time = int(msg.get('time', ''))
        except ValueError:
            attp_time = 0
        if time() - attp_time > ATTP_MSG_TIMEOUT:
            # Message timeout reached
            raise Exception("ATTP message timeout error")

        # Check for reuse before timeout
        attp_msg_cache_key = "attp_{}{}".format(attp_id, attp_hash)
        if cache.get(attp_msg_cache_key) is not None:
            # Message hash already used
Julien Furrer's avatar
Julien Furrer committed
61
            raise Exception("ATTP message reuse error")
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
        cache.set(attp_msg_cache_key, 1, ATTP_MSG_TIMEOUT + 1)

        # Check hash validity
        attp_salt = msg.get('salt', '')
        ctrl_hmac = hmac.new(
            "{salt}{time}{key}".format(salt=attp_salt, time=attp_time, key=settings.ATTP_KEYS.get(attp_id)),
            attp_msg64, sha256
        )
        if attp_hash != ctrl_hmac.hexdigest():
            # Invalid message control
            # TODO: implement throttling if too many invalid message
            raise Exception("ATTP invalid message error")

        msg_data = msg.get('data', {})
        msg_data['attp_id'] = msg.get('attp_id')
    return msg_data


def get_request_attp(request):
    """
    Retrieve ATTP message from different origin:
        1 - Headers
        2 - QueryString
        3 - ...

    :param request:
    :return:
    """
    attp_msg64 = request.META.get('HTTP_X_ATTP_MSG')
    attp_hash = request.META.get('HTTP_X_ATTP_H')
    if not attp_msg64:
        attp_msg64 = request.GET.get('attp_msg')
        attp_hash = request.GET.get('attp_h')

    if not attp_msg64:
        attp_msg64 = request.POST.get('attp_msg')
        attp_hash = request.POST.get('attp_h')

    # TODO: apply some pre-validation on ``attp_msg64`` and ``attp_hash`` (regex matching)

    if attp_msg64 and attp_hash:
        try:
            return get_attp_message(attp_msg64=attp_msg64, attp_hash=attp_hash)
        except Exception as e:
            logger.warning(e, extra={'request': request})
            return None


# def store_attp_message(request, attp_message=None):
#     """
#     Persist attp_message for reuse in next Request, only if associated with an anobj. (not for login message)
#     For now, use session. In future use cache with timeout
#     :param request:
#     :param attp_message:
#     :return:
#     """
#     pass
#
#
# def clear_attp_message(request, anobj_uuid=None):
#     """
#     Try to clear an attp_message from persistence store.
#     :param request:
#     :param attp_message:
#     :return:
#     """
#     pass