mirror of https://github.com/tp4a/teleport
127 lines
4.4 KiB
Python
127 lines
4.4 KiB
Python
![]() |
|
||
|
"""
|
||
|
"""
|
||
|
|
||
|
import random
|
||
|
|
||
|
from datetime import timedelta
|
||
|
from time import time
|
||
|
from uuid import uuid4
|
||
|
|
||
|
from wheezy.core.collections import last_item_adapter
|
||
|
from wheezy.core.uuid import shrink_uuid
|
||
|
from wheezy.http import CacheProfile
|
||
|
from wheezy.http import HTTPResponse
|
||
|
from wheezy.http import accept_method
|
||
|
from wheezy.http import bad_request
|
||
|
from wheezy.http import response_cache
|
||
|
|
||
|
|
||
|
class FileAdapter(object):
|
||
|
|
||
|
def __init__(self, response):
|
||
|
self.write = response.write_bytes
|
||
|
|
||
|
|
||
|
class CaptchaContext(object):
|
||
|
|
||
|
def __init__(self, image,
|
||
|
cache, prefix='captcha:', namespace=None,
|
||
|
timeout=5 * 60, profile=None,
|
||
|
chars='ABCDEFGHJKLMNPQRSTUVWXYZ23456789',
|
||
|
max_chars=4, wait_timeout=2,
|
||
|
challenge_key='c', turing_key='turing_number',
|
||
|
enabled=True):
|
||
|
self.image = image
|
||
|
self.cache = cache
|
||
|
self.prefix = prefix
|
||
|
self.namespace = namespace
|
||
|
self.timeout = timeout
|
||
|
self.chars = chars
|
||
|
self.wait_timeout = wait_timeout
|
||
|
self.max_chars = max_chars
|
||
|
self.challenge_key = challenge_key
|
||
|
self.turing_key = turing_key
|
||
|
self.enabled = enabled
|
||
|
if profile:
|
||
|
self.profile = profile
|
||
|
else:
|
||
|
self.profile = CacheProfile(
|
||
|
'server',
|
||
|
vary_query=[challenge_key],
|
||
|
duration=timedelta(seconds=wait_timeout),
|
||
|
no_store=True,
|
||
|
namespace=namespace)
|
||
|
|
||
|
def create_handler(self, content_type='image/jpg', format='JPEG',
|
||
|
**options):
|
||
|
@accept_method('GET')
|
||
|
@response_cache(self.profile)
|
||
|
def handler(request):
|
||
|
if self.challenge_key not in request.query:
|
||
|
return bad_request()
|
||
|
challenge_code = last_item_adapter(
|
||
|
request.query)[self.challenge_key]
|
||
|
turing_number = ''.join(random.sample(self.chars, self.max_chars))
|
||
|
if not self.cache.set(self.prefix + challenge_code,
|
||
|
(int(time()), turing_number),
|
||
|
self.timeout, self.namespace):
|
||
|
return bad_request()
|
||
|
response = HTTPResponse(content_type)
|
||
|
self.image(turing_number).save(
|
||
|
FileAdapter(response), format, **options)
|
||
|
return response
|
||
|
return handler
|
||
|
|
||
|
def get_challenge_code(self, request):
|
||
|
if self.challenge_key not in request.query:
|
||
|
return shrink_uuid(uuid4())
|
||
|
else:
|
||
|
return request.query[self.challenge_key][0]
|
||
|
|
||
|
def validate(self, request, errors, gettext):
|
||
|
if not self.enabled:
|
||
|
return True
|
||
|
if self.challenge_key not in request.form:
|
||
|
self.append_error(errors, gettext(
|
||
|
'The challenge code is not available.'))
|
||
|
return False
|
||
|
if self.turing_key not in request.form:
|
||
|
self.append_error(errors, gettext(
|
||
|
'The turing number is not available.'))
|
||
|
return False
|
||
|
form = last_item_adapter(request.form)
|
||
|
challenge_code = form[self.challenge_key]
|
||
|
if len(challenge_code) != 22:
|
||
|
self.append_error(errors, gettext(
|
||
|
'The challenge code is invalid.'))
|
||
|
return False
|
||
|
entered_turing_number = form[self.turing_key]
|
||
|
if len(entered_turing_number) != self.max_chars:
|
||
|
self.append_error(errors, gettext(
|
||
|
'The turing number is invalid.'))
|
||
|
return False
|
||
|
|
||
|
key = self.prefix + challenge_code
|
||
|
data = self.cache.get(key, self.namespace)
|
||
|
if not data:
|
||
|
self.append_error(errors, gettext(
|
||
|
'The code you typed has expired after %d seconds.')
|
||
|
% self.timeout)
|
||
|
return False
|
||
|
self.cache.delete(key, 0, self.namespace)
|
||
|
issued, turing_number = data
|
||
|
if issued + self.wait_timeout > int(time()):
|
||
|
self.append_error(errors, gettext(
|
||
|
'The code was typed too quickly. Wait at least %d seconds.')
|
||
|
% self.wait_timeout)
|
||
|
return False
|
||
|
if turing_number != entered_turing_number.upper():
|
||
|
self.append_error(
|
||
|
errors, gettext('The code you typed has no match.'))
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
def append_error(self, errors, message):
|
||
|
errors.setdefault(self.turing_key, []).append(message)
|