nginx-amplify-agent/amplify/agent/managers/abstract.py

214 lines
7.3 KiB
Python

# -*- coding: utf-8 -*-
import abc
import time
from threading import current_thread
from greenlet import GreenletExit
from amplify.agent.common.context import context
from amplify.agent.common.util import subp
__author__ = "Grant Hulegaard"
__copyright__ = "Copyright (C) Nginx, Inc. All rights reserved."
__license__ = ""
__maintainer__ = "Grant Hulegaard"
__email__ = "grant.hulegaard@nginx.com"
def get_launchers():
# built-in
launchers = ['supervisord', 'supervisorctl', 'runsv', 'supervise', 'mysqld_safe']
# config values
if context.app_config is not None:
config_launchers = context.app_config['agent'].get('launchers', launchers)
# support single values
if not isinstance(launchers, list):
config_launchers = [launchers]
for config_launcher in config_launchers:
if config_launcher not in launchers:
launchers.append(config_launcher)
return launchers
def launch_method_supported(manager_type, ppid):
"""
Skip handling if master process is managed by an unsupported launcher
and/or the launcher is in a container (master process will still show up on host machine's ps output)
:param manager_type: string - nginx, mysql, or phpfpm, etc.
:param ppid: int - ppid of master process
:param supported_launchers: list of strings
:return:
"""
if ppid not in (0, 1):
out, err = subp.call('ps o "ppid,command" %d' % ppid)
# take the second line because the first is a header
launcher_ppid, parent_command = out[1].split(None, 1)
if not any(x in parent_command for x in get_launchers()):
context.log.debug(
'launching %s with "%s" is not currently supported' %
(manager_type, parent_command)
)
return False
if int(launcher_ppid) not in (0, 1):
context.log.debug(
'master process for %s is being skipped because its launcher (%s) is in a container' %
(manager_type, parent_command)
)
return False
return True
class AbstractManager(object):
"""
A manager is an encapsulated body that is spawned by supervisor. Every manager, regardless of encapsulated purpose
should have a run action that will be run in a while loop in .start().
This manager object is also useful for easily encapsulating asynchronous logic. Much of the encapsulation here
is necessary due to our mandatory agent requirements to support Python versions as old as 2.6.
"""
name = 'abstract_manager'
def __init__(self, **kwargs):
self.running = False
self.interval = float(kwargs.get('interval', 5.0)) # Run interval for manager
self.in_container = bool(context.app_config['credentials']['imagename'])
@property
def status(self):
return 'running' if self.running else 'stopped'
@abc.abstractmethod
def _run(self):
# Example since this is an abstract method.
try:
pass # Do something here...
except:
context.default_log.error('failed', exc_info=True)
raise
@staticmethod
def _wait(seconds):
time.sleep(seconds) # Releases the GIL.
# TODO: Investigate more efficient methods for releasing the GIL. Probably use more functionality from gevent.
def start(self):
"""
Primary execution loop. Follows the pattern: wait, increment action id, call manager run method.
"""
# TODO: Standardize this with collectors.
current_thread().name = self.name
context.setup_thread_id()
self.running = True
try:
while self.running:
self._wait(self.interval)
context.inc_action_id()
self._run()
except GreenletExit as e:
self.stop()
raise e
except Exception as e:
context.log.error('manager execution failed due to "%s"' % e.__class__.__name__)
context.log.debug('additional info:', exc_info=True)
raise e
def stop(self):
# TODO: Think about whether or not this is necessary. Managers should probably be receiving thread.kill().
context.teardown_thread_id()
self.running = False
def __del__(self):
if self.running:
self.stop()
class ObjectManager(AbstractManager):
"""
Common Object manager. Object managers manage objects of a specific type. There should a be a different object
manager for each type ('system' and 'nginx' for now).
Object managers should have a run action that follows the following run pattern: discover, start objects, schedule
cloud commands.
"""
name = 'object_manager'
type = 'common'
types = ('common',)
def __init__(self, object_configs=None, **kwargs):
super(ObjectManager, self).__init__(**kwargs)
self.config = context.app_config['containers'].get(self.type) or {}
self.config_intervals = self.config.get('poll_intervals') or {}
self.object_configs = object_configs if object_configs else {}
self.objects = context.objects # Object tank
self.last_discover = 0
@abc.abstractmethod
def _discover_objects(self):
"""
Abstract discovering method. Should be overridden.
"""
pass
# Step 1: Discover
def _discover(self):
"""
Wrapper for _discover_objects - runs discovering with period
"""
if time.time() > self.last_discover + (self.config_intervals.get('discover') or self.interval):
self._discover_objects()
context.log.debug('%s objects: %s' % (
self.type,
[obj.definition_hash for obj in self.objects.find_all(types=self.types)]
))
# Step 2: Start objects
def _start_objects(self):
"""
Starts all objects.
"""
for managed_obj in self.objects.find_all(types=self.types):
managed_obj.start()
for child_obj in self.objects.find_all(obj_id=managed_obj.id, children=True, include_self=False):
child_obj.start()
# Step 3: Schedule cloud commands
def _schedule_cloud_commands(self):
"""
Reads global cloud command queue and applies commands to specific objects. Optionally overridden.
"""
pass
def _run(self):
try:
self._discover()
self._start_objects()
self._schedule_cloud_commands()
except:
context.default_log.error('run failed', exc_info=True)
def run(self):
"""
Unprotected wrapper for _run. Ideally, ObjectManagers would be run as coroutines with gevent, but given some
problems this is a work around where run method is called explicitly in an main loop from supervisor.
"""
self._run()
def stop(self):
super(ObjectManager, self).stop()
self._stop_objects()
def _stop_objects(self):
for managed_obj in self.objects.find_all(types=self.types):
for child_obj in self.objects.find_all(obj_id=managed_obj.id, children=True, include_self=False):
child_obj.stop()
self.objects.unregister(obj=child_obj)
managed_obj.stop()
self.objects.unregister(obj=managed_obj)