mirror of https://github.com/jumpserver/jumpserver
Aaron3S
1 week ago
committed by
Bryan
22 changed files with 512 additions and 19 deletions
@ -0,0 +1,53 @@
|
||||
from django.core.cache import cache |
||||
|
||||
from authentication.mfa.base import BaseMFA |
||||
from django.shortcuts import reverse |
||||
from django.utils.translation import gettext_lazy as _ |
||||
|
||||
from authentication.mixins import MFAFaceMixin |
||||
from settings.api import settings |
||||
|
||||
|
||||
class MFAFace(BaseMFA, MFAFaceMixin): |
||||
name = "face" |
||||
display_name = _('Face Recognition') |
||||
placeholder = 'Face Recognition' |
||||
|
||||
def check_code(self, code): |
||||
|
||||
assert self.is_authenticated() |
||||
|
||||
try: |
||||
code = self.get_face_code() |
||||
if not self.user.check_face(code): |
||||
return False, _('Facial comparison failed') |
||||
except Exception as e: |
||||
return False, "{}:{}".format(_('Facial comparison failed'), str(e)) |
||||
return True, '' |
||||
|
||||
def is_active(self): |
||||
if not self.is_authenticated(): |
||||
return True |
||||
return bool(self.user.face_vector) |
||||
|
||||
@staticmethod |
||||
def global_enabled(): |
||||
return settings.XPACK_LICENSE_IS_VALID and settings.FACE_RECOGNITION_ENABLED |
||||
|
||||
def get_enable_url(self) -> str: |
||||
return reverse('authentication:user-face-enable') |
||||
|
||||
def get_disable_url(self) -> str: |
||||
return reverse('authentication:user-face-disable') |
||||
|
||||
def disable(self): |
||||
assert self.is_authenticated() |
||||
self.user.face_vector = '' |
||||
self.user.save(update_fields=['face_vector']) |
||||
|
||||
def can_disable(self) -> bool: |
||||
return True |
||||
|
||||
@staticmethod |
||||
def help_text_of_enable(): |
||||
return _("Frontal Face Recognition") |
@ -0,0 +1,92 @@
|
||||
{% extends '_base_only_content.html' %} |
||||
{% load i18n %} |
||||
{% load static %} |
||||
|
||||
{% block title %} |
||||
<div style="text-align: center"> |
||||
{% trans 'Face Recognition' %} |
||||
</div> |
||||
{% endblock %} |
||||
|
||||
{% block content %} |
||||
<form class="m-t" role="form" method="post" action=""> |
||||
{% csrf_token %} |
||||
{% if 'code' in form.errors %} |
||||
<p class="red-fonts">{{ form.code.errors.as_text }}</p> |
||||
{% endif %} |
||||
<button id="submit_button" type="submit" style="display: none"></button> |
||||
</form> |
||||
|
||||
<div id="iframe_container" |
||||
style="display: none; justify-content: center; align-items: center; height: 800px; width: 100%; background-color: #0a6aa1"> |
||||
<iframe |
||||
title="face capture" |
||||
id="face_capture_iframe" |
||||
allow="camera" |
||||
sandbox="allow-scripts allow-same-origin" |
||||
style="width: 100%; height: 100%;border: none;"> |
||||
</iframe> |
||||
</div> |
||||
|
||||
<div id="retry_container" style="text-align: center; margin-top: 20px; display: none;"> |
||||
<button id="retry_button" class="btn btn-primary">{% trans 'Retry' %}</button> |
||||
</div> |
||||
|
||||
<script> |
||||
$(document).ready(function () { |
||||
const apiUrl = "{% url 'api-auth:mfa-face-context' %}"; |
||||
const faceCaptureUrl = "/faceliving/capture"; |
||||
let token; |
||||
|
||||
function createFaceCaptureToken() { |
||||
const csrf = getCookie('jms_csrftoken'); |
||||
$.ajax({ |
||||
url: apiUrl, |
||||
method: 'POST', |
||||
headers: { |
||||
'X-CSRFToken': csrf |
||||
}, |
||||
success: function (data) { |
||||
token = data.token; |
||||
$('#iframe_container').show(); |
||||
$('#face_capture_iframe').attr('src', `${faceCaptureUrl}?token=${token}`); |
||||
startCheckingStatus(); |
||||
}, |
||||
error: function (error) { |
||||
$('#retry_container').show(); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
function startCheckingStatus() { |
||||
const interval = 1000; |
||||
const timer = setInterval(function () { |
||||
$.ajax({ |
||||
url: `${apiUrl}?token=${token}`, |
||||
method: 'GET', |
||||
success: function (data) { |
||||
if (data.is_finished) { |
||||
clearInterval(timer); |
||||
$('#submit_button').click(); |
||||
} |
||||
}, |
||||
error: function (error) { |
||||
console.error('API request failed:', error); |
||||
} |
||||
}); |
||||
}, interval); |
||||
} |
||||
|
||||
const active = "{{ active }}"; |
||||
if (active) { |
||||
createFaceCaptureToken(); |
||||
} else { |
||||
$('#retry_container').show(); |
||||
} |
||||
|
||||
$('#retry_button').on('click', function () { |
||||
window.location.href = "{% url 'authentication:login-face-capture' %}"; |
||||
}); |
||||
}); |
||||
</script> |
||||
{% endblock %} |
@ -0,0 +1,19 @@
|
||||
# Generated by Django 4.1.13 on 2024-11-08 03:33 |
||||
|
||||
import common.db.fields |
||||
from django.db import migrations |
||||
|
||||
|
||||
class Migration(migrations.Migration): |
||||
|
||||
dependencies = [ |
||||
('users', '0001_initial'), |
||||
] |
||||
|
||||
operations = [ |
||||
migrations.AddField( |
||||
model_name='user', |
||||
name='face_vector', |
||||
field=common.db.fields.EncryptTextField(blank=True, max_length=2048, null=True, verbose_name='Face Vector'), |
||||
), |
||||
] |
@ -0,0 +1,56 @@
|
||||
import base64 |
||||
import struct |
||||
|
||||
import math |
||||
from django.conf import settings |
||||
from django.core.exceptions import ValidationError |
||||
|
||||
from common.utils import ( |
||||
get_logger, |
||||
) |
||||
|
||||
logger = get_logger(__file__) |
||||
|
||||
|
||||
class FaceMixin: |
||||
face_vector = None |
||||
|
||||
def get_face_vector(self) -> list[float]: |
||||
if not self.face_vector: |
||||
raise ValidationError("Face vector is not set.") |
||||
return self._decode_base64_vector(str(self.face_vector)) |
||||
|
||||
def check_face(self, code) -> bool: |
||||
distance = self.compare_euclidean_distance(code) |
||||
similarity = self.compare_cosine_similarity(code) |
||||
|
||||
return distance < settings.FACE_RECOGNITION_DISTANCE_THRESHOLD \ |
||||
and similarity > settings.FACE_RECOGNITION_COSINE_THRESHOLD |
||||
|
||||
def compare_euclidean_distance(self, base64_vector: str) -> float: |
||||
target_vector = self._decode_base64_vector(base64_vector) |
||||
current_vector = self.get_face_vector() |
||||
return self._calculate_euclidean_distance(current_vector, target_vector) |
||||
|
||||
def compare_cosine_similarity(self, base64_vector: str) -> float: |
||||
target_vector = self._decode_base64_vector(base64_vector) |
||||
current_vector = self.get_face_vector() |
||||
return self._calculate_cosine_similarity(current_vector, target_vector) |
||||
|
||||
@staticmethod |
||||
def _decode_base64_vector(base64_vector: str) -> list[float]: |
||||
byte_data = base64.b64decode(base64_vector) |
||||
return list(struct.unpack('<128d', byte_data)) |
||||
|
||||
@staticmethod |
||||
def _calculate_euclidean_distance(vec1: list[float], vec2: list[float]) -> float: |
||||
return sum((x - y) ** 2 for x, y in zip(vec1, vec2)) ** 0.5 |
||||
|
||||
@staticmethod |
||||
def _calculate_cosine_similarity(vec1: list[float], vec2: list[float]) -> float: |
||||
dot_product = sum(x * y for x, y in zip(vec1, vec2)) |
||||
magnitude_vec1 = math.sqrt(sum(x ** 2 for x in vec1)) |
||||
magnitude_vec2 = math.sqrt(sum(y ** 2 for y in vec2)) |
||||
if magnitude_vec1 == 0 or magnitude_vec2 == 0: |
||||
raise ValueError("Vector magnitude cannot be zero.") |
||||
return dot_product / (magnitude_vec1 * magnitude_vec2) |
@ -0,0 +1,62 @@
|
||||
from django.contrib.auth import logout as auth_logout |
||||
from django.shortcuts import redirect |
||||
from django.views.generic import FormView |
||||
from django import forms |
||||
|
||||
from authentication import errors |
||||
from authentication.mixins import AuthMixin, MFAFaceMixin |
||||
|
||||
__all__ = ['UserFaceCaptureView', 'UserFaceEnableView', |
||||
'UserFaceDisableView'] |
||||
|
||||
|
||||
class UserFaceCaptureForm(forms.Form): |
||||
code = forms.CharField(label='MFA Code', max_length=128, required=False) |
||||
|
||||
|
||||
class UserFaceCaptureView(AuthMixin, FormView): |
||||
template_name = 'authentication/face_capture.html' |
||||
form_class = UserFaceCaptureForm |
||||
mfa_type = 'face' |
||||
code = '' |
||||
|
||||
def form_valid(self, form): |
||||
raise NotImplementedError |
||||
|
||||
def get_context_data(self, **kwargs): |
||||
context = super().get_context_data() |
||||
|
||||
if not self.get_form().is_bound: |
||||
context.update({ |
||||
"active": True, |
||||
}) |
||||
|
||||
kwargs.update(context) |
||||
return kwargs |
||||
|
||||
|
||||
class UserFaceEnableView(UserFaceCaptureView, MFAFaceMixin): |
||||
def form_valid(self, form): |
||||
code = self.get_face_code() |
||||
|
||||
user = self.get_user_from_session() |
||||
user.face_vector = code |
||||
user.save(update_fields=['face_vector']) |
||||
|
||||
auth_logout(self.request) |
||||
return redirect('authentication:login') |
||||
|
||||
|
||||
class UserFaceDisableView(UserFaceCaptureView): |
||||
def form_valid(self, form): |
||||
try: |
||||
self._do_check_user_mfa(self.code, self.mfa_type) |
||||
user = self.get_user_from_session() |
||||
user.face_vector = None |
||||
user.save(update_fields=['face_vector']) |
||||
auth_logout(self.request) |
||||
except (errors.MFAFailedError, errors.BlockMFAError) as e: |
||||
form.add_error('code', e.msg) |
||||
return super().form_invalid(form) |
||||
|
||||
return redirect('authentication:login') |
Loading…
Reference in new issue