Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
* [ENHANCEMENT] Distributor: Validate metric name before removing empty labels. #7253
* [ENHANCEMENT] Make cortex_ingester_tsdb_sample_ooo_delta metric per-tenant #7278
* [ENHANCEMENT] Distributor: Add dimension `nhcb` to keep track of nhcb samples in `cortex_distributor_received_samples_total` and `cortex_distributor_samples_in_total` metrics.
* [ENHANCEMENT] Distributor: Add `-distributor.accept-unknown-remote-write-content-type` flag. When enabled, requests with unknown or invalid Content-Type header are treated as remote write v1 instead of returning 415 Unsupported Media Type. Default is false. #7293
* [BUGFIX] Distributor: If remote write v2 is disabled, explicitly return HTTP 415 (Unsupported Media Type) for Remote Write V2 requests instead of attempting to parse them as V1. #7238
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3213,6 +3213,12 @@ ha_tracker:
# CLI flag: -distributor.remote-writev2-enabled
[remote_writev2_enabled: <boolean> | default = false]

# If true, treat requests with unknown or invalid Content-Type header as remote
# write v1 (legacy behavior). If false, return 415 Unsupported Media Type for
# non-standard content types.
# CLI flag: -distributor.accept-unknown-remote-write-content-type
[accept_unknown_remote_write_content_type: <boolean> | default = false]

ring:
kvstore:
# Backend storage to use for the ring. Supported values are: consul,
Expand Down
8 changes: 4 additions & 4 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
Help: "Total number of push requests by type.",
}, []string{"type"})

a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d), requestTotal), true, "POST")
a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.AcceptUnknownRemoteWriteContentType, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d), requestTotal), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d), requestTotal), true, "POST")

a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
Expand All @@ -304,7 +304,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET")

// Legacy Routes
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d), requestTotal), true, "POST")
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.AcceptUnknownRemoteWriteContentType, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d), requestTotal), true, "POST")
a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET")
a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET")
}
Expand Down Expand Up @@ -337,12 +337,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config, overri
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET")
a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging.
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.AcceptUnknownRemoteWriteContentType, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging.

// Legacy Routes
a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging.
a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.AcceptUnknownRemoteWriteContentType, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging.
}

func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) {
Expand Down
4 changes: 3 additions & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ type Config struct {
ExtendWrites bool `yaml:"extend_writes"`
SignWriteRequestsEnabled bool `yaml:"sign_write_requests"`
UseStreamPush bool `yaml:"use_stream_push"`
RemoteWriteV2Enabled bool `yaml:"remote_writev2_enabled"`
RemoteWriteV2Enabled bool `yaml:"remote_writev2_enabled"`
AcceptUnknownRemoteWriteContentType bool `yaml:"accept_unknown_remote_write_content_type"`

// Distributors ring
DistributorRing RingConfig `yaml:"ring"`
Expand Down Expand Up @@ -224,6 +225,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.ZoneResultsQuorumMetadata, "distributor.zone-results-quorum-metadata", false, "Experimental, this flag may change in the future. If zone awareness and this both enabled, when querying metadata APIs (labels names and values for now), only results from quorum number of zones will be included.")
f.IntVar(&cfg.NumPushWorkers, "distributor.num-push-workers", 0, "EXPERIMENTAL: Number of go routines to handle push calls from distributors to ingesters. When no workers are available, a new goroutine will be spawned automatically. If set to 0 (default), workers are disabled, and a new goroutine will be created for each push request.")
f.BoolVar(&cfg.RemoteWriteV2Enabled, "distributor.remote-writev2-enabled", false, "EXPERIMENTAL: If true, accept prometheus remote write v2 protocol push request.")
f.BoolVar(&cfg.AcceptUnknownRemoteWriteContentType, "distributor.accept-unknown-remote-write-content-type", false, "If true, treat requests with unknown or invalid Content-Type header as remote write v1 (legacy behavior). If false, return 415 Unsupported Media Type for non-standard content types.")

f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.")
f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.")
Expand Down
44 changes: 27 additions & 17 deletions pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,17 @@ const (
rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Histograms-Written"
rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Exemplars-Written"

labelValuePRW1 = "prw1"
labelValuePRW2 = "prw2"
labelValueOTLP = "otlp"
labelValuePRW1 = "prw1"
labelValuePRW2 = "prw2"
labelValueOTLP = "otlp"
labelValueUnknown = "unknown"
)

// Func defines the type of the push. It is similar to http.HandlerFunc.
type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)

// Handler is a http.Handler which accepts WriteRequests.
func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation.Overrides, sourceIPs *middleware.SourceIPExtractor, push Func, requestTotal *prometheus.CounterVec) http.Handler {
func Handler(remoteWrite2Enabled bool, acceptUnknownRemoteWriteContentType bool, maxRecvMsgSize int, overrides *validation.Overrides, sourceIPs *middleware.SourceIPExtractor, push Func, requestTotal *prometheus.CounterVec) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := log.WithContext(ctx, log.Logger)
Expand Down Expand Up @@ -155,20 +156,27 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation
}

msgType, err := remote.ParseProtoMsg(contentType)
if err != nil {
level.Error(logger).Log("Error decoding remote write request", "err", err)
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
return
contentTypeUnknownOrInvalid := false
if msgType != remote.WriteV1MessageType && msgType != remote.WriteV2MessageType {
if acceptUnknownRemoteWriteContentType {
contentTypeUnknownOrInvalid = true
msgType = remote.WriteV1MessageType
level.Debug(logger).Log("msg", "Treating unknown or invalid content-type as remote write v1", "content_type", contentType, "msgType", msgType, "err", err)
} else {
if err != nil {
level.Error(logger).Log("msg", "Content-type header invalid or message type not accepted", "content_type", contentType, "err", err)
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
} else {
errMsg := fmt.Sprintf("%v protobuf message is not accepted by this server; only accepts %v or %v", msgType, remote.WriteV1MessageType, remote.WriteV2MessageType)
level.Error(logger).Log("Not accepted msg type", "msgType", msgType)
http.Error(w, errMsg, http.StatusUnsupportedMediaType)
}
return
}
}

if requestTotal != nil {
requestTotal.WithLabelValues(getTypeLabel(msgType)).Inc()
}

if msgType != remote.WriteV1MessageType && msgType != remote.WriteV2MessageType {
level.Error(logger).Log("Not accepted msg type", "msgType", msgType, "err", err)
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
return
requestTotal.WithLabelValues(getTypeLabel(msgType, contentTypeUnknownOrInvalid)).Inc()
}

enc := r.Header.Get("Content-Encoding")
Expand Down Expand Up @@ -311,10 +319,12 @@ func convertV2ToV1Exemplars(b *labels.ScratchBuilder, symbols []string, v2Exempl
return v1Exemplars, nil
}

func getTypeLabel(msgType remote.WriteMessageType) string {
func getTypeLabel(msgType remote.WriteMessageType, unknownOrInvalidContentType bool) string {
if unknownOrInvalidContentType {
return labelValueUnknown
}
if msgType == remote.WriteV1MessageType {
return labelValuePRW1
}

return labelValuePRW2
}
63 changes: 50 additions & 13 deletions pkg/util/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func Benchmark_Handler(b *testing.B) {
testSeriesNums := []int{10, 100, 500, 1000}
for _, seriesNum := range testSeriesNums {
b.Run(fmt.Sprintf("PRW1 with %d series", seriesNum), func(b *testing.B) {
handler := Handler(true, 1000000, overrides, nil, mockHandler, nil)
handler := Handler(true, false, 1000000, overrides, nil, mockHandler, nil)
req, err := createPRW1HTTPRequest(seriesNum)
require.NoError(b, err)

Expand All @@ -143,7 +143,7 @@ func Benchmark_Handler(b *testing.B) {
}
})
b.Run(fmt.Sprintf("PRW2 with %d series", seriesNum), func(b *testing.B) {
handler := Handler(true, 1000000, overrides, nil, mockHandler, nil)
handler := Handler(true, false, 1000000, overrides, nil, mockHandler, nil)
req, err := createPRW2HTTPRequest(seriesNum)
require.NoError(b, err)

Expand Down Expand Up @@ -514,7 +514,7 @@ func TestHandler_remoteWrite(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
ctx := context.Background()
ctx = user.InjectOrgID(ctx, "user-1")
handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API), nil)
handler := Handler(true, false, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API), nil)

body, isV2 := test.createBody()
req := createRequest(t, body, isV2)
Expand Down Expand Up @@ -549,7 +549,7 @@ func TestHandler_remoteWrite(t *testing.T) {

ctx := context.Background()
ctx = user.InjectOrgID(ctx, "user-1")
handler := Handler(true, 100000, overrides, nil, pushFunc, nil)
handler := Handler(true, false, 100000, overrides, nil, pushFunc, nil)
req := createRequest(t, createPrometheusRemoteWriteV2Protobuf(t), true)
req = req.WithContext(ctx)
resp := httptest.NewRecorder()
Expand All @@ -572,11 +572,12 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) {
sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)")

tests := []struct {
description string
reqHeaders map[string]string
expectedCode int
isV2 bool
remoteWrite2Enabled bool
description string
reqHeaders map[string]string
expectedCode int
isV2 bool
remoteWrite2Enabled bool
acceptUnknownRemoteWriteContentType bool
}{
{
description: "[RW 2.0] correct content-type",
Expand Down Expand Up @@ -688,11 +689,33 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) {
isV2: false,
remoteWrite2Enabled: true,
},
{
description: "unknown content-type with acceptUnknownRemoteWriteContentType, treated as RW v1",
reqHeaders: map[string]string{
"Content-Type": "yolo",
"Content-Encoding": "snappy",
},
expectedCode: http.StatusOK,
isV2: false,
remoteWrite2Enabled: true,
acceptUnknownRemoteWriteContentType: true,
},
{
description: "invalid proto param with acceptUnknownRemoteWriteContentType, treated as RW v1",
reqHeaders: map[string]string{
"Content-Type": "application/x-protobuf;proto=yolo",
"Content-Encoding": "snappy",
},
expectedCode: http.StatusOK,
isV2: false,
remoteWrite2Enabled: true,
acceptUnknownRemoteWriteContentType: true,
},
}

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
handler := Handler(test.remoteWrite2Enabled, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API), nil)
handler := Handler(test.remoteWrite2Enabled, test.acceptUnknownRemoteWriteContentType, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API), nil)

if test.isV2 {
ctx := context.Background()
Expand All @@ -719,7 +742,7 @@ func TestHandler_cortexWriteRequest(t *testing.T) {
overrides := validation.NewOverrides(limits, nil)

sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)")
handler := Handler(true, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API), nil)
handler := Handler(true, false, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API), nil)

t.Run("remote write v1", func(t *testing.T) {
req := createRequest(t, createCortexWriteRequestProtobuf(t, false, cortexpb.API), false)
Expand Down Expand Up @@ -749,7 +772,7 @@ func TestHandler_ignoresSkipLabelNameValidationIfSet(t *testing.T) {
createRequest(t, createCortexWriteRequestProtobuf(t, true, cortexpb.RULE), false),
} {
resp := httptest.NewRecorder()
handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.RULE), nil)
handler := Handler(true, false, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.RULE), nil)
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)
}
Expand All @@ -765,7 +788,7 @@ func TestHandler_MetricCollection(t *testing.T) {
Help: "test help",
}, []string{"type"})

handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API), counter)
handler := Handler(true, false, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API), counter)

t.Run("counts v1 requests", func(t *testing.T) {
req := createRequest(t, createPrometheusRemoteWriteProtobuf(t), false)
Expand All @@ -790,6 +813,20 @@ func TestHandler_MetricCollection(t *testing.T) {
val := testutil.ToFloat64(counter.WithLabelValues("prw2"))
assert.Equal(t, 1.0, val)
})

t.Run("counts unknown or invalid content-type as unknown when acceptUnknownRemoteWriteContentType is true", func(t *testing.T) {
handlerWithUnknown := Handler(true, true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API), counter)
req := createRequestWithHeaders(t, map[string]string{
"Content-Type": "yolo",
"Content-Encoding": "snappy",
}, createCortexWriteRequestProtobuf(t, false, cortexpb.API))
resp := httptest.NewRecorder()
handlerWithUnknown.ServeHTTP(resp, req)
assert.Equal(t, http.StatusOK, resp.Code)

val := testutil.ToFloat64(counter.WithLabelValues("unknown"))
assert.Equal(t, 1.0, val)
})
}

func verifyWriteRequestHandler(t *testing.T, expectSource cortexpb.SourceEnum) func(ctx context.Context, request *cortexpb.WriteRequest) (response *cortexpb.WriteResponse, err error) {
Expand Down
6 changes: 6 additions & 0 deletions schemas/cortex-config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -3701,6 +3701,12 @@
"distributor_config": {
"description": "The distributor_config configures the Cortex distributor.",
"properties": {
"accept_unknown_remote_write_content_type": {
"default": false,
"description": "If true, treat requests with unknown or invalid Content-Type header as remote write v1 (legacy behavior). If false, return 415 Unsupported Media Type for non-standard content types.",
"type": "boolean",
"x-cli-flag": "distributor.accept-unknown-remote-write-content-type"
},
"extend_writes": {
"default": true,
"description": "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.",
Expand Down
Loading