nginx-amplify-agent/amplify/agent/pipelines/syslog.py

193 lines
7.4 KiB
Python

"""
Asyncore implementation of a syslog interface. Adapted from "Tiny Syslog Server in Python" (
https://gist.github.com/marcelom/4218010) using Asyncore (https://docs.python.org/2/library/asyncore.html). Some
inspiration for asyncore implementation derived from pymotw (https://pymotw.com/2/asyncore/).
SyslogTail spawns coroutine which in turns spawns an asyncore implemented syslog server and handler/cache and returns
the received messages when iterated.
"""
# -*- coding: utf-8 -*-
import copy
import asyncore
import socket
from collections import deque
from threading import current_thread
from amplify.agent.common.util.threads import spawn
from amplify.agent.common.context import context
from amplify.agent.common.errors import AmplifyException
from amplify.agent.managers.abstract import AbstractManager
from amplify.agent.pipelines.abstract import Pipeline
__author__ = "Grant Hulegaard"
__copyright__ = "Copyright (C) Nginx, Inc. All rights reserved."
__license__ = ""
__maintainer__ = "Grant Hulegaard"
__email__ = "grant.hulegaard@nginx.com"
SYSLOG_ADDRESSES = set()
class AmplifyAddresssAlreadyInUse(AmplifyException):
description = "Couldn't start socket listener because address already in use"
class SyslogServer(asyncore.dispatcher):
"""Simple socket server that creates a socket and listens for and caches UDP packets"""
def __init__(self, cache, address, chunk_size=8192):
# Explicitly passed shared cache object
self.cache = cache
# Custom constants
self.chunk_size = chunk_size
# Old-style class super
asyncore.dispatcher.__init__(self)
# asyncore server init
self.create_socket(socket.AF_INET, socket.SOCK_DGRAM) # asyncore socket wrapper
self.bind(address) # bind afore wrapped socket to address
self.address = self.socket.getsockname() # use socket api to retrieve address (address we actually bound to)
SYSLOG_ADDRESSES.add(self.address)
context.log.debug('syslog server binding to %s' % str(self.address))
def handle_read(self):
"""Called when a read event happens on the socket"""
data = bytes.decode(self.recv(self.chunk_size).strip())
try:
log_record = data.split('amplify: ', 1)[1] # this implicitly relies on the nginx syslog format specifically
self.cache.append(log_record)
except Exception:
context.log.error('error handling syslog message (address:%s, message:"%s")' % (self.address, data))
context.log.debug('additional info:', exc_info=True)
def close(self):
context.log.debug('syslog server closing')
asyncore.dispatcher.close(self)
class SyslogListener(AbstractManager):
"""This is just a container to manage the SyslogServer listen/handle loop."""
name = 'syslog_listener'
def __init__(self, cache, address, **kwargs):
super(SyslogListener, self).__init__(**kwargs)
self.server = SyslogServer(cache, address)
def start(self):
current_thread().name = self.name
context.setup_thread_id()
self.running = True
while self.running:
self._wait(0.1)
# This means that we don't increment every time a UDP message is handled, but rather every listen "period"
context.inc_action_id()
asyncore.loop(timeout=self.interval, count=10)
# count is arbitrary since timeout is unreliable at breaking asyncore.loop
def stop(self):
self.server.close()
context.teardown_thread_id()
super(SyslogListener, self).stop()
class SyslogTail(Pipeline):
"""Generalized Pipeline wrapper to provide a developer API for interacting with UDP listener."""
def __init__(self, address, maxlen=10000, **kwargs):
super(SyslogTail, self).__init__(name='syslog:%s' % str(address))
self.kwargs = kwargs # only have to record this due to new listener fail-over logic
self.maxlen = maxlen
self.cache = deque(maxlen=self.maxlen)
self.address = address # This stores the address that we were passed
self.listener = None
self.listener_setup_attempts = 0
self.thread = None
# Try to start listener right away, handle the exception
try:
self._setup_listener(**self.kwargs)
except AmplifyAddresssAlreadyInUse as e:
context.log.warning(
'failed to start listener during syslog tail init due to "%s", will try later (attempts: %s)' % (
e.__class__.__name__,
self.listener_setup_attempts
)
)
context.log.debug('additional info:', exc_info=True)
self.running = True
def __iter__(self):
if not self.listener and self.listener_setup_attempts < 3:
try:
self._setup_listener(**self.kwargs)
context.log.info(
'successfully started listener during "SyslogTail.__iter__()" after %s failed attempt(s)' % (
self.listener_setup_attempts
)
)
self.listener_setup_attempts = 0 # reset attempt counter
except AmplifyAddresssAlreadyInUse as e:
if self.listener_setup_attempts < 3:
context.log.warning(
'failed to start listener during "SyslogTail.__iter__()" due to "%s", '
'will try again (attempts: %s)' % (
e.__class__.__name__,
self.listener_setup_attempts
)
)
context.log.debug('additional info:', exc_info=True)
else:
context.log.error(
'failed to start listener %s times, will not try again' % self.listener_setup_attempts
)
context.log.debug('additional info:', exc_info=True)
current_cache = copy.deepcopy(self.cache)
context.log.debug('syslog tail returned %s lines captured from %s' % (len(current_cache), self.name))
self.cache.clear()
return iter(current_cache)
def _setup_listener(self, **kwargs):
if self.address in SYSLOG_ADDRESSES:
self.listener_setup_attempts += 1
raise AmplifyAddresssAlreadyInUse(
message='cannot initialize "%s" because address is already in use' % self.name,
payload=dict(
address=self.address,
used=list(SYSLOG_ADDRESSES)
)
)
SYSLOG_ADDRESSES.add(self.address)
self.listener = SyslogListener(cache=self.cache, address=self.address, **kwargs)
self.thread = spawn(self.listener.start)
def stop(self):
if self.running:
# Remove from used addresses
for address in set((self.address, self.listener.server.address)):
SYSLOG_ADDRESSES.remove(address)
self.listener.stop() # Close the UDP server
self.thread.kill() # Kill the greenlet
# Unassign variables to reduce reference count for GC
self.listener = None
self.thread = None
# For good measure clear the cache to free memory and set running variable manually to False
self.cache.clear()
self.running = False
context.log.debug('syslog tail stopped')
def __del__(self):
self.stop()