diff --git a/backend/application/asgi.py b/backend/application/asgi.py index c38d873..96d776b 100644 --- a/backend/application/asgi.py +++ b/backend/application/asgi.py @@ -18,10 +18,10 @@ from application.websocketConfig import websocket_application http_application = get_asgi_application() -async def application(scope,receive,send): - if scope['type'] == 'http': - await http_application(scope, receive, send) - elif scope['type'] == 'websocket': - await websocket_application(scope, receive, send) - else: - raise Exception("未知的scope类型,"+ scope['type']) \ No newline at end of file +# async def application(scope,receive,send): +# if scope['type'] == 'http': +# await http_application(scope, receive, send) +# elif scope['type'] == 'websocket': +# await websocket_application(scope, receive, send) +# else: +# raise Exception("未知的scope类型,"+ scope['type']) \ No newline at end of file diff --git a/backend/application/routing.py b/backend/application/routing.py index 166857b..d9590cc 100644 --- a/backend/application/routing.py +++ b/backend/application/routing.py @@ -1,13 +1,19 @@ # -*- coding: utf-8 -*- from channels.auth import AuthMiddlewareStack from channels.routing import ProtocolTypeRouter, URLRouter -from dvadmin.system import routing as dvadminRouting +from django.urls import path + +from application.websocketConfig import MegCenter + +websocket_urlpatterns = [ + path('ws//', MegCenter.as_asgi()), #consumers.DvadminWebSocket 是该路由的消费者 +] application = ProtocolTypeRouter({ 'websocket': AuthMiddlewareStack( URLRouter( - dvadminRouting.websocket_urlpatterns# 指明路由文件是devops/routing.py + websocket_urlpatterns #指明路由文件是devops/routing.py ) ), }) \ No newline at end of file diff --git a/backend/application/websocketConfig.py b/backend/application/websocketConfig.py index 8db1b59..1c811b2 100644 --- a/backend/application/websocketConfig.py +++ b/backend/application/websocketConfig.py @@ -1,114 +1,100 @@ # -*- coding: utf-8 -*- -import django - -django.setup() -import json import urllib -#处理websocket传参 +from asgiref.sync import sync_to_async, async_to_sync +from channels.db import database_sync_to_async +from channels.generic.websocket import AsyncJsonWebsocketConsumer, AsyncWebsocketConsumer +import json + +from channels.layers import get_channel_layer from jwt import InvalidSignatureError from application import settings from dvadmin.system.models import MessageCenter +send_dict = {} + +# 发送消息结构体 +def message(sender, msg_type, msg): + text = { + 'sender': sender, + 'contentType': msg_type, + 'content': msg, + } + return text + +#异步获取消息中心的目标用户 +@database_sync_to_async +def _get_message_center_instance(message_id): + _MessageCenter = MessageCenter.objects.filter(id=message_id).values_list('target_user',flat=True) + if _MessageCenter: + return _MessageCenter + else: + return [] + def request_data(scope): query_string = scope.get('query_string', b'').decode('utf-8') qs = urllib.parse.parse_qs(query_string) return qs - -# 全部的websocket sender -CONNECTIONS = {} +class DvadminWebSocket(AsyncJsonWebsocketConsumer): + async def connect(self): + try: + import jwt + self.service_uid = self.scope["url_route"]["kwargs"]["service_uid"] + decoded_result = jwt.decode(self.service_uid, settings.SECRET_KEY, algorithms=["HS256"]) + if decoded_result: + self.user_id = decoded_result.get('user_id') + self.chat_group_name = "user_"+str(self.user_id) + #收到连接时候处理, + await self.channel_layer.group_add( + self.chat_group_name, + self.channel_name + ) + await self.accept() + await self.send_json(message('system', 'SYSTEM', '连接成功')) + except InvalidSignatureError: + await self.disconnect(None) -# 判断用户是否已经连接 -def check_connection(key): - return key in CONNECTIONS + async def disconnect(self, close_code): + # Leave room group + await self.channel_layer.group_discard(self.chat_group_name, self.channel_name) + print("连接关闭") + await self.close(close_code) -# 发送消息结构体 -def message(sender, msg_type, msg): - text = json.dumps({ - 'sender': sender, - 'contentType': msg_type, - 'content': msg, - }) - return { - 'type': 'websocket.send', - 'text': text +class MegCenter(DvadminWebSocket): + """ + 消息中心 + """ + + async def receive(self, text_data): + # 接受客户端的信息,你处理的函数 + text_data_json = json.loads(text_data) + message_id = text_data_json.get('message_id', None) + user_list = await _get_message_center_instance(message_id) + for send_user in user_list: + await self.channel_layer.group_send( + "user_" + str(send_user), + {'type': 'push.message', 'message': text_data_json} + ) + + async def push_message(self, event): + message = event['message'] + await self.send(text_data=json.dumps(message)) + + +def push(username, event): + """ + 主动推送消息 + """ + channel_layer = get_channel_layer() + async_to_sync(channel_layer.group_send)( + username, + { + "type": "push.message", + "event": event } - - -async def websocket_application(scope, receive, send): - while True: - event = await receive() - # print('[event] ', event) - qs = request_data(scope) - print(1,qs) - auth = qs.get('auth', [''])[0] - user_id = None - # 收到建立WebSocket连接的消息 - if event['type'] == 'websocket.connect': - # 昵称验证 - if not auth: - break - else: - try: - import jwt - decoded_result = jwt.decode(auth, settings.SECRET_KEY, algorithms=["HS256"]) - if decoded_result: - user_id = decoded_result.get('user_id') - # 记录 - CONNECTIONS[user_id] = send - except InvalidSignatureError: - break - if auth in CONNECTIONS: - break - - await send({'type': 'websocket.accept'}) - await send(message('system', 'INFO', '连接成功')) - # # 发送好友列表 - # friends_list = list(CONNECTIONS.keys()) - # await send(message('system', 'INFO', friends_list)) - # - # # 向其他人群发消息, 有人登录了 - # for other in CONNECTIONS.values(): - # await other(message('system', 'addFriend', auth)) - - - # 收到中断WebSocket连接的消息 - elif event['type'] == 'websocket.disconnect': - # 移除记录 - if user_id in CONNECTIONS: - CONNECTIONS.pop(user_id) - - # # 向其他人群发消息, 有人离线了 - # for other in CONNECTIONS.values(): - # await other(message('system', 'removeFriend', user_id)) - - # 其他情况,正常的WebSocket消息 - elif event['type'] == 'websocket.receive': - - if event['text'] == 'ping': - await send(message('system', 'text', 'pong!')) - else: - receive_msg = json.loads(event['text']) - message_id = receive_msg.get('message_id', None) - _MessageCenter = MessageCenter.objects.filter(id=message_id).first() - if _MessageCenter: - user_list = _MessageCenter.target_user.values_list('id',flat=True) - for send_user in user_list: - if send_user in CONNECTIONS: - content_type = receive_msg.get('contentType', 'TEXT') - content = receive_msg.get('content', '') - msg = message(user_id, content_type, content) - await CONNECTIONS[send_user](msg) - else: - msg = message('system', 'text', '对方已下线或不存在') - await send(msg) - else: - print('a1a1a1') - pass - - print('[disconnect]') \ No newline at end of file + ) \ No newline at end of file diff --git a/backend/dvadmin/system/consumers.py b/backend/dvadmin/system/consumers.py deleted file mode 100644 index 044c955..0000000 --- a/backend/dvadmin/system/consumers.py +++ /dev/null @@ -1,95 +0,0 @@ -# -*- coding: utf-8 -*- -import urllib - -from asgiref.sync import sync_to_async -from channels.db import database_sync_to_async -from channels.generic.websocket import AsyncJsonWebsocketConsumer, AsyncWebsocketConsumer -import json - -from jwt import InvalidSignatureError - -from application import settings -from dvadmin.system.models import MessageCenter - -send_dict = {} - -# 发送消息结构体 -def message(sender, msg_type, msg): - text = { - 'sender': sender, - 'contentType': msg_type, - 'content': msg, - } - return text - -#异步获取消息中心的目标用户 -@database_sync_to_async -def _get_message_center_instance(message_id): - _MessageCenter = MessageCenter.objects.filter(id=message_id).values_list('target_user',flat=True) - if _MessageCenter: - return _MessageCenter - else: - return [] - - -def request_data(scope): - query_string = scope.get('query_string', b'').decode('utf-8') - qs = urllib.parse.parse_qs(query_string) - return qs - -class DvadminWebSocket(AsyncJsonWebsocketConsumer): - async def connect(self): - try: - import jwt - self.service_uid = self.scope["url_route"]["kwargs"]["service_uid"] - params = request_data(self.scope) - room = params.get('room')[0] - decoded_result = jwt.decode(self.service_uid, settings.SECRET_KEY, algorithms=["HS256"]) - if decoded_result: - self.user_id = decoded_result.get('user_id') - self.chat_group_name = room - #收到连接时候处理, - await self.channel_layer.group_add( - self.chat_group_name, - self.channel_name - ) - # 将该客户端的信息发送函数与客户端的唯一身份标识绑定,保存至自定义的字典中 - if len(send_dict)==0: - send_dict.setdefault(self.chat_group_name, {}) - for room in send_dict.keys(): - if room == self.chat_group_name: - send_dict[self.chat_group_name][self.user_id] = self.send - else: - send_dict.setdefault(self.chat_group_name,{}) - await self.accept() - await self.send_json(message('system', 'SYSTEM', '连接成功')) - except InvalidSignatureError: - await self.disconnect(None) - - - async def disconnect(self, close_code): - # 删除 send_dict 中对应的信息 - del send_dict[self.chat_group_name][self.user_id] - # Leave room group - await self.channel_layer.group_discard(self.chat_group_name, self.channel_name) - print("连接关闭") - await self.close(close_code) - - -class MegCenter(DvadminWebSocket): - """ - 消息中心 - """ - async def receive(self, text_data=None, byte_text_data=None): - print(text_data) - try: - text_data_json = json.loads(text_data) - except Exception as e: - print('数据无法被json格式化', e) - await self.disconnect(400) - else: - # 获取将要推送信息的目标身份标识,调用保存在 send_dict中的信息发送函数 - message_id = text_data_json.get('message_id', None) - user_list = await _get_message_center_instance(message_id) - for send_user in user_list: - await send_dict[self.chat_group_name][send_user](text_data=json.dumps(text_data_json)) \ No newline at end of file diff --git a/backend/dvadmin/system/routing.py b/backend/dvadmin/system/routing.py deleted file mode 100644 index 3f4ecbb..0000000 --- a/backend/dvadmin/system/routing.py +++ /dev/null @@ -1,7 +0,0 @@ -# -*- coding: utf-8 -*- -from django.urls import path -from . import consumers - -websocket_urlpatterns = [ - path('ws//', consumers.MegCenter.as_asgi()), #consumers.DvadminWebSocket 是该路由的消费者 -] \ No newline at end of file diff --git a/backend/dvadmin/system/views/message_center.py b/backend/dvadmin/system/views/message_center.py index bcd147c..bbda1f9 100644 --- a/backend/dvadmin/system/views/message_center.py +++ b/backend/dvadmin/system/views/message_center.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- -from itertools import chain - +import json from django_restql.fields import DynamicSerializerMethodField from rest_framework import serializers from rest_framework.decorators import action, permission_classes @@ -181,7 +180,6 @@ class MessageCenterViewSet(CustomModelViewSet): self_user_id = self.request.user.id queryset = MessageCenterTargetUser.objects.filter(users__id=self_user_id).exclude( messagecenter__is_deleted=True).order_by('create_datetime').last() - print(queryset) data = None if queryset: serializer = MessageCenterTargetUserListSerializer(queryset, many=False, request=request) diff --git a/web/src/api/websocket.js b/web/src/api/websocket.js index 269f452..d38ad05 100644 --- a/web/src/api/websocket.js +++ b/web/src/api/websocket.js @@ -3,7 +3,7 @@ import util from '@/libs/util' function initWebSocket (e) { const token = util.cookies.get('token') if (token) { - const wsUri = process.env.VUE_APP_WEBSOCKET + '/ws/' + token + '/?room=message_center' + const wsUri = process.env.VUE_APP_WEBSOCKET + '/ws/' + token + '/' this.socket = new WebSocket(wsUri)// 这里面的this都指向vue this.socket.onerror = webSocketOnError this.socket.onmessage = webSocketOnMessage @@ -20,6 +20,12 @@ function webSocketOnError (e) { duration: 3000 }) } + +/** + * 接收消息 + * @param e + * @returns {any} + */ function webSocketOnMessage (e) { const data = JSON.parse(e.data) if (data.contentType === 'SYSTEM') { @@ -58,9 +64,13 @@ function closeWebsocket () { this.socket.close() } +/** + * 发送消息 + * @param message + */ function webSocketSend (message) { this.socket.send(JSON.stringify(message)) } export default { - initWebSocket, closeWebsocket, webSocketSend + initWebSocket, closeWebsocket, webSocketSend,webSocketOnMessage }