fix: pubkey auth require svc sign

pull/11704/head
ibuler 2023-09-25 22:58:12 +08:00 committed by Bryan
parent 1f00c00183
commit 9bde2ff6e1
2 changed files with 41 additions and 0 deletions

View File

@ -424,6 +424,7 @@ class AuthMixin(CommonMixin, AuthPreCheckMixin, AuthACLMixin, MFAMixin, AuthPost
key_prefix_captcha = "_LOGIN_INVALID_{}"
def _check_auth_user_is_valid(self, username, password, public_key):
from common.permissions import ServiceAccountSignaturePermission
user = authenticate(
self.request, username=username,
password=password, public_key=public_key
@ -431,6 +432,11 @@ class AuthMixin(CommonMixin, AuthPreCheckMixin, AuthACLMixin, MFAMixin, AuthPost
if not user:
self.raise_credential_error(errors.reason_password_failed)
if public_key:
permission = ServiceAccountSignaturePermission()
if not permission.has_permission(self.request, self):
self.raise_credential_error(errors.reason_password_failed)
self.request.session['auth_backend'] = getattr(user, 'backend', settings.AUTH_BACKEND_MODEL)
if user.is_expired:

View File

@ -86,3 +86,38 @@ class UserConfirmation(permissions.BasePermission):
min_level = ConfirmType.values.index(confirm_type) + 1
name = 'UserConfirmationLevel{}TTL{}'.format(min_level, ttl)
return type(name, (cls,), {'min_level': min_level, 'ttl': ttl, 'confirm_type': confirm_type})
class ServiceAccountSignaturePermission(permissions.BasePermission):
def has_permission(self, request, view):
from authentication.models import AccessKey
from common.utils.crypto import get_aes_crypto
signature = request.META.get('HTTP_X_JMS_SVC', '')
if not signature or not signature.startswith('Sign'):
return False
data = signature[4:].strip()
if not data or ':' not in data:
return False
ak_id, time_sign = data.split(':', 1)
if not ak_id or not time_sign:
return False
ak = AccessKey.objects.filter(id=ak_id).first()
if not ak or not ak.is_active:
return False
if not ak.user or not ak.user.is_active or not ak.user.is_service_account:
return False
aes = get_aes_crypto(str(ak.secret).replace('-', ''), mode='ECB')
try:
timestamp = aes.decrypt(time_sign)
if not timestamp or not timestamp.isdigit():
return False
timestamp = int(timestamp)
interval = abs(int(time.time()) - timestamp)
if interval > 30:
return False
return True
except Exception:
return False
def has_object_permission(self, request, view, obj):
return False