forked from grafana/mimir
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathquery_scheduler_test.go
131 lines (105 loc) · 6.3 KB
/
query_scheduler_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
// SPDX-License-Identifier: AGPL-3.0-only
//go:build requires_docker
package integration
import (
"sort"
"testing"
"time"
"github.com/grafana/e2e"
e2edb "github.com/grafana/e2e/db"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/mimir/integration/e2emimir"
)
func TestQuerySchedulerWithMaxUsedInstances(t *testing.T) {
runTestQuerySchedulerWithMaxUsedInstances(t, "series_1", generateFloatSeries)
}
func TestQuerySchedulerWithMaxUsedInstancesHistogram(t *testing.T) {
runTestQuerySchedulerWithMaxUsedInstances(t, "hseries_1", generateHistogramSeries)
}
func runTestQuerySchedulerWithMaxUsedInstances(t *testing.T, seriesName string, genSeries generateSeriesFunc) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()
flags := mergeFlags(
BlocksStorageFlags(),
BlocksStorageS3Flags(),
map[string]string{
"-query-scheduler.service-discovery-mode": "ring",
"-query-scheduler.ring.store": "consul",
"-query-scheduler.max-used-instances": "1",
"-querier.max-concurrent": "8",
},
)
// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))
flags["-query-scheduler.ring.consul.hostname"] = consul.NetworkHTTPEndpoint()
// Start 2 query-scheduler. We override the address registered in the ring so that we can easily predict it
// when computing the expected in-use query-scheduler instance.
queryScheduler1 := e2emimir.NewQueryScheduler("query-scheduler-1", mergeFlags(flags, map[string]string{"-query-scheduler.ring.instance-addr": e2e.NetworkContainerHost(s.NetworkName(), "query-scheduler-1")}))
queryScheduler2 := e2emimir.NewQueryScheduler("query-scheduler-2", mergeFlags(flags, map[string]string{"-query-scheduler.ring.instance-addr": e2e.NetworkContainerHost(s.NetworkName(), "query-scheduler-2")}))
require.NoError(t, s.StartAndWaitReady(queryScheduler1, queryScheduler2))
// Start all other Mimir services.
queryFrontend := e2emimir.NewQueryFrontend("query-frontend", consul.NetworkHTTPEndpoint(), flags)
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), flags)
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(queryFrontend, querier, ingester, distributor))
// Wait until distributor and querier have updated the ingesters ring.
for _, service := range []*e2emimir.MimirService{distributor, querier} {
require.NoError(t, service.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))),
"Service: %s", service.Name())
}
// Wait until query-frontend and querier have updated the query-schedulers ring.
require.NoError(t, queryFrontend.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "query-frontend-query-scheduler-client"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(2), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "querier-query-scheduler-client"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
// Compute which is the expected in-use query-scheduler.
schedulers := []*e2emimir.MimirService{queryScheduler1, queryScheduler2}
sort.Slice(schedulers, func(i, j int) bool { return schedulers[i].NetworkGRPCEndpoint() < schedulers[j].NetworkGRPCEndpoint() })
inUseScheduler := schedulers[0]
notInUseScheduler := schedulers[1]
// The minimum number of connections per scheduler is 4 in order to avoid queue starvation
// when the RequestQueue utilizes the querier-worker queue prioritization algorithm.
// Although the max-concurrent is set to 8, the querier will create an extra 4 connections
// per not-in-use scheduler to meet the minimum requirements per connected RequestQueue instance.
require.NoError(t, inUseScheduler.WaitSumMetricsWithOptions(e2e.Equals(8), []string{"cortex_query_scheduler_connected_querier_clients"}))
require.NoError(t, notInUseScheduler.WaitSumMetricsWithOptions(e2e.Equals(4), []string{"cortex_query_scheduler_connected_querier_clients"}))
// We expect the query-frontend to only open connections to the in-use scheduler.
require.NoError(t, inUseScheduler.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_query_scheduler_connected_frontend_clients"}))
require.NoError(t, notInUseScheduler.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_query_scheduler_connected_frontend_clients"}))
// Push some series to Mimir.
now := time.Now()
series, expectedVector, _ := genSeries(seriesName, now, prompb.Label{Name: "foo", Value: "bar"})
c, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)
res, err := c.Push(series)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
// Query the series.
result, err := c.Query(seriesName, now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))
// Terminate the in-use query-scheduler.
require.NoError(t, s.Stop(inUseScheduler))
// We expect the querier to transfer all connections up to the configured max to the previously not-in-use scheduler.
require.NoError(t, notInUseScheduler.WaitSumMetricsWithOptions(e2e.Equals(8), []string{"cortex_query_scheduler_connected_querier_clients"}))
// We expect the query-frontend to open connections to the previously not-in-use scheduler.
require.NoError(t, notInUseScheduler.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_query_scheduler_connected_frontend_clients"}))
// Query the series.
result, err = c.Query(seriesName, now)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))
}