Source code for zsl_jwt.codec

"""
:mod:`zsl_jwt.codec`
--------------------

The module provides the two main functions :func:`.encode` and :func:`.decode`
which encode and decode the given payload.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import datetime
from builtins import *  # NOQA
from typing import Any  # NOQA
from typing import Dict  # NOQA

import jwt
from zsl import Injected
from zsl import inject
from zsl.db.model.app_model_json_encoder import AppModelJSONEncoder
from zsl.errors import ZslError

from zsl_jwt.configuration import DEFAULT_PROFILE_NAME
from zsl_jwt.configuration import JWTConfiguration

#: The list of JWT claim names.
CLAIMS = ('exp', 'nbf', 'iss', 'aud', 'iat')


[docs]class ZslJwtError(ZslError): """Main exception type raised from this module.""" pass
[docs]class ZslJwtInvalidAudienceError(ZslJwtError): """When the audience of the token does not match the audience of the profile used to decode the token.""" pass
[docs]class ZslJwtExpiredSignatureError(ZslJwtError): pass
[docs]class ZslJwtInvalidNbfClaimError(ZslJwtError): pass
@inject(jwt_configuration=JWTConfiguration)
[docs]def encode(payload, profile=DEFAULT_PROFILE_NAME, jwt_configuration=Injected): # type: (Dict[str, str], str, JWTConfiguration, str)->str """ Encodes the payload. :param payload: The payload to be encoded. :param jwt_configuration: The JWT configuration, it is injected. :param algorithm: Algorithm name. A reasonable default is provided. :param profile: The JWT profile to be used. See :class:`zsl_jwt.configuration.JWTProfile`. :return: The JWT token. """ payload = _append_claims(payload, profile) profile = jwt_configuration[profile] token = jwt.encode(payload, profile.secret, json_encoder=AppModelJSONEncoder, algorithm=profile.algorithm) token = token.decode('ascii') return token
@inject(jwt_configuration=JWTConfiguration)
[docs]def decode(token, profile=DEFAULT_PROFILE_NAME, jwt_configuration=Injected): # type: (str, str, JWTConfiguration)->Dict[str, Any] """ Decodes the encoded token. :param token: The encoded token. :param jwt_configuration: The configuration, injected. :param profile: The profile name used for encoding. :return: The decoded payload. """ try: profile = jwt_configuration[profile] payload = jwt.decode(token, profile.secret, audience=profile.audience, algorithms=[profile.algorithm]) except jwt.InvalidAudienceError as e: raise ZslJwtInvalidAudienceError(e) except jwt.ExpiredSignatureError as e: raise ZslJwtExpiredSignatureError(e) except jwt.ImmatureSignatureError as e: raise ZslJwtInvalidNbfClaimError(e) except Exception as e: raise ZslJwtError(e) payload = _remove_claims(payload) return payload
@inject(jwt_configuration=JWTConfiguration) def _append_claims(payload, profile, jwt_configuration): # type: (Dict[str, str], str, JWTConfiguration)->Dict[str, str] for claim in CLAIMS: if claim in payload: raise ZslJwtError("Claim '%s' found in payload. " "Please use payloads without " "claims - %s.".format(claim, CLAIMS)) payload = payload.copy() profile = jwt_configuration[profile] payload['exp'] = datetime.datetime.utcnow() + profile.expiration payload['nbf'] = datetime.datetime.utcnow() + profile.not_before payload['iss'] = profile.issuer payload['aud'] = profile.audience payload['iat'] = datetime.datetime.utcnow() return payload def _remove_claims(payload): for claim in CLAIMS: del payload[claim] return payload