nginx-amplify-agent/amplify/agent/managers/nginx.py

313 lines
13 KiB
Python

# -*- coding: utf-8 -*-
import hashlib
import re
import psutil
from amplify.agent.data.eventd import INFO
from amplify.agent.common.util import subp
from amplify.agent.common.context import context
from amplify.agent.managers.abstract import ObjectManager, launch_method_supported
from amplify.agent.objects.nginx.object import NginxObject, ContainerNginxObject
from amplify.agent.objects.nginx.binary import get_prefix_and_conf_path
__author__ = "Mike Belov"
__copyright__ = "Copyright (C) Nginx, Inc. All rights reserved."
__license__ = ""
__maintainer__ = "Mike Belov"
__email__ = "dedm@nginx.com"
class NginxManager(ObjectManager):
"""
Manager for Nginx objects.
"""
name = 'nginx_manager'
type = 'nginx'
types = ('nginx', 'container_nginx')
def _init_nginx_object(self, data=None):
"""
Method for initializing a new NGINX object. Checks to see if we need a
Docker object or a regular one.
:param data: Dict Data dict for object init
:return: NginxObject/ContainerNginxObject
"""
if self.in_container:
return ContainerNginxObject(data=data)
else:
return NginxObject(data=data)
def _restart_nginx_object(self, current_obj, data):
"""
Restarts an object by initiaizing a new object with new data, stopping
and unregistering children of old object, replacing old object with new
object in the object tank, and finally stopping the old object.
There are two conditions that can trigger a restart which is why this
logic is moved to an encapsulated function.
"""
context.log.debug(
'nginx object restarting (master pid was %s)' % current_obj.pid
)
# push cloud config
data.update(self.object_configs.get(current_obj.definition_hash, {}))
# pass on data from the last config collection to the new object
config_collector = current_obj.collectors[0]
data['config_data'] = {
'previous': config_collector.previous
}
# if there is information in the configd store, pass it from old to new object
if current_obj.configd.current:
data['configd'] = current_obj.configd
# also pass on reloads counter
data['reloads'] = current_obj.reloads
new_obj = self._init_nginx_object(data=data)
# Send nginx config changed event.
new_obj.eventd.event(
level=INFO,
message='nginx-%s config changed, read from %s' % (
new_obj.version, new_obj.conf_path
)
)
new_obj.id = current_obj.id
# stop and deregister children
for child_obj in self.objects.find_all(
obj_id=current_obj.id,
children=True,
include_self=False
):
child_obj.stop()
self.objects.unregister(obj=child_obj)
# Replace old object in tank.
self.objects.objects[current_obj.id] = new_obj
current_obj.stop() # stop old object
def _discover_objects(self):
# save the current_ids
existing_hashes = [obj.definition_hash for obj in self.objects.find_all(types=self.types)]
# discover nginxs
nginxs = self._find_all()
# process all found nginxs
discovered_hashes = []
while len(nginxs):
try:
definition, data = nginxs.pop()
definition_hash = NginxObject.hash(definition)
discovered_hashes.append(definition_hash)
if definition_hash not in existing_hashes:
# New object -- create it
data.update(self.object_configs.get(definition_hash, {})) # push cloud config
new_obj = self._init_nginx_object(data=data)
# Send discover event.
new_obj.eventd.event(
level=INFO,
message='nginx-%s master process found, pid %s' % (new_obj.version, new_obj.pid)
)
self.objects.register(new_obj, parent_id=self.objects.root_id)
elif definition_hash in existing_hashes:
for obj in self.objects.find_all(types=self.types):
if obj.definition_hash == definition_hash:
current_obj = obj
break # TODO: Think about adding a definition hash - id map to objects tank.
if current_obj.need_restart:
# restart object if needed
self._restart_nginx_object(current_obj, data)
# this usually is triggered by bubbled errors from
# coroutine errors...this should not typically happen
# but is included for resilience.
context.log.debug(
'nginx object was restarted by "need_restart" flag'
)
elif current_obj.pid != data['pid']:
# check that the object pids didn't change
context.log.debug(
'nginx was restarted (pid was %s now %s)' % (
current_obj.pid, data['pid']
)
)
data.update(self.object_configs.get(definition_hash, {}))
new_obj = self._init_nginx_object(data=data)
# Send nginx master process restart event.
new_obj.eventd.event(
level=INFO,
message='nginx-%s master process restarted, new pid %s, old pid %s' % (
new_obj.version,
new_obj.pid,
current_obj.pid
)
)
new_obj.id = current_obj.id
# stop and unregister children
for child_obj in self.objects.find_all(
obj_id=current_obj.id,
children=True,
include_self=False
):
child_obj.stop()
self.objects.unregister(obj=child_obj)
self.objects.objects[current_obj.id] = new_obj
current_obj.stop() # stop old object
elif current_obj.workers != data['workers']:
# this is a reload, increment counter
current_obj.reloads += 1
# if workers changed nginx was reloaded
context.log.debug(
'nginx was reloaded (workers were %s now %s)' % (
current_obj.workers, data['workers']
)
)
self._restart_nginx_object(current_obj, data)
except psutil.NoSuchProcess:
context.log.debug('nginx is restarting/reloading, pids are changing, agent is waiting')
# check if we left something in objects (nginx could be stopped or something)
dropped_hashes = list(filter(lambda x: x not in discovered_hashes, existing_hashes))
if len(dropped_hashes):
for dropped_hash in dropped_hashes:
for obj in self.objects.find_all(types=self.types):
if obj.definition_hash == dropped_hash:
dropped_obj = obj
break # TODO: Think about adding a definition hash - id map to objects tank.
context.log.debug('nginx was stopped (pid was %s)' % dropped_obj.pid)
for child_obj in self.objects.find_all(
obj_id=dropped_obj.id,
children=True,
include_self=False
):
child_obj.stop()
self.objects.unregister(child_obj)
dropped_obj.stop()
self.objects.unregister(dropped_obj)
# manage nginx configs
self._manage_configs()
@staticmethod
def _find_all():
"""
Tries to find all master processes
:return: list of dict: nginx object definitions
"""
# get ps info
ps_cmd = "ps xao pid,ppid,command | grep 'nginx[:]'"
try:
ps, _ = subp.call(ps_cmd)
context.log.debug('ps nginx output: %s' % ps)
except:
context.log.debug('failed to find running nginx via %s' % ps_cmd)
context.log.debug('additional info:', exc_info=True)
if context.objects.root_object:
context.objects.root_object.eventd.event(
level=INFO,
message='no nginx found'
)
return []
# return an empty list if there are no master processes
if not any('nginx: master process' in line for line in ps):
context.log.debug('nginx masters amount is zero')
return []
# collect all info about processes
masters = {}
try:
for line in ps:
# parse ps response line:
# 21355 1 nginx: master process /usr/sbin/nginx
gwe = re.match(r'\s*(?P<pid>\d+)\s+(?P<ppid>\d+)\s+(?P<cmd>.+)\s*', line)
# if not parsed - go to the next line
if not gwe:
continue
pid, ppid, cmd = int(gwe.group('pid')), int(gwe.group('ppid')), gwe.group('cmd').rstrip()
# match nginx master process
if 'nginx: master process' in cmd:
if not launch_method_supported("nginx", ppid):
continue
# get path to binary, prefix and conf_path
try:
bin_path, prefix, conf_path, version = get_prefix_and_conf_path(cmd)
except:
context.log.debug('failed to find bin_path, prefix and conf_path for %s' % cmd)
context.log.debug('', exc_info=True)
else:
# calculate local id
local_string_id = '%s_%s_%s' % (bin_path, conf_path, prefix)
local_id = hashlib.sha256(local_string_id.encode('utf-8')).hexdigest()
if pid not in masters:
masters[pid] = {'workers': []}
masters[pid].update({
'version': version,
'bin_path': bin_path,
'conf_path': conf_path,
'prefix': prefix,
'pid': pid,
'local_id': local_id
})
# match worker process
elif 'nginx: worker process' in cmd:
if ppid in masters:
masters[ppid]['workers'].append(pid)
else:
masters[ppid] = dict(workers=[pid])
except Exception as e:
exception_name = e.__class__.__name__
context.log.error('failed to parse ps results due to %s' % exception_name)
context.log.debug('additional info:', exc_info=True)
# collect results
results = []
for pid, description in masters.items():
if 'bin_path' in description: # filter workers with non-executable nginx -V (relative paths, etc)
definition = {
'local_id': description['local_id'],
'type': NginxManager.type,
'root_uuid': context.uuid
}
results.append((definition, description))
return results
def _manage_configs(self):
# go through existing objects and create the ident tags for their configs
existing_object_configs = set()
for nginx_obj in self.objects.find_all(types=self.types):
existing_object_configs.add((nginx_obj.conf_path, nginx_obj.prefix, nginx_obj.bin_path))
# create a set of the existing ident tags
configs = set(context.nginx_configs.keys())
# for the idents in the tank but not being referenced by existing nginx objects, remove them
for filename, prefix, bin_path in configs.difference(existing_object_configs):
del context.nginx_configs[(filename, prefix, bin_path)]