添加SSE订阅字段过滤参数

pull/1834/head
lyswhut 2024-03-30 12:20:12 +08:00
parent a7ce24c84b
commit 703327ed41
1 changed files with 44 additions and 21 deletions

View File

@ -1,4 +1,5 @@
import http from 'node:http' import http from 'node:http'
import querystring from 'node:querystring'
import type { Socket } from 'node:net' import type { Socket } from 'node:net'
let status: LX.OpenAPI.Status = { let status: LX.OpenAPI.Status = {
@ -7,14 +8,42 @@ let status: LX.OpenAPI.Status = {
address: '', address: '',
} }
type SubscribeKeys = keyof LX.Player.Status
let httpServer: http.Server let httpServer: http.Server
let sockets = new Set<Socket>() let sockets = new Set<Socket>()
let responses = new Set<http.ServerResponse<http.IncomingMessage>>() let responses = new Map<http.ServerResponse<http.IncomingMessage>, SubscribeKeys[]>()
const parseFilter = (filter: any) => {
const keys = Object.keys(global.lx.player_status) as SubscribeKeys[]
if (typeof filter != 'string') return keys
filter = filter.split(',')
const subKeys = keys.filter(k => filter.includes(k))
return subKeys.length ? subKeys : keys
}
const handleSubscribePlayerStatus = (req: http.IncomingMessage, res: http.ServerResponse<http.IncomingMessage>, query?: string) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
Connection: 'keep-alive',
'Cache-Control': 'no-cache',
})
req.socket.setTimeout(0)
req.on('close', () => {
res.end('OK')
responses.delete(res)
})
const keys = parseFilter(querystring.parse(query ?? '').filter)
responses.set(res, keys)
for (const [k, v] of Object.entries(global.lx.player_status)) {
if (!keys.includes(k as SubscribeKeys)) continue
res.write(`event: ${k}\n`)
res.write(`data: ${JSON.stringify(v)}\n\n`)
}
}
const handleStartServer = async(port = 9000, ip = '127.0.0.1') => new Promise<void>((resolve, reject) => { const handleStartServer = async(port = 9000, ip = '127.0.0.1') => new Promise<void>((resolve, reject) => {
httpServer = http.createServer((req, res) => { httpServer = http.createServer((req, res): void => {
// console.log(req.url) const [endUrl, query] = `/${req.url?.split('/').at(-1) ?? ''}`.split('?')
const endUrl = `/${req.url?.split('/').at(-1) ?? ''}`
let code let code
let msg let msg
switch (endUrl) { switch (endUrl) {
@ -83,22 +112,15 @@ const handleStartServer = async(port = 9000, ip = '127.0.0.1') => new Promise<vo
msg = global.lx.player_status.lyric msg = global.lx.player_status.lyric
break break
case '/subscribe-player-status': case '/subscribe-player-status':
res.writeHead(200, { try {
'Content-Type': 'text/event-stream', handleSubscribePlayerStatus(req, res, query)
Connection: 'keep-alive',
'Cache-Control': 'no-cache',
})
req.socket.setTimeout(0)
req.on('close', () => {
res.end('OK')
responses.delete(res)
})
for (const [k, v] of Object.entries(global.lx.player_status)) {
res.write(`event: ${k}\n`)
res.write(`data: ${JSON.stringify(v)}\n\n`)
}
responses.add(res)
return return
} catch (err) {
console.log(err)
code = 500
msg = 'Error'
}
break
default: default:
code = 401 code = 401
msg = 'Forbidden' msg = 'Forbidden'
@ -149,8 +171,9 @@ const handleStopServer = async() => new Promise<void>((resolve, reject) => {
const sendStatus = (status: Partial<LX.Player.Status>) => { const sendStatus = (status: Partial<LX.Player.Status>) => {
if (!responses.size) return if (!responses.size) return
for (const [resp, keys] of responses) {
for (const [k, v] of Object.entries(status)) { for (const [k, v] of Object.entries(status)) {
for (const resp of responses) { if (!keys.includes(k as SubscribeKeys)) continue
resp.write(`event: ${k}\n`) resp.write(`event: ${k}\n`)
resp.write(`data: ${JSON.stringify(v)}\n\n`) resp.write(`data: ${JSON.stringify(v)}\n\n`)
} }