mirror of https://github.com/jumpserver/jumpserver
				
				
				
			
		
			
				
	
	
		
			113 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			113 lines
		
	
	
		
			3.9 KiB
		
	
	
	
		
			Python
		
	
	
| from rest_framework import authentication
 | |
| from rest_framework import exceptions
 | |
| 
 | |
| from httpsig import HeaderVerifier, utils
 | |
| 
 | |
| """
 | |
| Reusing failure exceptions serves several purposes:
 | |
| 
 | |
|     1. Lack of useful information regarding the failure inhibits attackers
 | |
|     from learning about valid keyIDs or other forms of information leakage.
 | |
|     Using the same actual object for any failure makes preventing such
 | |
|     leakage through mistakenly-distinct error messages less likely.
 | |
| 
 | |
|     2. In an API scenario, the object is created once and raised many times
 | |
|     rather than generated on every failure, which could lead to higher loads
 | |
|     or memory usage in high-volume attack scenarios.
 | |
| 
 | |
| """
 | |
| FAILED = exceptions.AuthenticationFailed('Invalid signature.')
 | |
| 
 | |
| 
 | |
| class SignatureAuthentication(authentication.BaseAuthentication):
 | |
|     """
 | |
|     DRF authentication class for HTTP Signature support.
 | |
| 
 | |
|     You must subclass this class in your own project and implement the
 | |
|     `fetch_user_data(self, keyId, algorithm)` method, returning a tuple of
 | |
|     the User object and a bytes object containing the user's secret. Note
 | |
|     that key_id and algorithm are DIRTY as they are supplied by the client
 | |
|     and so must be verified in your subclass!
 | |
| 
 | |
|     You may set the following class properties in your subclass to configure
 | |
|     authentication for your particular use case:
 | |
| 
 | |
|     :param www_authenticate_realm:  Default: "api"
 | |
|     :param required_headers:        Default: ["(request-target)", "date"]
 | |
|     """
 | |
| 
 | |
|     www_authenticate_realm = "api"
 | |
|     required_headers = ["(request-target)", "date"]
 | |
| 
 | |
|     def fetch_user_data(self, key_id, algorithm=None):
 | |
|         """Retuns a tuple (User, secret) or (None, None)."""
 | |
|         raise NotImplementedError()
 | |
| 
 | |
|     def authenticate_header(self, request):
 | |
|         """
 | |
|         DRF sends this for unauthenticated responses if we're the primary
 | |
|         authenticator.
 | |
|         """
 | |
|         h = " ".join(self.required_headers)
 | |
|         return 'Signature realm="%s",headers="%s"' % (
 | |
|         self.www_authenticate_realm, h)
 | |
| 
 | |
|     def authenticate(self, request):
 | |
|         """
 | |
|         Perform the actual authentication.
 | |
| 
 | |
|         Note that the exception raised is always the same. This is so that we
 | |
|         don't leak information about in/valid keyIds and other such useful
 | |
|         things.
 | |
|         """
 | |
|         auth_header = authentication.get_authorization_header(request)
 | |
|         if not auth_header or len(auth_header) == 0:
 | |
|             return None
 | |
| 
 | |
|         method, fields = utils.parse_authorization_header(auth_header)
 | |
| 
 | |
|         # Ignore foreign Authorization headers.
 | |
|         if method.lower() != 'signature':
 | |
|             return None
 | |
| 
 | |
|         # Verify basic header structure.
 | |
|         if len(fields) == 0:
 | |
|             raise FAILED
 | |
| 
 | |
|         # Ensure all required fields were included.
 | |
|         if len({"keyid", "algorithm", "signature"} - set(fields.keys())) > 0:
 | |
|             raise FAILED
 | |
| 
 | |
|         # Fetch the secret associated with the keyid
 | |
|         user, secret = self.fetch_user_data(
 | |
|             fields["keyid"],
 | |
|             algorithm=fields["algorithm"]
 | |
|         )
 | |
| 
 | |
|         if not (user and secret):
 | |
|             raise FAILED
 | |
| 
 | |
|         # Gather all request headers and translate them as stated in the Django docs:
 | |
|         # https://docs.djangoproject.com/en/1.6/ref/request-response/#django.http.HttpRequest.META
 | |
|         headers = {}
 | |
|         for key in request.META.keys():
 | |
|             if key.startswith("HTTP_") or \
 | |
|                     key in ("CONTENT_TYPE", "CONTENT_LENGTH"):
 | |
|                 header = key[5:].lower().replace('_', '-')
 | |
|                 headers[header] = request.META[key]
 | |
| 
 | |
|         # Verify headers
 | |
|         hs = HeaderVerifier(
 | |
|             headers,
 | |
|             secret,
 | |
|             required_headers=self.required_headers,
 | |
|             method=request.method.lower(),
 | |
|             path=request.get_full_path()
 | |
|         )
 | |
| 
 | |
|         # All of that just to get to this.
 | |
|         if not hs.verify():
 | |
|             raise FAILED
 | |
| 
 | |
|         return user, fields["keyid"]
 |