from django.contrib.auth import login as auth_login from django.contrib.auth.models import User from django.db import IntegrityError from rest_framework.authentication import BaseAuthentication from rest_framework.exceptions import AuthenticationFailed from .utils import get_request_attp try: from shibauth.models import ShibbUserAttributes except ImportError: ShibbUserAttributes = None def validate_username(email): """ Return a unique username based on the email truncated to 30 char. If a username already exists, try to build a new one by incrementing last char :param email: :return: """ candidate = email[:30] if not User.objects.filter(username=candidate).exists(): return candidate # A user exists with that username, change it until it's unique found = False inc = 0 while not found: inc += 1 if inc > 2000: raise Exception("Unexpected error, max iteration reached") candidate = candidate[:-len(str(inc))] + str(inc) if not User.objects.filter(username=candidate).exists(): found = True return candidate def get_or_create_user(attp_user): """ Try to find an existing user corresponding to the parameters passed by attp_user. The first attribute with data in attp_user will be used for lookup :param attp_user: :return: """ # attp_attr is the name of the attribute in attp_user, # model_attr is the corresponding attribute name on the User model, # given in attr_choice as (attp_attr, model_attr) attr_choice = (('id', 'pk'), ('uniqueId', 'shibbuserattributes__uid'), ('email', 'email')) # Lookup the user, using the various attributes user = None add_uniqueid = False for (attp_attr, model_attr) in attr_choice: # The attribute was not provided in the message, continue with next attribute if not attp_user.get(attp_attr): continue # If the user cannot be found with the current attribute, try with the next ones try: user = User.objects.get(**{"{}__iexact".format(model_attr): attp_user.get(attp_attr)}) except User.DoesNotExist: continue add_uniqueid = (model_attr != 'shibbuserattributes__uid') # user is found, get out of the loop break user_changed = False # If the user could not be found, create it if user is None: user = User(username=validate_username(attp_user.get('email')), email=attp_user.get('email')) user_changed = True # Update User data if attp_user.get('first_name') and user.first_name != attp_user['first_name']: # Update first_name, if provided user.first_name = attp_user['first_name'] user_changed = True if attp_user.get('last_name') and user.last_name != attp_user['last_name']: # Update last_name if provided user.last_name = attp_user['last_name'] user_changed = True if attp_user.get('email') and user.email != attp_user['email']: # Update email and username if provided user.email = attp_user['email'] user.username = validate_username(attp_user.get('email')) user_changed = True if user_changed: user.save() # Save uniqueId if user has not already one and a value is given if add_uniqueid and attp_user.get('uniqueId') and ShibbUserAttributes: try: ShibbUserAttributes.objects.create(user=user, uid=attp_user.get('uniqueId')) except IntegrityError: pass return user def login_attp_user(request, attp_message, persist=True): """ Authenticate a user based on the data given in attp_message['user'] The attp_message is considered valid :param request: :param attp_message: :return: """ attp_user = attp_message.get('user') if not attp_user: return user = get_or_create_user(attp_user) if request.user.is_anonymous() or request.user != user: if persist: user.backend = 'django.contrib.auth.backends.ModelBackend' auth_login(request, user) else: request.user = user class TTPAuthentication(BaseAuthentication): """ """ def authenticate(self, request): attp_message = get_request_attp(request) if not attp_message: return None attp_user = attp_message.get('user') if not attp_user: return None user = get_or_create_user(attp_user) return user, None