214 lines
7.3 KiB
Python
214 lines
7.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
import abc
|
|
import time
|
|
|
|
from threading import current_thread
|
|
from greenlet import GreenletExit
|
|
|
|
from amplify.agent.common.context import context
|
|
from amplify.agent.common.util import subp
|
|
|
|
|
|
__author__ = "Grant Hulegaard"
|
|
__copyright__ = "Copyright (C) Nginx, Inc. All rights reserved."
|
|
__license__ = ""
|
|
__maintainer__ = "Grant Hulegaard"
|
|
__email__ = "grant.hulegaard@nginx.com"
|
|
|
|
|
|
def get_launchers():
|
|
# built-in
|
|
launchers = ['supervisord', 'supervisorctl', 'runsv', 'supervise', 'mysqld_safe']
|
|
|
|
# config values
|
|
if context.app_config is not None:
|
|
config_launchers = context.app_config['agent'].get('launchers', launchers)
|
|
# support single values
|
|
if not isinstance(launchers, list):
|
|
config_launchers = [launchers]
|
|
|
|
for config_launcher in config_launchers:
|
|
if config_launcher not in launchers:
|
|
launchers.append(config_launcher)
|
|
|
|
return launchers
|
|
|
|
|
|
def launch_method_supported(manager_type, ppid):
|
|
"""
|
|
Skip handling if master process is managed by an unsupported launcher
|
|
and/or the launcher is in a container (master process will still show up on host machine's ps output)
|
|
|
|
:param manager_type: string - nginx, mysql, or phpfpm, etc.
|
|
:param ppid: int - ppid of master process
|
|
:param supported_launchers: list of strings
|
|
:return:
|
|
"""
|
|
if ppid not in (0, 1):
|
|
out, err = subp.call('ps o "ppid,command" %d' % ppid)
|
|
# take the second line because the first is a header
|
|
launcher_ppid, parent_command = out[1].split(None, 1)
|
|
if not any(x in parent_command for x in get_launchers()):
|
|
context.log.debug(
|
|
'launching %s with "%s" is not currently supported' %
|
|
(manager_type, parent_command)
|
|
)
|
|
return False
|
|
if int(launcher_ppid) not in (0, 1):
|
|
context.log.debug(
|
|
'master process for %s is being skipped because its launcher (%s) is in a container' %
|
|
(manager_type, parent_command)
|
|
)
|
|
return False
|
|
return True
|
|
|
|
|
|
class AbstractManager(object):
|
|
"""
|
|
A manager is an encapsulated body that is spawned by supervisor. Every manager, regardless of encapsulated purpose
|
|
should have a run action that will be run in a while loop in .start().
|
|
|
|
This manager object is also useful for easily encapsulating asynchronous logic. Much of the encapsulation here
|
|
is necessary due to our mandatory agent requirements to support Python versions as old as 2.6.
|
|
"""
|
|
name = 'abstract_manager'
|
|
|
|
def __init__(self, **kwargs):
|
|
self.running = False
|
|
self.interval = float(kwargs.get('interval', 5.0)) # Run interval for manager
|
|
self.in_container = bool(context.app_config['credentials']['imagename'])
|
|
|
|
@property
|
|
def status(self):
|
|
return 'running' if self.running else 'stopped'
|
|
|
|
@abc.abstractmethod
|
|
def _run(self):
|
|
# Example since this is an abstract method.
|
|
try:
|
|
pass # Do something here...
|
|
except:
|
|
context.default_log.error('failed', exc_info=True)
|
|
raise
|
|
|
|
@staticmethod
|
|
def _wait(seconds):
|
|
time.sleep(seconds) # Releases the GIL.
|
|
# TODO: Investigate more efficient methods for releasing the GIL. Probably use more functionality from gevent.
|
|
|
|
def start(self):
|
|
"""
|
|
Primary execution loop. Follows the pattern: wait, increment action id, call manager run method.
|
|
"""
|
|
# TODO: Standardize this with collectors.
|
|
current_thread().name = self.name
|
|
context.setup_thread_id()
|
|
|
|
self.running = True
|
|
|
|
try:
|
|
while self.running:
|
|
self._wait(self.interval)
|
|
context.inc_action_id()
|
|
self._run()
|
|
except GreenletExit as e:
|
|
self.stop()
|
|
raise e
|
|
except Exception as e:
|
|
context.log.error('manager execution failed due to "%s"' % e.__class__.__name__)
|
|
context.log.debug('additional info:', exc_info=True)
|
|
raise e
|
|
|
|
def stop(self):
|
|
# TODO: Think about whether or not this is necessary. Managers should probably be receiving thread.kill().
|
|
context.teardown_thread_id()
|
|
self.running = False
|
|
|
|
def __del__(self):
|
|
if self.running:
|
|
self.stop()
|
|
|
|
|
|
class ObjectManager(AbstractManager):
|
|
"""
|
|
Common Object manager. Object managers manage objects of a specific type. There should a be a different object
|
|
manager for each type ('system' and 'nginx' for now).
|
|
|
|
Object managers should have a run action that follows the following run pattern: discover, start objects, schedule
|
|
cloud commands.
|
|
"""
|
|
name = 'object_manager'
|
|
type = 'common'
|
|
types = ('common',)
|
|
|
|
def __init__(self, object_configs=None, **kwargs):
|
|
super(ObjectManager, self).__init__(**kwargs)
|
|
self.config = context.app_config['containers'].get(self.type) or {}
|
|
self.config_intervals = self.config.get('poll_intervals') or {}
|
|
self.object_configs = object_configs if object_configs else {}
|
|
self.objects = context.objects # Object tank
|
|
self.last_discover = 0
|
|
|
|
@abc.abstractmethod
|
|
def _discover_objects(self):
|
|
"""
|
|
Abstract discovering method. Should be overridden.
|
|
"""
|
|
pass
|
|
|
|
# Step 1: Discover
|
|
def _discover(self):
|
|
"""
|
|
Wrapper for _discover_objects - runs discovering with period
|
|
"""
|
|
if time.time() > self.last_discover + (self.config_intervals.get('discover') or self.interval):
|
|
self._discover_objects()
|
|
context.log.debug('%s objects: %s' % (
|
|
self.type,
|
|
[obj.definition_hash for obj in self.objects.find_all(types=self.types)]
|
|
))
|
|
|
|
# Step 2: Start objects
|
|
def _start_objects(self):
|
|
"""
|
|
Starts all objects.
|
|
"""
|
|
for managed_obj in self.objects.find_all(types=self.types):
|
|
managed_obj.start()
|
|
for child_obj in self.objects.find_all(obj_id=managed_obj.id, children=True, include_self=False):
|
|
child_obj.start()
|
|
|
|
# Step 3: Schedule cloud commands
|
|
def _schedule_cloud_commands(self):
|
|
"""
|
|
Reads global cloud command queue and applies commands to specific objects. Optionally overridden.
|
|
"""
|
|
pass
|
|
|
|
def _run(self):
|
|
try:
|
|
self._discover()
|
|
self._start_objects()
|
|
self._schedule_cloud_commands()
|
|
except:
|
|
context.default_log.error('run failed', exc_info=True)
|
|
|
|
def run(self):
|
|
"""
|
|
Unprotected wrapper for _run. Ideally, ObjectManagers would be run as coroutines with gevent, but given some
|
|
problems this is a work around where run method is called explicitly in an main loop from supervisor.
|
|
"""
|
|
self._run()
|
|
|
|
def stop(self):
|
|
super(ObjectManager, self).stop()
|
|
self._stop_objects()
|
|
|
|
def _stop_objects(self):
|
|
for managed_obj in self.objects.find_all(types=self.types):
|
|
for child_obj in self.objects.find_all(obj_id=managed_obj.id, children=True, include_self=False):
|
|
child_obj.stop()
|
|
self.objects.unregister(obj=child_obj)
|
|
managed_obj.stop()
|
|
self.objects.unregister(obj=managed_obj)
|