nginx-amplify-agent/amplify/agent/managers/bridge.py

236 lines
7.8 KiB
Python

# -*- coding: utf-8 -*-
import gc
import time
from collections import deque
from requests.exceptions import HTTPError
from amplify.agent.common.context import context
from amplify.agent.common.cloud import HTTP503Error
from amplify.agent.common.util.backoff import exponential_delay
from amplify.agent.managers.abstract import AbstractManager
__author__ = "Mike Belov"
__copyright__ = "Copyright (C) Nginx, Inc. All rights reserved."
__license__ = ""
__maintainer__ = "Mike Belov"
__email__ = "dedm@nginx.com"
class Bridge(AbstractManager):
"""
Manager that flushes object bins and stores them in deques. These deques are then sent to backend.
"""
name = 'bridge_manager'
def __init__(self, **kwargs):
if 'interval' not in kwargs:
kwargs['interval'] = context.app_config['cloud']['push_interval']
super(Bridge, self).__init__(**kwargs)
self.payload = {}
self.first_run = True
self.last_http_attempt = 0
self.http_fail_count = 0
self.http_delay = 0
# Instantiate payload with appropriate keys and buckets.
self._reset_payload()
@staticmethod
def look_around():
"""
Checks everything around and make appropriate tree structure
:return: dict of structure
"""
# TODO check docker or OS around
tree = {'system': ['nginx']}
return tree
def _run(self):
try:
self.flush_all()
gc.collect()
except:
context.default_log.error('failed', exc_info=True)
raise
def flush_metrics(self):
"""
Flushes only metrics
"""
flush_data = self._flush_metrics()
if flush_data:
self.payload['metrics'].append(flush_data)
self._send_payload()
def flush_all(self, force=False):
"""
Flushes all data
"""
clients = {
'meta': self._flush_meta,
'metrics': self._flush_metrics,
'events': self._flush_events,
'configs': self._flush_configs
}
# Flush data and add to appropriate payload bucket.
if self.first_run:
# If this is the first run, flush meta only to ensure object creation.
flush_data = self._flush_meta()
if flush_data:
self.payload['meta'].append(flush_data)
else:
for client_type in self.payload.keys():
if client_type in clients:
flush_data = clients[client_type].__call__()
if flush_data:
self.payload[client_type].append(flush_data)
now = time.time()
if force or (
now >= (self.last_http_attempt + self.interval + self.http_delay) and
now > context.backpressure_time
):
self._send_payload()
def _send_payload(self):
"""
Sends current payload to backend
"""
context.log.debug(
'modified payload; current payload stats: '
'meta - %s, metrics - %s, events - %s, configs - %s' % (
len(self.payload['meta']),
len(self.payload['metrics']),
len(self.payload['events']),
len(self.payload['configs'])
)
)
# Send payload to backend.
try:
self.last_http_attempt = time.time()
self._pre_process_payload() # Convert deques to lists for encoding
context.http_client.post('update/', data=self.payload)
context.default_log.debug(self.payload)
self._reset_payload() # Clear payload after successful
if self.first_run:
self.first_run = False # Set first_run to False after first successful send
if self.http_delay:
self.http_fail_count = 0
self.http_delay = 0 # Reset HTTP delay on success
context.log.debug('successful update, reset http delay')
except Exception as e:
self._post_process_payload() # Convert lists to deques since send failed
if isinstance(e, HTTPError) and e.response.status_code == 503:
backpressure_error = HTTP503Error(e)
context.backpressure_time = int(time.time() + backpressure_error.delay)
context.log.debug(
'back pressure delay %s added (next talk: %s)' % (
backpressure_error.delay,
context.backpressure_time
)
)
else:
self.http_fail_count += 1
self.http_delay = exponential_delay(self.http_fail_count)
context.log.debug('http delay set to %s (fails: %s)' % (self.http_delay, self.http_fail_count))
exception_name = e.__class__.__name__
context.log.error('failed to push data due to %s' % exception_name)
context.log.debug('additional info:', exc_info=True)
context.log.debug(
'finished flush_all; new payload stats: '
'meta - %s, metrics - %s, events - %s, configs - %s' % (
len(self.payload['meta']),
len(self.payload['metrics']),
len(self.payload['events']),
len(self.payload['configs'])
)
)
def _flush_meta(self):
return self._flush(clients=['meta'])
def _flush_metrics(self):
return self._flush(clients=['metrics'])
def _flush_events(self):
return self._flush(clients=['events'])
def _flush_configs(self):
return self._flush(clients=['configs'])
def _flush(self, clients=None):
# get structure
objects_structure = context.objects.tree()
# recursive flush
results = self._recursive_object_flush(objects_structure, clients=clients) if objects_structure else None
return results
@staticmethod
def _empty_flush(flush_dict):
"""Helper for determining whether or not a flush payload is empty or not. Checks to see if _any_ key other
than object was included in the flush payload and assumes it is non-empty if so."""
empty = True
for key in flush_dict.keys():
if key != 'object':
empty = False
return empty
def _recursive_object_flush(self, tree, clients=None):
results = {}
object_flush = tree['object'].flush(clients=clients)
if object_flush:
results.update(object_flush)
if tree['children']:
children_results = []
for child_tree in tree['children']:
child_result = self._recursive_object_flush(child_tree, clients=clients)
if child_result:
children_results.append(child_result)
if children_results:
results['children'] = children_results
if not self._empty_flush(results):
return results
def _reset_payload(self):
"""
After payload has been successfully sent, clear the queues (reset them to empty deques).
"""
self.payload = {
'meta': deque(maxlen=360),
'metrics': deque(maxlen=360),
'events': deque(maxlen=360),
'configs': deque(maxlen=360)
}
def _pre_process_payload(self):
"""
ujson.encode does not handle deque objects well. So before attempting a send, convert all the deques to lists.
"""
for key in self.payload.keys():
self.payload[key] = list(self.payload[key])
def _post_process_payload(self):
"""
If a payload is NOT reset (cannot be sent), then we should reconvert the lists to deques with maxlen to enforce
memory management.
"""
for key in self.payload.keys():
self.payload[key] = deque(self.payload[key], maxlen=360)