# -*- coding: utf-8 -*- import hashlib import psutil from amplify.agent.common.context import context from amplify.agent.common.util import subp from amplify.agent.managers.abstract import launch_method_supported from amplify.agent.data.eventd import INFO from amplify.ext.abstract.manager import ExtObjectManager from amplify.ext.mysql.util import PS_CMD, master_parser, ps_parser from amplify.ext.mysql import AMPLIFY_EXT_KEY from amplify.agent.common.util.configtypes import boolean from amplify.ext.mysql.objects import MySQLObject __author__ = "Andrew Alexeev" __copyright__ = "Copyright (C) Nginx Inc. All rights reserved." __license__ = "" __maintainer__ = "Mike Belov" __email__ = "dedm@nginx.com" class MySQLManager(ExtObjectManager): """ Manager for MySQL objects. """ ext = AMPLIFY_EXT_KEY name = 'mysql_manager' type = 'mysql' types = ('mysql',) def _discover_objects(self): # save the current ids existing_hashes = [ obj.definition_hash for obj in self.objects.find_all(types=self.types) ] discovered_hashes = [] if boolean(context.app_config['mysql'].get('remote', False)): mysql_daemons = self._find_remote() else: mysql_daemons = self._find_local() while len(mysql_daemons): try: data = mysql_daemons.pop() definition = { 'type': 'mysql', 'local_id': data['local_id'], 'root_uuid': context.uuid } definition_hash = MySQLObject.hash(definition) discovered_hashes.append(definition_hash) if definition_hash not in existing_hashes: # New object -- create it new_obj = MySQLObject(data=data) # Send discover event. new_obj.eventd.event( level=INFO, message='mysqld process found, pid %s' % new_obj.pid ) self.objects.register(new_obj, parent_id=self.objects.root_id) elif definition_hash in existing_hashes: for obj in self.objects.find_all(types=self.types): if obj.definition_hash == definition_hash: current_obj = obj break if current_obj.pid != data['pid']: # PIDs changed... MySQL must have been restarted context.log.debug( 'mysqld was restarted (pid was %s now %s)' % ( current_obj.pid, data['pid'] ) ) new_obj = MySQLObject(data=data) # send MySQL restart event new_obj.eventd.event( level=INFO, message='mysqld process was restarted, new pid %s, old pid %s' % ( new_obj.pid, current_obj.pid ) ) # stop and un-register children children_objects = self.objects.find_all( obj_id=current_obj.id, children=True, include_self=False ) for child_obj in children_objects: child_obj.stop() self.objects.unregister(obj=child_obj) # un-register old object self.objects.unregister(current_obj) # stop old object current_obj.stop() self.objects.register(new_obj, parent_id=self.objects.root_id) except psutil.NoSuchProcess: context.log.debug('mysqld is restarting/reloading, pids are changing, agent is waiting') # check if we left something in objects (MySQL could be stopped or something) dropped_hashes = list(filter(lambda x: x not in discovered_hashes, existing_hashes)) if len(dropped_hashes) == 0: return for dropped_hash in dropped_hashes: for obj in self.objects.find_all(types=self.types): if obj.definition_hash == dropped_hash: dropped_obj = obj break context.log.debug('mysqld was stopped (pid was %s)' % dropped_obj.pid) # stop and un-register children children_objects = self.objects.find_all( obj_id=dropped_obj.id, children=True, include_self=False ) for child_obj in children_objects: child_obj.stop() self.objects.unregister(child_obj) dropped_obj.stop() self.objects.unregister(dropped_obj) @staticmethod def _find_local(ps=None): """ Tries to find all mysqld processes :param ps: [] of str, used for debugging our parsing logic - should be None most of the time :return: [] of {} MySQL object definitions """ # get ps info try: # set ps output to passed param or call subp ps, _ = (ps, None) if ps is not None else subp.call(PS_CMD) context.log.debug('ps mysqld output: %s' % ps) except Exception as e: # log error exception_name = e.__class__.__name__ context.log.debug( 'failed to find running mysqld via "%s" due to %s' % ( PS_CMD, exception_name ) ) context.log.debug('additional info:', exc_info=True) # If there is a root_object defined, log an event to send to the # cloud. if context.objects.root_object: context.objects.root_object.eventd.event( level=INFO, message='no mysqld processes found' ) # break processing returning a fault-tolerant empty list return [] if not any('mysqld' in line for line in ps): context.log.info('no mysqld processes found') # break processing returning a fault-tolerant empty list return [] # collect all info about processes masters = {} try: for line in ps: parsed = ps_parser(line) # if not parsed - go to the next line if parsed is None: continue pid, ppid, cmd = parsed # unpack values # match master process if cmd.split(' ', 1)[0].endswith('mysqld'): if not launch_method_supported("mysql", ppid): continue try: conf_path = master_parser(cmd) except Exception as e: context.log.error('failed to find conf_path for %s' % cmd) context.log.debug('additional info:', exc_info=True) else: # calculate local_id local_string_id = '%s_%s' % (cmd, conf_path) local_id = hashlib.sha256(local_string_id.encode('utf-8')).hexdigest() if pid not in masters: masters[pid] = {} masters[pid].update({ 'cmd': cmd.strip(), 'conf_path': conf_path, 'pid': pid, 'local_id': local_id }) except Exception as e: # log error exception_name = e.__class__.__name__ context.log.error('failed to parse ps results due to %s' % exception_name) context.log.debug('additional info:', exc_info=True) # format results results = [] for payload in masters.values(): # only add payloads that have all the keys if 'cmd' in payload and 'conf_path' in payload and 'pid' in payload and 'local_id' in payload: results.append(payload) else: context.log.debug('MySQL "_find_all()" found an incomplete entity %s' % payload) return results @staticmethod def _find_remote(): """ :return: [] of {} MySQL object definition for remote mysqld process """ results = [] try: cmd = "/usr/sbin/mysqld" conf_path = "/etc/mysql/my.cnf" # calculate local_id local_string_id = '%s_%s' % (cmd, conf_path) local_id = hashlib.sha256(local_string_id.encode('utf-8')).hexdigest() results.append({ 'cmd': 'unknown', 'conf_path': 'unknown', 'pid': 'unknown', 'local_id': local_id }) except Exception as e: # log error exception_name = e.__class__.__name__ context.log.error('failed to parse remote mysql results due to %s' % exception_name) context.log.debug('additional info:', exc_info=True) return results