364 lines
14 KiB
Python
364 lines
14 KiB
Python
# -*- coding: utf-8 -*-
|
|
import time
|
|
|
|
from amplify.agent.collectors.nginx.accesslog import NginxAccessLogsCollector
|
|
from amplify.agent.collectors.nginx.config import NginxConfigCollector
|
|
from amplify.agent.collectors.nginx.errorlog import NginxErrorLogsCollector
|
|
|
|
from amplify.agent.common.context import context
|
|
from amplify.agent.common.util import http, net, plus
|
|
from amplify.agent.data.eventd import INFO, WARNING
|
|
from amplify.agent.objects.abstract import AbstractObject
|
|
from amplify.agent.objects.nginx.binary import nginx_v
|
|
from amplify.agent.objects.nginx.filters import Filter
|
|
from amplify.agent.pipelines.syslog import SyslogTail
|
|
from amplify.agent.pipelines.file import FileTail
|
|
|
|
|
|
__author__ = "Mike Belov"
|
|
__copyright__ = "Copyright (C) Nginx, Inc. All rights reserved."
|
|
__license__ = ""
|
|
__maintainer__ = "Mike Belov"
|
|
__email__ = "dedm@nginx.com"
|
|
|
|
|
|
class NginxObject(AbstractObject):
|
|
type = 'nginx'
|
|
|
|
def __init__(self, **kwargs):
|
|
super(NginxObject, self).__init__(**kwargs)
|
|
|
|
# Have to override intervals here because new container sub objects.
|
|
self.intervals = context.app_config['containers'].get('nginx', {}).get('poll_intervals', {'default': 10})
|
|
|
|
self.root_uuid = context.uuid
|
|
self._local_id = self.data['local_id'] # Assigned by manager
|
|
self.pid = self.data['pid']
|
|
self.version = self.data['version']
|
|
self.workers = self.data['workers']
|
|
self.prefix = self.data['prefix']
|
|
self.bin_path = self.data['bin_path']
|
|
self.conf_path = self.data['conf_path']
|
|
self.name = self.version
|
|
|
|
# agent config
|
|
default_config = context.app_config['containers']['nginx']
|
|
self.upload_config = self.data.get('upload_config') or default_config.get('upload_config', False)
|
|
self.run_config_test = self.data.get('run_test') or default_config.get('run_test', False)
|
|
self.upload_ssl = self.data.get('upload_ssl') or default_config.get('upload_ssl', False)
|
|
|
|
# nginx -V data
|
|
self.parsed_v = nginx_v(self.bin_path)
|
|
|
|
# filters
|
|
self.filters = [Filter(**raw_filter) for raw_filter in self.data.get('filters') or []]
|
|
|
|
# nginx config
|
|
if 'config_data' in self.data:
|
|
self._restore_config_collector(self.data['config_data']['previous'])
|
|
else:
|
|
self._setup_config_collector()
|
|
|
|
# api
|
|
self.api_endpoints_to_skip = self.get_api_endpoints_to_skip()
|
|
self.api_external_url, self.api_internal_url = self.get_alive_api_urls()
|
|
self.api_enabled = True if (self.api_external_url or self.api_internal_url) else False
|
|
api_url = self.api_internal_url if self.api_internal_url is not None else self.api_external_url
|
|
if self.api_enabled and plus.get_latest_supported_api(api_url) is None:
|
|
context.log.debug("API directive was specified but no supported API was found.")
|
|
self.api_enabled = False
|
|
|
|
# plus status
|
|
self.plus_status_external_url, self.plus_status_internal_url = self.get_alive_plus_status_urls()
|
|
self.plus_status_enabled = True if (self.plus_status_external_url or self.plus_status_internal_url) else False
|
|
|
|
# stub status
|
|
self.stub_status_url = self.get_alive_stub_status_url()
|
|
self.stub_status_enabled = True if self.stub_status_url else False
|
|
|
|
self.processes = []
|
|
|
|
self.reloads = self.data.get('reloads', 0)
|
|
|
|
self._setup_meta_collector()
|
|
self._setup_metrics_collector()
|
|
self._setup_access_logs()
|
|
self._setup_error_logs()
|
|
|
|
# publish events for old object
|
|
for error in self.config.parser_errors:
|
|
self.eventd.event(level=WARNING, message=error)
|
|
|
|
@property
|
|
def status_directive_supported(self):
|
|
release = self.parsed_v['plus']['release']
|
|
if release is not None:
|
|
if release.startswith('nginx-plus-r'):
|
|
r = release.split('-')[2].lstrip('r')
|
|
if r.isdigit():
|
|
if int(r) <= 15:
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def definition(self):
|
|
# Type is hard coded so it is not different from ContainerNginxObject.
|
|
return {'type': 'nginx', 'local_id': self.local_id, 'root_uuid': self.root_uuid}
|
|
|
|
@property
|
|
def config(self):
|
|
return context.nginx_configs[(self.conf_path, self.prefix, self.bin_path)]
|
|
|
|
def get_api_endpoints_to_skip(self):
|
|
"""
|
|
Searches main context for http and stream blocks and returns which ones were not found.
|
|
"""
|
|
to_find = set(['http', 'stream'])
|
|
main_ctx = set(stmt['directive'] for stmt in self.config.subtree)
|
|
return list(to_find - main_ctx)
|
|
|
|
def get_alive_stub_status_url(self):
|
|
"""
|
|
Tries to get alive stub_status url
|
|
Records some events about it
|
|
|
|
:return: str stub_status url
|
|
"""
|
|
urls_to_check = self.config.stub_status_urls
|
|
|
|
if 'stub_status' in context.app_config.get('nginx', {}):
|
|
predefined_uri = context.app_config['nginx']['stub_status']
|
|
urls_to_check.append(http.resolve_uri(predefined_uri))
|
|
|
|
stub_status_url = self.__get_alive_status(urls_to_check, what='stub status')
|
|
if stub_status_url:
|
|
# Send stub detected event
|
|
self.eventd.event(
|
|
level=INFO,
|
|
message='nginx stub_status detected, %s' % stub_status_url
|
|
)
|
|
else:
|
|
self.eventd.event(
|
|
level=INFO,
|
|
message='nginx stub_status not found in nginx config'
|
|
)
|
|
return stub_status_url
|
|
|
|
def get_alive_plus_status_urls(self):
|
|
"""
|
|
Tries to get alive plus urls
|
|
There are two types of plus status urls: internal and external
|
|
- internal are for the agent and usually they have the localhost ip in address
|
|
- external are for the browsers and usually they have a normal server name
|
|
|
|
Returns a tuple of str or Nones - (external_url, internal_url)
|
|
|
|
Even if external status url is not responding (cannot be accesible from the host)
|
|
we should return it to show in our UI
|
|
|
|
:return: (str or None, str or None)
|
|
"""
|
|
internal_urls = self.config.plus_status_internal_urls
|
|
external_urls = self.config.plus_status_external_urls
|
|
|
|
if 'plus_status' in context.app_config.get('nginx', {}):
|
|
predefined_uri = context.app_config['nginx']['plus_status']
|
|
internal_urls.append(http.resolve_uri(predefined_uri))
|
|
|
|
internal_status_url = self.__get_alive_status(internal_urls, json=True, what='plus status internal')
|
|
if internal_status_url:
|
|
self.eventd.event(
|
|
level=INFO,
|
|
message='nginx internal plus_status detected, %s' % internal_status_url
|
|
)
|
|
|
|
external_status_url = self.__get_alive_status(external_urls, json=True, what='plus status external')
|
|
if len(self.config.plus_status_external_urls) > 0:
|
|
if not external_status_url:
|
|
external_status_url = self.config.plus_status_external_urls[0]
|
|
|
|
self.eventd.event(
|
|
level=INFO,
|
|
message='nginx external plus_status detected, %s' % external_status_url
|
|
)
|
|
|
|
return external_status_url, internal_status_url
|
|
|
|
def get_alive_api_urls(self):
|
|
"""
|
|
Tries to get alive api urls
|
|
There are two types of api urls: internal and external
|
|
- internal are for the agent and usually they have the localhost ip in address
|
|
- external are for the browsers and usually they have a normal server name
|
|
|
|
Returns a tuple of str or Nones - (external_url, internal_url)
|
|
|
|
Even if external api url is not responding (cannot be accesible from the host)
|
|
we should return it to show in our UI
|
|
|
|
:return: (str or None, str or None)
|
|
"""
|
|
internal_urls = self.config.api_internal_urls
|
|
external_urls = self.config.api_external_urls
|
|
|
|
if 'api' in context.app_config.get('nginx', {}):
|
|
predefined_uri = context.app_config['nginx']['api']
|
|
internal_urls.append(http.resolve_uri(predefined_uri))
|
|
|
|
internal_api_url = self.__get_alive_status(internal_urls, json=True, what='api internal')
|
|
if internal_api_url:
|
|
self.eventd.event(
|
|
level=INFO,
|
|
message='nginx internal api detected, %s' % internal_api_url
|
|
)
|
|
|
|
external_api_url = self.__get_alive_status(external_urls, json=True, what='api external')
|
|
if len(self.config.api_external_urls) > 0:
|
|
if not external_api_url:
|
|
external_api_url = self.config.api_external_urls[0]
|
|
|
|
self.eventd.event(
|
|
level=INFO,
|
|
message='nginx external api detected, %s' % external_api_url
|
|
)
|
|
|
|
return external_api_url, internal_api_url
|
|
|
|
def __get_alive_status(self, url_list, json=False, what='api/stub status/plus status'):
|
|
"""
|
|
Tries to find alive status url
|
|
Returns first alive url or None if all founded urls are not responding
|
|
|
|
:param url_list: [] of urls
|
|
:param json: bool - will try to encode json if True
|
|
:param what: str - what kind of url (used for logging)
|
|
:return: None or str
|
|
"""
|
|
for url in url_list:
|
|
if url.startswith('http://'):
|
|
full_urls = [url, 'https://'+url[7:]]
|
|
elif url.startswith('https://'):
|
|
full_urls = [url, 'http://'+url[8:]]
|
|
else:
|
|
full_urls = ['http://'+url, 'https://'+url]
|
|
|
|
for full_url in full_urls:
|
|
try:
|
|
status_response = context.http_client.get(full_url, timeout=0.5, json=json, log=False)
|
|
if status_response:
|
|
if json or 'Active connections' in status_response:
|
|
return full_url
|
|
else:
|
|
context.log.debug('bad response from %s url %s' % (what, full_url))
|
|
except:
|
|
context.log.debug('bad response from %s url %s' % (what, full_url))
|
|
return None
|
|
|
|
def __setup_pipeline(self, name):
|
|
"""
|
|
Sets up a pipeline/tail object for a collector based on "filename".
|
|
|
|
:param name: Str
|
|
:return: Pipeline
|
|
"""
|
|
tail = None
|
|
try:
|
|
if name.startswith('syslog'):
|
|
address_bucket = name.split(',', 1)[0]
|
|
host, port, address = net.ipv4_address(
|
|
address=address_bucket.split('=')[1], full_format=True, silent=True
|
|
)
|
|
# Right now we assume AFNET address/port...e.g. no support for unix sockets
|
|
|
|
if address in context.listeners:
|
|
port = int(port) # socket requires integer port
|
|
tail = SyslogTail(address=(host, port))
|
|
else:
|
|
tail = FileTail(name)
|
|
except Exception as e:
|
|
context.log.error(
|
|
'failed to initialize pipeline for "%s" due to %s (maybe has no rights?)' % (name, e.__class__.__name__)
|
|
)
|
|
context.log.debug('additional info:', exc_info=True)
|
|
|
|
return tail
|
|
|
|
def _setup_meta_collector(self):
|
|
collector_cls = self._import_collector_class('nginx', 'meta')
|
|
self.collectors.append(
|
|
collector_cls(object=self, interval=self.intervals['meta'])
|
|
)
|
|
|
|
def _setup_metrics_collector(self):
|
|
collector_cls = self._import_collector_class('nginx', 'metrics')
|
|
self.collectors.append(
|
|
collector_cls(object=self, interval=self.intervals['metrics'])
|
|
)
|
|
|
|
def _setup_config_collector(self):
|
|
collector = NginxConfigCollector(object=self, interval=self.intervals['configs'])
|
|
try:
|
|
start_time = time.time()
|
|
collector.collect() # run parse on startup
|
|
finally:
|
|
end_time = time.time()
|
|
context.log.debug(
|
|
'%s config parse on startup in %.3f' % (self.definition_hash, end_time - start_time)
|
|
)
|
|
self.collectors.append(collector)
|
|
|
|
def _restore_config_collector(self, previous):
|
|
collector = NginxConfigCollector(object=self, interval=self.intervals['configs'], previous=previous)
|
|
try:
|
|
start_time = time.time()
|
|
collector.collect(no_delay=True) # run NginxConfigCollector.parse_config on object restart
|
|
finally:
|
|
end_time = time.time()
|
|
context.log.debug(
|
|
'%s restored previous config collector in %.3f' % (self.definition_hash, end_time - start_time)
|
|
)
|
|
self.collectors.append(collector)
|
|
|
|
def _setup_access_logs(self):
|
|
# access logs
|
|
for log_description, log_data in self.config.access_logs.items():
|
|
format_name = log_data['log_format']
|
|
log_format = self.config.log_formats.get(format_name)
|
|
tail = self.__setup_pipeline(log_description)
|
|
|
|
if tail:
|
|
self.collectors.append(
|
|
NginxAccessLogsCollector(
|
|
object=self,
|
|
interval=self.intervals['logs'],
|
|
log_format=log_format,
|
|
tail=tail
|
|
)
|
|
)
|
|
|
|
# Send access log discovery event.
|
|
self.eventd.event(level=INFO, message='nginx access log %s found' % log_description)
|
|
|
|
def _setup_error_logs(self):
|
|
# error logs
|
|
for log_description, log_data in self.config.error_logs.items():
|
|
log_level = log_data['log_level']
|
|
tail = self.__setup_pipeline(log_description)
|
|
|
|
if tail:
|
|
self.collectors.append(
|
|
NginxErrorLogsCollector(
|
|
object=self,
|
|
interval=self.intervals['logs'],
|
|
level=log_level,
|
|
tail=tail
|
|
)
|
|
)
|
|
|
|
# Send error log discovery event.
|
|
self.eventd.event(level=INFO, message='nginx error log %s found' % log_description)
|
|
|
|
|
|
class ContainerNginxObject(NginxObject):
|
|
type = 'container_nginx'
|