Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
* [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152
* [FEATURE] Ingester: Add experimental active series queried metric. #7173
* [FEATURE] StoreGateway: Add optional limit `blocks-storage.bucket-store.max-concurrent-bytes` on series bytes per store gateway to protect from oomkill. This returns an error that is retryable at querier level. #7271
* [ENHANCEMENT] Distributor: Add `cortex_distributor_push_requests_total` metric to track the number of push requests by type. #7239
* [ENHANCEMENT] Querier: Add `-querier.store-gateway-series-batch-size` flag to configure the maximum number of series to be batched in a single gRPC response message from Store Gateways. #7203
* [ENHANCEMENT] HATracker: Add `-distributor.ha-tracker.enable-startup-sync` flag. If enabled, the ha-tracker fetches all tracked keys on startup to populate the local cache. #7213
Expand Down
6 changes: 6 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,12 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.max-inflight-requests
[max_inflight_requests: <int> | default = 0]

# Max number of bytes being processed concurrently across all queries. When
# the limit is reached, new requests are rejected with HTTP 503. 0 to
# disable.
# CLI flag: -blocks-storage.bucket-store.max-concurrent-bytes
[max_concurrent_bytes: <int> | default = 0]

# Maximum number of concurrent tenants syncing blocks.
# CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency
[tenant_sync_concurrency: <int> | default = 10]
Expand Down
6 changes: 6 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,12 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.max-inflight-requests
[max_inflight_requests: <int> | default = 0]

# Max number of bytes being processed concurrently across all queries. When
# the limit is reached, new requests are rejected with HTTP 503. 0 to
# disable.
# CLI flag: -blocks-storage.bucket-store.max-concurrent-bytes
[max_concurrent_bytes: <int> | default = 0]

# Maximum number of concurrent tenants syncing blocks.
# CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency
[tenant_sync_concurrency: <int> | default = 10]
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1325,6 +1325,11 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.max-inflight-requests
[max_inflight_requests: <int> | default = 0]

# Max number of bytes being processed concurrently across all queries. When
# the limit is reached, new requests are rejected with HTTP 503. 0 to disable.
# CLI flag: -blocks-storage.bucket-store.max-concurrent-bytes
[max_concurrent_bytes: <int> | default = 0]

# Maximum number of concurrent tenants syncing blocks.
# CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency
[tenant_sync_concurrency: <int> | default = 10]
Expand Down
13 changes: 10 additions & 3 deletions pkg/frontend/transport/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/querier/tripperware"
"github.com/cortexproject/cortex/pkg/storegateway"
)

type Retry struct {
Expand Down Expand Up @@ -77,9 +78,15 @@ func (r *Retry) Do(ctx context.Context, f func() (*httpgrpc.HTTPResponse, error)
}

func isBodyRetryable(body string) bool {
// If pool exhausted, retry at query frontend might make things worse.
// Rely on retries at querier level only.
return !strings.Contains(body, pool.ErrPoolExhausted.Error())
// If pool exhausted or concurrent bytes limit exceeded, retry at query frontend
// might make things worse. Rely on retries at querier level only.
if strings.Contains(body, pool.ErrPoolExhausted.Error()) {
return false
}
if strings.Contains(body, storegateway.ErrMaxConcurrentBytesLimitExceeded.Error()) {
return false
}
return true
}

func yoloString(b []byte) string {
Expand Down
23 changes: 23 additions & 0 deletions pkg/frontend/transport/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

"github.com/cortexproject/cortex/pkg/storegateway"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/pool"
"github.com/weaveworks/common/httpgrpc"
Expand Down Expand Up @@ -52,3 +53,25 @@ func TestNoRetryOnChunkPoolExhaustion(t *testing.T) {
require.NoError(t, err)
require.Equal(t, int32(500), res.Code)
}

func TestNoRetryOnMaxConcurrentBytesLimitExceeded(t *testing.T) {
tries := atomic.NewInt64(3)
r := NewRetry(3, nil)
ctx := context.Background()
res, err := r.Do(ctx, func() (*httpgrpc.HTTPResponse, error) {
try := tries.Dec()
if try > 1 {
return &httpgrpc.HTTPResponse{
Code: 500,
Body: []byte(storegateway.ErrMaxConcurrentBytesLimitExceeded.Error()),
}, nil
}
return &httpgrpc.HTTPResponse{
Code: 200,
}, nil

})

require.NoError(t, err)
require.Equal(t, int32(500), res.Code)
}
5 changes: 3 additions & 2 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -1239,8 +1239,9 @@ func isRetryableError(err error) bool {
case codes.Canceled:
return strings.Contains(err.Error(), "grpc: the client connection is closing")
case codes.Unknown:
// Catch chunks pool exhaustion error only.
return strings.Contains(err.Error(), pool.ErrPoolExhausted.Error())
// Catch chunks pool exhaustion error or concurrent bytes limit exceeded error.
return strings.Contains(err.Error(), pool.ErrPoolExhausted.Error()) ||
strings.Contains(err.Error(), storegateway.ErrMaxConcurrentBytesLimitExceeded.Error())
default:
return false
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1404,6 +1404,35 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
},
},
},
"multiple store-gateways has the block, but one of them fails to return due to max concurrent bytes limit exceeded": {
finderResult: bucketindex.Blocks{
&bucketindex.Block{ID: block1},
},
storeSetResponses: []any{
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{
remoteAddr: "1.1.1.1",
mockedSeriesErr: status.Error(codes.Unknown, storegateway.ErrMaxConcurrentBytesLimitExceeded.Error()),
}: {block1},
},
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.FromStrings(metricNameLabel.Name, metricNameLabel.Value, series1Label.Name, series1Label.Value), []cortexpb.Sample{{Value: 2, TimestampMs: minT}}, nil, nil),
mockHintsResponse(block1),
}}: {block1},
},
},
limits: &blocksStoreLimitsMock{},
queryLimiter: noOpQueryLimiter,
expectedSeries: []seriesResult{
{
lbls: labels.New(metricNameLabel, series1Label),
values: []valueResult{
{t: minT, v: 2},
},
},
},
},
"all store-gateways return PermissionDenied": {
finderResult: bucketindex.Blocks{
&bucketindex.Block{ID: block1},
Expand Down Expand Up @@ -2603,6 +2632,7 @@ func TestBlocksStoreQuerier_isRetryableError(t *testing.T) {
require.True(t, isRetryableError(limiter.ErrResourceLimitReached))
require.True(t, isRetryableError(status.Error(codes.Canceled, "grpc: the client connection is closing")))
require.True(t, isRetryableError(errors.New("pool exhausted")))
require.True(t, isRetryableError(errors.New("max concurrent bytes limit exceeded")))

require.False(t, isRetryableError(status.Error(codes.ResourceExhausted, "some other error")))
require.False(t, isRetryableError(status.Error(codes.Canceled, "some other error")))
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ var (
ErrInvalidTokenBucketBytesLimiterMode = errors.New("invalid token bucket bytes limiter mode")
ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio = errors.New("lazy expanded posting group max key series ratio needs to be equal or greater than 0")
ErrInvalidBucketStoreType = errors.New("invalid bucket store type")
ErrInvalidMaxConcurrentBytes = errors.New("max concurrent bytes must be non-negative")
)

// BlocksStorageConfig holds the config information for the blocks storage.
Expand Down Expand Up @@ -281,6 +282,7 @@ type BucketStoreConfig struct {
SyncInterval time.Duration `yaml:"sync_interval"`
MaxConcurrent int `yaml:"max_concurrent"`
MaxInflightRequests int `yaml:"max_inflight_requests"`
MaxConcurrentBytes int64 `yaml:"max_concurrent_bytes"`
TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"`
BlockSyncConcurrency int `yaml:"block_sync_concurrency"`
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
Expand Down Expand Up @@ -365,6 +367,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.ChunkPoolMaxBucketSizeBytes, "blocks-storage.bucket-store.chunk-pool-max-bucket-size-bytes", ChunkPoolDefaultMaxBucketSize, "Size - in bytes - of the largest chunks pool bucket.")
f.IntVar(&cfg.MaxConcurrent, "blocks-storage.bucket-store.max-concurrent", 100, "Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants.")
f.IntVar(&cfg.MaxInflightRequests, "blocks-storage.bucket-store.max-inflight-requests", 0, "Max number of inflight queries to execute against the long-term storage. The limit is shared across all tenants. 0 to disable.")
f.Int64Var(&cfg.MaxConcurrentBytes, "blocks-storage.bucket-store.max-concurrent-bytes", 0, "Max number of bytes being processed concurrently across all queries. When the limit is reached, new requests are rejected with HTTP 503. 0 to disable.")
f.IntVar(&cfg.TenantSyncConcurrency, "blocks-storage.bucket-store.tenant-sync-concurrency", 10, "Maximum number of concurrent tenants syncing blocks.")
f.IntVar(&cfg.BlockSyncConcurrency, "blocks-storage.bucket-store.block-sync-concurrency", 20, "Maximum number of concurrent blocks syncing per tenant.")
f.IntVar(&cfg.MetaSyncConcurrency, "blocks-storage.bucket-store.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from object storage per tenant.")
Expand Down Expand Up @@ -429,6 +432,9 @@ func (cfg *BucketStoreConfig) Validate() error {
if cfg.LazyExpandedPostingGroupMaxKeySeriesRatio < 0 {
return ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio
}
if cfg.MaxConcurrentBytes < 0 {
return ErrInvalidMaxConcurrentBytes
}
return nil
}

Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/tsdb/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,24 @@ func TestConfig_Validate(t *testing.T) {
},
expectedErr: errUnSupportedWALCompressionType,
},
"should fail on negative max concurrent bytes": {
setup: func(cfg *BlocksStorageConfig) {
cfg.BucketStore.MaxConcurrentBytes = -1
},
expectedErr: ErrInvalidMaxConcurrentBytes,
},
"should pass on zero max concurrent bytes (disabled)": {
setup: func(cfg *BlocksStorageConfig) {
cfg.BucketStore.MaxConcurrentBytes = 0
},
expectedErr: nil,
},
"should pass on positive max concurrent bytes": {
setup: func(cfg *BlocksStorageConfig) {
cfg.BucketStore.MaxConcurrentBytes = 1024 * 1024 * 1024 // 1GB
},
expectedErr: nil,
},
}

for testName, testData := range tests {
Expand Down
46 changes: 31 additions & 15 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ type ThanosBucketStores struct {
// Keeps number of inflight requests
inflightRequests *util.InflightRequestTracker

// Concurrent bytes tracker for limiting bytes being processed across all queries.
concurrentBytesTracker ConcurrentBytesTracker

// Holder for per-request bytes trackers. The BytesLimiterFactory (created
// once per user store) reads from this to find the current request's tracker.
requestBytesTrackerHolder *requestBytesTrackerHolder

// Metrics.
syncTimes prometheus.Histogram
syncLastSuccess prometheus.Gauge
Expand Down Expand Up @@ -133,20 +140,22 @@ func newThanosBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy Shardi
}).Set(float64(cfg.BucketStore.MaxConcurrent))

u := &ThanosBucketStores{
logger: logger,
cfg: cfg,
limits: limits,
bucket: cachingBucket,
shardingStrategy: shardingStrategy,
stores: map[string]*store.BucketStore{},
storesErrors: map[string]error{},
logLevel: logLevel,
bucketStoreMetrics: NewBucketStoreMetrics(),
metaFetcherMetrics: NewMetadataFetcherMetrics(),
queryGate: queryGate,
partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg),
userTokenBuckets: make(map[string]*util.TokenBucket),
inflightRequests: util.NewInflightRequestTracker(),
logger: logger,
cfg: cfg,
limits: limits,
bucket: cachingBucket,
shardingStrategy: shardingStrategy,
stores: map[string]*store.BucketStore{},
storesErrors: map[string]error{},
logLevel: logLevel,
bucketStoreMetrics: NewBucketStoreMetrics(),
metaFetcherMetrics: NewMetadataFetcherMetrics(),
queryGate: queryGate,
partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg),
userTokenBuckets: make(map[string]*util.TokenBucket),
inflightRequests: util.NewInflightRequestTracker(),
concurrentBytesTracker: NewConcurrentBytesTracker(uint64(cfg.BucketStore.MaxConcurrentBytes), reg),
requestBytesTrackerHolder: &requestBytesTrackerHolder{},
syncTimes: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_bucket_stores_blocks_sync_seconds",
Help: "The total time it takes to perform a sync stores",
Expand Down Expand Up @@ -381,6 +390,13 @@ func (u *ThanosBucketStores) Series(req *storepb.SeriesRequest, srv storepb.Stor
defer u.inflightRequests.Dec()
}

reqTracker := newRequestBytesTracker(u.concurrentBytesTracker)
u.requestBytesTrackerHolder.Set(reqTracker)
defer func() {
u.requestBytesTrackerHolder.Clear()
reqTracker.ReleaseAll()
}()

err = store.Series(req, spanSeriesServer{
Store_SeriesServer: srv,
ctx: spanCtx,
Expand Down Expand Up @@ -697,7 +713,7 @@ func (u *ThanosBucketStores) getOrCreateStore(userID string) (*store.BucketStore
u.syncDirForUser(userID),
newChunksLimiterFactory(u.limits, userID),
newSeriesLimiterFactory(u.limits, userID),
newBytesLimiterFactory(u.limits, userID, u.getUserTokenBucket(userID), u.instanceTokenBucket, u.cfg.BucketStore.TokenBucketBytesLimiter, u.getTokensToRetrieve),
newBytesLimiterFactory(u.limits, userID, u.getUserTokenBucket(userID), u.instanceTokenBucket, u.cfg.BucketStore.TokenBucketBytesLimiter, u.getTokensToRetrieve, u.requestBytesTrackerHolder),
u.partitioner,
u.cfg.BucketStore.BlockSyncConcurrency,
false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers
Expand Down
Loading