init api project

pull/22/head
雷二猛 2019-11-08 16:53:16 +08:00
commit 6d6d5e93c6
20 changed files with 675 additions and 0 deletions

7
spug_api/.gitignore vendored Normal file
View File

@ -0,0 +1,7 @@
*.pyc
/venv/
__pycache__/
/.idea/
/db.sqlite3
migrations/
/access.log

View File

View File

@ -0,0 +1,37 @@
from django.db import models
from libs import ModelMixin, human_time
from django.contrib.auth.hashers import make_password, check_password
class User(models.Model, ModelMixin):
username = models.CharField(max_length=100, unique=True)
nickname = models.CharField(max_length=100)
password_hash = models.CharField(max_length=100) # hashed password
is_supper = models.BooleanField(default=False)
is_active = models.BooleanField(default=True)
access_token = models.CharField(max_length=32)
token_expired = models.IntegerField(null=True)
last_login = models.CharField(max_length=20)
created_at = models.CharField(max_length=20, default=human_time)
created_by = models.ForeignKey('User', models.PROTECT, related_name='+', null=True)
deleted_at = models.CharField(max_length=20, null=True)
deleted_by = models.ForeignKey('User', models.PROTECT, related_name='+', null=True)
@staticmethod
def make_password(plain_password: str) -> str:
return make_password(plain_password, hasher='pbkdf2_sha256')
def verify_password(self, plain_password: str) -> bool:
return check_password(plain_password, self.password_hash)
def has_perms(self, codes):
# return self.is_supper or self.role in codes
return self.is_supper
def __repr__(self):
return '<User %r>' % self.username
class Meta:
db_table = 'users'
ordering = ('-id',)

View File

@ -0,0 +1,9 @@
from django.conf.urls import url
from apps.account.views import *
urlpatterns = [
url(r'^login/', login),
url(r'^logout/', logout),
url(r'^user/$', UserView.as_view()),
]

View File

@ -0,0 +1,85 @@
from django.core.cache import cache
from django.views.generic import View
from libs import JsonParser, Argument, human_time, json_response
from .models import User
import time
import uuid
class UserView(View):
def get(self, request):
users = User.objects.filter(is_supper=False, deleted_by_id__isnull=True)
return json_response([x.to_dict(excludes=('access_token', 'password_hash')) for x in users])
def post(self, request):
form, error = JsonParser(
Argument('username', help='请输入登录名'),
Argument('password', help='请输入密码'),
Argument('nickname', help='请输入姓名'),
).parse(request.body)
if error is None:
form.password_hash = User.make_password(form.pop('password'))
form.created_by = request.user
User.objects.create(**form)
return json_response(error=error)
def patch(self, request):
form, error = JsonParser(
Argument('id', type=int, help='请指定操作对象'),
Argument('username', required=False),
Argument('password', required=False),
Argument('nickname', required=False),
Argument('is_active', type=bool, required=False),
).parse(request.body, True)
if error is None:
if form.get('password'):
form.password_hash = User.make_password(form.pop('password'))
User.objects.filter(pk=form.pop('id')).update(**form)
return json_response(error=error)
def delete(self, request):
form, error = JsonParser(
Argument('id', type=int, help='请指定操作对象')
).parse(request.GET)
if error is None:
User.objects.filter(pk=form.id).update(
deleted_at=human_time(),
deleted_by=request.user
)
return json_response(error=error)
def login(request):
form, error = JsonParser(
Argument('username', help='请输入用户名'),
Argument('password', help='请输入密码')
).parse(request.body)
if error is None:
user = User.objects.filter(username=form.username).first()
if user:
if not user.is_active:
return json_response(error="账户已被禁用")
if user.verify_password(form.password):
cache.delete(form.username)
token_isvalid = user.access_token and len(user.access_token) == 32 and user.token_expired >= time.time()
user.access_token = user.access_token if token_isvalid else uuid.uuid4().hex
user.token_expired = time.time() + 8 * 60 * 60
user.last_login = human_time()
user.save()
return json_response({'access_token': user.access_token, 'nickname': user.nickname})
value = cache.get_or_set(form.username, 0, 86400)
if value >= 3:
if user and user.is_active:
user.is_active = False
user.save()
return json_response(error='账户已被禁用')
cache.set(form.username, value + 1, 86400)
return json_response(error="用户名或密码错误,连续多次错误账户将会被禁用")
return json_response(error=error)
def logout(request):
request.user.token_expired = 0
request.user.save()
return json_response()

View File

@ -0,0 +1,5 @@
from .parser import JsonParser, Argument
from .decorators import *
from .validators import *
from .mixins import *
from .utils import *

View File

@ -0,0 +1,37 @@
from functools import wraps
from .utils import json_response
def permission_required_supper(view_func):
@wraps(view_func)
def wrapper(*args, **kwargs):
request = None
for item in args:
if hasattr(item, 'user'):
request = item
break
if request is None or not request.user.is_supper:
return json_response(error='需要管理员权限')
return view_func(*args, **kwargs)
return wrapper
def permission_required(perm_list):
def decorate(view_func):
codes = (perm_list,) if isinstance(perm_list, str) else perm_list
@wraps(view_func)
def wrapper(*args, **kwargs):
request = None
for item in args:
if hasattr(item, 'user'):
request = item
break
if request is None or (not request.user.is_supper and not request.user.has_perms(codes)):
return json_response(error='拒绝访问')
return view_func(*args, **kwargs)
return wrapper
return decorate

View File

@ -0,0 +1,39 @@
from django.utils.deprecation import MiddlewareMixin
from django.conf import settings
from .utils import json_response
from apps.account.models import User
import traceback
import time
class HandleExceptionMiddleware(MiddlewareMixin):
"""
处理试图函数异常
"""
def process_exception(self, request, exception):
traceback.print_exc()
return json_response(error='Exception: %s' % exception)
class AuthenticationMiddleware(MiddlewareMixin):
"""
登录验证
"""
def process_request(self, request):
if request.path in settings.AUTHENTICATION_EXCLUDES:
return None
if any(x.match(request.path) for x in settings.AUTHENTICATION_EXCLUDES if hasattr(x, 'match')):
return None
access_token = request.META.get('HTTP_X_TOKEN') or request.GET.get('x-token')
if access_token and len(access_token) == 32:
user = User.objects.filter(access_token=access_token).first()
if user and user.token_expired >= time.time() and user.is_active:
request.user = user
user.token_expired = time.time() + 8 * 60 * 60
user.save()
return None
response = json_response(error="验证失败,请重新登录")
response.status_code = 401
return response

53
spug_api/libs/mixins.py Normal file
View File

@ -0,0 +1,53 @@
from .utils import json_response
# 混入类提供Model实例to_dict方法
class ModelMixin(object):
__slots__ = ()
def to_dict(self, excludes: tuple = None, selects: tuple = None) -> dict:
if not hasattr(self, '_meta'):
raise TypeError('<%r> does not a django.db.models.Model object.' % self)
elif selects:
return {f: getattr(self, f) for f in selects}
elif excludes:
return {f.attname: getattr(self, f.attname) for f in self._meta.fields if f.attname not in excludes}
else:
return {f.attname: getattr(self, f.attname) for f in self._meta.fields}
# 使用该混入类需要request.user对象实现has_perms方法
class PermissionMixin(object):
"""
CBV mixin which verifies that the current user has all specified
permissions.
"""
permission_required = None
def get_permission_required(self):
"""
Override this method to override the permission_required attribute.
Must return an iterable.
"""
if self.permission_required is None:
raise AttributeError(
'{0} is missing the permission_required attribute. Define {0}.permission_required, or override '
'{0}.get_permission_required().'.format(self.__class__.__name__)
)
if isinstance(self.permission_required, str):
perms = (self.permission_required,)
else:
perms = self.permission_required
return perms
def has_permission(self):
"""
Override this method to customize the way permissions are checked.
"""
perms = self.get_permission_required()
return self.request.user.has_perms(perms)
def dispatch(self, request, *args, **kwargs):
if not self.has_permission():
return json_response(error='拒绝访问')
return super(PermissionMixin, self).dispatch(request, *args, **kwargs)

123
spug_api/libs/parser.py Normal file
View File

@ -0,0 +1,123 @@
import json
from .utils import AttrDict
# 自定义的解析异常
class ParseError(BaseException):
def __init__(self, message):
self.message = message
# 需要校验的参数对象
class Argument(object):
"""
:param name: name of option
:param default: default value if the argument if absent
:param bool required: is required
"""
def __init__(self, name, default=None, required=True, type=str, filter=None, help=None, nullable=False):
self.name = name
self.default = default
self.type = type
self.required = required
self.nullable = nullable
self.filter = filter
self.help = help
if not isinstance(self.name, str):
raise TypeError('Argument name must be string')
if filter and not callable(self.filter):
raise TypeError('Argument filter is not callable')
def parse(self, has_key, value):
if not has_key:
if self.required and self.default is None:
raise ParseError(
self.help or 'Required Error: %s is required' % self.name)
else:
return self.default
elif value in [u'', '', None]:
if self.default is not None:
return self.default
elif not self.nullable and self.required:
raise ParseError(
self.help or 'Value Error: %s must not be null' % self.name)
else:
return None
try:
if self.type:
if self.type in (list, dict) and isinstance(value, str):
value = json.loads(value)
assert isinstance(value, self.type)
elif self.type == bool and isinstance(value, str):
assert value.lower() in ['true', 'false']
value = value.lower() == 'true'
elif not isinstance(value, self.type):
value = self.type(value)
except (TypeError, ValueError, AssertionError):
raise ParseError(self.help or 'Type Error: %s type must be %s' % (
self.name, self.type))
if self.filter:
if not self.filter(value):
raise ParseError(
self.help or 'Value Error: %s filter check failed' % self.name)
return value
# 解析器基类
class BaseParser(object):
def __init__(self, *args):
self.args = []
for e in args:
if isinstance(e, str):
e = Argument(e)
elif not isinstance(e, Argument):
raise TypeError('%r is not instance of Argument' % e)
self.args.append(e)
def _get(self, key):
raise NotImplementedError
def _init(self, data):
raise NotImplementedError
def add_argument(self, **kwargs):
self.args.append(Argument(**kwargs))
def parse(self, data=None, clear=False):
rst = AttrDict()
try:
self._init(data)
for e in self.args:
has_key, value = self._get(e.name)
if clear and has_key is False and e.required is False:
continue
rst[e.name] = e.parse(has_key, value)
except ParseError as err:
return None, err.message
return rst, None
# Json解析器
class JsonParser(BaseParser):
def __init__(self, *args):
self.__data = None
super(JsonParser, self).__init__(*args)
def _get(self, key):
return key in self.__data, self.__data.get(key)
def _init(self, data):
try:
if isinstance(data, (str, bytes)):
data = data.decode('utf-8')
self.__data = json.loads(data) if data else {}
else:
assert hasattr(data, '__contains__')
assert hasattr(data, 'get')
assert callable(data.get)
self.__data = data
except (ValueError, AssertionError):
raise ParseError('Invalid data type for parse')

93
spug_api/libs/utils.py Normal file
View File

@ -0,0 +1,93 @@
from django.http.response import HttpResponse
from django.db.models import QuerySet
from datetime import datetime, date as datetime_date
from decimal import Decimal
import string
import random
import json
# 转换时间格式到字符串
def human_time(date=None):
if date:
assert isinstance(date, datetime)
else:
date = datetime.now()
return date.strftime('%Y-%m-%d %H:%M:%S')
# 转换时间格式到字符串(天)
def human_date(date=None):
if date:
assert isinstance(date, datetime)
else:
date = datetime.now()
return date.strftime('%Y-%m-%d')
# 解析时间类型的数据
def parse_time(value):
if isinstance(value, datetime):
return value
if isinstance(value, str):
if len(value) == 10:
return datetime.strptime(value, '%Y-%m-%d')
elif len(value) == 19:
return datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
raise TypeError('Expect a datetime.datetime value')
# 传两个时间得到一个时间差
def human_diff_time(time1, time2):
time1 = parse_time(time1)
time2 = parse_time(time2)
delta = time1 - time2 if time1 > time2 else time2 - time1
if delta.seconds < 60:
text = '%d' % delta.seconds
elif delta.seconds < 3600:
text = '%d' % (delta.seconds / 60)
else:
text = '%d小时' % (delta.seconds / 3600)
return '%d%s' % (delta.days, text) if delta.days else text
def json_response(data='', error=''):
content = AttrDict(data=data, error=error)
if error:
content.data = ''
elif hasattr(data, 'to_dict'):
content.data = data.to_dict()
elif isinstance(data, (list, QuerySet)) and all([hasattr(item, 'to_dict') for item in data]):
content.data = [item.to_dict() for item in data]
return HttpResponse(json.dumps(content, cls=DateTimeEncoder), content_type='application/json')
# 继承自dict实现可以通过.来操作元素
class AttrDict(dict):
def __setattr__(self, key, value):
self.__setitem__(key, value)
def __getattr__(self, item):
return self.__getitem__(item)
def __delattr__(self, item):
self.__delitem__(item)
# 日期json序列化
class DateTimeEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, datetime):
return o.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(o, datetime_date):
return o.strftime('%Y-%m-%d')
elif isinstance(o, Decimal):
return float(o)
return json.JSONEncoder.default(self, o)
# 生成指定长度的随机数
def generate_random_str(length: int = 4, is_digits: bool = True) -> str:
words = string.digits if is_digits else string.ascii_letters + string.digits
return ''.join(random.sample(words, length))

View File

@ -0,0 +1,26 @@
import ipaddress
from datetime import datetime
# 判断是否是ip地址
def ip_validator(value):
try:
ipaddress.ip_address(value)
return True
except ValueError:
return False
# 判断是否是日期字符串,支持 2018-04-11 或 2018-04-11 14:55:30
def date_validator(value: str) -> bool:
value = value.strip()
try:
if len(value) == 10:
datetime.strptime(value, '%Y-%m-%d')
return True
elif len(value) == 19:
datetime.strptime(value, '%Y-%m-%d %H:%M:%S')
return True
except ValueError:
pass
return False

21
spug_api/manage.py Executable file
View File

@ -0,0 +1,21 @@
#!/usr/bin/env python
"""Django's command-line utility for administrative tasks."""
import os
import sys
def main():
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'spug.settings')
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,2 @@
Django==2.2.7
channels==2.3.1

View File

4
spug_api/spug/routing.py Normal file
View File

@ -0,0 +1,4 @@
from channels.routing import ProtocolTypeRouter, URLRouter
application = ProtocolTypeRouter({
})

69
spug_api/spug/settings.py Normal file
View File

@ -0,0 +1,69 @@
"""
Django settings for spug project.
Generated by 'django-admin startproject' using Django 2.2.7.
For more information on this file, see
https://docs.djangoproject.com/en/2.2/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/2.2/ref/settings/
"""
import os
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/2.2/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'vk0do47)egwzz!uk49%(y3s(fpx4+ha@ugt-hcv&%&d@hwr&p7'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
ALLOWED_HOSTS = []
# Application definition
INSTALLED_APPS = [
'channels',
'apps.account',
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.middleware.common.CommonMiddleware',
]
ROOT_URLCONF = 'spug.urls'
WSGI_APPLICATION = 'spug.wsgi.application'
ASGI_APPLICATION = 'spug.routing.application'
# Database
# https://docs.djangoproject.com/en/2.2/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
}
}
# Internationalization
# https://docs.djangoproject.com/en/2.2/topics/i18n/
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True
USE_L10N = True
USE_TZ = True
AUTHENTICATION_EXCLUDES = ()

20
spug_api/spug/urls.py Normal file
View File

@ -0,0 +1,20 @@
"""spug URL Configuration
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/2.2/topics/http/urls/
Examples:
Function views
1. Add an import: from my_app import views
2. Add a URL to urlpatterns: path('', views.home, name='home')
Class-based views
1. Add an import: from other_app.views import Home
2. Add a URL to urlpatterns: path('', Home.as_view(), name='home')
Including another URLconf
1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""
from django.urls import path, include
urlpatterns = [
path('account/', include('apps.account.urls'))
]

16
spug_api/spug/wsgi.py Normal file
View File

@ -0,0 +1,16 @@
"""
WSGI config for spug project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/2.2/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'spug.settings')
application = get_wsgi_application()

29
spug_api/tools/useradd.py Normal file
View File

@ -0,0 +1,29 @@
import argparse
import django
import sys
import os
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'spug.settings')
django.setup()
from apps.account.models import User
parser = argparse.ArgumentParser(description='创建用户')
parser.add_argument('-u', required=True, metavar='username', help='账户名称')
parser.add_argument('-p', required=True, metavar='password', help='账户密码')
parser.add_argument('-n', default='', metavar='nickname', help='账户昵称')
parser.add_argument('-s', default=False, action='store_true', help='是否是超级用户(默认否)')
args = parser.parse_args()
User.objects.create(
username=args.u,
nickname=args.n,
password_hash=User.make_password(args.p),
is_supper=args.s,
)
print('创建成功')