perf: 支持 loki log

pull/13241/head
Eric 7 months ago committed by Bryan
parent a7316bc7c1
commit 79edff5fca

@ -1278,5 +1278,6 @@
"ZoneEnabled": "Enable zone", "ZoneEnabled": "Enable zone",
"ZoneHelpMessage": "The zone is the location where assets are located, which can be a data center, public cloud, or VPC. Gateways can be set up within the region. When the network cannot be directly accessed, users can utilize gateways to log in to the assets.", "ZoneHelpMessage": "The zone is the location where assets are located, which can be a data center, public cloud, or VPC. Gateways can be set up within the region. When the network cannot be directly accessed, users can utilize gateways to log in to the assets.",
"ZoneList": "Zones", "ZoneList": "Zones",
"ZoneUpdate": "Update the zone" "ZoneUpdate": "Update the zone",
} "TailLog": "Tail Log"
}

@ -1278,5 +1278,6 @@
"ZoneUpdate": "更新区域", "ZoneUpdate": "更新区域",
"YourProfile": "个人信息", "YourProfile": "个人信息",
"InformationModification": "信息更改", "InformationModification": "信息更改",
"Phone": "手机" "Phone": "手机",
"TailLog": "追踪日志"
} }

@ -619,7 +619,10 @@ class Config(dict):
# Ansible Receptor # Ansible Receptor
'RECEPTOR_ENABLED': False, 'RECEPTOR_ENABLED': False,
'ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST': 'jms_celery', 'ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST': 'jms_celery',
'ANSIBLE_RECEPTOR_TCP_LISTEN_ADDRESS': 'receptor:7521' 'ANSIBLE_RECEPTOR_TCP_LISTEN_ADDRESS': 'receptor:7521',
'LOKI_LOG_ENABLED': False,
'LOKI_BASE_URL': 'http://loki:3100',
} }

@ -235,3 +235,6 @@ TICKET_APPLY_ASSET_SCOPE = CONFIG.TICKET_APPLY_ASSET_SCOPE
RECEPTOR_ENABLED = CONFIG.RECEPTOR_ENABLED RECEPTOR_ENABLED = CONFIG.RECEPTOR_ENABLED
ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST = CONFIG.ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST = CONFIG.ANSIBLE_RECEPTOR_GATEWAY_PROXY_HOST
ANSIBLE_RECEPTOR_TCP_LISTEN_ADDRESS = CONFIG.ANSIBLE_RECEPTOR_TCP_LISTEN_ADDRESS ANSIBLE_RECEPTOR_TCP_LISTEN_ADDRESS = CONFIG.ANSIBLE_RECEPTOR_TCP_LISTEN_ADDRESS
LOKI_LOG_ENABLED = CONFIG.LOKI_LOG_ENABLED
LOKI_BASE_URL = CONFIG.LOKI_BASE_URL

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
from .celery_flower import *
from .error_views import *
from .index import * from .index import *
from .other import * from .other import *
from .celery_flower import *
from .swagger import * from .swagger import *
from .error_views import *

@ -62,6 +62,7 @@ class PrivateSettingSerializer(PublicSettingSerializer):
CHAT_AI_ENABLED = serializers.BooleanField() CHAT_AI_ENABLED = serializers.BooleanField()
GPT_MODEL = serializers.CharField() GPT_MODEL = serializers.CharField()
FILE_UPLOAD_SIZE_LIMIT_MB = serializers.IntegerField() FILE_UPLOAD_SIZE_LIMIT_MB = serializers.IntegerField()
LOKI_LOG_ENABLED = serializers.BooleanField()
class ServerInfoSerializer(serializers.Serializer): class ServerInfoSerializer(serializers.Serializer):

@ -1,5 +1,6 @@
from .connect_methods import * from .connect_methods import *
from .endpoint import * from .endpoint import *
from .loki_log import *
from .status import * from .status import *
from .storage import * from .storage import *
from .terminal import * from .terminal import *

@ -0,0 +1,35 @@
from rest_framework.response import Response
from rest_framework.views import APIView
from common.permissions import OnlySuperUser
from common.utils import get_logger
from terminal import serializers
from terminal.mixin import LokiMixin
__all__ = ['LokiLogAPI', ]
logger = get_logger(__name__)
class LokiLogAPI(APIView, LokiMixin):
http_method_names = ['get', ]
permission_classes = [OnlySuperUser]
def get(self, request, *args, **kwargs):
serializer = serializers.LokiLogSerializer(data=request.query_params)
serializer.is_valid(raise_exception=True)
components = serializer.validated_data.get('components')
search = serializer.validated_data.get('search', '')
start = serializer.validated_data.get('start', )
end = serializer.validated_data.get('end', )
loki_logs = self.query_components_log(components, search, start, end)
return Response(data=loki_logs)
def query_components_log(self, components, search, start, end):
# 秒转纳秒
start_ns = int(start * 1e9)
end_ns = int(end * 1e9)
query = self.create_loki_query(components, search)
loki_client = self.get_loki_client()
loki_response = loki_client.query_range(query, start_ns, end_ns, limit=100)
return loki_response['data']['result']

@ -0,0 +1,15 @@
from terminal.utils.loki_client import get_loki_client
__all__ = ['LokiMixin', ]
class LokiMixin:
def get_loki_client(self):
return get_loki_client()
def create_loki_query(self, components, search):
stream_selector = '{component!=""}'
if components:
stream_selector = '{component=~"%s"}' % components
query = f'{stream_selector} |="{search}"'
return query

@ -11,3 +11,4 @@ from .task import *
from .terminal import * from .terminal import *
from .virtualapp import * from .virtualapp import *
from .virtualapp_provider import * from .virtualapp_provider import *
from .loki import *

@ -0,0 +1,14 @@
import time
from rest_framework import serializers
__all__ = [
'LokiLogSerializer',
]
class LokiLogSerializer(serializers.Serializer):
components = serializers.CharField(required=False, )
start = serializers.IntegerField()
end = serializers.IntegerField(default=time.time)
search = serializers.CharField(required=False, default='')

@ -54,6 +54,7 @@ urlpatterns = [
# components # components
path('components/metrics/', api.ComponentsMetricsAPIView.as_view(), name='components-metrics'), path('components/metrics/', api.ComponentsMetricsAPIView.as_view(), name='components-metrics'),
path('components/connect-methods/', api.ConnectMethodListApi.as_view(), name='connect-methods'), path('components/connect-methods/', api.ConnectMethodListApi.as_view(), name='connect-methods'),
path('loki/logs/', api.LokiLogAPI.as_view(), name='loki-logs'),
] ]
urlpatterns += router.urls urlpatterns += router.urls

@ -6,4 +6,5 @@ app_name = 'terminal'
urlpatterns = [ urlpatterns = [
path('ws/terminal-task/', ws.TerminalTaskWebsocket.as_asgi(), name='terminal-task-ws'), path('ws/terminal-task/', ws.TerminalTaskWebsocket.as_asgi(), name='terminal-task-ws'),
path('ws/component-log-tail/', ws.LokiTailWebsocket.as_asgi(), name='component-log-tail-ws'),
] ]

@ -0,0 +1,57 @@
import urllib.parse
import requests
from django.conf import settings
from websockets.sync.client import connect as ws_connect
def get_loki_client():
# TODO: 补充 auth 认证相关
return LokiClient(base_url=settings.LOKI_BASE_URL)
# https://grafana.com/docs/loki/latest/reference/loki-http-api/
class LokiClient(object):
query_range_url = '/loki/api/v1/query_range'
tail_url = '/loki/api/v1/tail'
def __init__(self, base_url: str):
self.base_url = base_url.rstrip('/')
def query_range(self, query, start, end, limit=100):
params = {
'query': query,
'start': start,
'end': end,
'limit': limit,
}
url = f"{self.base_url}{self.query_range_url}"
response = requests.get(url, params=params)
if response.status_code != 200:
raise Exception(response.text)
return response.json()
def create_tail_ws(self, query, limit=100):
data = {'query': query, 'limit': limit}
params = urllib.parse.urlencode(data)
ws_url = f"ws://{self.base_url[7:]}"
if self.base_url.startswith('https://'):
ws_url = f"wss://{self.base_url[8:]}"
url = f"{ws_url}{self.tail_url}?{params}"
ws = ws_connect(url)
return LokiTailWs(ws)
class LokiTailWs(object):
def __init__(self, ws):
self._ws = ws
def messages(self):
for message in self._ws:
yield message
def close(self):
if self._ws:
self._ws.close()

@ -1,4 +1,5 @@
import datetime import datetime
from threading import Thread
from channels.generic.websocket import JsonWebsocketConsumer from channels.generic.websocket import JsonWebsocketConsumer
from django.utils import timezone from django.utils import timezone
@ -10,6 +11,7 @@ from common.utils.connection import Subscription
from terminal.const import TaskNameType from terminal.const import TaskNameType
from terminal.models import Session, Terminal from terminal.models import Session, Terminal
from terminal.serializers import TaskSerializer, StatSerializer from terminal.serializers import TaskSerializer, StatSerializer
from .mixin import LokiMixin
from .signal_handlers import component_event_chan from .signal_handlers import component_event_chan
logger = get_logger(__name__) logger = get_logger(__name__)
@ -77,3 +79,40 @@ class TerminalTaskWebsocket(JsonWebsocketConsumer):
if self.sub is None: if self.sub is None:
return return
self.sub.unsubscribe() self.sub.unsubscribe()
class LokiTailWebsocket(JsonWebsocketConsumer, LokiMixin):
loki_tail_ws = None
def connect(self):
user = self.scope["user"]
if user.is_authenticated and user.is_superuser:
self.accept()
logger.info('Loki tail websocket connected')
else:
self.close()
def receive_json(self, content, **kwargs):
if not content:
return
components = content.get('components')
search = content.get('search', '')
query = self.create_loki_query(components, search)
self.handle_query(query)
def send_tail_msg(self, tail_ws):
for message in tail_ws.messages():
self.send(text_data=message)
logger.info('Loki tail thread finished')
def handle_query(self, query):
loki_client = self.get_loki_client()
self.loki_tail_ws = loki_client.create_tail_ws(query)
threader = Thread(target=self.send_tail_msg, args=(self.loki_tail_ws,))
threader.start()
logger.debug('Start loki tail thread')
def disconnect(self, close_code):
if self.loki_tail_ws:
self.loki_tail_ws.close()
logger.info('Loki tail websocket client closed')

Loading…
Cancel
Save