diff --git a/plugins/kafka-logger.lua b/plugins/kafka-logger.lua index 18ec730..b03630f 100644 --- a/plugins/kafka-logger.lua +++ b/plugins/kafka-logger.lua @@ -3,7 +3,6 @@ --- Created by Safe3. --- DateTime: 2022/9/21 20:37 --- -local cjson = require("cjson.safe") local producer = require("resty.kafka.producer") local log = require("waf.log") @@ -26,20 +25,18 @@ function _M.resp_body_filter(waf) end --]] -local function kafka_log(_, broker_list, info) - local kp = producer:new(broker_list, { producer_type = "async" }) - local json = cjson.new() - +local function kafkaLog(_, brokerList, info) + local kp = producer:new(brokerList, { producer_type = "async" }) local key = "key" - local message = json.encode(info) + local message = log.encodeJson(info) local ok, err = kp:send("waf-log", key, message) if not ok then - log.errorlog(_M.name, " send err: ", err) + log.errLog(_M.name, " send err: ", err) end end function _M.log(waf) - local broker_list = { + local brokerList = { { host = "127.0.0.1", port = 9092, @@ -49,12 +46,12 @@ function _M.log(waf) user = "USERNAME", password = "PASSWORD", }, - }, + } } if waf.msg then local country, province, city = log.ip2loc(waf.ip) local info = { rule_id = waf.rule_id, ip = waf.ip, host = waf.host, url = waf.reqUri, data = waf.msg, country = country, province = province, city = city } - log.broker(kafka_log, broker_list, info) + log.broker(kafkaLog, brokerList, info) end end