nginx-amplify-agent/amplify/agent/common/cloud.py

73 lines
1.9 KiB
Python

# -*- coding: utf-8 -*-
from amplify.agent.objects.abstract import AbstractObject
__author__ = "Mike Belov"
__copyright__ = "Copyright (C) Nginx, Inc. All rights reserved."
__license__ = ""
__maintainer__ = "Mike Belov"
__email__ = "dedm@nginx.com"
def _version_to_tuple(version):
return tuple(map(int, str(version).split('.')))
def tuple_to_version(semver):
return '.'.join(map(str, semver))
class Versions(object):
def __init__(self, current=None, obsolete=None, old=None):
self.current = _version_to_tuple(current)
self.obsolete = _version_to_tuple(obsolete)
self.old = _version_to_tuple(old)
class ObjectData(object):
def __init__(self, object=None, config=None, filters=None):
self.definition = object
self.id = AbstractObject.hash(self.definition)
self.type = self.definition.get('type')
self.config = config if config else {}
self.config['filters'] = filters or []
class CloudResponse(object):
def __init__(self, response):
"""
Init a CloudResponse object
:param response: {} raw cloud response
:return: CloudResponse
"""
self.config = response.get('config', {})
self.messages = response.get('messages', [])
self.versions = Versions(**response.get('versions'))
self.capabilities = response.get('capabilities', {})
self.objects = []
for raw_object_data in response.get('objects', []):
self.objects.append(ObjectData(**raw_object_data))
class HTTP503Error(object):
"""
Back pressure status handler.
"""
def __init__(self, http_error):
"""
Init
:param http_error: HTTPError object from requests.exceptions
"""
self.code = 503
self.text = http_error.response.text or '60'
try:
self.delay = int(float(self.text))
except:
self.delay = 60