@ -16,6 +16,8 @@ package remote
import (
import (
"context"
"context"
"fmt"
"fmt"
"io/ioutil"
"os"
"reflect"
"reflect"
"sync"
"sync"
"sync/atomic"
"sync/atomic"
@ -49,7 +51,12 @@ func TestSampleDelivery(t *testing.T) {
cfg . BatchSendDeadline = model . Duration ( 100 * time . Millisecond )
cfg . BatchSendDeadline = model . Duration ( 100 * time . Millisecond )
cfg . MaxShards = 1
cfg . MaxShards = 1
var temp int64
var temp int64
m := NewQueueManager ( nil , "" , newEWMARate ( ewmaWeight , shardUpdateDuration ) , & temp , cfg , nil , nil , c , defaultFlushDeadline , 0 )
dir , err := ioutil . TempDir ( "" , "TestSampleDeliver" )
testutil . Ok ( t , err )
defer os . RemoveAll ( dir )
m := NewQueueManager ( nil , dir , newEWMARate ( ewmaWeight , shardUpdateDuration ) , & temp , cfg , nil , nil , c , defaultFlushDeadline , 0 )
m . seriesLabels = refSeriesToLabelsProto ( series )
m . seriesLabels = refSeriesToLabelsProto ( series )
// These should be received by the client.
// These should be received by the client.
@ -73,7 +80,12 @@ func TestSampleDeliveryTimeout(t *testing.T) {
cfg . MaxShards = 1
cfg . MaxShards = 1
cfg . BatchSendDeadline = model . Duration ( 100 * time . Millisecond )
cfg . BatchSendDeadline = model . Duration ( 100 * time . Millisecond )
var temp int64
var temp int64
m := NewQueueManager ( nil , "" , newEWMARate ( ewmaWeight , shardUpdateDuration ) , & temp , cfg , nil , nil , c , defaultFlushDeadline , 0 )
dir , err := ioutil . TempDir ( "" , "TestSampleDeliveryTimeout" )
testutil . Ok ( t , err )
defer os . RemoveAll ( dir )
m := NewQueueManager ( nil , dir , newEWMARate ( ewmaWeight , shardUpdateDuration ) , & temp , cfg , nil , nil , c , defaultFlushDeadline , 0 )
m . seriesLabels = refSeriesToLabelsProto ( series )
m . seriesLabels = refSeriesToLabelsProto ( series )
m . Start ( )
m . Start ( )
defer m . Stop ( )
defer m . Stop ( )
@ -109,7 +121,12 @@ func TestSampleDeliveryOrder(t *testing.T) {
c := NewTestStorageClient ( )
c := NewTestStorageClient ( )
c . expectSamples ( samples , series )
c . expectSamples ( samples , series )
var temp int64
var temp int64
m := NewQueueManager ( nil , "" , newEWMARate ( ewmaWeight , shardUpdateDuration ) , & temp , config . DefaultQueueConfig , nil , nil , c , defaultFlushDeadline , 0 )
dir , err := ioutil . TempDir ( "" , "TestSampleDeliveryOrder" )
testutil . Ok ( t , err )
defer os . RemoveAll ( dir )
m := NewQueueManager ( nil , dir , newEWMARate ( ewmaWeight , shardUpdateDuration ) , & temp , config . DefaultQueueConfig , nil , nil , c , defaultFlushDeadline , 0 )
m . seriesLabels = refSeriesToLabelsProto ( series )
m . seriesLabels = refSeriesToLabelsProto ( series )
m . Start ( )
m . Start ( )
@ -124,7 +141,12 @@ func TestShutdown(t *testing.T) {
c := NewTestBlockedStorageClient ( )
c := NewTestBlockedStorageClient ( )
var temp int64
var temp int64
m := NewQueueManager ( nil , "" , newEWMARate ( ewmaWeight , shardUpdateDuration ) , & temp , config . DefaultQueueConfig , nil , nil , c , deadline , 0 )
dir , err := ioutil . TempDir ( "" , "TestShutdown" )
testutil . Ok ( t , err )
defer os . RemoveAll ( dir )
m := NewQueueManager ( nil , dir , newEWMARate ( ewmaWeight , shardUpdateDuration ) , & temp , config . DefaultQueueConfig , nil , nil , c , deadline , 0 )
samples , series := createTimeseries ( 2 * config . DefaultQueueConfig . MaxSamplesPerSend )
samples , series := createTimeseries ( 2 * config . DefaultQueueConfig . MaxSamplesPerSend )
m . seriesLabels = refSeriesToLabelsProto ( series )
m . seriesLabels = refSeriesToLabelsProto ( series )
m . Start ( )
m . Start ( )
@ -157,7 +179,11 @@ func TestSeriesReset(t *testing.T) {
numSegments := 4
numSegments := 4
numSeries := 25
numSeries := 25
m := NewQueueManager ( nil , "" , newEWMARate ( ewmaWeight , shardUpdateDuration ) , & temp , config . DefaultQueueConfig , nil , nil , c , deadline , 0 )
dir , err := ioutil . TempDir ( "" , "TestSeriesReset" )
testutil . Ok ( t , err )
defer os . RemoveAll ( dir )
m := NewQueueManager ( nil , dir , newEWMARate ( ewmaWeight , shardUpdateDuration ) , & temp , config . DefaultQueueConfig , nil , nil , c , deadline , 0 )
for i := 0 ; i < numSegments ; i ++ {
for i := 0 ; i < numSegments ; i ++ {
series := [ ] tsdb . RefSeries { }
series := [ ] tsdb . RefSeries { }
for j := 0 ; j < numSeries ; j ++ {
for j := 0 ; j < numSeries ; j ++ {
@ -182,7 +208,12 @@ func TestReshard(t *testing.T) {
cfg . MaxShards = 1
cfg . MaxShards = 1
var temp int64
var temp int64
m := NewQueueManager ( nil , "" , newEWMARate ( ewmaWeight , shardUpdateDuration ) , & temp , cfg , nil , nil , c , defaultFlushDeadline , 0 )
dir , err := ioutil . TempDir ( "" , "TestReshard" )
testutil . Ok ( t , err )
defer os . RemoveAll ( dir )
m := NewQueueManager ( nil , dir , newEWMARate ( ewmaWeight , shardUpdateDuration ) , & temp , cfg , nil , nil , c , defaultFlushDeadline , 0 )
m . seriesLabels = refSeriesToLabelsProto ( series )
m . seriesLabels = refSeriesToLabelsProto ( series )
m . Start ( )
m . Start ( )