235 lines
8.7 KiB
Python
235 lines
8.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
import abc
|
|
import hashlib
|
|
import time
|
|
|
|
from gevent import queue
|
|
|
|
try:
|
|
from gevent.hub import BlockingSwitchOutError
|
|
except ImportError:
|
|
# if using an old version of gevent (because this is running on CentOS 6) then
|
|
# create a BlockingSwitchOutError class just to avoid raising NameErrors
|
|
class BlockingSwitchOutError(Exception):
|
|
pass
|
|
|
|
from amplify.agent.data.eventd import EventdClient
|
|
from amplify.agent.data.metad import MetadClient
|
|
from amplify.agent.data.statsd import StatsdClient
|
|
|
|
from amplify.agent.data.configd import ConfigdClient
|
|
from amplify.agent.common.context import context
|
|
from amplify.agent.common.util.threads import spawn
|
|
from amplify.agent.common.util import host, loader
|
|
|
|
from amplify.agent.pipelines.abstract import Pipeline
|
|
|
|
__author__ = "Mike Belov"
|
|
__copyright__ = "Copyright (C) Nginx, Inc. All rights reserved."
|
|
__license__ = ""
|
|
__maintainer__ = "Mike Belov"
|
|
__email__ = "dedm@nginx.com"
|
|
|
|
|
|
class AbstractObject(object):
|
|
"""
|
|
Abstract object. Supervisor for collectors and data client bucket.
|
|
"""
|
|
|
|
# TODO: Refactor our agent objects to be more inline with our backend representations of the same.
|
|
type = 'common'
|
|
|
|
def __init__(self, data=None, **kwargs):
|
|
self.id = None
|
|
self.data = data if data else kwargs
|
|
|
|
self.in_container = bool(context.app_config['credentials']['imagename'])
|
|
self.intervals = context.app_config['containers'].get(self.type, {}).get('poll_intervals', {'default': 10})
|
|
self.running = False
|
|
self.need_restart = False
|
|
self.init_time = int(time.time())
|
|
|
|
self.threads = []
|
|
self.collectors = []
|
|
self.filters = []
|
|
self.queue = queue.Queue()
|
|
|
|
# data clients
|
|
self.statsd = StatsdClient(object=self, interval=max(self.intervals.values()))
|
|
self.eventd = EventdClient(object=self)
|
|
self.metad = MetadClient(object=self)
|
|
self.configd = self.data.get('configd', ConfigdClient(object=self))
|
|
# configd is checked for in data so it can be passed between objects at the manger level. This avoids excess
|
|
# config parsing in nginx objects.
|
|
self.clients = {
|
|
'meta': self.metad,
|
|
'metrics': self.statsd,
|
|
'events': self.eventd,
|
|
'configs': self.configd,
|
|
} # This is a client mapping to aid with lookup during flush by Bridge.
|
|
|
|
self._definition_hash = None
|
|
self._local_id = None
|
|
|
|
self.name = self.data.get('name', None)
|
|
|
|
@abc.abstractproperty
|
|
def definition(self):
|
|
return {'id': self.id, 'type': self.type}
|
|
|
|
@property
|
|
def definition_healthy(self):
|
|
check = {}
|
|
for k, v in self.definition.items():
|
|
if v:
|
|
check[k] = v
|
|
return check == self.definition
|
|
|
|
@property
|
|
def definition_hash(self):
|
|
if not self._definition_hash:
|
|
definition_string = str(list(map(lambda x: u'%s:%s' % (x, self.definition[x]), sorted(list(self.definition.keys()))))).encode('utf-8')
|
|
self._definition_hash = hashlib.sha256(definition_string).hexdigest()
|
|
return self._definition_hash
|
|
|
|
@staticmethod
|
|
def hash(definition):
|
|
definition_string = str(list(map(lambda x: u'%s:%s' % (x, definition[x]), sorted(list(definition.keys()))))).encode('utf-8')
|
|
result = hashlib.sha256(definition_string).hexdigest()
|
|
return result
|
|
|
|
@property
|
|
def local_id_args(self):
|
|
"""
|
|
Class specific local_id_args for local_id hash. Should be overridden by objects that utilize local_id's.
|
|
(Optional for system/root objects)
|
|
(Order sensitive)
|
|
|
|
:return: Tuple String arguments to be used in string hashable
|
|
"""
|
|
return tuple()
|
|
|
|
@property
|
|
def local_id(self):
|
|
"""
|
|
This property will use assigned local_id (from self.local_id_cache) if one exists or construct one from the
|
|
tuple of arguments returned by self.local_id_args.
|
|
|
|
:return: String Hash representation of local_id.
|
|
"""
|
|
# TODO: Refactor Nginx object to use this style local_id property.
|
|
if not self._local_id and len(self.local_id_args):
|
|
args = map(lambda x: str(x.encode('utf-8') if hasattr(x, 'encode') else x), self.local_id_args)
|
|
self._local_id = hashlib.sha256('_'.join(args).encode('utf-8')).hexdigest()
|
|
return self._local_id
|
|
|
|
@staticmethod
|
|
def hash_local(*local_id_args):
|
|
"""
|
|
Helper for hashing passed arguments in local_id style. Helpful for lookup/hash comparisons.
|
|
|
|
:param local_id_args: List Ordered arguments for local_id hash
|
|
:return: String 64 len hash of local_id
|
|
"""
|
|
if len(local_id_args):
|
|
args = map(lambda x: str(x.encode('utf-8') if hasattr(x, 'encode') else x), local_id_args)
|
|
return hashlib.sha256('_'.join(args).encode('utf-8')).hexdigest()
|
|
|
|
@property
|
|
def display_name(self):
|
|
"""
|
|
Generic attribute wrapper for returning a user-friendly/frontend label for an object.
|
|
"""
|
|
|
|
# TOOD: We should clean up and unify our container detection.
|
|
sysidentifier = context.app_config['credentials']['imagename'] or context.hostname
|
|
|
|
if self.name is not None:
|
|
return "%s %s @ %s" % (self.type, self.name, sysidentifier)
|
|
else:
|
|
return "%s @ %s" % (self.type, sysidentifier)
|
|
|
|
def start(self):
|
|
"""
|
|
Starts all of the object's collector threads
|
|
"""
|
|
if not self.running:
|
|
context.log.debug('starting object "%s" %s' % (self.type, self.definition_hash))
|
|
for collector in self.collectors:
|
|
self.threads.append(spawn(collector.run))
|
|
self.running = True
|
|
|
|
def stop(self):
|
|
if self.running:
|
|
context.log.debug('stopping object "%s" %s' % (self.type, self.definition_hash))
|
|
for thread in self.threads:
|
|
try:
|
|
thread.kill()
|
|
except BlockingSwitchOutError:
|
|
pass
|
|
except Exception as e:
|
|
context.log.debug('exception during object stop: {}'.format(e.__class__.__name__), exc_info=True)
|
|
|
|
# For every collector, if the collector has a .tail attribute and is a Pipeline, send a stop signal.
|
|
for collector in self.collectors:
|
|
try:
|
|
if hasattr(collector, 'tail') and isinstance(collector.tail, Pipeline):
|
|
collector.tail.stop()
|
|
except BlockingSwitchOutError:
|
|
pass
|
|
except Exception as e:
|
|
context.log.debug('exception during pipeline stop', exc_info=True)
|
|
|
|
self.running = False
|
|
context.log.debug('stopped object "%s" %s ' % (self.type, self.definition_hash))
|
|
|
|
def _import_collector_class(self, type, target):
|
|
"""
|
|
Import a collector class
|
|
|
|
:param type: str - Object type name (e.g. 'system' or 'nginx')
|
|
:param target: str - what to collect (e.g. 'meta' or 'metrics')
|
|
:return: A collector class that corresponds with the host's distribution
|
|
"""
|
|
distribution = host.linux_name()
|
|
distribution = {
|
|
'ubuntu': '',
|
|
'amzn': 'centos',
|
|
'rhel': 'centos',
|
|
'fedora': 'centos',
|
|
'sles': 'centos'
|
|
}.get(distribution, distribution)
|
|
|
|
try:
|
|
class_name = distribution.title() + type.title() + target.title() + 'Collector'
|
|
class_path = 'amplify.agent.collectors.%s.%s.%s' % (type.lower(), target.lower(), class_name)
|
|
cls = loader.import_class(class_path)
|
|
except AttributeError:
|
|
class_name = 'GenericLinux' + type.title() + target.title() + 'Collector'
|
|
class_path = 'amplify.agent.collectors.%s.%s.%s' % (type.lower(), target.lower(), class_name)
|
|
cls = loader.import_class(class_path)
|
|
|
|
return cls
|
|
|
|
def flush(self, clients=None):
|
|
"""
|
|
Object flush method. Since the object is what has the bins, it should be responsible for managing them.
|
|
|
|
:param clients: List of Strings (names of the bins to flush.
|
|
:return: Dict Flush contents for each named bin. Structure of each is determined by the bin itself.
|
|
"""
|
|
results = {}
|
|
|
|
if clients: # Flush the bins requested.
|
|
if len(clients) != 1:
|
|
for name in clients:
|
|
if name in self.clients:
|
|
results[name] = self.clients[name].flush()
|
|
else:
|
|
results = self.clients[clients[0]].flush()
|
|
else: # Flush all the bins for the object
|
|
for name, client in self.clients.items():
|
|
results[name] = client.flush()
|
|
|
|
return results
|