Update circonus-gometrics

`vendor circonus-labs/circonus-gometrics`
`vendor circonus-labs/circonus-gometrics/api`
`vendor circonus-labs/circonus-gometrics/checkmgr`
`vendor circonus-labs/circonusllhist`
`vendor hashicorp/go-retryablehttp`
pull/2491/head
matt maier 2016-11-09 15:30:07 -05:00
parent 2f341738a1
commit 101b2cd9da
23 changed files with 943 additions and 312 deletions

View File

@ -0,0 +1,28 @@
Copyright (c) 2016, Circonus, Inc. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following
disclaimer in the documentation and/or other materials provided
with the distribution.
* Neither the name Circonus, Inc. nor the names
of its contributors may be used to endorse or promote products
derived from this software without specific prior written
permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -24,17 +24,37 @@ import (
func main() {
log.Println("Configuring cgm")
logger := log.New(os.Stdout, "", log.LstdFlags)
logger.Println("Configuring cgm")
cmc := &cgm.Config{}
// Interval at which metrics are submitted to Circonus, default: 10 seconds
cmc.Interval = "10s" // 10 seconds
// cmc.Interval = "10s" // 10 seconds
// Enable debug messages, default: false
cmc.Debug = false
cmc.Debug = true
// Send debug messages to specific log.Logger instance
// default: if debug stderr, else, discard
//cmc.CheckManager.Log = ...
cmc.Log = logger
// Reset counter metrics after each submission, default: "true"
// Change to "false" to retain (and continue submitting) the last value.
// cmc.ResetCounters = "true"
// Reset gauge metrics after each submission, default: "true"
// Change to "false" to retain (and continue submitting) the last value.
// cmc.ResetGauges = "true"
// Reset histogram metrics after each submission, default: "true"
// Change to "false" to retain (and continue submitting) the last value.
// cmc.ResetHistograms = "true"
// Reset text metrics after each submission, default: "true"
// Change to "false" to retain (and continue submitting) the last value.
// cmc.ResetText = "true"
// Circonus API configuration options
//
@ -53,10 +73,12 @@ func main() {
// otherwise: if an applicable check is NOT specified or found, an
// attempt will be made to automatically create one
//
// Pre-existing httptrap check submission_url
// Submission URL for an existing [httptrap] check
cmc.CheckManager.Check.SubmissionURL = os.Getenv("CIRCONUS_SUBMISION_URL")
// Pre-existing httptrap check id (check not check bundle)
cmc.CheckManager.Check.ID = ""
// ID of an existing [httptrap] check (note: check id not check bundle id)
cmc.CheckManager.Check.ID = os.Getenv("CIRCONUS_CHECK_ID")
// if neither a submission url nor check id are provided, an attempt will be made to find an existing
// httptrap check by using the circonus api to search for a check matching the following criteria:
// an active check,
@ -68,49 +90,63 @@ func main() {
// default: 'hostname':'program name'
// note: for a persistent instance that is ephemeral or transient where metric continuity is
// desired set this explicitly so that the current hostname will not be used.
cmc.CheckManager.Check.InstanceID = ""
// Search tag - a specific tag which, when coupled with the instanceId serves to identify the
// origin and/or grouping of the metrics
// default: service:application name (e.g. service:consul)
cmc.CheckManager.Check.SearchTag = ""
// cmc.CheckManager.Check.InstanceID = ""
// Search tag - specific tag(s) used in conjunction with isntanceId to search for an
// existing check. comma separated string of tags (spaces will be removed, no commas
// in tag elements).
// default: service:application name (e.g. service:consul service:nomad etc.)
// cmc.CheckManager.Check.SearchTag = ""
// Check secret, default: generated when a check needs to be created
cmc.CheckManager.Check.Secret = ""
// Check tags, array of strings, additional tags to add to a new check, default: none
//cmc.CheckManager.Check.Tags = []string{"category:tagname"}
// cmc.CheckManager.Check.Secret = ""
// Additional tag(s) to add when *creating* a check. comma separated string
// of tags (spaces will be removed, no commas in tag elements).
// (e.g. group:abc or service_role:agent,group:xyz).
// default: none
// cmc.CheckManager.Check.Tags = ""
// max amount of time to to hold on to a submission url
// when a given submission fails (due to retries) if the
// time the url was last updated is > than this, the trap
// url will be refreshed (e.g. if the broker is changed
// in the UI) default 5 minutes
cmc.CheckManager.Check.MaxURLAge = "5m"
// cmc.CheckManager.Check.MaxURLAge = "5m"
// custom display name for check, default: "InstanceId /cgm"
cmc.CheckManager.Check.DisplayName = ""
// cmc.CheckManager.Check.DisplayName = ""
// force metric activation - if a metric has been disabled via the UI
// the default behavior is to *not* re-activate the metric; this setting
// overrides the behavior and will re-activate the metric when it is
// encountered. "(true|false)", default "false"
cmc.CheckManager.Check.ForceMetricActivation = "false"
// cmc.CheckManager.Check.ForceMetricActivation = "false"
// Broker configuration options
//
// Broker ID of specific broker to use, default: random enterprise broker or
// Circonus default if no enterprise brokers are available.
// default: only used if set
cmc.CheckManager.Broker.ID = ""
// used to select a broker with the same tag (e.g. can be used to dictate that a broker
// serving a specific location should be used. "dc:sfo", "location:new_york", "zone:us-west")
// if more than one broker has the tag, one will be selected randomly from the resulting list
// default: not used unless != ""
cmc.CheckManager.Broker.SelectTag = ""
// cmc.CheckManager.Broker.ID = ""
// used to select a broker with the same tag(s) (e.g. can be used to dictate that a broker
// serving a specific location should be used. "dc:sfo", "loc:nyc,dc:nyc01", "zone:us-west")
// if more than one broker has the tag(s), one will be selected randomly from the resulting
// list. comma separated string of tags (spaces will be removed, no commas in tag elements).
// default: none
// cmc.CheckManager.Broker.SelectTag = ""
// longest time to wait for a broker connection (if latency is > the broker will
// be considered invalid and not available for selection.), default: 500 milliseconds
cmc.CheckManager.Broker.MaxResponseTime = "500ms"
// if broker Id or SelectTag are not specified, a broker will be selected randomly
// cmc.CheckManager.Broker.MaxResponseTime = "500ms"
// note: if broker Id or SelectTag are not specified, a broker will be selected randomly
// from the list of brokers available to the api token. enterprise brokers take precedence
// viable brokers are "active", have the "httptrap" module enabled, are reachable and respond
// within MaxResponseTime.
log.Println("Creating new cgm instance")
logger.Println("Creating new cgm instance")
metrics, err := cgm.NewCirconusMetrics(cmc)
if err != nil {
@ -120,23 +156,44 @@ func main() {
src := rand.NewSource(time.Now().UnixNano())
rnd := rand.New(src)
log.Println("Starting cgm internal auto-flush timer")
logger.Println("Starting cgm internal auto-flush timer")
metrics.Start()
log.Println("Starting to send metrics")
logger.Println("Adding ctrl-c trap")
c := make(chan os.Signal, 2)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
logger.Println("Received CTRL-C, flushing outstanding metrics before exit")
metrics.Flush()
os.Exit(0)
}()
// number of "sets" of metrics to send (a minute worth)
// Add metric tags (append to any existing tags on specified metric)
metrics.AddMetricTags("foo", []string{"cgm:test"})
metrics.AddMetricTags("baz", []string{"cgm:test"})
logger.Println("Starting to send metrics")
// number of "sets" of metrics to send
max := 60
for i := 1; i < max; i++ {
log.Printf("\tmetric set %d of %d", i, 60)
metrics.Timing("ding", rnd.Float64()*10)
metrics.Increment("dong")
metrics.Gauge("dang", 10)
time.Sleep(1000 * time.Millisecond)
logger.Printf("\tmetric set %d of %d", i, 60)
metrics.Timing("foo", rnd.Float64()*10)
metrics.Increment("bar")
metrics.Gauge("baz", 10)
if i == 35 {
// Set metric tags (overwrite current tags on specified metric)
metrics.SetMetricTags("baz", []string{"cgm:reset_test", "cgm:test2"})
}
time.Sleep(time.Second)
}
log.Println("Flushing any outstanding metrics manually")
logger.Println("Flushing any outstanding metrics manually")
metrics.Flush()
}
@ -163,7 +220,7 @@ import (
func main() {
cmc := &cgm.Config{}
cmc.CheckManager.API.TokenKey = os.Getenv("CIRCONUS_API_TOKEN")
metrics, err := cgm.NewCirconusMetrics(cmc)
if err != nil {
panic(err)
@ -177,3 +234,5 @@ func main() {
}
```
Unless otherwise noted, the source files are distributed under the BSD-style license found in the LICENSE file.

View File

@ -1,3 +1,7 @@
// Copyright 2016 Circonus, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package api provides methods for interacting with the Circonus API
package api
@ -20,9 +24,9 @@ const (
// a few sensible defaults
defaultAPIURL = "https://api.circonus.com/v2"
defaultAPIApp = "circonus-gometrics"
minRetryWait = 10 * time.Millisecond
maxRetryWait = 50 * time.Millisecond
maxRetries = 3
minRetryWait = 1 * time.Second
maxRetryWait = 15 * time.Second
maxRetries = 4 // equating to 1 + maxRetries total attempts
)
// TokenKeyType - Circonus API Token key
@ -43,8 +47,11 @@ type URLType string
// SearchQueryType search query
type SearchQueryType string
// SearchTagType search/select tag type
type SearchTagType string
// SearchFilterType search filter
type SearchFilterType string
// TagType search/select/custom tag(s) type
type TagType []string
// Config options for Circonus API
type Config struct {
@ -99,12 +106,13 @@ func NewAPI(ac *Config) (*API, error) {
a := &API{apiURL, key, app, ac.Debug, ac.Log}
a.Debug = ac.Debug
a.Log = ac.Log
if a.Debug && a.Log == nil {
a.Log = log.New(os.Stderr, "", log.LstdFlags)
}
if a.Log == nil {
if a.Debug {
a.Log = log.New(os.Stderr, "", log.LstdFlags)
} else {
a.Log = log.New(ioutil.Discard, "", log.LstdFlags)
}
a.Log = log.New(ioutil.Discard, "", log.LstdFlags)
}
return a, nil
@ -152,40 +160,56 @@ func (a *API) apiCall(reqMethod string, reqPath string, data []byte) ([]byte, er
req.Header.Add("X-Circonus-Auth-Token", string(a.key))
req.Header.Add("X-Circonus-App-Name", string(a.app))
// keep last HTTP error in the event of retry failure
var lastHTTPError error
retryPolicy := func(resp *http.Response, err error) (bool, error) {
if err != nil {
lastHTTPError = err
return true, err
}
// Check the response code. We retry on 500-range responses to allow
// the server time to recover, as 500's are typically not permanent
// errors and may relate to outages on the server side. This will catch
// invalid response codes as well, like 0 and 999.
// Retry on 429 (rate limit) as well.
if resp.StatusCode == 0 || resp.StatusCode >= 500 || resp.StatusCode == 429 {
body, readErr := ioutil.ReadAll(resp.Body)
if readErr != nil {
lastHTTPError = fmt.Errorf("- last HTTP error: %d %+v", resp.StatusCode, readErr)
} else {
lastHTTPError = fmt.Errorf("- last HTTP error: %d %s", resp.StatusCode, string(body))
}
return true, nil
}
return false, nil
}
client := retryablehttp.NewClient()
client.RetryWaitMin = minRetryWait
client.RetryWaitMax = maxRetryWait
client.RetryMax = maxRetries
client.Logger = a.Log
// retryablehttp only groks log or no log
// but, outputs everything as [DEBUG] messages
if a.Debug {
client.Logger = a.Log
} else {
client.Logger = log.New(ioutil.Discard, "", log.LstdFlags)
}
client.CheckRetry = retryPolicy
resp, err := client.Do(req)
if err != nil {
stdClient := &http.Client{}
dataReader.Seek(0, 0)
stdRequest, _ := http.NewRequest(reqMethod, reqURL, dataReader)
stdRequest.Header.Add("Accept", "application/json")
stdRequest.Header.Add("X-Circonus-Auth-Token", string(a.key))
stdRequest.Header.Add("X-Circonus-App-Name", string(a.app))
res, errSC := stdClient.Do(stdRequest)
if errSC != nil {
return nil, fmt.Errorf("[ERROR] fetching %s: %s", reqURL, errSC)
if lastHTTPError != nil {
return nil, lastHTTPError
}
if res != nil && res.Body != nil {
defer res.Body.Close()
body, _ := ioutil.ReadAll(res.Body)
if a.Debug {
a.Log.Printf("[DEBUG] %v\n", string(body))
}
return nil, fmt.Errorf("[ERROR] %s", string(body))
}
return nil, fmt.Errorf("[ERROR] fetching %s: %s", reqURL, err)
return nil, fmt.Errorf("[ERROR] %s: %+v", reqURL, err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("[ERROR] reading body %+v", err)
return nil, fmt.Errorf("[ERROR] reading response %+v", err)
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {

View File

@ -1,20 +1,27 @@
// Copyright 2016 Circonus, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api
import (
"encoding/json"
"fmt"
"strings"
)
// BrokerDetail instance attributes
type BrokerDetail struct {
CN string `json:"cn"`
IP string `json:"ipaddress"`
MinVer int `json:"minimum_version_required"`
Modules []string `json:"modules"`
Port int `json:"port"`
Skew string `json:"skew"`
Status string `json:"status"`
Version int `json:"version"`
CN string `json:"cn"`
ExternalHost string `json:"external_host"`
ExternalPort int `json:"external_port"`
IP string `json:"ipaddress"`
MinVer int `json:"minimum_version_required"`
Modules []string `json:"modules"`
Port int `json:"port"`
Skew string `json:"skew"`
Status string `json:"status"`
Version int `json:"version"`
}
// Broker definition
@ -51,8 +58,8 @@ func (a *API) FetchBrokerByCID(cid CIDType) (*Broker, error) {
}
// FetchBrokerListByTag return list of brokers with a specific tag
func (a *API) FetchBrokerListByTag(searchTag SearchTagType) ([]Broker, error) {
query := SearchQueryType(fmt.Sprintf("f__tags_has=%s", searchTag))
func (a *API) FetchBrokerListByTag(searchTag TagType) ([]Broker, error) {
query := SearchQueryType(fmt.Sprintf("f__tags_has=%s", strings.Replace(strings.Join(searchTag, ","), ",", "&f__tags_has=", -1)))
return a.BrokerSearch(query)
}
@ -66,7 +73,9 @@ func (a *API) BrokerSearch(query SearchQueryType) ([]Broker, error) {
}
var brokers []Broker
json.Unmarshal(result, &brokers)
if err := json.Unmarshal(result, &brokers); err != nil {
return nil, err
}
return brokers, nil
}
@ -79,7 +88,9 @@ func (a *API) FetchBrokerList() ([]Broker, error) {
}
var response []Broker
json.Unmarshal(result, &response)
if err := json.Unmarshal(result, &response); err != nil {
return nil, err
}
return response, nil
}

View File

@ -1,3 +1,7 @@
// Copyright 2016 Circonus, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api
import (
@ -36,7 +40,9 @@ func (a *API) FetchCheckByCID(cid CIDType) (*Check, error) {
}
check := new(Check)
json.Unmarshal(result, check)
if err := json.Unmarshal(result, check); err != nil {
return nil, err
}
return check, nil
}
@ -52,21 +58,20 @@ func (a *API) FetchCheckBySubmissionURL(submissionURL URLType) (*Check, error) {
// valid trap url: scheme://host[:port]/module/httptrap/UUID/secret
// does it smell like a valid trap url path
if u.Path[:17] != "/module/httptrap/" {
if !strings.Contains(u.Path, "/module/httptrap/") {
return nil, fmt.Errorf("[ERROR] Invalid submission URL '%s', unrecognized path", submissionURL)
}
// extract uuid/secret
pathParts := strings.Split(u.Path[17:len(u.Path)], "/")
// extract uuid
pathParts := strings.Split(strings.Replace(u.Path, "/module/httptrap/", "", 1), "/")
if len(pathParts) != 2 {
return nil, fmt.Errorf("[ERROR] Invalid submission URL '%s', UUID not where expected", submissionURL)
}
uuid := pathParts[0]
query := SearchQueryType(fmt.Sprintf("f__check_uuid=%s", uuid))
filter := SearchFilterType(fmt.Sprintf("f__check_uuid=%s", uuid))
checks, err := a.CheckSearch(query)
checks, err := a.CheckFilterSearch(filter)
if err != nil {
return nil, err
}
@ -93,9 +98,9 @@ func (a *API) FetchCheckBySubmissionURL(submissionURL URLType) (*Check, error) {
}
// CheckSearch returns a list of checks matching a query/filter
// CheckSearch returns a list of checks matching a search query
func (a *API) CheckSearch(query SearchQueryType) ([]Check, error) {
queryURL := fmt.Sprintf("/check?%s", string(query))
queryURL := fmt.Sprintf("/check?search=%s", string(query))
result, err := a.Get(queryURL)
if err != nil {
@ -103,7 +108,26 @@ func (a *API) CheckSearch(query SearchQueryType) ([]Check, error) {
}
var checks []Check
json.Unmarshal(result, &checks)
if err := json.Unmarshal(result, &checks); err != nil {
return nil, err
}
return checks, nil
}
// CheckFilterSearch returns a list of checks matching a filter
func (a *API) CheckFilterSearch(filter SearchFilterType) ([]Check, error) {
filterURL := fmt.Sprintf("/check?%s", string(filter))
result, err := a.Get(filterURL)
if err != nil {
return nil, err
}
var checks []Check
if err := json.Unmarshal(result, &checks); err != nil {
return nil, err
}
return checks, nil
}

View File

@ -1,3 +1,7 @@
// Copyright 2016 Circonus, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package api
import (
@ -10,14 +14,22 @@ type CheckBundleConfig struct {
AsyncMetrics bool `json:"async_metrics"`
Secret string `json:"secret"`
SubmissionURL string `json:"submission_url"`
ReverseSecret string `json:"reverse:secret_key"`
HTTPVersion string `json:"http_version,omitempty"`
Method string `json:"method,omitempty"`
Payload string `json:"payload,omitempty"`
Port string `json:"port,omitempty"`
ReadLimit string `json:"read_limit,omitempty"`
URL string `json:"url,omitempty"`
}
// CheckBundleMetric individual metric configuration
type CheckBundleMetric struct {
Name string `json:"name"`
Type string `json:"type"`
Units string `json:"units"`
Status string `json:"status"`
Name string `json:"name"`
Type string `json:"type"`
Units string `json:"units"`
Status string `json:"status"`
Tags []string `json:"tags"`
}
// CheckBundle definition
@ -28,7 +40,7 @@ type CheckBundle struct {
Created int `json:"_created,omitempty"`
LastModified int `json:"_last_modified,omitempty"`
LastModifedBy string `json:"_last_modifed_by,omitempty"`
ReverseConnectUrls []string `json:"_reverse_connection_urls,omitempty"`
ReverseConnectURLs []string `json:"_reverse_connection_urls"`
Brokers []string `json:"brokers"`
Config CheckBundleConfig `json:"config"`
DisplayName string `json:"display_name"`
@ -57,7 +69,9 @@ func (a *API) FetchCheckBundleByCID(cid CIDType) (*CheckBundle, error) {
}
checkBundle := &CheckBundle{}
json.Unmarshal(result, checkBundle)
if err := json.Unmarshal(result, checkBundle); err != nil {
return nil, err
}
return checkBundle, nil
}
@ -73,9 +87,8 @@ func (a *API) CheckBundleSearch(searchCriteria SearchQueryType) ([]CheckBundle,
}
var results []CheckBundle
err = json.Unmarshal(response, &results)
if err != nil {
return nil, fmt.Errorf("[ERROR] Parsing JSON response %+v", err)
if err := json.Unmarshal(response, &results); err != nil {
return nil, err
}
return results, nil
@ -94,8 +107,7 @@ func (a *API) CreateCheckBundle(config CheckBundle) (*CheckBundle, error) {
}
checkBundle := &CheckBundle{}
err = json.Unmarshal(response, checkBundle)
if err != nil {
if err := json.Unmarshal(response, checkBundle); err != nil {
return nil, err
}
@ -105,7 +117,7 @@ func (a *API) CreateCheckBundle(config CheckBundle) (*CheckBundle, error) {
// UpdateCheckBundle updates a check bundle configuration
func (a *API) UpdateCheckBundle(config *CheckBundle) (*CheckBundle, error) {
if a.Debug {
a.Log.Printf("[DEBUG] Updating check bundle with new metrics.")
a.Log.Printf("[DEBUG] Updating check bundle.")
}
cfgJSON, err := json.Marshal(config)
@ -119,8 +131,7 @@ func (a *API) UpdateCheckBundle(config *CheckBundle) (*CheckBundle, error) {
}
checkBundle := &CheckBundle{}
err = json.Unmarshal(response, checkBundle)
if err != nil {
if err := json.Unmarshal(response, checkBundle); err != nil {
return nil, err
}

View File

@ -1,3 +1,7 @@
// Copyright 2016 Circonus, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package checkmgr
import (
@ -6,6 +10,7 @@ import (
"net"
"net/url"
"reflect"
"strconv"
"strings"
"time"
@ -75,7 +80,7 @@ func (cm *CheckManager) selectBroker() (*api.Broker, error) {
var brokerList []api.Broker
var err error
if cm.brokerSelectTag != "" {
if len(cm.brokerSelectTag) > 0 {
brokerList, err = cm.apih.FetchBrokerListByTag(cm.brokerSelectTag)
if err != nil {
return nil, err
@ -141,10 +146,10 @@ func (cm *CheckManager) brokerSupportsCheckType(checkType CheckTypeType, details
// Is the broker valid (active, supports check type, and reachable)
func (cm *CheckManager) isValidBroker(broker *api.Broker) bool {
brokerPort := 0
brokerHost := ""
brokerPort := ""
valid := false
for _, detail := range broker.Details {
brokerPort = 43191
// broker must be active
if detail.Status != statusActive {
@ -162,8 +167,24 @@ func (cm *CheckManager) isValidBroker(broker *api.Broker) bool {
continue
}
if detail.ExternalPort != 0 {
brokerPort = strconv.Itoa(detail.ExternalPort)
} else {
if detail.Port != 0 {
brokerPort = strconv.Itoa(detail.Port)
} else {
brokerPort = "43191"
}
}
if detail.ExternalHost != "" {
brokerHost = detail.ExternalHost
} else {
brokerHost = detail.IP
}
// broker must be reachable and respond within designated time
conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", detail.IP, brokerPort), cm.brokerMaxResponseTime)
conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%s", brokerHost, brokerPort), cm.brokerMaxResponseTime)
if err != nil {
if detail.CN != "trap.noit.circonus.net" {
if cm.Debug {
@ -172,8 +193,8 @@ func (cm *CheckManager) isValidBroker(broker *api.Broker) bool {
continue // not able to reach the broker (or respone slow enough for it to be considered not usable)
}
// if circonus trap broker, try port 443
brokerPort = 443
conn, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%d", detail.CN, brokerPort), cm.brokerMaxResponseTime)
brokerPort = "443"
conn, err = net.DialTimeout("tcp", fmt.Sprintf("%s:%s", detail.CN, brokerPort), cm.brokerMaxResponseTime)
if err != nil {
if cm.Debug {
cm.Log.Printf("[DEBUG] Broker '%s' unable to connect %v\n", broker.Name, err)

View File

@ -1,3 +1,7 @@
// Copyright 2016 Circonus, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package checkmgr
import (
@ -70,8 +74,7 @@ func (cm *CheckManager) fetchCert() ([]byte, error) {
}
cadata := new(CACert)
err = json.Unmarshal(response, cadata)
if err != nil {
if err := json.Unmarshal(response, cadata); err != nil {
return nil, err
}

View File

@ -1,3 +1,7 @@
// Copyright 2016 Circonus, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package checkmgr
import (
@ -13,6 +17,68 @@ import (
"github.com/circonus-labs/circonus-gometrics/api"
)
// UpdateCheck determines if the check needs to be updated (new metrics, tags, etc.)
func (cm *CheckManager) UpdateCheck(newMetrics map[string]*api.CheckBundleMetric) {
// only if check manager is enabled
if !cm.enabled {
return
}
// only if checkBundle has been populated
if cm.checkBundle == nil {
return
}
// only if there is *something* to update
if !cm.forceCheckUpdate && len(newMetrics) == 0 && len(cm.metricTags) == 0 {
return
}
// refresh check bundle (in case there were changes made by other apps or in UI)
checkBundle, err := cm.apih.FetchCheckBundleByCID(api.CIDType(cm.checkBundle.Cid))
if err != nil {
cm.Log.Printf("[ERROR] unable to fetch up-to-date check bundle %v", err)
return
}
cm.cbmu.Lock()
cm.checkBundle = checkBundle
cm.cbmu.Unlock()
cm.addNewMetrics(newMetrics)
if len(cm.metricTags) > 0 {
// note: if a tag has been added (queued) for a metric which never gets sent
// the tags will be discarded. (setting tags does not *create* metrics.)
for metricName, metricTags := range cm.metricTags {
for metricIdx, metric := range cm.checkBundle.Metrics {
if metric.Name == metricName {
cm.checkBundle.Metrics[metricIdx].Tags = metricTags
break
}
}
cm.mtmu.Lock()
delete(cm.metricTags, metricName)
cm.mtmu.Unlock()
}
cm.forceCheckUpdate = true
}
if cm.forceCheckUpdate {
newCheckBundle, err := cm.apih.UpdateCheckBundle(cm.checkBundle)
if err != nil {
cm.Log.Printf("[ERROR] updating check bundle %v", err)
return
}
cm.forceCheckUpdate = false
cm.cbmu.Lock()
cm.checkBundle = newCheckBundle
cm.cbmu.Unlock()
cm.inventoryMetrics()
}
}
// Initialize CirconusMetrics instance. Attempt to find a check otherwise create one.
// use cases:
//
@ -28,6 +94,8 @@ func (cm *CheckManager) initializeTrapURL() error {
cm.trapmu.Lock()
defer cm.trapmu.Unlock()
// special case short-circuit: just send to a url, no check management
// up to user to ensure that if url is https that it will work (e.g. not self-signed)
if cm.checkSubmissionURL != "" {
if !cm.enabled {
cm.trapURL = cm.checkSubmissionURL
@ -50,6 +118,9 @@ func (cm *CheckManager) initializeTrapURL() error {
if err != nil {
return err
}
if !check.Active {
return fmt.Errorf("[ERROR] Check ID %v is not active", check.Cid)
}
// extract check id from check object returned from looking up using submission url
// set m.CheckId to the id
// set m.SubmissionUrl to "" to prevent trying to search on it going forward
@ -71,10 +142,13 @@ func (cm *CheckManager) initializeTrapURL() error {
if err != nil {
return err
}
if !check.Active {
return fmt.Errorf("[ERROR] Check ID %v is not active", check.Cid)
}
} else {
searchCriteria := fmt.Sprintf(
"(active:1)(host:\"%s\")(type:\"%s\")(tags:%s)",
cm.checkInstanceID, cm.checkType, cm.checkSearchTag)
"(active:1)(host:\"%s\")(type:\"%s\")(tags:%s)(notes:%s)",
cm.checkTarget, cm.checkType, strings.Join(cm.checkSearchTag, ","), fmt.Sprintf("cgm_instanceid=%s", cm.checkInstanceID))
checkBundle, err = cm.checkBundleSearch(searchCriteria)
if err != nil {
return err
@ -112,8 +186,19 @@ func (cm *CheckManager) initializeTrapURL() error {
cm.checkBundle = checkBundle
cm.inventoryMetrics()
// url to which metrics should be PUT
cm.trapURL = api.URLType(checkBundle.Config.SubmissionURL)
// determine the trap url to which metrics should be PUT
if checkBundle.Type == "httptrap" {
cm.trapURL = api.URLType(checkBundle.Config.SubmissionURL)
} else {
// build a submission_url for non-httptrap checks out of mtev_reverse url
if len(checkBundle.ReverseConnectURLs) == 0 {
return fmt.Errorf("%s is not an HTTPTRAP check and no reverse connection urls found", checkBundle.Checks[0])
}
mtevURL := checkBundle.ReverseConnectURLs[0]
mtevURL = strings.Replace(mtevURL, "mtev_reverse", "https", 1)
mtevURL = strings.Replace(mtevURL, "check", "module/httptrap", 1)
cm.trapURL = api.URLType(fmt.Sprintf("%s/%s", mtevURL, checkBundle.Config.ReverseSecret))
}
// used when sending as "ServerName" get around certs not having IP SANS
// (cert created with server name as CN but IP used in trap url)
@ -178,11 +263,11 @@ func (cm *CheckManager) createNewCheck() (*api.CheckBundle, *api.Broker, error)
DisplayName: string(cm.checkDisplayName),
Metrics: []api.CheckBundleMetric{},
MetricLimit: 0,
Notes: "",
Notes: fmt.Sprintf("cgm_instanceid=%s", cm.checkInstanceID),
Period: 60,
Status: statusActive,
Tags: append([]string{string(cm.checkSearchTag)}, cm.checkTags...),
Target: string(cm.checkInstanceID),
Tags: append(cm.checkSearchTag, cm.checkTags...),
Target: cm.checkTarget,
Timeout: 10,
Type: string(cm.checkType),
}

View File

@ -1,3 +1,7 @@
// Copyright 2016 Circonus, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package checkmgr provides a check management interace to circonus-gometrics
package checkmgr
@ -12,6 +16,7 @@ import (
"os"
"path"
"strconv"
"strings"
"sync"
"time"
@ -54,7 +59,7 @@ type CheckConfig struct {
// used to search for a check to use
// used as check.target when creating a check
InstanceID string
// unique check searching tag
// unique check searching tag (or tags)
// used to search for a check to use (combined with instanceid)
// used as a regular tag when creating a check
SearchTag string
@ -64,7 +69,7 @@ type CheckConfig struct {
Secret string
// additional tags to add to a check (when creating a check)
// these tags will not be added to an existing check
Tags []string
Tags string
// max amount of time to to hold on to a submission url
// when a given submission fails (due to retries) if the
// time the url was last updated is > than this, the trap
@ -83,8 +88,8 @@ type CheckConfig struct {
type BrokerConfig struct {
// a specific broker id (numeric portion of cid)
ID string
// a tag that can be used to select 1-n brokers from which to select
// when creating a new check (e.g. datacenter:abc)
// one or more tags used to select 1-n brokers from which to select
// when creating a new check (e.g. datacenter:abc or loc:dfw,dc:abc)
SelectTag string
// for a broker to be considered viable it must respond to a
// connection attempt within this amount of time e.g. 200ms, 2s, 1m
@ -114,7 +119,7 @@ type CheckInstanceIDType string
type CheckSecretType string
// CheckTagsType check tags
type CheckTagsType []string
type CheckTagsType string
// CheckDisplayNameType check display name
type CheckDisplayNameType string
@ -133,20 +138,27 @@ type CheckManager struct {
checkType CheckTypeType
checkID api.IDType
checkInstanceID CheckInstanceIDType
checkSearchTag api.SearchTagType
checkTarget string
checkSearchTag api.TagType
checkSecret CheckSecretType
checkTags CheckTagsType
checkTags api.TagType
checkSubmissionURL api.URLType
checkDisplayName CheckDisplayNameType
forceMetricActivation bool
forceCheckUpdate bool
// metric tags
metricTags map[string][]string
mtmu sync.Mutex
// broker
brokerID api.IDType
brokerSelectTag api.SearchTagType
brokerSelectTag api.TagType
brokerMaxResponseTime time.Duration
// state
checkBundle *api.CheckBundle
cbmu sync.Mutex
availableMetrics map[string]bool
trapURL api.URLType
trapCN BrokerCNType
@ -174,14 +186,12 @@ func NewCheckManager(cfg *Config) (*CheckManager, error) {
}
cm.Debug = cfg.Debug
cm.Log = cfg.Log
if cm.Debug && cm.Log == nil {
cm.Log = log.New(os.Stderr, "", log.LstdFlags)
}
if cm.Log == nil {
if cm.Debug {
cm.Log = log.New(os.Stderr, "", log.LstdFlags)
} else {
cm.Log = log.New(ioutil.Discard, "", log.LstdFlags)
}
cm.Log = log.New(ioutil.Discard, "", log.LstdFlags)
}
if cfg.Check.SubmissionURL != "" {
@ -229,9 +239,7 @@ func NewCheckManager(cfg *Config) (*CheckManager, error) {
cm.checkInstanceID = CheckInstanceIDType(cfg.Check.InstanceID)
cm.checkDisplayName = CheckDisplayNameType(cfg.Check.DisplayName)
cm.checkSearchTag = api.SearchTagType(cfg.Check.SearchTag)
cm.checkSecret = CheckSecretType(cfg.Check.Secret)
cm.checkTags = cfg.Check.Tags
fma := defaultForceMetricActivation
if cfg.Check.ForceMetricActivation != "" {
@ -251,13 +259,20 @@ func NewCheckManager(cfg *Config) (*CheckManager, error) {
if cm.checkInstanceID == "" {
cm.checkInstanceID = CheckInstanceIDType(fmt.Sprintf("%s:%s", hn, an))
}
cm.checkTarget = hn
if cm.checkSearchTag == "" {
cm.checkSearchTag = api.SearchTagType(fmt.Sprintf("service:%s", an))
if cfg.Check.SearchTag == "" {
cm.checkSearchTag = []string{fmt.Sprintf("service:%s", an)}
} else {
cm.checkSearchTag = strings.Split(strings.Replace(cfg.Check.SearchTag, " ", "", -1), ",")
}
if cfg.Check.Tags != "" {
cm.checkTags = strings.Split(strings.Replace(cfg.Check.Tags, " ", "", -1), ",")
}
if cm.checkDisplayName == "" {
cm.checkDisplayName = CheckDisplayNameType(fmt.Sprintf("%s /cgm", string(cm.checkInstanceID)))
cm.checkDisplayName = CheckDisplayNameType(fmt.Sprintf("%s", string(cm.checkInstanceID)))
}
dur := cfg.Check.MaxURLAge
@ -282,7 +297,9 @@ func NewCheckManager(cfg *Config) (*CheckManager, error) {
}
cm.brokerID = api.IDType(id)
cm.brokerSelectTag = api.SearchTagType(cfg.Broker.SelectTag)
if cfg.Broker.SelectTag != "" {
cm.brokerSelectTag = strings.Split(strings.Replace(cfg.Broker.SelectTag, " ", "", -1), ",")
}
dur = cfg.Broker.MaxResponseTime
if dur == "" {
@ -296,6 +313,7 @@ func NewCheckManager(cfg *Config) (*CheckManager, error) {
// metrics
cm.availableMetrics = make(map[string]bool)
cm.metricTags = make(map[string][]string)
if err := cm.initializeTrapURL(); err != nil {
return nil, err

View File

@ -1,3 +1,7 @@
// Copyright 2016 Circonus, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package checkmgr
import (
@ -25,44 +29,85 @@ func (cm *CheckManager) ActivateMetric(name string) bool {
return false
}
// AddNewMetrics updates a check bundle with new metrics
func (cm *CheckManager) AddNewMetrics(newMetrics map[string]*api.CheckBundleMetric) {
// only if check manager is enabled
if !cm.enabled {
return
// AddMetricTags updates check bundle metrics with tags
func (cm *CheckManager) AddMetricTags(metricName string, tags []string, appendTags bool) bool {
tagsUpdated := false
if len(tags) == 0 {
return tagsUpdated
}
// only if checkBundle has been populated
if cm.checkBundle == nil {
return
if _, exists := cm.metricTags[metricName]; !exists {
foundMetric := false
for _, metric := range cm.checkBundle.Metrics {
if metric.Name == metricName {
foundMetric = true
cm.metricTags[metricName] = metric.Tags
break
}
}
if !foundMetric {
cm.metricTags[metricName] = []string{}
}
}
newCheckBundle := cm.checkBundle
numCurrMetrics := len(newCheckBundle.Metrics)
action := "no new"
if appendTags {
numNewTags := countNewTags(cm.metricTags[metricName], tags)
if numNewTags > 0 {
action = "Added"
cm.metricTags[metricName] = append(cm.metricTags[metricName], tags...)
tagsUpdated = true
}
} else {
action = "Set"
cm.metricTags[metricName] = tags
tagsUpdated = true
}
if cm.Debug {
cm.Log.Printf("[DEBUG] %s metric tag(s) %s %v\n", action, metricName, tags)
}
return tagsUpdated
}
// addNewMetrics updates a check bundle with new metrics
func (cm *CheckManager) addNewMetrics(newMetrics map[string]*api.CheckBundleMetric) bool {
updatedCheckBundle := false
if cm.checkBundle == nil || len(newMetrics) == 0 {
return updatedCheckBundle
}
cm.cbmu.Lock()
defer cm.cbmu.Unlock()
numCurrMetrics := len(cm.checkBundle.Metrics)
numNewMetrics := len(newMetrics)
if numCurrMetrics+numNewMetrics >= cap(newCheckBundle.Metrics) {
if numCurrMetrics+numNewMetrics >= cap(cm.checkBundle.Metrics) {
nm := make([]api.CheckBundleMetric, numCurrMetrics+numNewMetrics)
copy(nm, newCheckBundle.Metrics)
newCheckBundle.Metrics = nm
copy(nm, cm.checkBundle.Metrics)
cm.checkBundle.Metrics = nm
}
newCheckBundle.Metrics = newCheckBundle.Metrics[0 : numCurrMetrics+numNewMetrics]
cm.checkBundle.Metrics = cm.checkBundle.Metrics[0 : numCurrMetrics+numNewMetrics]
i := 0
for _, metric := range newMetrics {
newCheckBundle.Metrics[numCurrMetrics+i] = *metric
cm.checkBundle.Metrics[numCurrMetrics+i] = *metric
i++
updatedCheckBundle = true
}
checkBundle, err := cm.apih.UpdateCheckBundle(newCheckBundle)
if err != nil {
cm.Log.Printf("[ERROR] updating check bundle with new metrics %v", err)
return
if updatedCheckBundle {
cm.forceCheckUpdate = true
}
cm.checkBundle = checkBundle
cm.inventoryMetrics()
return updatedCheckBundle
}
// inventoryMetrics creates list of active metrics in check bundle
@ -73,3 +118,31 @@ func (cm *CheckManager) inventoryMetrics() {
}
cm.availableMetrics = availableMetrics
}
// countNewTags returns a count of new tags which do not exist in the current list of tags
func countNewTags(currTags []string, newTags []string) int {
if len(newTags) == 0 {
return 0
}
if len(currTags) == 0 {
return len(newTags)
}
newTagCount := 0
for _, newTag := range newTags {
found := false
for _, currTag := range currTags {
if newTag == currTag {
found = true
break
}
}
if !found {
newTagCount++
}
}
return newTagCount
}

View File

@ -1,3 +1,7 @@
// Copyright 2016 Circonus, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package circonusgometrics provides instrumentation for your applications in the form
// of counters, gauges and histograms and allows you to publish them to
// Circonus
@ -30,6 +34,7 @@ import (
"io/ioutil"
"log"
"os"
"strconv"
"sync"
"time"
@ -43,8 +48,12 @@ const (
// Config options for circonus-gometrics
type Config struct {
Log *log.Logger
Debug bool
Log *log.Logger
Debug bool
ResetCounters string // reset/delete counters on flush (default true)
ResetGauges string // reset/delete gauges on flush (default true)
ResetHistograms string // reset/delete histograms on flush (default true)
ResetText string // reset/delete text on flush (default true)
// API, Check and Broker configuration options
CheckManager checkmgr.Config
@ -55,12 +64,16 @@ type Config struct {
// CirconusMetrics state
type CirconusMetrics struct {
Log *log.Logger
Debug bool
flushInterval time.Duration
flushing bool
flushmu sync.Mutex
check *checkmgr.CheckManager
Log *log.Logger
Debug bool
resetCounters bool
resetGauges bool
resetHistograms bool
resetText bool
flushInterval time.Duration
flushing bool
flushmu sync.Mutex
check *checkmgr.CheckManager
counters map[string]uint64
cm sync.Mutex
@ -102,12 +115,10 @@ func NewCirconusMetrics(cfg *Config) (*CirconusMetrics, error) {
}
cm.Debug = cfg.Debug
if cm.Debug {
if cfg.Log == nil {
cm.Log = log.New(os.Stderr, "", log.LstdFlags)
} else {
cm.Log = cfg.Log
}
cm.Log = cfg.Log
if cm.Debug && cfg.Log == nil {
cm.Log = log.New(os.Stderr, "", log.LstdFlags)
}
if cm.Log == nil {
cm.Log = log.New(ioutil.Discard, "", log.LstdFlags)
@ -124,6 +135,36 @@ func NewCirconusMetrics(cfg *Config) (*CirconusMetrics, error) {
}
cm.flushInterval = dur
var setting bool
cm.resetCounters = true
if cfg.ResetCounters != "" {
if setting, err = strconv.ParseBool(cfg.ResetCounters); err == nil {
cm.resetCounters = setting
}
}
cm.resetGauges = true
if cfg.ResetGauges != "" {
if setting, err = strconv.ParseBool(cfg.ResetGauges); err == nil {
cm.resetGauges = setting
}
}
cm.resetHistograms = true
if cfg.ResetHistograms != "" {
if setting, err = strconv.ParseBool(cfg.ResetHistograms); err == nil {
cm.resetHistograms = setting
}
}
cm.resetText = true
if cfg.ResetText != "" {
if setting, err = strconv.ParseBool(cfg.ResetText); err == nil {
cm.resetText = setting
}
}
cfg.CheckManager.Debug = cm.Debug
cfg.CheckManager.Log = cm.Log
@ -242,7 +283,13 @@ func (m *CirconusMetrics) Flush() {
}
}
m.submit(output, newMetrics)
if len(output) > 0 {
m.submit(output, newMetrics)
} else {
if m.Debug {
m.Log.Println("[DEBUG] No metrics to send, skipping")
}
}
m.flushmu.Lock()
m.flushing = false

View File

@ -1,3 +1,7 @@
// Copyright 2016 Circonus, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package circonusgometrics
// A Counter is a monotonically increasing unsigned integer.

View File

@ -1,3 +1,7 @@
// Copyright 2016 Circonus, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package circonusgometrics
// A Gauge is an instantaneous measurement of a value.

View File

@ -1,3 +1,7 @@
// Copyright 2016 Circonus, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package circonusgometrics
import (
@ -25,19 +29,20 @@ func (m *CirconusMetrics) RecordValue(metric string, val float64) {
// SetHistogramValue adds a value to a histogram
func (m *CirconusMetrics) SetHistogramValue(metric string, val float64) {
m.NewHistogram(metric)
hist := m.NewHistogram(metric)
m.histograms[metric].rw.Lock()
defer m.histograms[metric].rw.Unlock()
m.histograms[metric].hist.RecordValue(val)
m.hm.Lock()
hist.rw.Lock()
hist.hist.RecordValue(val)
hist.rw.Unlock()
m.hm.Unlock()
}
// RemoveHistogram removes a histogram
func (m *CirconusMetrics) RemoveHistogram(metric string) {
m.hm.Lock()
defer m.hm.Unlock()
delete(m.histograms, metric)
m.hm.Unlock()
}
// NewHistogram returns a histogram instance.
@ -67,7 +72,6 @@ func (h *Histogram) Name() string {
// RecordValue records the given value to a histogram instance
func (h *Histogram) RecordValue(v float64) {
h.rw.Lock()
defer h.rw.Unlock()
h.hist.RecordValue(v)
h.rw.Unlock()
}

View File

@ -0,0 +1,15 @@
// Copyright 2016 Circonus, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package circonusgometrics
// SetMetricTags sets the tags for the named metric and flags a check update is needed
func (m *CirconusMetrics) SetMetricTags(name string, tags []string) bool {
return m.check.AddMetricTags(name, tags, false)
}
// AddMetricTags appends tags to any existing tags for the named metric and flags a check update is needed
func (m *CirconusMetrics) AddMetricTags(name string, tags []string) bool {
return m.check.AddMetricTags(name, tags, true)
}

View File

@ -1,9 +1,14 @@
// Copyright 2016 Circonus, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package circonusgometrics
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"log"
"net"
@ -16,9 +21,9 @@ import (
)
func (m *CirconusMetrics) submit(output map[string]interface{}, newMetrics map[string]*api.CheckBundleMetric) {
if len(newMetrics) > 0 {
m.check.AddNewMetrics(newMetrics)
}
// update check if there are any new metrics or, if metric tags have been added since last submit
m.check.UpdateCheck(newMetrics)
str, err := json.Marshal(output)
if err != nil {
@ -49,8 +54,32 @@ func (m *CirconusMetrics) trapCall(payload []byte) (int, error) {
if err != nil {
return 0, err
}
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Accept", "application/json")
// keep last HTTP error in the event of retry failure
var lastHTTPError error
retryPolicy := func(resp *http.Response, err error) (bool, error) {
if err != nil {
lastHTTPError = err
return true, err
}
// Check the response code. We retry on 500-range responses to allow
// the server time to recover, as 500's are typically not permanent
// errors and may relate to outages on the server side. This will catch
// invalid response codes as well, like 0 and 999.
if resp.StatusCode == 0 || resp.StatusCode >= 500 {
body, readErr := ioutil.ReadAll(resp.Body)
if readErr != nil {
lastHTTPError = fmt.Errorf("- last HTTP error: %d %+v", resp.StatusCode, readErr)
} else {
lastHTTPError = fmt.Errorf("- last HTTP error: %d %s", resp.StatusCode, string(body))
}
return true, nil
}
return false, nil
}
client := retryablehttp.NewClient()
if trap.URL.Scheme == "https" {
client.HTTPClient.Transport = &http.Transport{
@ -78,10 +107,17 @@ func (m *CirconusMetrics) trapCall(payload []byte) (int, error) {
DisableCompression: true,
}
}
client.RetryWaitMin = 10 * time.Millisecond
client.RetryWaitMax = 50 * time.Millisecond
client.RetryWaitMin = 1 * time.Second
client.RetryWaitMax = 5 * time.Second
client.RetryMax = 3
client.Logger = m.Log
// retryablehttp only groks log or no log
// but, outputs everything as [DEBUG] messages
if m.Debug {
client.Logger = m.Log
} else {
client.Logger = log.New(ioutil.Discard, "", log.LstdFlags)
}
client.CheckRetry = retryPolicy
attempts := -1
client.RequestLogHook = func(logger *log.Logger, req *http.Request, retryNumber int) {
@ -90,6 +126,9 @@ func (m *CirconusMetrics) trapCall(payload []byte) (int, error) {
resp, err := client.Do(req)
if err != nil {
if lastHTTPError != nil {
return 0, fmt.Errorf("[ERROR] submitting: %+v %+v", err, lastHTTPError)
}
if attempts == client.RetryMax {
m.check.RefreshTrap()
}
@ -103,9 +142,8 @@ func (m *CirconusMetrics) trapCall(payload []byte) (int, error) {
}
var response map[string]interface{}
err = json.Unmarshal(body, &response)
if err != nil {
m.Log.Printf("[ERROR] parsing body, proceeding. %s\n", err)
if err := json.Unmarshal(body, &response); err != nil {
m.Log.Printf("[ERROR] parsing body, proceeding. %v (%s)\n", err, body)
}
if resp.StatusCode != 200 {

View File

@ -1,3 +1,7 @@
// Copyright 2016 Circonus, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package circonusgometrics
// A Text metric is an arbitrary string

View File

@ -1,3 +1,7 @@
// Copyright 2016 Circonus, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package circonusgometrics
import (

View File

@ -1,3 +1,7 @@
// Copyright 2016 Circonus, Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package circonusgometrics
import (
@ -80,7 +84,9 @@ func (m *CirconusMetrics) snapshot() (c map[string]uint64, g map[string]string,
h = make(map[string]*circonusllhist.Histogram, len(m.histograms))
for n, hist := range m.histograms {
hist.rw.Lock()
h[n] = hist.hist.CopyAndReset()
hist.rw.Unlock()
}
t = make(map[string]string, len(m.text)+len(m.textFuncs))
@ -92,5 +98,24 @@ func (m *CirconusMetrics) snapshot() (c map[string]uint64, g map[string]string,
t[n] = f()
}
if m.resetCounters {
m.counters = make(map[string]uint64)
m.counterFuncs = make(map[string]func() uint64)
}
if m.resetGauges {
m.gauges = make(map[string]string)
m.gaugeFuncs = make(map[string]func() int64)
}
if m.resetHistograms {
m.histograms = make(map[string]*Histogram)
}
if m.resetText {
m.text = make(map[string]string)
m.textFuncs = make(map[string]func() string)
}
return
}

View File

@ -15,7 +15,7 @@ import (
)
const (
DEFAULT_HIST_SIZE = int16(100)
DEFAULT_HIST_SIZE = uint16(100)
)
var power_of_ten = [...]float64{
@ -70,6 +70,14 @@ func NewBinFromFloat64(d float64) *Bin {
hb.SetFromFloat64(d)
return hb
}
type FastL2 struct {
l1, l2 int
}
func (hb *Bin) fastl2() FastL2 {
return FastL2{l1: int(uint8(hb.exp)), l2: int(uint8(hb.val))}
}
func (hb *Bin) SetFromFloat64(d float64) *Bin {
hb.val = -1
if math.IsInf(d, 0) || math.IsNaN(d) {
@ -117,10 +125,7 @@ func (hb *Bin) SetFromFloat64(d float64) *Bin {
return hb
}
func (hb *Bin) PowerOfTen() float64 {
idx := int(hb.exp)
if idx < 0 {
idx = 256 + idx
}
idx := int(uint8(hb.exp))
return power_of_ten[idx]
}
@ -183,69 +188,58 @@ func (hb *Bin) Left() float64 {
}
func (h1 *Bin) Compare(h2 *Bin) int {
if h1.val == h2.val && h1.exp == h2.exp {
return 0
var v1, v2 int
// slide exp positive,
// shift by size of val
// multiple by (val != 0)
// then add or subtract val accordingly
if h1.val >= 0 {
v1 = ((int(h1.exp)+256)<<8)*int(((h1.val|(^h1.val+1))>>8)&1) + int(h1.val)
} else {
v1 = ((int(h1.exp)+256)<<8)*int(((h1.val|(^h1.val+1))>>8)&1) - int(h1.val)
}
if h1.val == -1 {
return 1
if h2.val >= 0 {
v2 = ((int(h2.exp)+256)<<8)*int(((h2.val|(^h2.val+1))>>8)&1) + int(h2.val)
} else {
v2 = ((int(h2.exp)+256)<<8)*int(((h2.val|(^h2.val+1))>>8)&1) - int(h2.val)
}
if h2.val == -1 {
return -1
}
if h1.val == 0 {
if h2.val > 0 {
return 1
}
return -1
}
if h2.val == 0 {
if h1.val < 0 {
return 1
}
return -1
}
if h1.val < 0 && h2.val > 0 {
return 1
}
if h1.val > 0 && h2.val < 0 {
return -1
}
if h1.exp == h2.exp {
if h1.val < h2.val {
return 1
}
return -1
}
if h1.exp > h2.exp {
if h1.val < 0 {
return 1
}
return -1
}
if h1.exp < h2.exp {
if h1.val < 0 {
return -1
}
return 1
}
return 0
// return the difference
return v2 - v1
}
// This histogram structure tracks values are two decimal digits of precision
// with a bounded error that remains bounded upon composition
type Histogram struct {
mutex sync.Mutex
bvs []Bin
used int16
allocd int16
used uint16
allocd uint16
lookup [256][]uint16
mutex sync.Mutex
useLocks bool
}
// New returns a new Histogram
func New() *Histogram {
return &Histogram{
allocd: DEFAULT_HIST_SIZE,
used: 0,
bvs: make([]Bin, DEFAULT_HIST_SIZE),
allocd: DEFAULT_HIST_SIZE,
used: 0,
bvs: make([]Bin, DEFAULT_HIST_SIZE),
useLocks: true,
}
}
// New returns a Histogram without locking
func NewNoLocks() *Histogram {
return &Histogram{
allocd: DEFAULT_HIST_SIZE,
used: 0,
bvs: make([]Bin, DEFAULT_HIST_SIZE),
useLocks: false,
}
}
@ -266,9 +260,24 @@ func (h *Histogram) Mean() float64 {
// Reset forgets all bins in the histogram (they remain allocated)
func (h *Histogram) Reset() {
h.mutex.Lock()
if h.useLocks {
h.mutex.Lock()
defer h.mutex.Unlock()
}
for i := 0; i < 256; i++ {
if h.lookup[i] != nil {
for j := range h.lookup[i] {
h.lookup[i][j] = 0
}
}
}
h.used = 0
h.mutex.Unlock()
}
// RecordIntScale records an integer scaler value, returning an error if the
// value is out of range.
func (h *Histogram) RecordIntScale(val, scale int) error {
return h.RecordIntScales(val, scale, 1)
}
// RecordValue records the given value, returning an error if the value is out
@ -304,13 +313,19 @@ func (h *Histogram) RecordCorrectedValue(v, expectedInterval int64) error {
}
// find where a new bin should go
func (h *Histogram) InternalFind(hb *Bin) (bool, int16) {
func (h *Histogram) InternalFind(hb *Bin) (bool, uint16) {
if h.used == 0 {
return false, 0
}
f2 := hb.fastl2()
if h.lookup[f2.l1] != nil {
if idx := h.lookup[f2.l1][f2.l2]; idx != 0 {
return true, idx - 1
}
}
rv := -1
idx := int16(0)
l := int16(0)
idx := uint16(0)
l := uint16(0)
r := h.used - 1
for l < r {
check := (r + l) / 2
@ -339,10 +354,9 @@ func (h *Histogram) InternalFind(hb *Bin) (bool, int16) {
}
func (h *Histogram) InsertBin(hb *Bin, count int64) uint64 {
h.mutex.Lock()
defer h.mutex.Unlock()
if count == 0 {
return 0
if h.useLocks {
h.mutex.Lock()
defer h.mutex.Unlock()
}
found, idx := h.InternalFind(hb)
if !found {
@ -363,13 +377,20 @@ func (h *Histogram) InsertBin(hb *Bin, count int64) uint64 {
h.bvs[idx].exp = hb.exp
h.bvs[idx].count = uint64(count)
h.used++
for i := idx; i < h.used; i++ {
f2 := h.bvs[i].fastl2()
if h.lookup[f2.l1] == nil {
h.lookup[f2.l1] = make([]uint16, 256)
}
h.lookup[f2.l1][f2.l2] = uint16(i) + 1
}
return h.bvs[idx].count
}
var newval uint64
if count < 0 {
newval = h.bvs[idx].count - uint64(-count)
} else {
if count >= 0 {
newval = h.bvs[idx].count + uint64(count)
} else {
newval = h.bvs[idx].count - uint64(-count)
}
if newval < h.bvs[idx].count { //rolled
newval = ^uint64(0)
@ -378,6 +399,39 @@ func (h *Histogram) InsertBin(hb *Bin, count int64) uint64 {
return newval - h.bvs[idx].count
}
// RecordIntScales records n occurrences of the given value, returning an error if
// the value is out of range.
func (h *Histogram) RecordIntScales(val, scale int, n int64) error {
sign := 1
if val == 0 {
scale = 0
} else {
if val < 0 {
val = 0 - val
sign = -1
}
if val < 10 {
val *= 10
scale -= 1
}
for val > 100 {
val /= 10
scale++
}
}
if scale < -128 {
val = 0
scale = 0
} else if scale > 127 {
val = 0xff
scale = 0
}
val *= sign
hb := Bin{val: int8(val), exp: int8(scale), count: 0}
h.InsertBin(&hb, n)
return nil
}
// RecordValues records n occurrences of the given value, returning an error if
// the value is out of range.
func (h *Histogram) RecordValues(v float64, n int64) error {
@ -389,11 +443,13 @@ func (h *Histogram) RecordValues(v float64, n int64) error {
// Approximate mean
func (h *Histogram) ApproxMean() float64 {
h.mutex.Lock()
defer h.mutex.Unlock()
if h.useLocks {
h.mutex.Lock()
defer h.mutex.Unlock()
}
divisor := 0.0
sum := 0.0
for i := int16(0); i < h.used; i++ {
for i := uint16(0); i < h.used; i++ {
midpoint := h.bvs[i].Midpoint()
cardinality := float64(h.bvs[i].count)
divisor += cardinality
@ -407,10 +463,12 @@ func (h *Histogram) ApproxMean() float64 {
// Approximate sum
func (h *Histogram) ApproxSum() float64 {
h.mutex.Lock()
defer h.mutex.Unlock()
if h.useLocks {
h.mutex.Lock()
defer h.mutex.Unlock()
}
sum := 0.0
for i := int16(0); i < h.used; i++ {
for i := uint16(0); i < h.used; i++ {
midpoint := h.bvs[i].Midpoint()
cardinality := float64(h.bvs[i].count)
sum += midpoint * cardinality
@ -419,10 +477,12 @@ func (h *Histogram) ApproxSum() float64 {
}
func (h *Histogram) ApproxQuantile(q_in []float64) ([]float64, error) {
h.mutex.Lock()
defer h.mutex.Unlock()
if h.useLocks {
h.mutex.Lock()
defer h.mutex.Unlock()
}
q_out := make([]float64, len(q_in))
i_q, i_b := 0, int16(0)
i_q, i_b := 0, uint16(0)
total_cnt, bin_width, bin_left, lower_cnt, upper_cnt := 0.0, 0.0, 0.0, 0.0, 0.0
if len(q_in) == 0 {
return q_out, nil
@ -485,8 +545,10 @@ func (h *Histogram) ApproxQuantile(q_in []float64) ([]float64, error) {
// ValueAtQuantile returns the recorded value at the given quantile (0..1).
func (h *Histogram) ValueAtQuantile(q float64) float64 {
h.mutex.Lock()
defer h.mutex.Unlock()
if h.useLocks {
h.mutex.Lock()
defer h.mutex.Unlock()
}
q_in := make([]float64, 1)
q_in[0] = q
q_out, err := h.ApproxQuantile(q_in)
@ -505,16 +567,20 @@ func (h *Histogram) SignificantFigures() int64 {
// Equals returns true if the two Histograms are equivalent, false if not.
func (h *Histogram) Equals(other *Histogram) bool {
h.mutex.Lock()
other.mutex.Lock()
defer h.mutex.Unlock()
defer other.mutex.Unlock()
if h.useLocks {
h.mutex.Lock()
defer h.mutex.Unlock()
}
if other.useLocks {
other.mutex.Lock()
defer other.mutex.Unlock()
}
switch {
case
h.used != other.used:
return false
default:
for i := int16(0); i < h.used; i++ {
for i := uint16(0); i < h.used; i++ {
if h.bvs[i].Compare(&other.bvs[i]) != 0 {
return false
}
@ -527,8 +593,10 @@ func (h *Histogram) Equals(other *Histogram) bool {
}
func (h *Histogram) CopyAndReset() *Histogram {
h.mutex.Lock()
defer h.mutex.Unlock()
if h.useLocks {
h.mutex.Lock()
defer h.mutex.Unlock()
}
newhist := &Histogram{
allocd: h.allocd,
used: h.used,
@ -537,11 +605,20 @@ func (h *Histogram) CopyAndReset() *Histogram {
h.allocd = DEFAULT_HIST_SIZE
h.bvs = make([]Bin, DEFAULT_HIST_SIZE)
h.used = 0
for i := 0; i < 256; i++ {
if h.lookup[i] != nil {
for j := range h.lookup[i] {
h.lookup[i][j] = 0
}
}
}
return newhist
}
func (h *Histogram) DecStrings() []string {
h.mutex.Lock()
defer h.mutex.Unlock()
if h.useLocks {
h.mutex.Lock()
defer h.mutex.Unlock()
}
out := make([]string, h.used)
for i, bin := range h.bvs[0:h.used] {
var buffer bytes.Buffer

View File

@ -32,12 +32,16 @@ import (
var (
// Default retry configuration
defaultRetryWaitMin = 1 * time.Second
defaultRetryWaitMax = 5 * time.Minute
defaultRetryMax = 32
defaultRetryWaitMax = 30 * time.Second
defaultRetryMax = 4
// defaultClient is used for performing requests without explicitly making
// a new client. It is purposely private to avoid modifications.
defaultClient = NewClient()
// We need to consume response bodies to maintain http connections, but
// limit the size we consume to respReadLimit.
respReadLimit = int64(4096)
)
// LenReader is an interface implemented by many in-memory io.Reader's. Used
@ -93,6 +97,16 @@ type RequestLogHook func(*log.Logger, *http.Request, int)
// from this method, this will affect the response returned from Do().
type ResponseLogHook func(*log.Logger, *http.Response)
// CheckRetry specifies a policy for handling retries. It is called
// following each request with the response and error values returned by
// the http.Client. If CheckRetry returns false, the Client stops retrying
// and returns the response to the caller. If CheckRetry returns an error,
// that error value is returned in lieu of the error from the request. The
// Client will close any response body when retrying, but if the retry is
// aborted it is up to the CheckResponse callback to properly close any
// response body before returning.
type CheckRetry func(resp *http.Response, err error) (bool, error)
// Client is used to make HTTP requests. It adds additional functionality
// like automatic retries to tolerate minor outages.
type Client struct {
@ -110,6 +124,10 @@ type Client struct {
// ResponseLogHook allows a user-supplied function to be called
// with the response from each HTTP request executed.
ResponseLogHook ResponseLogHook
// CheckRetry specifies the policy for handling retries, and is called
// after each request. The default policy is DefaultRetryPolicy.
CheckRetry CheckRetry
}
// NewClient creates a new Client with default settings.
@ -120,9 +138,27 @@ func NewClient() *Client {
RetryWaitMin: defaultRetryWaitMin,
RetryWaitMax: defaultRetryWaitMax,
RetryMax: defaultRetryMax,
CheckRetry: DefaultRetryPolicy,
}
}
// DefaultRetryPolicy provides a default callback for Client.CheckRetry, which
// will retry on connection errors and server errors.
func DefaultRetryPolicy(resp *http.Response, err error) (bool, error) {
if err != nil {
return true, err
}
// Check the response code. We retry on 500-range responses to allow
// the server time to recover, as 500's are typically not permanent
// errors and may relate to outages on the server side. This will catch
// invalid response codes as well, like 0 and 999.
if resp.StatusCode == 0 || resp.StatusCode >= 500 {
return true, nil
}
return false, nil
}
// Do wraps calling an HTTP method with retries.
func (c *Client) Do(req *Request) (*http.Response, error) {
c.Logger.Printf("[DEBUG] %s %s", req.Method, req.URL)
@ -143,27 +179,34 @@ func (c *Client) Do(req *Request) (*http.Response, error) {
// Attempt the request
resp, err := c.HTTPClient.Do(req.Request)
// Check if we should continue with retries.
checkOK, checkErr := c.CheckRetry(resp, err)
if err != nil {
c.Logger.Printf("[ERR] %s %s request failed: %v", req.Method, req.URL, err)
goto RETRY
}
code = resp.StatusCode
// Call the response logger function if provided.
if c.ResponseLogHook != nil {
c.ResponseLogHook(c.Logger, resp)
} else {
// Call this here to maintain the behavior of logging all requests,
// even if CheckRetry signals to stop.
if c.ResponseLogHook != nil {
// Call the response logger function if provided.
c.ResponseLogHook(c.Logger, resp)
}
}
// Check the response code. We retry on 500-range responses to allow
// the server time to recover, as 500's are typically not permanent
// errors and may relate to outages on the server side.
if code%500 < 100 {
resp.Body.Close()
goto RETRY
// Now decide if we should continue.
if !checkOK {
if checkErr != nil {
err = checkErr
}
return resp, err
}
// We're going to retry, consume any response to reuse the connection.
if err == nil {
c.drainBody(resp.Body)
}
return resp, nil
RETRY:
remain := c.RetryMax - i
if remain == 0 {
break
@ -182,6 +225,15 @@ func (c *Client) Do(req *Request) (*http.Response, error) {
req.Method, req.URL, c.RetryMax+1)
}
// Try to read the response body so we can reuse this connection.
func (c *Client) drainBody(body io.ReadCloser) {
defer body.Close()
_, err := io.Copy(ioutil.Discard, io.LimitReader(body, respReadLimit))
if err != nil {
c.Logger.Printf("[ERR] error reading response body: %v", err)
}
}
// Get is a shortcut for doing a GET request without making a new client.
func Get(url string) (*http.Response, error) {
return defaultClient.Get(url)

30
vendor/vendor.json vendored
View File

@ -220,28 +220,28 @@
"versionExact": "v1.2.1"
},
{
"checksumSHA1": "b5zgHT9TxBAVh/KP9kQi7QVoz9w=",
"checksumSHA1": "szvY4u7TlXkrQ3PW8wmyJaIFy0U=",
"path": "github.com/circonus-labs/circonus-gometrics",
"revision": "a7c30e0dcc6e2341053132470dcedc12bc7705ef",
"revisionTime": "2016-07-22T17:27:10Z"
"revision": "d17a8420c36e800fcb46bbd4d2a15b93c68605ea",
"revisionTime": "2016-11-09T19:23:37Z"
},
{
"checksumSHA1": "IFiYTxu8jshL4A8BCttUaDhp1m4=",
"checksumSHA1": "WUE6oF152uN5FcLmmq+nO3Yl7pU=",
"path": "github.com/circonus-labs/circonus-gometrics/api",
"revision": "a7c30e0dcc6e2341053132470dcedc12bc7705ef",
"revisionTime": "2016-07-22T17:27:10Z"
"revision": "d17a8420c36e800fcb46bbd4d2a15b93c68605ea",
"revisionTime": "2016-11-09T19:23:37Z"
},
{
"checksumSHA1": "+9vcRzlTdvEjH/Uf8fKC5MXdjNw=",
"checksumSHA1": "beRBHHy2+V6Ht4cACIMmZOCnzgg=",
"path": "github.com/circonus-labs/circonus-gometrics/checkmgr",
"revision": "a7c30e0dcc6e2341053132470dcedc12bc7705ef",
"revisionTime": "2016-07-22T17:27:10Z"
"revision": "d17a8420c36e800fcb46bbd4d2a15b93c68605ea",
"revisionTime": "2016-11-09T19:23:37Z"
},
{
"checksumSHA1": "C4Z7+l5GOpOCW5DcvNYzheGvQRE=",
"checksumSHA1": "eYWKyMvUWZpmuXjDEIGy9lBddK0=",
"path": "github.com/circonus-labs/circonusllhist",
"revision": "d724266ae5270ae8b87a5d2e8081f04e307c3c18",
"revisionTime": "2016-05-26T04:38:13Z"
"revision": "f3bb61c09a65a1bb5833219937fe4e7042dadab4",
"revisionTime": "2016-11-09T20:01:07Z"
},
{
"checksumSHA1": "fXAinpJ5bOcborK7AiO1rnW60BI=",
@ -392,10 +392,10 @@
"revisionTime": "2015-09-16T20:57:42Z"
},
{
"checksumSHA1": "bfVGm7xZ2VFpddJp3KEPUZ8Y9Po=",
"checksumSHA1": "ErJHGU6AVPZM9yoY/xV11TwSjQs=",
"path": "github.com/hashicorp/go-retryablehttp",
"revision": "886ce0458bc81ccca0fb7044c1be0e9ab590bed7",
"revisionTime": "2016-07-18T23:34:41Z"
"revision": "6e85be8fee1dcaa02c0eaaac2df5a8fbecf94145",
"revisionTime": "2016-09-30T03:51:02Z"
},
{
"checksumSHA1": "xZ7Ban1x//6uUIU1xtrTbCYNHBc=",