diff --git a/test/e2e/initial_resources.go b/test/e2e/initial_resources.go index 26a37c539d..2138137de8 100644 --- a/test/e2e/initial_resources.go +++ b/test/e2e/initial_resources.go @@ -20,7 +20,6 @@ import ( "fmt" "time" - influxdb "github.com/influxdb/influxdb/client" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" "k8s.io/kubernetes/pkg/api" @@ -30,14 +29,7 @@ var _ = Describe("Initial Resources [Skipped] ", func() { f := NewFramework("initial-resources") It("should set initial resources based on historical data", func() { - // Cleanup data in InfluxDB that left from previous tests. - influxdbClient, err := getInfluxdbClient(f.Client) - expectNoError(err, "failed to create influxdb client") - _, err = influxdbClient.Query("drop series autoscaling.cpu.usage.2m", influxdb.Second) - expectNoError(err) - _, err = influxdbClient.Query("drop series autoscaling.memory.usage.2m", influxdb.Second) - expectNoError(err) - + // TODO(piosz): Add cleanup data in InfluxDB that left from previous tests. cpu := 100 mem := 200 for i := 0; i < 10; i++ { diff --git a/test/e2e/monitoring.go b/test/e2e/monitoring.go index ae93b9f5e0..43f4953138 100644 --- a/test/e2e/monitoring.go +++ b/test/e2e/monitoring.go @@ -17,6 +17,8 @@ limitations under the License. package e2e import ( + "bytes" + "encoding/json" "fmt" "net/url" "time" @@ -49,10 +51,8 @@ var _ = Describe("[Flaky] Monitoring", func() { const ( influxdbService = "monitoring-influxdb" influxdbDatabaseName = "k8s" - influxdbUser = "root" - influxdbPW = "root" - podlistQuery = "select distinct(pod_id) from \"cpu/usage_ns_cumulative\"" - nodelistQuery = "select distinct(hostname) from \"cpu/usage_ns_cumulative\"" + podlistQuery = "show tag values from \"cpu/usage\" with key = pod_id" + nodelistQuery = "show tag values from \"cpu/usage\" with key = hostname" sleepBetweenAttempts = 5 * time.Second testTimeout = 5 * time.Minute ) @@ -65,6 +65,35 @@ var ( } ) +// Query sends a command to the server and returns the Response +func Query(c *client.Client, query string) (*influxdb.Response, error) { + result, err := c.Get(). + Prefix("proxy"). + Namespace("kube-system"). + Resource("services"). + Name(influxdbService+":api"). + Suffix("query"). + Param("q", query). + Param("db", influxdbDatabaseName). + Param("epoch", "s"). + Do(). + Raw() + + if err != nil { + return nil, err + } + + var response influxdb.Response + dec := json.NewDecoder(bytes.NewReader(result)) + dec.UseNumber() + err = dec.Decode(&response) + + if err != nil { + return nil, err + } + return &response, nil +} + func verifyExpectedRcsExistAndGetExpectedPods(c *client.Client) ([]string, error) { expectedPods := []string{} // Iterate over the labels that identify the replication controllers that we @@ -135,41 +164,24 @@ func getAllNodesInCluster(c *client.Client) ([]string, error) { return result, nil } -func getInfluxdbClient(c *client.Client) (*influxdb.Client, error) { - proxyUrl := fmt.Sprintf("%s/api/v1/proxy/namespaces/%s/services/%s:api/", getMasterHost(), api.NamespaceSystem, influxdbService) - config := &influxdb.ClientConfig{ - Host: proxyUrl, - // TODO(vishh): Infer username and pw from the Pod spec. - Username: influxdbUser, - Password: influxdbPW, - Database: influxdbDatabaseName, - HttpClient: c.Client, - IsSecure: true, - } - return influxdb.NewClient(config) -} - -func getInfluxdbData(c *influxdb.Client, query string) (map[string]bool, error) { - series, err := c.Query(query, influxdb.Second) +func getInfluxdbData(c *client.Client, query string, tag string) (map[string]bool, error) { + response, err := Query(c, query) if err != nil { return nil, err } - if len(series) != 1 { - return nil, fmt.Errorf("expected only one series from Influxdb for query %q. Got %+v", query, series) + if len(response.Results) != 1 { + return nil, fmt.Errorf("expected only one result from Influxdb for query %q. Got %+v", query, response) } - if len(series[0].GetColumns()) != 2 { - Failf("Expected two columns for query %q. Found %v", query, series[0].GetColumns()) + if len(response.Results[0].Series) != 1 { + return nil, fmt.Errorf("expected exactly one series for query %q.", query) + } + if len(response.Results[0].Series[0].Columns) != 1 { + Failf("Expected one column for query %q. Found %v", query, response.Results[0].Series[0].Columns) } result := map[string]bool{} - for _, point := range series[0].GetPoints() { - if len(point) != 2 { - Failf("Expected only two entries in a point for query %q. Got %v", query, point) - } - name, ok := point[1].(string) - if !ok { - Failf("expected %v to be a string, but it is %T", point[1], point[1]) - } - result[name] = false + for _, value := range response.Results[0].Series[0].Values { + name := value[0].(string) + result[name] = true } return result, nil } @@ -186,14 +198,14 @@ func expectedItemsExist(expectedItems []string, actualItems map[string]bool) boo return true } -func validatePodsAndNodes(influxdbClient *influxdb.Client, expectedPods, expectedNodes []string) bool { - pods, err := getInfluxdbData(influxdbClient, podlistQuery) +func validatePodsAndNodes(c *client.Client, expectedPods, expectedNodes []string) bool { + pods, err := getInfluxdbData(c, podlistQuery, "pod_id") if err != nil { // We don't fail the test here because the influxdb service might still not be running. Logf("failed to query list of pods from influxdb. Query: %q, Err: %v", podlistQuery, err) return false } - nodes, err := getInfluxdbData(influxdbClient, nodelistQuery) + nodes, err := getInfluxdbData(c, nodelistQuery, "hostname") if err != nil { Logf("failed to query list of nodes from influxdb. Query: %q, Err: %v", nodelistQuery, err) return false @@ -222,14 +234,11 @@ func testMonitoringUsingHeapsterInfluxdb(c *client.Client) { expectNoError(expectedServicesExist(c)) // TODO: Wait for all pods and services to be running. - influxdbClient, err := getInfluxdbClient(c) - expectNoError(err, "failed to create influxdb client") - expectedNodes, err := getAllNodesInCluster(c) expectNoError(err) startTime := time.Now() for { - if validatePodsAndNodes(influxdbClient, expectedPods, expectedNodes) { + if validatePodsAndNodes(c, expectedPods, expectedNodes) { return } if time.Since(startTime) >= testTimeout {