Source code for zsl_jwt.auth.controller

"""
:mod:`zsl_jwt.controller`
-------------------------

Contains the login function.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import http
from builtins import *  # NOQA
from typing import Any  # NOQA
from typing import Union  # NOQA

from zsl import inject
from zsl.application.error_handler import ErrorResponse
from zsl.task.job_context import StatusCodeResponder
from zsl.task.job_context import add_responder
from zsl.utils.http import get_http_status_code_value

from zsl_jwt.auth.service import AuthenticationService
from zsl_jwt.auth.service import StandardUserInformation  # NOQA
from zsl_jwt.auth.service import create_standard_user_information
from zsl_jwt.decorators import jwt_output

ERROR_CODE_INVALID_CREDENTIALS = 'INVALID_CREDENTIALS'
ERROR_MESSAGE_INVALID_CREDENTIALS = "Username and password can not be verified."


@inject(authentication_service=AuthenticationService)
[docs]def authenticate(username, password, authentication_service): # type: (str, str, AuthenticationService)->Union[ErrorResponse,StandardUserInformation] """ The "login" function, from the given username/password returns the JWT. Use only as a delegate function in handling of tasks. :param username: :param password: :param authentication_service: Injected. Uses :class:`zsl_jwt.auth.service.AuthenticationService` to query the database. :return: error response if the credentials are invalid or jwt. """ # type:(str, str, AuthenticationService)->Any if not authentication_service.verify_password(username, password): return _respond_with_error() return _respond_with_jwt(username, authentication_service)
def _respond_with_error(): # type: ()->ErrorResponse add_responder(StatusCodeResponder(get_http_status_code_value(http.client.FORBIDDEN))) return ErrorResponse(ERROR_CODE_INVALID_CREDENTIALS, ERROR_MESSAGE_INVALID_CREDENTIALS) @jwt_output() def _respond_with_jwt(username, authentication_service): # type:(str, AuthenticationService)->StandardUserInformation roles, user_object = authentication_service.get_user_information(username) return create_standard_user_information(username, roles, user_object).get_attributes()