Fixed e2e tests

pull/6/head
Piotr Szczesniak 2016-01-25 14:10:24 +01:00
parent 331318b3fe
commit 8aebecc232
2 changed files with 50 additions and 49 deletions

View File

@ -20,7 +20,6 @@ import (
"fmt"
"time"
influxdb "github.com/influxdb/influxdb/client"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/kubernetes/pkg/api"
@ -30,14 +29,7 @@ var _ = Describe("Initial Resources [Skipped] ", func() {
f := NewFramework("initial-resources")
It("should set initial resources based on historical data", func() {
// Cleanup data in InfluxDB that left from previous tests.
influxdbClient, err := getInfluxdbClient(f.Client)
expectNoError(err, "failed to create influxdb client")
_, err = influxdbClient.Query("drop series autoscaling.cpu.usage.2m", influxdb.Second)
expectNoError(err)
_, err = influxdbClient.Query("drop series autoscaling.memory.usage.2m", influxdb.Second)
expectNoError(err)
// TODO(piosz): Add cleanup data in InfluxDB that left from previous tests.
cpu := 100
mem := 200
for i := 0; i < 10; i++ {

View File

@ -17,6 +17,8 @@ limitations under the License.
package e2e
import (
"bytes"
"encoding/json"
"fmt"
"net/url"
"time"
@ -49,10 +51,8 @@ var _ = Describe("[Flaky] Monitoring", func() {
const (
influxdbService = "monitoring-influxdb"
influxdbDatabaseName = "k8s"
influxdbUser = "root"
influxdbPW = "root"
podlistQuery = "select distinct(pod_id) from \"cpu/usage_ns_cumulative\""
nodelistQuery = "select distinct(hostname) from \"cpu/usage_ns_cumulative\""
podlistQuery = "show tag values from \"cpu/usage\" with key = pod_id"
nodelistQuery = "show tag values from \"cpu/usage\" with key = hostname"
sleepBetweenAttempts = 5 * time.Second
testTimeout = 5 * time.Minute
)
@ -65,6 +65,35 @@ var (
}
)
// Query sends a command to the server and returns the Response
func Query(c *client.Client, query string) (*influxdb.Response, error) {
result, err := c.Get().
Prefix("proxy").
Namespace("kube-system").
Resource("services").
Name(influxdbService+":api").
Suffix("query").
Param("q", query).
Param("db", influxdbDatabaseName).
Param("epoch", "s").
Do().
Raw()
if err != nil {
return nil, err
}
var response influxdb.Response
dec := json.NewDecoder(bytes.NewReader(result))
dec.UseNumber()
err = dec.Decode(&response)
if err != nil {
return nil, err
}
return &response, nil
}
func verifyExpectedRcsExistAndGetExpectedPods(c *client.Client) ([]string, error) {
expectedPods := []string{}
// Iterate over the labels that identify the replication controllers that we
@ -135,41 +164,24 @@ func getAllNodesInCluster(c *client.Client) ([]string, error) {
return result, nil
}
func getInfluxdbClient(c *client.Client) (*influxdb.Client, error) {
proxyUrl := fmt.Sprintf("%s/api/v1/proxy/namespaces/%s/services/%s:api/", getMasterHost(), api.NamespaceSystem, influxdbService)
config := &influxdb.ClientConfig{
Host: proxyUrl,
// TODO(vishh): Infer username and pw from the Pod spec.
Username: influxdbUser,
Password: influxdbPW,
Database: influxdbDatabaseName,
HttpClient: c.Client,
IsSecure: true,
}
return influxdb.NewClient(config)
}
func getInfluxdbData(c *influxdb.Client, query string) (map[string]bool, error) {
series, err := c.Query(query, influxdb.Second)
func getInfluxdbData(c *client.Client, query string, tag string) (map[string]bool, error) {
response, err := Query(c, query)
if err != nil {
return nil, err
}
if len(series) != 1 {
return nil, fmt.Errorf("expected only one series from Influxdb for query %q. Got %+v", query, series)
if len(response.Results) != 1 {
return nil, fmt.Errorf("expected only one result from Influxdb for query %q. Got %+v", query, response)
}
if len(series[0].GetColumns()) != 2 {
Failf("Expected two columns for query %q. Found %v", query, series[0].GetColumns())
if len(response.Results[0].Series) != 1 {
return nil, fmt.Errorf("expected exactly one series for query %q.", query)
}
if len(response.Results[0].Series[0].Columns) != 1 {
Failf("Expected one column for query %q. Found %v", query, response.Results[0].Series[0].Columns)
}
result := map[string]bool{}
for _, point := range series[0].GetPoints() {
if len(point) != 2 {
Failf("Expected only two entries in a point for query %q. Got %v", query, point)
}
name, ok := point[1].(string)
if !ok {
Failf("expected %v to be a string, but it is %T", point[1], point[1])
}
result[name] = false
for _, value := range response.Results[0].Series[0].Values {
name := value[0].(string)
result[name] = true
}
return result, nil
}
@ -186,14 +198,14 @@ func expectedItemsExist(expectedItems []string, actualItems map[string]bool) boo
return true
}
func validatePodsAndNodes(influxdbClient *influxdb.Client, expectedPods, expectedNodes []string) bool {
pods, err := getInfluxdbData(influxdbClient, podlistQuery)
func validatePodsAndNodes(c *client.Client, expectedPods, expectedNodes []string) bool {
pods, err := getInfluxdbData(c, podlistQuery, "pod_id")
if err != nil {
// We don't fail the test here because the influxdb service might still not be running.
Logf("failed to query list of pods from influxdb. Query: %q, Err: %v", podlistQuery, err)
return false
}
nodes, err := getInfluxdbData(influxdbClient, nodelistQuery)
nodes, err := getInfluxdbData(c, nodelistQuery, "hostname")
if err != nil {
Logf("failed to query list of nodes from influxdb. Query: %q, Err: %v", nodelistQuery, err)
return false
@ -222,14 +234,11 @@ func testMonitoringUsingHeapsterInfluxdb(c *client.Client) {
expectNoError(expectedServicesExist(c))
// TODO: Wait for all pods and services to be running.
influxdbClient, err := getInfluxdbClient(c)
expectNoError(err, "failed to create influxdb client")
expectedNodes, err := getAllNodesInCluster(c)
expectNoError(err)
startTime := time.Now()
for {
if validatePodsAndNodes(influxdbClient, expectedPods, expectedNodes) {
if validatePodsAndNodes(c, expectedPods, expectedNodes) {
return
}
if time.Since(startTime) >= testTimeout {