mirror of https://github.com/jumpserver/jumpserver
				
				
				
			
		
			
				
	
	
		
			113 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			113 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Python
		
	
	
| """
 | |
|     OpenID Connect relying party (RP) utilities
 | |
|     ===========================================
 | |
| 
 | |
|     This modules defines utilities allowing to manipulate ID tokens and other common helpers.
 | |
| 
 | |
| """
 | |
| 
 | |
| import datetime as dt
 | |
| from calendar import timegm
 | |
| from urllib.parse import urlparse
 | |
| 
 | |
| from django.core.exceptions import SuspiciousOperation
 | |
| from django.utils.encoding import force_bytes, smart_bytes
 | |
| from jwkest import JWKESTException
 | |
| from jwkest.jwk import KEYS
 | |
| from jwkest.jws import JWS
 | |
| from django.conf import settings
 | |
| 
 | |
| from common.utils import get_logger
 | |
| 
 | |
| 
 | |
| logger = get_logger(__file__)
 | |
| 
 | |
| 
 | |
| def validate_and_return_id_token(jws, nonce=None, validate_nonce=True):
 | |
|     """ Validates the id_token according to the OpenID Connect specification. """
 | |
|     log_prompt = "Validate ID Token: {}"
 | |
|     logger.debug(log_prompt.format('Get shared key'))
 | |
|     shared_key = settings.AUTH_OPENID_CLIENT_ID \
 | |
|         if settings.AUTH_OPENID_PROVIDER_SIGNATURE_ALG == 'HS256' \
 | |
|         else settings.AUTH_OPENID_PROVIDER_SIGNATURE_KEY  # RS256
 | |
| 
 | |
|     try:
 | |
|         # Decodes the JSON Web Token and raise an error if the signature is invalid.
 | |
|         logger.debug(log_prompt.format('Verify compact jwk'))
 | |
|         id_token = JWS().verify_compact(force_bytes(jws), _get_jwks_keys(shared_key))
 | |
|     except JWKESTException as e:
 | |
|         logger.debug(log_prompt.format('Verify compact jwkest exception: {}'.format(str(e))))
 | |
|         return
 | |
| 
 | |
|     # Validates the claims embedded in the id_token.
 | |
|     logger.debug(log_prompt.format('Validate claims'))
 | |
|     _validate_claims(id_token, nonce=nonce, validate_nonce=validate_nonce)
 | |
| 
 | |
|     return id_token
 | |
| 
 | |
| 
 | |
| def _get_jwks_keys(shared_key):
 | |
|     """ Returns JWKS keys used to decrypt id_token values. """
 | |
|     # The OpenID Connect Provider (OP) uses RSA keys to sign/enrypt ID tokens and generate public
 | |
|     # keys allowing to decrypt them. These public keys are exposed through the 'jwks_uri' and should
 | |
|     # be used to decrypt the JWS - JSON Web Signature.
 | |
|     log_prompt = "Get jwks keys: {}"
 | |
|     logger.debug(log_prompt.format('Start'))
 | |
|     jwks_keys = KEYS()
 | |
|     logger.debug(log_prompt.format('Load from provider jwks endpoint'))
 | |
|     jwks_keys.load_from_url(settings.AUTH_OPENID_PROVIDER_JWKS_ENDPOINT)
 | |
|     # Adds the shared key (which can correspond to the client_secret) as an oct key so it can be
 | |
|     # used for HMAC signatures.
 | |
|     logger.debug(log_prompt.format('Add key'))
 | |
|     jwks_keys.add({'key': smart_bytes(shared_key), 'kty': 'oct'})
 | |
|     logger.debug(log_prompt.format('End'))
 | |
|     return jwks_keys
 | |
| 
 | |
| 
 | |
| def _validate_claims(id_token, nonce=None, validate_nonce=True):
 | |
|     """ Validates the claims embedded in the JSON Web Token. """
 | |
|     log_prompt = "Validate claims: {}"
 | |
|     logger.debug(log_prompt.format('Start'))
 | |
| 
 | |
|     iss_parsed_url = urlparse(id_token['iss'])
 | |
|     provider_parsed_url = urlparse(settings.AUTH_OPENID_PROVIDER_ENDPOINT)
 | |
|     if iss_parsed_url.netloc != provider_parsed_url.netloc:
 | |
|         logger.debug(log_prompt.format('Invalid issuer'))
 | |
|         raise SuspiciousOperation('Invalid issuer')
 | |
| 
 | |
|     if isinstance(id_token['aud'], str):
 | |
|         id_token['aud'] = [id_token['aud']]
 | |
| 
 | |
|     if settings.AUTH_OPENID_CLIENT_ID not in id_token['aud']:
 | |
|         logger.debug(log_prompt.format('Invalid audience'))
 | |
|         raise SuspiciousOperation('Invalid audience')
 | |
| 
 | |
|     if len(id_token['aud']) > 1 and 'azp' not in id_token:
 | |
|         logger.debug(log_prompt.format('Incorrect id_token: azp'))
 | |
|         raise SuspiciousOperation('Incorrect id_token: azp')
 | |
| 
 | |
|     if 'azp' in id_token and id_token['azp'] != settings.AUTH_OPENID_CLIENT_ID:
 | |
|         raise SuspiciousOperation('Incorrect id_token: azp')
 | |
| 
 | |
|     utc_timestamp = timegm(dt.datetime.utcnow().utctimetuple())
 | |
|     if utc_timestamp > id_token['exp']:
 | |
|         logger.debug(log_prompt.format('Signature has expired'))
 | |
|         raise SuspiciousOperation('Signature has expired')
 | |
| 
 | |
|     if 'nbf' in id_token and utc_timestamp < id_token['nbf']:
 | |
|         logger.debug(log_prompt.format('Incorrect id_token: nbf'))
 | |
|         raise SuspiciousOperation('Incorrect id_token: nbf')
 | |
| 
 | |
|     # Verifies that the token was issued in the allowed timeframe.
 | |
|     if utc_timestamp > id_token['iat'] + settings.AUTH_OPENID_ID_TOKEN_MAX_AGE:
 | |
|         logger.debug(log_prompt.format('Incorrect id_token: iat'))
 | |
|         raise SuspiciousOperation('Incorrect id_token: iat')
 | |
| 
 | |
|     # Validate the nonce to ensure the request was not modified if applicable.
 | |
|     id_token_nonce = id_token.get('nonce', None)
 | |
|     if validate_nonce and settings.AUTH_OPENID_USE_NONCE and id_token_nonce != nonce:
 | |
|         logger.debug(log_prompt.format('Incorrect id_token: nonce'))
 | |
|         raise SuspiciousOperation('Incorrect id_token: nonce')
 | |
| 
 | |
|     logger.debug(log_prompt.format('End'))
 |