Add metrics to all major gce operations {latency, errors}

The new metrics is:

  cloudprovider_gce_api_request_duration_seconds{request, region, zone}
  cloudprovider_gce_api_request_errors{request, region, zone}

`request` is the specific function that is used.
`region` is the target region (Will be "<n/a>" if not applicable)
`zone` is the target zone (Will be "<n/a>" if not applicable)

Note: this fixes some issues with the previous implementation of
metrics for disks:
- Time duration tracked was of the initial API call, not the entire
  operation.
- Metrics label tuple would have resulted in many independent
  histograms stored, one for each disk. (Did not aggregate well).
pull/6/head
Bowei Du 2017-04-12 11:57:20 -07:00
parent 310c914a6d
commit ee847ebf8a
18 changed files with 527 additions and 273 deletions

View File

@ -23,7 +23,6 @@ go_library(
"gce_instancegroup.go",
"gce_instances.go",
"gce_loadbalancer.go",
"gce_metrics.go",
"gce_op.go",
"gce_routes.go",
"gce_staticip.go",
@ -31,6 +30,7 @@ go_library(
"gce_urlmap.go",
"gce_util.go",
"gce_zones.go",
"metrics.go",
"token_source.go",
],
tags = ["automanaged"],
@ -43,13 +43,11 @@ go_library(
"//vendor/cloud.google.com/go/compute/metadata:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/golang.org/x/oauth2:go_default_library",
"//vendor/golang.org/x/oauth2/google:go_default_library",
"//vendor/google.golang.org/api/compute/v0.alpha:go_default_library",
"//vendor/google.golang.org/api/compute/v1:go_default_library",
"//vendor/google.golang.org/api/container/v1:go_default_library",
"//vendor/google.golang.org/api/gensupport:go_default_library",
"//vendor/google.golang.org/api/googleapi:go_default_library",
"//vendor/gopkg.in/gcfg.v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",

View File

@ -25,7 +25,6 @@ import (
"time"
"cloud.google.com/go/compute/metadata"
"golang.org/x/net/context"
"gopkg.in/gcfg.v1"
@ -39,7 +38,6 @@ import (
computealpha "google.golang.org/api/compute/v0.alpha"
compute "google.golang.org/api/compute/v1"
container "google.golang.org/api/container/v1"
"google.golang.org/api/gensupport"
)
const (
@ -103,47 +101,12 @@ type Config struct {
}
}
// ApiWithNamespace stores api and namespace in context
type apiWithNamespace struct {
namespace string
apiCall string
}
func init() {
registerMetrics()
cloudprovider.RegisterCloudProvider(
ProviderName,
func(config io.Reader) (cloudprovider.Interface, error) {
return newGCECloud(config)
})
gensupport.RegisterHook(trackAPILatency)
}
func trackAPILatency(ctx context.Context, req *http.Request) func(resp *http.Response) {
requestTime := time.Now()
t := ctx.Value("kube-api-namespace")
apiNamespace, ok := t.(apiWithNamespace)
if !ok {
return nil
}
apiResponseReceived := func(resp *http.Response) {
timeTaken := time.Since(requestTime).Seconds()
if mi, ok := gceMetricMap[apiNamespace.apiCall]; ok {
mi.WithLabelValues(apiNamespace.namespace).Observe(timeTaken)
}
}
return apiResponseReceived
}
func contextWithNamespace(namespace string, apiCall string) context.Context {
rootContext := context.Background()
apiNamespace := apiWithNamespace{
namespace: namespace,
apiCall: apiCall,
}
return context.WithValue(rootContext, "kube-api-namespace", apiNamespace)
}
// Raw access to the underlying GCE service, probably should only be used for e2e tests
@ -340,7 +303,7 @@ func getNetworkNameViaAPICall(svc *compute.Service, projectID string) (string, e
}
if networkList == nil || len(networkList.Items) <= 0 {
return "", fmt.Errorf("GCE Network List call returned no networks for project %q.", projectID)
return "", fmt.Errorf("GCE Network List call returned no networks for project %q", projectID)
}
return networkList.Items[0].Name, nil

View File

@ -18,11 +18,17 @@ package gce
import (
"net/http"
"time"
compute "google.golang.org/api/compute/v1"
)
// BackendService Management
func newBackendServiceMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"backendservice_" + request, unusedMetricLabel, unusedMetricLabel},
}
}
// GetBackendService retrieves a backend by name.
func (gce *GCECloud) GetBackendService(name string) (*compute.BackendService, error) {
@ -31,32 +37,38 @@ func (gce *GCECloud) GetBackendService(name string) (*compute.BackendService, er
// UpdateBackendService applies the given BackendService as an update to an existing service.
func (gce *GCECloud) UpdateBackendService(bg *compute.BackendService) error {
mc := newBackendServiceMetricContext("update")
op, err := gce.service.BackendServices.Update(gce.projectID, bg.Name, bg).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// DeleteBackendService deletes the given BackendService by name.
func (gce *GCECloud) DeleteBackendService(name string) error {
mc := newBackendServiceMetricContext("delete")
op, err := gce.service.BackendServices.Delete(gce.projectID, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return nil
}
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// CreateBackendService creates the given BackendService.
func (gce *GCECloud) CreateBackendService(bg *compute.BackendService) error {
mc := newBackendServiceMetricContext("create")
op, err := gce.service.BackendServices.Insert(gce.projectID, bg).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// ListBackendServices lists all backend services in the project.

View File

@ -18,11 +18,17 @@ package gce
import (
"net/http"
"time"
compute "google.golang.org/api/compute/v1"
)
// SSL Certificate management
func newCertMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"cert_" + request, unusedMetricLabel, unusedMetricLabel},
}
}
// GetSslCertificate returns the SslCertificate by name.
func (gce *GCECloud) GetSslCertificate(name string) (*compute.SslCertificate, error) {
@ -31,26 +37,34 @@ func (gce *GCECloud) GetSslCertificate(name string) (*compute.SslCertificate, er
// CreateSslCertificate creates and returns a SslCertificate.
func (gce *GCECloud) CreateSslCertificate(sslCerts *compute.SslCertificate) (*compute.SslCertificate, error) {
mc := newCertMetricContext("create")
op, err := gce.service.SslCertificates.Insert(gce.projectID, sslCerts).Do()
if err != nil {
return nil, err
return nil, mc.Observe(err)
}
if err = gce.waitForGlobalOp(op); err != nil {
return nil, err
if err = gce.waitForGlobalOp(op, mc); err != nil {
return nil, mc.Observe(err)
}
return gce.GetSslCertificate(sslCerts.Name)
}
// DeleteSslCertificate deletes the SslCertificate by name.
func (gce *GCECloud) DeleteSslCertificate(name string) error {
mc := newCertMetricContext("delete")
op, err := gce.service.SslCertificates.Delete(gce.projectID, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return nil
}
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// ListSslCertificates lists all SslCertificates in the project.

View File

@ -22,6 +22,7 @@ import (
"net/http"
"path"
"strings"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -84,6 +85,13 @@ type GCEDisk struct {
Type string
}
func newDiskMetricContext(request, zone string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"disk_" + request, unusedMetricLabel, zone},
}
}
func (gce *GCECloud) AttachDisk(diskName string, nodeName types.NodeName, readOnly bool) error {
instanceName := mapNodeNameToInstanceName(nodeName)
instance, err := gce.getInstanceByName(instanceName)
@ -99,13 +107,16 @@ func (gce *GCECloud) AttachDisk(diskName string, nodeName types.NodeName, readOn
readWrite = "READ_ONLY"
}
attachedDisk := gce.convertDiskToAttachedDisk(disk, readWrite)
dc := contextWithNamespace(diskName, "gce_attach_disk")
attachOp, err := gce.service.Instances.AttachDisk(gce.projectID, disk.Zone, instance.Name, attachedDisk).Context(dc).Do()
mc := newDiskMetricContext("attach", instance.Zone)
attachOp, err := gce.service.Instances.AttachDisk(
gce.projectID, disk.Zone, instance.Name, attachedDisk).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForZoneOp(attachOp, disk.Zone)
return gce.waitForZoneOp(attachOp, disk.Zone, mc)
}
func (gce *GCECloud) DetachDisk(devicePath string, nodeName types.NodeName) error {
@ -123,13 +134,14 @@ func (gce *GCECloud) DetachDisk(devicePath string, nodeName types.NodeName) erro
return fmt.Errorf("error getting instance %q", instanceName)
}
dc := contextWithNamespace(devicePath, "gce_detach_disk")
detachOp, err := gce.service.Instances.DetachDisk(gce.projectID, inst.Zone, inst.Name, devicePath).Context(dc).Do()
mc := newDiskMetricContext("detach", inst.Zone)
detachOp, err := gce.service.Instances.DetachDisk(gce.projectID, inst.Zone, inst.Name, devicePath).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForZoneOp(detachOp, inst.Zone)
return gce.waitForZoneOp(detachOp, inst.Zone, mc)
}
func (gce *GCECloud) DiskIsAttached(diskName string, nodeName types.NodeName) (bool, error) {
@ -193,7 +205,9 @@ func (gce *GCECloud) DisksAreAttached(diskNames []string, nodeName types.NodeNam
// CreateDisk creates a new Persistent Disk, with the specified name &
// size, in the specified zone. It stores specified tags encoded in
// JSON in Description field.
func (gce *GCECloud) CreateDisk(name string, diskType string, zone string, sizeGb int64, tags map[string]string) error {
func (gce *GCECloud) CreateDisk(
name string, diskType string, zone string, sizeGb int64, tags map[string]string) error {
// Do not allow creation of PDs in zones that are not managed. Such PDs
// then cannot be deleted by DeleteDisk.
isManaged := false
@ -228,13 +242,14 @@ func (gce *GCECloud) CreateDisk(name string, diskType string, zone string, sizeG
Description: tagsStr,
Type: diskTypeUri,
}
dc := contextWithNamespace(name, "gce_disk_insert")
createOp, err := gce.service.Disks.Insert(gce.projectID, zone, diskToCreate).Context(dc).Do()
mc := newDiskMetricContext("create", zone)
createOp, err := gce.service.Disks.Insert(gce.projectID, zone, diskToCreate).Do()
if err != nil {
return err
return mc.Observe(err)
}
err = gce.waitForZoneOp(createOp, zone)
err = gce.waitForZoneOp(createOp, zone, mc)
if isGCEError(err, "alreadyExists") {
glog.Warningf("GCE PD %q already exists, reusing", name)
return nil
@ -304,8 +319,7 @@ func (gce *GCECloud) GetAutoLabelsForPD(name string, zone string) (map[string]st
// Returns a GCEDisk for the disk, if it is found in the specified zone.
// If not found, returns (nil, nil)
func (gce *GCECloud) findDiskByName(diskName string, zone string) (*GCEDisk, error) {
dc := contextWithNamespace(diskName, "gce_list_disk")
disk, err := gce.service.Disks.Get(gce.projectID, zone, diskName).Context(dc).Do()
disk, err := gce.service.Disks.Get(gce.projectID, zone, diskName).Do()
if err == nil {
d := &GCEDisk{
Zone: lastComponent(disk.Zone),
@ -390,13 +404,14 @@ func (gce *GCECloud) doDeleteDisk(diskToDelete string) error {
return err
}
dc := contextWithNamespace(diskToDelete, "gce_disk_delete")
deleteOp, err := gce.service.Disks.Delete(gce.projectID, disk.Zone, disk.Name).Context(dc).Do()
mc := newDiskMetricContext("delete", disk.Zone)
deleteOp, err := gce.service.Disks.Delete(gce.projectID, disk.Zone, disk.Name).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForZoneOp(deleteOp, disk.Zone)
return gce.waitForZoneOp(deleteOp, disk.Zone, mc)
}
// Converts a Disk resource to an AttachedDisk resource.

View File

@ -17,14 +17,20 @@ limitations under the License.
package gce
import (
"time"
"k8s.io/kubernetes/pkg/api/v1"
netsets "k8s.io/kubernetes/pkg/util/net/sets"
compute "google.golang.org/api/compute/v1"
)
// Firewall management: These methods are just passthrough to the existing
// internal firewall creation methods used to manage TCPLoadBalancer.
func newFirewallMetricContext(request string, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"firewall_" + request, region, unusedMetricLabel},
}
}
// GetFirewall returns the Firewall by name.
func (gce *GCECloud) GetFirewall(name string) (*compute.Firewall, error) {
@ -37,22 +43,29 @@ func (gce *GCECloud) CreateFirewall(name, desc string, sourceRanges netsets.IPNe
if err != nil {
return err
}
// TODO: This completely breaks modularity in the cloudprovider but the methods
// shared with the TCPLoadBalancer take v1.ServicePorts.
mc := newFirewallMetricContext("create", region)
// TODO: This completely breaks modularity in the cloudprovider but
// the methods shared with the TCPLoadBalancer take v1.ServicePorts.
svcPorts := []v1.ServicePort{}
// TODO: Currently the only consumer of this method is the GCE L7
// loadbalancer controller, which never needs a protocol other than TCP.
// We should pipe through a mapping of port:protocol and default to TCP
// if UDP ports are required. This means the method signature will change
// forcing downstream clients to refactor interfaces.
// loadbalancer controller, which never needs a protocol other than
// TCP. We should pipe through a mapping of port:protocol and
// default to TCP if UDP ports are required. This means the method
// signature will change forcing downstream clients to refactor
// interfaces.
for _, p := range ports {
svcPorts = append(svcPorts, v1.ServicePort{Port: int32(p), Protocol: v1.ProtocolTCP})
}
hosts, err := gce.getInstancesByNames(hostNames)
if err != nil {
mc.Observe(err)
return err
}
return gce.createFirewall(name, region, desc, sourceRanges, svcPorts, hosts)
return mc.Observe(gce.createFirewall(name, region, desc, sourceRanges, svcPorts, hosts))
}
// DeleteFirewall deletes the given firewall rule.
@ -61,30 +74,41 @@ func (gce *GCECloud) DeleteFirewall(name string) error {
if err != nil {
return err
}
return gce.deleteFirewall(name, region)
mc := newFirewallMetricContext("delete", region)
return mc.Observe(gce.deleteFirewall(name, region))
}
// UpdateFirewall applies the given firewall rule as an update to an existing
// firewall rule with the same name.
// UpdateFirewall applies the given firewall rule as an update to an
// existing firewall rule with the same name.
func (gce *GCECloud) UpdateFirewall(name, desc string, sourceRanges netsets.IPNet, ports []int64, hostNames []string) error {
region, err := GetGCERegion(gce.localZone)
if err != nil {
return err
}
// TODO: This completely breaks modularity in the cloudprovider but the methods
// shared with the TCPLoadBalancer take v1.ServicePorts.
mc := newFirewallMetricContext("update", region)
// TODO: This completely breaks modularity in the cloudprovider but
// the methods shared with the TCPLoadBalancer take v1.ServicePorts.
svcPorts := []v1.ServicePort{}
// TODO: Currently the only consumer of this method is the GCE L7
// loadbalancer controller, which never needs a protocol other than TCP.
// We should pipe through a mapping of port:protocol and default to TCP
// if UDP ports are required. This means the method signature will change,
// forcing downstream clients to refactor interfaces.
// loadbalancer controller, which never needs a protocol other than
// TCP. We should pipe through a mapping of port:protocol and
// default to TCP if UDP ports are required. This means the method
// signature will change, forcing downstream clients to refactor
// interfaces.
for _, p := range ports {
svcPorts = append(svcPorts, v1.ServicePort{Port: int32(p), Protocol: v1.ProtocolTCP})
}
hosts, err := gce.getInstancesByNames(hostNames)
if err != nil {
mc.Observe(err)
return err
}
return gce.updateFirewall(name, region, desc, sourceRanges, svcPorts, hosts)
return mc.Observe(gce.updateFirewall(name, region, desc, sourceRanges, svcPorts, hosts))
}

View File

@ -18,16 +18,24 @@ package gce
import (
"net/http"
"time"
compute "google.golang.org/api/compute/v1"
)
// GlobalForwardingRule management
func newForwardingRuleMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"forwardingrule_" + request, region, unusedMetricLabel},
}
}
// CreateGlobalForwardingRule creates and returns a
// GlobalForwardingRule that points to the given TargetHttp(s)Proxy.
// targetProxyLink is the SelfLink of a TargetHttp(s)Proxy.
func (gce *GCECloud) CreateGlobalForwardingRule(targetProxyLink, ip, name, portRange string) (*compute.ForwardingRule, error) {
mc := newForwardingRuleMetricContext("create", "")
rule := &compute.ForwardingRule{
Name: name,
IPAddress: ip,
@ -37,34 +45,47 @@ func (gce *GCECloud) CreateGlobalForwardingRule(targetProxyLink, ip, name, portR
}
op, err := gce.service.GlobalForwardingRules.Insert(gce.projectID, rule).Do()
if err != nil {
mc.Observe(err)
return nil, err
}
if err = gce.waitForGlobalOp(op); err != nil {
if err = gce.waitForGlobalOp(op, mc); err != nil {
return nil, err
}
return gce.GetGlobalForwardingRule(name)
}
// SetProxyForGlobalForwardingRule links the given TargetHttp(s)Proxy with the given GlobalForwardingRule.
// targetProxyLink is the SelfLink of a TargetHttp(s)Proxy.
func (gce *GCECloud) SetProxyForGlobalForwardingRule(fw *compute.ForwardingRule, targetProxyLink string) error {
op, err := gce.service.GlobalForwardingRules.SetTarget(gce.projectID, fw.Name, &compute.TargetReference{Target: targetProxyLink}).Do()
mc := newForwardingRuleMetricContext("set_proxy", "")
op, err := gce.service.GlobalForwardingRules.SetTarget(
gce.projectID, fw.Name, &compute.TargetReference{Target: targetProxyLink}).Do()
if err != nil {
mc.Observe(err)
return err
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// DeleteGlobalForwardingRule deletes the GlobalForwardingRule by name.
func (gce *GCECloud) DeleteGlobalForwardingRule(name string) error {
mc := newForwardingRuleMetricContext("delete", "")
op, err := gce.service.GlobalForwardingRules.Delete(gce.projectID, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
mc.Observe(nil)
return nil
}
mc.Observe(err)
return err
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// GetGlobalForwardingRule returns the GlobalForwardingRule by name.

View File

@ -16,9 +16,18 @@ limitations under the License.
package gce
import compute "google.golang.org/api/compute/v1"
import (
"time"
// Legacy HTTP Health Checks
compute "google.golang.org/api/compute/v1"
)
func newHealthcheckMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"healthcheck_" + request, unusedMetricLabel, unusedMetricLabel},
}
}
// GetHttpHealthCheck returns the given HttpHealthCheck by name.
func (gce *GCECloud) GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) {
@ -27,29 +36,41 @@ func (gce *GCECloud) GetHttpHealthCheck(name string) (*compute.HttpHealthCheck,
// UpdateHttpHealthCheck applies the given HttpHealthCheck as an update.
func (gce *GCECloud) UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error {
mc := newHealthcheckMetricContext("update_legacy")
op, err := gce.service.HttpHealthChecks.Update(gce.projectID, hc.Name, hc).Do()
if err != nil {
mc.Observe(err)
return err
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// DeleteHttpHealthCheck deletes the given HttpHealthCheck by name.
func (gce *GCECloud) DeleteHttpHealthCheck(name string) error {
mc := newHealthcheckMetricContext("delete_legacy")
op, err := gce.service.HttpHealthChecks.Delete(gce.projectID, name).Do()
if err != nil {
mc.Observe(err)
return err
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// CreateHttpHealthCheck creates the given HttpHealthCheck.
func (gce *GCECloud) CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error {
mc := newHealthcheckMetricContext("create_legacy")
op, err := gce.service.HttpHealthChecks.Insert(gce.projectID, hc).Do()
if err != nil {
mc.Observe(err)
return err
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// ListHttpHealthChecks lists all HttpHealthChecks in the project.
@ -67,29 +88,38 @@ func (gce *GCECloud) GetHttpsHealthCheck(name string) (*compute.HttpsHealthCheck
// UpdateHttpsHealthCheck applies the given HttpsHealthCheck as an update.
func (gce *GCECloud) UpdateHttpsHealthCheck(hc *compute.HttpsHealthCheck) error {
mc := newHealthcheckMetricContext("update_legacy")
op, err := gce.service.HttpsHealthChecks.Update(gce.projectID, hc.Name, hc).Do()
if err != nil {
mc.Observe(err)
return err
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// DeleteHttpsHealthCheck deletes the given HttpsHealthCheck by name.
func (gce *GCECloud) DeleteHttpsHealthCheck(name string) error {
mc := newHealthcheckMetricContext("delete_legacy")
op, err := gce.service.HttpsHealthChecks.Delete(gce.projectID, name).Do()
if err != nil {
mc.Observe(err)
return err
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// CreateHttpsHealthCheck creates the given HttpsHealthCheck.
func (gce *GCECloud) CreateHttpsHealthCheck(hc *compute.HttpsHealthCheck) error {
mc := newHealthcheckMetricContext("create_legacy")
op, err := gce.service.HttpsHealthChecks.Insert(gce.projectID, hc).Do()
if err != nil {
mc.Observe(err)
return err
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// ListHttpsHealthChecks lists all HttpsHealthChecks in the project.
@ -107,29 +137,43 @@ func (gce *GCECloud) GetHealthCheck(name string) (*compute.HealthCheck, error) {
// UpdateHealthCheck applies the given HealthCheck as an update.
func (gce *GCECloud) UpdateHealthCheck(hc *compute.HealthCheck) error {
mc := newHealthcheckMetricContext("update")
op, err := gce.service.HealthChecks.Update(gce.projectID, hc.Name, hc).Do()
if err != nil {
mc.Observe(err)
return err
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// DeleteHealthCheck deletes the given HealthCheck by name.
func (gce *GCECloud) DeleteHealthCheck(name string) error {
mc := newHealthcheckMetricContext("delete")
op, err := gce.service.HealthChecks.Delete(gce.projectID, name).Do()
if err != nil {
mc.Observe(err)
return err
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// CreateHealthCheck creates the given HealthCheck.
func (gce *GCECloud) CreateHealthCheck(hc *compute.HealthCheck) error {
mc := newHealthcheckMetricContext("create")
op, err := gce.service.HealthChecks.Insert(gce.projectID, hc).Do()
if err != nil {
mc.Observe(err)
return err
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// ListHealthChecks lists all HealthCheck in the project.

View File

@ -20,43 +20,63 @@ import (
"fmt"
"net/http"
"strings"
"time"
"github.com/golang/glog"
compute "google.golang.org/api/compute/v1"
)
// InstanceGroup Management
func newInstanceGroupMetricContext(request string, zone string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"instancegroup_" + request, unusedMetricLabel, zone},
}
}
// CreateInstanceGroup creates an instance group with the given instances. It is the callers responsibility to add named ports.
// CreateInstanceGroup creates an instance group with the given
// instances. It is the callers responsibility to add named ports.
func (gce *GCECloud) CreateInstanceGroup(name string, zone string) (*compute.InstanceGroup, error) {
mc := newInstanceGroupMetricContext("create", zone)
op, err := gce.service.InstanceGroups.Insert(
gce.projectID, zone, &compute.InstanceGroup{Name: name}).Do()
if err != nil {
mc.Observe(err)
return nil, err
}
if err = gce.waitForZoneOp(op, zone); err != nil {
if err = gce.waitForZoneOp(op, zone, mc); err != nil {
return nil, err
}
return gce.GetInstanceGroup(name, zone)
}
// DeleteInstanceGroup deletes an instance group.
func (gce *GCECloud) DeleteInstanceGroup(name string, zone string) error {
mc := newInstanceGroupMetricContext("delete", zone)
op, err := gce.service.InstanceGroups.Delete(
gce.projectID, zone, name).Do()
if err != nil {
mc.Observe(err)
return err
}
return gce.waitForZoneOp(op, zone)
return gce.waitForZoneOp(op, zone, mc)
}
// ListInstanceGroups lists all InstanceGroups in the project and zone.
// ListInstanceGroups lists all InstanceGroups in the project and
// zone.
func (gce *GCECloud) ListInstanceGroups(zone string) (*compute.InstanceGroupList, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.InstanceGroups.List(gce.projectID, zone).Do()
}
// ListInstancesInInstanceGroup lists all the instances in a given instance group and state.
// ListInstancesInInstanceGroup lists all the instances in a given
// instance group and state.
func (gce *GCECloud) ListInstancesInInstanceGroup(name string, zone string, state string) (*compute.InstanceGroupsListInstances, error) {
// TODO: use PageToken to list all not just the first 500
return gce.service.InstanceGroups.ListInstances(
@ -64,8 +84,11 @@ func (gce *GCECloud) ListInstancesInInstanceGroup(name string, zone string, stat
&compute.InstanceGroupsListInstancesRequest{InstanceState: state}).Do()
}
// AddInstancesToInstanceGroup adds the given instances to the given instance group.
// AddInstancesToInstanceGroup adds the given instances to the given
// instance group.
func (gce *GCECloud) AddInstancesToInstanceGroup(name string, zone string, instanceNames []string) error {
mc := newInstanceGroupMetricContext("add_instances", zone)
if len(instanceNames) == 0 {
return nil
}
@ -81,13 +104,18 @@ func (gce *GCECloud) AddInstancesToInstanceGroup(name string, zone string, insta
}).Do()
if err != nil {
mc.Observe(err)
return err
}
return gce.waitForZoneOp(op, zone)
return gce.waitForZoneOp(op, zone, mc)
}
// RemoveInstancesFromInstanceGroup removes the given instances from the instance group.
// RemoveInstancesFromInstanceGroup removes the given instances from
// the instance group.
func (gce *GCECloud) RemoveInstancesFromInstanceGroup(name string, zone string, instanceNames []string) error {
mc := newInstanceGroupMetricContext("remove_instances", zone)
if len(instanceNames) == 0 {
return nil
}
@ -104,21 +132,28 @@ func (gce *GCECloud) RemoveInstancesFromInstanceGroup(name string, zone string,
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
mc.Observe(nil)
return nil
}
mc.Observe(err)
return err
}
return gce.waitForZoneOp(op, zone)
return gce.waitForZoneOp(op, zone, mc)
}
// AddPortToInstanceGroup adds a port to the given instance group.
func (gce *GCECloud) AddPortToInstanceGroup(ig *compute.InstanceGroup, port int64) (*compute.NamedPort, error) {
mc := newInstanceGroupMetricContext("add_port", ig.Zone)
for _, np := range ig.NamedPorts {
if np.Port == port {
glog.V(3).Infof("Instance group %v already has named port %+v", ig.Name, np)
return np, nil
}
}
glog.Infof("Adding port %v to instance group %v with %d ports", port, ig.Name, len(ig.NamedPorts))
namedPort := compute.NamedPort{Name: fmt.Sprintf("port%v", port), Port: port}
ig.NamedPorts = append(ig.NamedPorts, &namedPort)
@ -133,12 +168,16 @@ func (gce *GCECloud) AddPortToInstanceGroup(ig *compute.InstanceGroup, port int6
gce.projectID, zone, ig.Name,
&compute.InstanceGroupsSetNamedPortsRequest{
NamedPorts: ig.NamedPorts}).Do()
if err != nil {
mc.Observe(err)
return nil, err
}
if err = gce.waitForZoneOp(op, zone); err != nil {
if err = gce.waitForZoneOp(op, zone, mc); err != nil {
return nil, err
}
return &namedPort, nil
}

View File

@ -36,6 +36,13 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
)
func newInstancesMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"instances_" + request, unusedMetricLabel, unusedMetricLabel},
}
}
// NodeAddresses is an implementation of Instances.NodeAddresses.
func (gce *GCECloud) NodeAddresses(_ types.NodeName) ([]v1.NodeAddress, error) {
internalIP, err := metadata.Get("instance/network-interfaces/0/ip")
@ -156,15 +163,22 @@ func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error
Value: &keyString,
})
}
op, err := gce.service.Projects.SetCommonInstanceMetadata(gce.projectID, project.CommonInstanceMetadata).Do()
mc := newInstancesMetricContext("add_ssh_key")
op, err := gce.service.Projects.SetCommonInstanceMetadata(
gce.projectID, project.CommonInstanceMetadata).Do()
if err != nil {
glog.Errorf("Could not Set Metadata: %v", err)
mc.Observe(err)
return false, nil
}
if err := gce.waitForGlobalOp(op); err != nil {
if err := gce.waitForGlobalOp(op, mc); err != nil {
glog.Errorf("Could not Set Metadata: %v", err)
return false, nil
}
glog.Infof("Successfully added sshKey to project metadata")
return true, nil
})
@ -324,8 +338,7 @@ func (gce *GCECloud) getInstanceByName(name string) (*gceInstance, error) {
// Avoid changing behaviour when not managing multiple zones
for _, zone := range gce.managedZones {
name = canonicalizeInstanceName(name)
dc := contextWithNamespace(name, "gce_instance_list")
res, err := gce.service.Instances.Get(gce.projectID, zone, name).Context(dc).Do()
res, err := gce.service.Instances.Get(gce.projectID, zone, name).Do()
if err != nil {
glog.Errorf("getInstanceByName: failed to get instance %s; err: %v", name, err)

View File

@ -24,6 +24,7 @@ import (
"sort"
"strconv"
"strings"
"time"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
@ -42,7 +43,30 @@ type cidrs struct {
isSet bool
}
var lbSrcRngsFlag cidrs
var (
lbSrcRngsFlag cidrs
)
func newLoadBalancerMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"loadbalancer_" + request, region, unusedMetricLabel},
}
}
func newTargetPoolMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"targetpool_" + request, region, unusedMetricLabel},
}
}
func newAddressMetricContext(request, region string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"address_" + request, region, unusedMetricLabel},
}
}
func init() {
var err error
@ -106,6 +130,7 @@ func (gce *GCECloud) GetLoadBalancer(clusterName string, service *v1.Service) (*
// Our load balancers in GCE consist of four separate GCE resources - a static
// IP address, a firewall rule, a target pool, and a forwarding rule. This
// function has to manage all of them.
//
// Due to an interesting series of design decisions, this handles both creating
// new load balancers and updating existing load balancers, recognizing when
// each is needed.
@ -131,7 +156,8 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
affinityType := apiService.Spec.SessionAffinity
serviceName := types.NamespacedName{Namespace: apiService.Namespace, Name: apiService.Name}
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", loadBalancerName, gce.region, loadBalancerIP, portStr, hostNames, serviceName, apiService.Annotations)
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)",
loadBalancerName, gce.region, loadBalancerIP, portStr, hostNames, serviceName, apiService.Annotations)
// Check if the forwarding rule exists, and if so, what its IP is.
fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := gce.forwardingRuleNeedsUpdate(loadBalancerName, gce.region, loadBalancerIP, ports)
@ -139,7 +165,8 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
return nil, err
}
if !fwdRuleExists {
glog.Infof("Forwarding rule %v for Service %v/%v doesn't exist", loadBalancerName, apiService.Namespace, apiService.Name)
glog.V(2).Infof("Forwarding rule %v for Service %v/%v doesn't exist",
loadBalancerName, apiService.Namespace, apiService.Name)
}
// Make sure we know which IP address will be used and have properly reserved
@ -454,8 +481,6 @@ func (gce *GCECloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.S
return nil
}
// XXX ----------
func (gce *GCECloud) DeleteForwardingRule(name string) error {
region, err := GetGCERegion(gce.localZone)
if err != nil {
@ -465,15 +490,18 @@ func (gce *GCECloud) DeleteForwardingRule(name string) error {
}
func (gce *GCECloud) deleteForwardingRule(name, region string) error {
mc := newForwardingRuleMetricContext("delete", region)
op, err := gce.service.ForwardingRules.Delete(gce.projectID, region, name).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Forwarding rule %s already deleted. Continuing to delete other resources.", name)
} else if err != nil {
glog.Warningf("Failed to delete forwarding rule %s: got error %s.", name, err.Error())
return err
return mc.Observe(err)
} else {
if err := gce.waitForRegionOp(op, region); err != nil {
glog.Warningf("Failed waiting for forwarding rule %s to be deleted: got error %s.", name, err.Error())
if err := gce.waitForRegionOp(op, region, mc); err != nil {
glog.Warningf("Failed waiting for forwarding rule %s to be deleted: got error %s.",
name, err.Error())
return err
}
}
@ -490,18 +518,22 @@ func (gce *GCECloud) DeleteTargetPool(name string, hc *compute.HttpHealthCheck)
}
func (gce *GCECloud) deleteTargetPool(name, region string, hc *compute.HttpHealthCheck) error {
mc := newTargetPoolMetricContext("delete", region)
op, err := gce.service.TargetPools.Delete(gce.projectID, region, name).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Target pool %s already deleted. Continuing to delete other resources.", name)
} else if err != nil {
glog.Warningf("Failed to delete target pool %s, got error %s.", name, err.Error())
return err
return mc.Observe(err)
} else {
if err := gce.waitForRegionOp(op, region); err != nil {
glog.Warningf("Failed waiting for target pool %s to be deleted: got error %s.", name, err.Error())
if err := gce.waitForRegionOp(op, region, mc); err != nil {
glog.Warningf("Failed waiting for target pool %s to be deleted: got error %s.",
name, err.Error())
return err
}
}
// Deletion of health checks is allowed only after the TargetPool reference is deleted
if hc != nil {
glog.Infof("Deleting health check %v", hc.Name)
@ -546,12 +578,14 @@ func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []
SessionAffinity: translateAffinityType(affinityType),
HealthChecks: hcLinks,
}
mc := newTargetPoolMetricContext("insert", region)
op, err := gce.service.TargetPools.Insert(gce.projectID, region, pool).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
return mc.Observe(err)
}
if op != nil {
err = gce.waitForRegionOp(op, region)
err = gce.waitForRegionOp(op, region, mc)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
@ -575,22 +609,25 @@ func (gce *GCECloud) updateTargetPool(loadBalancerName string, existing sets.Str
if len(toAdd) > 0 {
add := &compute.TargetPoolsAddInstanceRequest{Instances: toAdd}
mc := newTargetPoolMetricContext("update", gce.region)
op, err := gce.service.TargetPools.AddInstance(gce.projectID, gce.region, loadBalancerName, add).Do()
if err != nil {
return err
return mc.Observe(err)
}
if err := gce.waitForRegionOp(op, gce.region); err != nil {
if err := gce.waitForRegionOp(op, gce.region, mc); err != nil {
return err
}
}
if len(toRemove) > 0 {
mc := newTargetPoolMetricContext("delete", gce.region)
rm := &compute.TargetPoolsRemoveInstanceRequest{Instances: toRemove}
op, err := gce.service.TargetPools.RemoveInstance(gce.projectID, gce.region, loadBalancerName, rm).Do()
if err != nil {
return err
return mc.Observe(err)
}
if err := gce.waitForRegionOp(op, gce.region); err != nil {
if err := gce.waitForRegionOp(op, gce.region, mc); err != nil {
return err
}
}
@ -866,12 +903,13 @@ func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress s
Target: gce.targetPoolURL(name, region),
}
mc := newForwardingRuleMetricContext("create", region)
op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
return mc.Observe(err)
}
if op != nil {
err = gce.waitForRegionOp(op, region)
err = gce.waitForRegionOp(op, region, mc)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
@ -880,16 +918,17 @@ func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress s
}
func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error {
mc := newFirewallMetricContext("create", region)
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
if err != nil {
return err
return mc.Observe(err)
}
op, err := gce.service.Firewalls.Insert(gce.projectID, firewall).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
return mc.Observe(err)
}
if op != nil {
err = gce.waitForGlobalOp(op)
err = gce.waitForGlobalOp(op, mc)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
@ -898,16 +937,17 @@ func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges nets
}
func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []v1.ServicePort, hosts []*gceInstance) error {
mc := newFirewallMetricContext("update", region)
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
if err != nil {
return err
return mc.Observe(err)
}
op, err := gce.service.Firewalls.Update(gce.projectID, makeFirewallName(name), firewall).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
return mc.Observe(err)
}
if op != nil {
err = gce.waitForGlobalOp(op)
err = gce.waitForGlobalOp(op, mc)
if err != nil {
return err
}
@ -1065,19 +1105,23 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string
Name: name,
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName),
}
if existingIP != "" {
addressObj.Address = existingIP
}
mc := newAddressMetricContext("create", region)
op, err := gce.service.Addresses.Insert(gce.projectID, region, addressObj).Do()
if err != nil {
if !isHTTPErrorCode(err, http.StatusConflict) {
return "", false, fmt.Errorf("error creating gce static IP address: %v", err)
return "", false, fmt.Errorf("error creating gce static IP address: %v",
mc.Observe(err))
}
// StatusConflict == the IP exists already.
existed = true
}
if op != nil {
err := gce.waitForRegionOp(op, region)
err := gce.waitForRegionOp(op, region, mc)
if err != nil {
if !isHTTPErrorCode(err, http.StatusConflict) {
return "", false, fmt.Errorf("error waiting for gce static IP address to be created: %v", err)
@ -1096,15 +1140,17 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string
}
func (gce *GCECloud) deleteFirewall(name, region string) error {
mc := newFirewallMetricContext("delete", region)
fwName := makeFirewallName(name)
op, err := gce.service.Firewalls.Delete(gce.projectID, fwName).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Firewall %s already deleted. Continuing to delete other resources.", name)
glog.V(2).Infof("Firewall %s already deleted. Continuing to delete other resources.", name)
} else if err != nil {
glog.Warningf("Failed to delete firewall %s, got error %v", fwName, err)
return err
return mc.Observe(err)
} else {
if err := gce.waitForGlobalOp(op); err != nil {
if err := gce.waitForGlobalOp(op, mc); err != nil {
glog.Warningf("Failed waiting for Firewall %s to be deleted. Got error: %v", fwName, err)
return err
}
@ -1113,14 +1159,15 @@ func (gce *GCECloud) deleteFirewall(name, region string) error {
}
func (gce *GCECloud) deleteStaticIP(name, region string) error {
mc := newAddressMetricContext("delete", region)
op, err := gce.service.Addresses.Delete(gce.projectID, region, name).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Static IP address %s is not reserved", name)
} else if err != nil {
glog.Warningf("Failed to delete static IP address %s, got error %v", name, err)
return err
return mc.Observe(err)
} else {
if err := gce.waitForRegionOp(op, region); err != nil {
if err := gce.waitForRegionOp(op, region, mc); err != nil {
glog.Warningf("Failed waiting for address %s to be deleted, got error: %v", name, err)
return err
}

View File

@ -1,70 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gce
import "github.com/prometheus/client_golang/prometheus"
var gceMetricMap = map[string]*prometheus.HistogramVec{
"gce_instance_list": prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "gce_instance_list_duration_seconds",
Help: "Latency of instance listing calls",
},
[]string{"namespace"},
),
"gce_disk_insert": prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "gce_disk_insert_duration_seconds",
Help: "Latency of disk insert calls",
},
[]string{"namespace"},
),
"gce_disk_delete": prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "gce_disk_delete_duration_seconds",
Help: "Latency of disk delete calls",
},
[]string{"namespace"},
),
"gce_attach_disk": prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "gce_attach_disk_duration_seconds",
Help: "Latency of attach disk calls",
},
[]string{"namespace"},
),
"gce_detach_disk": prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "gce_detach_disk_duration_seconds",
Help: "Latency of detach disk calls",
},
[]string{"namespace"},
),
"gce_list_disk": prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "gce_list_disk_duration_seconds",
Help: "Latency of list disk calls",
},
[]string{"namespace"},
),
}
func registerMetrics() {
for _, metric := range gceMetricMap {
prometheus.MustRegister(metric)
}
}

View File

@ -27,9 +27,9 @@ import (
"google.golang.org/api/googleapi"
)
func (gce *GCECloud) waitForOp(op *compute.Operation, getOperation func(operationName string) (*compute.Operation, error)) error {
func (gce *GCECloud) waitForOp(op *compute.Operation, getOperation func(operationName string) (*compute.Operation, error), mc *metricContext) error {
if op == nil {
return fmt.Errorf("operation must not be nil")
return mc.Observe(fmt.Errorf("operation must not be nil"))
}
if opIsDone(op) {
@ -38,18 +38,20 @@ func (gce *GCECloud) waitForOp(op *compute.Operation, getOperation func(operatio
opStart := time.Now()
opName := op.Name
return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) {
start := time.Now()
gce.operationPollRateLimiter.Accept()
duration := time.Now().Sub(start)
if duration > 5*time.Second {
glog.Infof("pollOperation: throttled %v for %v", duration, opName)
glog.V(2).Infof("pollOperation: throttled %v for %v", duration, opName)
}
pollOp, err := getOperation(opName)
if err != nil {
glog.Warningf("GCE poll operation %s failed: pollOp: [%v] err: [%v] getErrorFromOp: [%v]",
opName, pollOp, err, getErrorFromOp(pollOp))
}
done := opIsDone(pollOp)
if done {
duration := time.Now().Sub(opStart)
@ -60,12 +62,13 @@ func (gce *GCECloud) waitForOp(op *compute.Operation, getOperation func(operatio
glog.Warningf("waitForOperation: long operation (%v): %v (failed to encode to JSON: %v)",
duration, pollOp, err)
} else {
glog.Infof("waitForOperation: long operation (%v): %v",
glog.V(2).Infof("waitForOperation: long operation (%v): %v",
duration, string(enc))
}
}
}
return done, getErrorFromOp(pollOp)
return done, mc.Observe(getErrorFromOp(pollOp))
})
}
@ -86,20 +89,20 @@ func getErrorFromOp(op *compute.Operation) error {
return nil
}
func (gce *GCECloud) waitForGlobalOp(op *compute.Operation) error {
func (gce *GCECloud) waitForGlobalOp(op *compute.Operation, mc *metricContext) error {
return gce.waitForOp(op, func(operationName string) (*compute.Operation, error) {
return gce.service.GlobalOperations.Get(gce.projectID, operationName).Do()
})
}, mc)
}
func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error {
func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string, mc *metricContext) error {
return gce.waitForOp(op, func(operationName string) (*compute.Operation, error) {
return gce.service.RegionOperations.Get(gce.projectID, region, operationName).Do()
})
}, mc)
}
func (gce *GCECloud) waitForZoneOp(op *compute.Operation, zone string) error {
func (gce *GCECloud) waitForZoneOp(op *compute.Operation, zone string, mc *metricContext) error {
return gce.waitForOp(op, func(operationName string) (*compute.Operation, error) {
return gce.service.ZoneOperations.Get(gce.projectID, zone, operationName).Do()
})
}, mc)
}

View File

@ -21,6 +21,7 @@ import (
"net/http"
"path"
"strings"
"time"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/cloudprovider"
@ -29,6 +30,13 @@ import (
compute "google.golang.org/api/compute/v1"
)
func newRoutesMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"routes_" + request, unusedMetricLabel, unusedMetricLabel},
}
}
func (gce *GCECloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, error) {
var routes []*cloudprovider.Route
pageToken := ""
@ -80,6 +88,8 @@ func (gce *GCECloud) CreateRoute(clusterName string, nameHint string, route *clo
if err != nil {
return err
}
mc := newRoutesMetricContext("create")
insertOp, err := gce.service.Routes.Insert(gce.projectID, &compute.Route{
Name: routeName,
DestRange: route.DestinationCIDR,
@ -93,18 +103,19 @@ func (gce *GCECloud) CreateRoute(clusterName string, nameHint string, route *clo
glog.Info("Route %v already exists.")
return nil
} else {
return err
return mc.Observe(err)
}
}
return gce.waitForGlobalOp(insertOp)
return gce.waitForGlobalOp(insertOp, mc)
}
func (gce *GCECloud) DeleteRoute(clusterName string, route *cloudprovider.Route) error {
mc := newRoutesMetricContext("create")
deleteOp, err := gce.service.Routes.Delete(gce.projectID, route.Name).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(deleteOp)
return gce.waitForGlobalOp(deleteOp, mc)
}
func truncateClusterName(clusterName string) string {

View File

@ -16,33 +16,46 @@ limitations under the License.
package gce
import compute "google.golang.org/api/compute/v1"
import (
"time"
// Global static IP management
compute "google.golang.org/api/compute/v1"
)
func newStaticIPMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"staticip_" + request, unusedMetricLabel, unusedMetricLabel},
}
}
// ReserveGlobalStaticIP creates a global static IP.
// Caller is allocated a random IP if they do not specify an ipAddress. If an
// ipAddress is specified, it must belong to the current project, eg: an
// ephemeral IP associated with a global forwarding rule.
func (gce *GCECloud) ReserveGlobalStaticIP(name, ipAddress string) (address *compute.Address, err error) {
mc := newStaticIPMetricContext("reserve")
op, err := gce.service.GlobalAddresses.Insert(gce.projectID, &compute.Address{Name: name, Address: ipAddress}).Do()
if err != nil {
return nil, mc.Observe(err)
}
if err := gce.waitForGlobalOp(op, mc); err != nil {
return nil, err
}
if err := gce.waitForGlobalOp(op); err != nil {
return nil, err
}
// We have to get the address to know which IP was allocated for us.
return gce.service.GlobalAddresses.Get(gce.projectID, name).Do()
}
// DeleteGlobalStaticIP deletes a global static IP by name.
func (gce *GCECloud) DeleteGlobalStaticIP(name string) error {
mc := newStaticIPMetricContext("delete")
op, err := gce.service.GlobalAddresses.Delete(gce.projectID, name).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// GetGlobalStaticIP returns the global static IP by name.

View File

@ -18,11 +18,17 @@ package gce
import (
"net/http"
"time"
compute "google.golang.org/api/compute/v1"
)
// TargetHttpProxy management
func newTargetProxyMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"targetproxy_" + request, unusedMetricLabel, unusedMetricLabel},
}
}
// GetTargetHttpProxy returns the UrlMap by name.
func (gce *GCECloud) GetTargetHttpProxy(name string) (*compute.TargetHttpProxy, error) {
@ -35,11 +41,13 @@ func (gce *GCECloud) CreateTargetHttpProxy(urlMap *compute.UrlMap, name string)
Name: name,
UrlMap: urlMap.SelfLink,
}
mc := newTargetProxyMetricContext("create")
op, err := gce.service.TargetHttpProxies.Insert(gce.projectID, proxy).Do()
if err != nil {
return nil, err
return nil, mc.Observe(err)
}
if err = gce.waitForGlobalOp(op); err != nil {
if err = gce.waitForGlobalOp(op, mc); err != nil {
return nil, err
}
return gce.GetTargetHttpProxy(name)
@ -47,23 +55,26 @@ func (gce *GCECloud) CreateTargetHttpProxy(urlMap *compute.UrlMap, name string)
// SetUrlMapForTargetHttpProxy sets the given UrlMap for the given TargetHttpProxy.
func (gce *GCECloud) SetUrlMapForTargetHttpProxy(proxy *compute.TargetHttpProxy, urlMap *compute.UrlMap) error {
op, err := gce.service.TargetHttpProxies.SetUrlMap(gce.projectID, proxy.Name, &compute.UrlMapReference{UrlMap: urlMap.SelfLink}).Do()
mc := newTargetProxyMetricContext("set_url_map")
op, err := gce.service.TargetHttpProxies.SetUrlMap(
gce.projectID, proxy.Name, &compute.UrlMapReference{UrlMap: urlMap.SelfLink}).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// DeleteTargetHttpProxy deletes the TargetHttpProxy by name.
func (gce *GCECloud) DeleteTargetHttpProxy(name string) error {
mc := newTargetProxyMetricContext("delete")
op, err := gce.service.TargetHttpProxies.Delete(gce.projectID, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return nil
}
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// ListTargetHttpProxies lists all TargetHttpProxies in the project.
@ -81,6 +92,7 @@ func (gce *GCECloud) GetTargetHttpsProxy(name string) (*compute.TargetHttpsProxy
// CreateTargetHttpsProxy creates and returns a TargetHttpsProxy with the given UrlMap and SslCertificate.
func (gce *GCECloud) CreateTargetHttpsProxy(urlMap *compute.UrlMap, sslCert *compute.SslCertificate, name string) (*compute.TargetHttpsProxy, error) {
mc := newTargetProxyMetricContext("delete")
proxy := &compute.TargetHttpsProxy{
Name: name,
UrlMap: urlMap.SelfLink,
@ -88,9 +100,9 @@ func (gce *GCECloud) CreateTargetHttpsProxy(urlMap *compute.UrlMap, sslCert *com
}
op, err := gce.service.TargetHttpsProxies.Insert(gce.projectID, proxy).Do()
if err != nil {
return nil, err
return nil, mc.Observe(err)
}
if err = gce.waitForGlobalOp(op); err != nil {
if err = gce.waitForGlobalOp(op, mc); err != nil {
return nil, err
}
return gce.GetTargetHttpsProxy(name)
@ -98,32 +110,38 @@ func (gce *GCECloud) CreateTargetHttpsProxy(urlMap *compute.UrlMap, sslCert *com
// SetUrlMapForTargetHttpsProxy sets the given UrlMap for the given TargetHttpsProxy.
func (gce *GCECloud) SetUrlMapForTargetHttpsProxy(proxy *compute.TargetHttpsProxy, urlMap *compute.UrlMap) error {
op, err := gce.service.TargetHttpsProxies.SetUrlMap(gce.projectID, proxy.Name, &compute.UrlMapReference{UrlMap: urlMap.SelfLink}).Do()
mc := newTargetProxyMetricContext("set_url_map")
op, err := gce.service.TargetHttpsProxies.SetUrlMap(
gce.projectID, proxy.Name, &compute.UrlMapReference{UrlMap: urlMap.SelfLink}).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// SetSslCertificateForTargetHttpsProxy sets the given SslCertificate for the given TargetHttpsProxy.
func (gce *GCECloud) SetSslCertificateForTargetHttpsProxy(proxy *compute.TargetHttpsProxy, sslCert *compute.SslCertificate) error {
op, err := gce.service.TargetHttpsProxies.SetSslCertificates(gce.projectID, proxy.Name, &compute.TargetHttpsProxiesSetSslCertificatesRequest{SslCertificates: []string{sslCert.SelfLink}}).Do()
mc := newTargetProxyMetricContext("set_ssl_cert")
op, err := gce.service.TargetHttpsProxies.SetSslCertificates(
gce.projectID, proxy.Name, &compute.TargetHttpsProxiesSetSslCertificatesRequest{SslCertificates: []string{sslCert.SelfLink}}).Do()
if err != nil {
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// DeleteTargetHttpsProxy deletes the TargetHttpsProxy by name.
func (gce *GCECloud) DeleteTargetHttpsProxy(name string) error {
mc := newTargetProxyMetricContext("delete")
op, err := gce.service.TargetHttpsProxies.Delete(gce.projectID, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return nil
}
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// ListTargetHttpsProxies lists all TargetHttpsProxies in the project.

View File

@ -18,11 +18,17 @@ package gce
import (
"net/http"
"time"
compute "google.golang.org/api/compute/v1"
)
// UrlMap management
func newUrlMapMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
attributes: []string{"urlmap_" + request, unusedMetricLabel, unusedMetricLabel},
}
}
// GetUrlMap returns the UrlMap by name.
func (gce *GCECloud) GetUrlMap(name string) (*compute.UrlMap, error) {
@ -35,11 +41,12 @@ func (gce *GCECloud) CreateUrlMap(backend *compute.BackendService, name string)
Name: name,
DefaultService: backend.SelfLink,
}
mc := newUrlMapMetricContext("create")
op, err := gce.service.UrlMaps.Insert(gce.projectID, urlMap).Do()
if err != nil {
return nil, err
return nil, mc.Observe(err)
}
if err = gce.waitForGlobalOp(op); err != nil {
if err = gce.waitForGlobalOp(op, mc); err != nil {
return nil, err
}
return gce.GetUrlMap(name)
@ -47,11 +54,12 @@ func (gce *GCECloud) CreateUrlMap(backend *compute.BackendService, name string)
// UpdateUrlMap applies the given UrlMap as an update, and returns the new UrlMap.
func (gce *GCECloud) UpdateUrlMap(urlMap *compute.UrlMap) (*compute.UrlMap, error) {
mc := newUrlMapMetricContext("update")
op, err := gce.service.UrlMaps.Update(gce.projectID, urlMap.Name, urlMap).Do()
if err != nil {
return nil, err
return nil, mc.Observe(err)
}
if err = gce.waitForGlobalOp(op); err != nil {
if err = gce.waitForGlobalOp(op, mc); err != nil {
return nil, err
}
return gce.service.UrlMaps.Get(gce.projectID, urlMap.Name).Do()
@ -59,14 +67,15 @@ func (gce *GCECloud) UpdateUrlMap(urlMap *compute.UrlMap) (*compute.UrlMap, erro
// DeleteUrlMap deletes a url map by name.
func (gce *GCECloud) DeleteUrlMap(name string) error {
mc := newUrlMapMetricContext("delete")
op, err := gce.service.UrlMaps.Delete(gce.projectID, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return nil
}
return err
return mc.Observe(err)
}
return gce.waitForGlobalOp(op)
return gce.waitForGlobalOp(op, mc)
}
// ListUrlMaps lists all UrlMaps in the project.

View File

@ -0,0 +1,80 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gce
import (
"time"
"github.com/prometheus/client_golang/prometheus"
)
type apiCallMetrics struct {
latency *prometheus.HistogramVec
errors *prometheus.CounterVec
}
var (
apiMetrics = registerAPIMetrics(
"request", // API function that is begin invoked.
"region", // region (optional).
"zone", // zone (optional).
)
)
type metricContext struct {
start time.Time
attributes []string
}
// Value for an unused label in the metric dimension.
const unusedMetricLabel = "<n/a>"
// Observe the result of a API call.
func (mc *metricContext) Observe(err error) error {
apiMetrics.latency.WithLabelValues(mc.attributes...).Observe(
time.Since(mc.start).Seconds())
if err != nil {
apiMetrics.errors.WithLabelValues(mc.attributes...).Inc()
}
return err
}
// registerApiMetrics adds metrics definitions for a category of API calls.
func registerAPIMetrics(attributes ...string) *apiCallMetrics {
metrics := &apiCallMetrics{
latency: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "cloudprovider_gce_api_request_duration_seconds",
Help: "Latency of a GCE API call",
},
attributes,
),
errors: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cloudprovider_gce_api_request_errors",
Help: "Number of errors for an API call",
},
attributes,
),
}
prometheus.MustRegister(metrics.latency)
prometheus.MustRegister(metrics.errors)
return metrics
}