功能变化: 重构websocket

pull/79/head
猿小天 2022-11-15 17:36:05 +08:00
parent 1bd16a722e
commit 90e47bdee5
7 changed files with 109 additions and 211 deletions

View File

@ -18,10 +18,10 @@ from application.websocketConfig import websocket_application
http_application = get_asgi_application() http_application = get_asgi_application()
async def application(scope,receive,send): # async def application(scope,receive,send):
if scope['type'] == 'http': # if scope['type'] == 'http':
await http_application(scope, receive, send) # await http_application(scope, receive, send)
elif scope['type'] == 'websocket': # elif scope['type'] == 'websocket':
await websocket_application(scope, receive, send) # await websocket_application(scope, receive, send)
else: # else:
raise Exception("未知的scope类型,"+ scope['type']) # raise Exception("未知的scope类型,"+ scope['type'])

View File

@ -1,13 +1,19 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from channels.auth import AuthMiddlewareStack from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter from channels.routing import ProtocolTypeRouter, URLRouter
from dvadmin.system import routing as dvadminRouting
from django.urls import path
from application.websocketConfig import MegCenter
websocket_urlpatterns = [
path('ws/<str:service_uid>/', MegCenter.as_asgi()), #consumers.DvadminWebSocket 是该路由的消费者
]
application = ProtocolTypeRouter({ application = ProtocolTypeRouter({
'websocket': AuthMiddlewareStack( 'websocket': AuthMiddlewareStack(
URLRouter( URLRouter(
dvadminRouting.websocket_urlpatterns# 指明路由文件是devops/routing.py websocket_urlpatterns #指明路由文件是devops/routing.py
) )
), ),
}) })

View File

@ -1,114 +1,100 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import django
django.setup()
import json
import urllib import urllib
#处理websocket传参 from asgiref.sync import sync_to_async, async_to_sync
from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncJsonWebsocketConsumer, AsyncWebsocketConsumer
import json
from channels.layers import get_channel_layer
from jwt import InvalidSignatureError from jwt import InvalidSignatureError
from application import settings from application import settings
from dvadmin.system.models import MessageCenter from dvadmin.system.models import MessageCenter
send_dict = {}
# 发送消息结构体
def message(sender, msg_type, msg):
text = {
'sender': sender,
'contentType': msg_type,
'content': msg,
}
return text
#异步获取消息中心的目标用户
@database_sync_to_async
def _get_message_center_instance(message_id):
_MessageCenter = MessageCenter.objects.filter(id=message_id).values_list('target_user',flat=True)
if _MessageCenter:
return _MessageCenter
else:
return []
def request_data(scope): def request_data(scope):
query_string = scope.get('query_string', b'').decode('utf-8') query_string = scope.get('query_string', b'').decode('utf-8')
qs = urllib.parse.parse_qs(query_string) qs = urllib.parse.parse_qs(query_string)
return qs return qs
class DvadminWebSocket(AsyncJsonWebsocketConsumer):
# 全部的websocket sender async def connect(self):
CONNECTIONS = {} try:
import jwt
self.service_uid = self.scope["url_route"]["kwargs"]["service_uid"]
decoded_result = jwt.decode(self.service_uid, settings.SECRET_KEY, algorithms=["HS256"])
if decoded_result:
self.user_id = decoded_result.get('user_id')
self.chat_group_name = "user_"+str(self.user_id)
#收到连接时候处理,
await self.channel_layer.group_add(
self.chat_group_name,
self.channel_name
)
await self.accept()
await self.send_json(message('system', 'SYSTEM', '连接成功'))
except InvalidSignatureError:
await self.disconnect(None)
# 判断用户是否已经连接 async def disconnect(self, close_code):
def check_connection(key): # Leave room group
return key in CONNECTIONS await self.channel_layer.group_discard(self.chat_group_name, self.channel_name)
print("连接关闭")
await self.close(close_code)
# 发送消息结构体 class MegCenter(DvadminWebSocket):
def message(sender, msg_type, msg): """
text = json.dumps({ 消息中心
'sender': sender, """
'contentType': msg_type,
'content': msg, async def receive(self, text_data):
}) # 接受客户端的信息,你处理的函数
return { text_data_json = json.loads(text_data)
'type': 'websocket.send', message_id = text_data_json.get('message_id', None)
'text': text user_list = await _get_message_center_instance(message_id)
for send_user in user_list:
await self.channel_layer.group_send(
"user_" + str(send_user),
{'type': 'push.message', 'message': text_data_json}
)
async def push_message(self, event):
message = event['message']
await self.send(text_data=json.dumps(message))
def push(username, event):
"""
主动推送消息
"""
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
username,
{
"type": "push.message",
"event": event
} }
)
async def websocket_application(scope, receive, send):
while True:
event = await receive()
# print('[event] ', event)
qs = request_data(scope)
print(1,qs)
auth = qs.get('auth', [''])[0]
user_id = None
# 收到建立WebSocket连接的消息
if event['type'] == 'websocket.connect':
# 昵称验证
if not auth:
break
else:
try:
import jwt
decoded_result = jwt.decode(auth, settings.SECRET_KEY, algorithms=["HS256"])
if decoded_result:
user_id = decoded_result.get('user_id')
# 记录
CONNECTIONS[user_id] = send
except InvalidSignatureError:
break
if auth in CONNECTIONS:
break
await send({'type': 'websocket.accept'})
await send(message('system', 'INFO', '连接成功'))
# # 发送好友列表
# friends_list = list(CONNECTIONS.keys())
# await send(message('system', 'INFO', friends_list))
#
# # 向其他人群发消息, 有人登录了
# for other in CONNECTIONS.values():
# await other(message('system', 'addFriend', auth))
# 收到中断WebSocket连接的消息
elif event['type'] == 'websocket.disconnect':
# 移除记录
if user_id in CONNECTIONS:
CONNECTIONS.pop(user_id)
# # 向其他人群发消息, 有人离线了
# for other in CONNECTIONS.values():
# await other(message('system', 'removeFriend', user_id))
# 其他情况,正常的WebSocket消息
elif event['type'] == 'websocket.receive':
if event['text'] == 'ping':
await send(message('system', 'text', 'pong!'))
else:
receive_msg = json.loads(event['text'])
message_id = receive_msg.get('message_id', None)
_MessageCenter = MessageCenter.objects.filter(id=message_id).first()
if _MessageCenter:
user_list = _MessageCenter.target_user.values_list('id',flat=True)
for send_user in user_list:
if send_user in CONNECTIONS:
content_type = receive_msg.get('contentType', 'TEXT')
content = receive_msg.get('content', '')
msg = message(user_id, content_type, content)
await CONNECTIONS[send_user](msg)
else:
msg = message('system', 'text', '对方已下线或不存在')
await send(msg)
else:
print('a1a1a1')
pass
print('[disconnect]')

View File

@ -1,95 +0,0 @@
# -*- coding: utf-8 -*-
import urllib
from asgiref.sync import sync_to_async
from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncJsonWebsocketConsumer, AsyncWebsocketConsumer
import json
from jwt import InvalidSignatureError
from application import settings
from dvadmin.system.models import MessageCenter
send_dict = {}
# 发送消息结构体
def message(sender, msg_type, msg):
text = {
'sender': sender,
'contentType': msg_type,
'content': msg,
}
return text
#异步获取消息中心的目标用户
@database_sync_to_async
def _get_message_center_instance(message_id):
_MessageCenter = MessageCenter.objects.filter(id=message_id).values_list('target_user',flat=True)
if _MessageCenter:
return _MessageCenter
else:
return []
def request_data(scope):
query_string = scope.get('query_string', b'').decode('utf-8')
qs = urllib.parse.parse_qs(query_string)
return qs
class DvadminWebSocket(AsyncJsonWebsocketConsumer):
async def connect(self):
try:
import jwt
self.service_uid = self.scope["url_route"]["kwargs"]["service_uid"]
params = request_data(self.scope)
room = params.get('room')[0]
decoded_result = jwt.decode(self.service_uid, settings.SECRET_KEY, algorithms=["HS256"])
if decoded_result:
self.user_id = decoded_result.get('user_id')
self.chat_group_name = room
#收到连接时候处理,
await self.channel_layer.group_add(
self.chat_group_name,
self.channel_name
)
# 将该客户端的信息发送函数与客户端的唯一身份标识绑定,保存至自定义的字典中
if len(send_dict)==0:
send_dict.setdefault(self.chat_group_name, {})
for room in send_dict.keys():
if room == self.chat_group_name:
send_dict[self.chat_group_name][self.user_id] = self.send
else:
send_dict.setdefault(self.chat_group_name,{})
await self.accept()
await self.send_json(message('system', 'SYSTEM', '连接成功'))
except InvalidSignatureError:
await self.disconnect(None)
async def disconnect(self, close_code):
# 删除 send_dict 中对应的信息
del send_dict[self.chat_group_name][self.user_id]
# Leave room group
await self.channel_layer.group_discard(self.chat_group_name, self.channel_name)
print("连接关闭")
await self.close(close_code)
class MegCenter(DvadminWebSocket):
"""
消息中心
"""
async def receive(self, text_data=None, byte_text_data=None):
print(text_data)
try:
text_data_json = json.loads(text_data)
except Exception as e:
print('数据无法被json格式化', e)
await self.disconnect(400)
else:
# 获取将要推送信息的目标身份标识,调用保存在 send_dict中的信息发送函数
message_id = text_data_json.get('message_id', None)
user_list = await _get_message_center_instance(message_id)
for send_user in user_list:
await send_dict[self.chat_group_name][send_user](text_data=json.dumps(text_data_json))

View File

@ -1,7 +0,0 @@
# -*- coding: utf-8 -*-
from django.urls import path
from . import consumers
websocket_urlpatterns = [
path('ws/<str:service_uid>/', consumers.MegCenter.as_asgi()), #consumers.DvadminWebSocket 是该路由的消费者
]

View File

@ -1,6 +1,5 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
from itertools import chain import json
from django_restql.fields import DynamicSerializerMethodField from django_restql.fields import DynamicSerializerMethodField
from rest_framework import serializers from rest_framework import serializers
from rest_framework.decorators import action, permission_classes from rest_framework.decorators import action, permission_classes
@ -181,7 +180,6 @@ class MessageCenterViewSet(CustomModelViewSet):
self_user_id = self.request.user.id self_user_id = self.request.user.id
queryset = MessageCenterTargetUser.objects.filter(users__id=self_user_id).exclude( queryset = MessageCenterTargetUser.objects.filter(users__id=self_user_id).exclude(
messagecenter__is_deleted=True).order_by('create_datetime').last() messagecenter__is_deleted=True).order_by('create_datetime').last()
print(queryset)
data = None data = None
if queryset: if queryset:
serializer = MessageCenterTargetUserListSerializer(queryset, many=False, request=request) serializer = MessageCenterTargetUserListSerializer(queryset, many=False, request=request)

View File

@ -3,7 +3,7 @@ import util from '@/libs/util'
function initWebSocket (e) { function initWebSocket (e) {
const token = util.cookies.get('token') const token = util.cookies.get('token')
if (token) { if (token) {
const wsUri = process.env.VUE_APP_WEBSOCKET + '/ws/' + token + '/?room=message_center' const wsUri = process.env.VUE_APP_WEBSOCKET + '/ws/' + token + '/'
this.socket = new WebSocket(wsUri)// 这里面的this都指向vue this.socket = new WebSocket(wsUri)// 这里面的this都指向vue
this.socket.onerror = webSocketOnError this.socket.onerror = webSocketOnError
this.socket.onmessage = webSocketOnMessage this.socket.onmessage = webSocketOnMessage
@ -20,6 +20,12 @@ function webSocketOnError (e) {
duration: 3000 duration: 3000
}) })
} }
/**
* 接收消息
* @param e
* @returns {any}
*/
function webSocketOnMessage (e) { function webSocketOnMessage (e) {
const data = JSON.parse(e.data) const data = JSON.parse(e.data)
if (data.contentType === 'SYSTEM') { if (data.contentType === 'SYSTEM') {
@ -58,9 +64,13 @@ function closeWebsocket () {
this.socket.close() this.socket.close()
} }
/**
* 发送消息
* @param message
*/
function webSocketSend (message) { function webSocketSend (message) {
this.socket.send(JSON.stringify(message)) this.socket.send(JSON.stringify(message))
} }
export default { export default {
initWebSocket, closeWebsocket, webSocketSend initWebSocket, closeWebsocket, webSocketSend,webSocketOnMessage
} }