From 0f64036875ed4aae7e533de2891076fdf0e88eed Mon Sep 17 00:00:00 2001 From: yeya24 Date: Mon, 23 Feb 2026 20:24:15 -0800 Subject: [PATCH 1/3] add config to allow unknown remote write request with unknown content type and treat as remote write v1 Signed-off-by: yeya24 --- docs/configuration/config-file-reference.md | 6 +++ pkg/api/api.go | 8 ++-- pkg/distributor/distributor.go | 4 +- pkg/util/push/push.go | 27 +++++++----- pkg/util/push/push_test.go | 49 +++++++++++++++------ schemas/cortex-config-schema.json | 6 +++ 6 files changed, 71 insertions(+), 29 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 81b85fb018..e8578744bd 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3213,6 +3213,12 @@ ha_tracker: # CLI flag: -distributor.remote-writev2-enabled [remote_writev2_enabled: | default = false] +# If true, treat requests with unknown or invalid Content-Type header as remote +# write v1 (legacy behavior). If false, return 415 Unsupported Media Type for +# non-standard content types. +# CLI flag: -distributor.accept-unknown-remote-write-content-type +[accept_unknown_remote_write_content_type: | default = false] + ring: kvstore: # Backend storage to use for the ring. Supported values are: consul, diff --git a/pkg/api/api.go b/pkg/api/api.go index 5bf191f207..2971fe8776 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -292,7 +292,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib Help: "Total number of push requests by type.", }, []string{"type"}) - a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d), requestTotal), true, "POST") + a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.AcceptUnknownRemoteWriteContentType, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d), requestTotal), true, "POST") a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d), requestTotal), true, "POST") a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status") @@ -304,7 +304,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET") // Legacy Routes - a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d), requestTotal), true, "POST") + a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.AcceptUnknownRemoteWriteContentType, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d), requestTotal), true, "POST") a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET") a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET") } @@ -337,12 +337,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config, overri a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST") a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET") a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST") - a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging. + a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.AcceptUnknownRemoteWriteContentType, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging. // Legacy Routes a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST") a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST") - a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging. + a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.AcceptUnknownRemoteWriteContentType, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging. } func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) { diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 424f81d21e..bc51d61c74 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -158,7 +158,8 @@ type Config struct { ExtendWrites bool `yaml:"extend_writes"` SignWriteRequestsEnabled bool `yaml:"sign_write_requests"` UseStreamPush bool `yaml:"use_stream_push"` - RemoteWriteV2Enabled bool `yaml:"remote_writev2_enabled"` + RemoteWriteV2Enabled bool `yaml:"remote_writev2_enabled"` + AcceptUnknownRemoteWriteContentType bool `yaml:"accept_unknown_remote_write_content_type"` // Distributors ring DistributorRing RingConfig `yaml:"ring"` @@ -224,6 +225,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.ZoneResultsQuorumMetadata, "distributor.zone-results-quorum-metadata", false, "Experimental, this flag may change in the future. If zone awareness and this both enabled, when querying metadata APIs (labels names and values for now), only results from quorum number of zones will be included.") f.IntVar(&cfg.NumPushWorkers, "distributor.num-push-workers", 0, "EXPERIMENTAL: Number of go routines to handle push calls from distributors to ingesters. When no workers are available, a new goroutine will be spawned automatically. If set to 0 (default), workers are disabled, and a new goroutine will be created for each push request.") f.BoolVar(&cfg.RemoteWriteV2Enabled, "distributor.remote-writev2-enabled", false, "EXPERIMENTAL: If true, accept prometheus remote write v2 protocol push request.") + f.BoolVar(&cfg.AcceptUnknownRemoteWriteContentType, "distributor.accept-unknown-remote-write-content-type", false, "If true, treat requests with unknown or invalid Content-Type header as remote write v1 (legacy behavior). If false, return 415 Unsupported Media Type for non-standard content types.") f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.") f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.") diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index 4608f2c4ad..7ac1ab0819 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -45,7 +45,7 @@ const ( type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) // Handler is a http.Handler which accepts WriteRequests. -func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation.Overrides, sourceIPs *middleware.SourceIPExtractor, push Func, requestTotal *prometheus.CounterVec) http.Handler { +func Handler(remoteWrite2Enabled bool, acceptUnknownRemoteWriteContentType bool, maxRecvMsgSize int, overrides *validation.Overrides, sourceIPs *middleware.SourceIPExtractor, push Func, requestTotal *prometheus.CounterVec) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() logger := log.WithContext(ctx, log.Logger) @@ -155,22 +155,27 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation } msgType, err := remote.ParseProtoMsg(contentType) - if err != nil { - level.Error(logger).Log("Error decoding remote write request", "err", err) - http.Error(w, err.Error(), http.StatusUnsupportedMediaType) - return + if msgType != remote.WriteV1MessageType && msgType != remote.WriteV2MessageType { + if acceptUnknownRemoteWriteContentType { + msgType = remote.WriteV1MessageType + level.Debug(logger).Log("msg", "Treating unknown or invalid content-type as remote write v1", "content_type", contentType, "msgType", msgType, "err", err) + } else { + if err != nil { + level.Error(logger).Log("msg", "Content-type header invalid or message type not accepted", "content_type", contentType, "err", err) + http.Error(w, err.Error(), http.StatusUnsupportedMediaType) + } else { + errMsg := fmt.Sprintf("%v protobuf message is not accepted by this server; only accepts %v or %v", msgType, remote.WriteV1MessageType, remote.WriteV2MessageType) + level.Error(logger).Log("Not accepted msg type", "msgType", msgType) + http.Error(w, errMsg, http.StatusUnsupportedMediaType) + } + return + } } if requestTotal != nil { requestTotal.WithLabelValues(getTypeLabel(msgType)).Inc() } - if msgType != remote.WriteV1MessageType && msgType != remote.WriteV2MessageType { - level.Error(logger).Log("Not accepted msg type", "msgType", msgType, "err", err) - http.Error(w, err.Error(), http.StatusUnsupportedMediaType) - return - } - enc := r.Header.Get("Content-Encoding") if enc == "" { } else if enc != compression.Snappy { diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index ac7c02a874..429ebad0b5 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -129,7 +129,7 @@ func Benchmark_Handler(b *testing.B) { testSeriesNums := []int{10, 100, 500, 1000} for _, seriesNum := range testSeriesNums { b.Run(fmt.Sprintf("PRW1 with %d series", seriesNum), func(b *testing.B) { - handler := Handler(true, 1000000, overrides, nil, mockHandler, nil) + handler := Handler(true, false, 1000000, overrides, nil, mockHandler, nil) req, err := createPRW1HTTPRequest(seriesNum) require.NoError(b, err) @@ -143,7 +143,7 @@ func Benchmark_Handler(b *testing.B) { } }) b.Run(fmt.Sprintf("PRW2 with %d series", seriesNum), func(b *testing.B) { - handler := Handler(true, 1000000, overrides, nil, mockHandler, nil) + handler := Handler(true, false, 1000000, overrides, nil, mockHandler, nil) req, err := createPRW2HTTPRequest(seriesNum) require.NoError(b, err) @@ -514,7 +514,7 @@ func TestHandler_remoteWrite(t *testing.T) { t.Run(test.name, func(t *testing.T) { ctx := context.Background() ctx = user.InjectOrgID(ctx, "user-1") - handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API), nil) + handler := Handler(true, false, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API), nil) body, isV2 := test.createBody() req := createRequest(t, body, isV2) @@ -549,7 +549,7 @@ func TestHandler_remoteWrite(t *testing.T) { ctx := context.Background() ctx = user.InjectOrgID(ctx, "user-1") - handler := Handler(true, 100000, overrides, nil, pushFunc, nil) + handler := Handler(true, false, 100000, overrides, nil, pushFunc, nil) req := createRequest(t, createPrometheusRemoteWriteV2Protobuf(t), true) req = req.WithContext(ctx) resp := httptest.NewRecorder() @@ -572,11 +572,12 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) { sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)") tests := []struct { - description string - reqHeaders map[string]string - expectedCode int - isV2 bool - remoteWrite2Enabled bool + description string + reqHeaders map[string]string + expectedCode int + isV2 bool + remoteWrite2Enabled bool + acceptUnknownRemoteWriteContentType bool }{ { description: "[RW 2.0] correct content-type", @@ -688,11 +689,33 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) { isV2: false, remoteWrite2Enabled: true, }, + { + description: "unknown content-type with acceptUnknownRemoteWriteContentType, treated as RW v1", + reqHeaders: map[string]string{ + "Content-Type": "yolo", + "Content-Encoding": "snappy", + }, + expectedCode: http.StatusOK, + isV2: false, + remoteWrite2Enabled: true, + acceptUnknownRemoteWriteContentType: true, + }, + { + description: "invalid proto param with acceptUnknownRemoteWriteContentType, treated as RW v1", + reqHeaders: map[string]string{ + "Content-Type": "application/x-protobuf;proto=yolo", + "Content-Encoding": "snappy", + }, + expectedCode: http.StatusOK, + isV2: false, + remoteWrite2Enabled: true, + acceptUnknownRemoteWriteContentType: true, + }, } for _, test := range tests { t.Run(test.description, func(t *testing.T) { - handler := Handler(test.remoteWrite2Enabled, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API), nil) + handler := Handler(test.remoteWrite2Enabled, test.acceptUnknownRemoteWriteContentType, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API), nil) if test.isV2 { ctx := context.Background() @@ -719,7 +742,7 @@ func TestHandler_cortexWriteRequest(t *testing.T) { overrides := validation.NewOverrides(limits, nil) sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)") - handler := Handler(true, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API), nil) + handler := Handler(true, false, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API), nil) t.Run("remote write v1", func(t *testing.T) { req := createRequest(t, createCortexWriteRequestProtobuf(t, false, cortexpb.API), false) @@ -749,7 +772,7 @@ func TestHandler_ignoresSkipLabelNameValidationIfSet(t *testing.T) { createRequest(t, createCortexWriteRequestProtobuf(t, true, cortexpb.RULE), false), } { resp := httptest.NewRecorder() - handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.RULE), nil) + handler := Handler(true, false, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.RULE), nil) handler.ServeHTTP(resp, req) assert.Equal(t, 200, resp.Code) } @@ -765,7 +788,7 @@ func TestHandler_MetricCollection(t *testing.T) { Help: "test help", }, []string{"type"}) - handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API), counter) + handler := Handler(true, false, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API), counter) t.Run("counts v1 requests", func(t *testing.T) { req := createRequest(t, createPrometheusRemoteWriteProtobuf(t), false) diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index dfbd85f685..bb6a328ba9 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -3701,6 +3701,12 @@ "distributor_config": { "description": "The distributor_config configures the Cortex distributor.", "properties": { + "accept_unknown_remote_write_content_type": { + "default": false, + "description": "If true, treat requests with unknown or invalid Content-Type header as remote write v1 (legacy behavior). If false, return 415 Unsupported Media Type for non-standard content types.", + "type": "boolean", + "x-cli-flag": "distributor.accept-unknown-remote-write-content-type" + }, "extend_writes": { "default": true, "description": "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.", From 0da732c5f67baa034ab8a2d5afcefb443eca7ace Mon Sep 17 00:00:00 2001 From: yeya24 Date: Mon, 23 Feb 2026 20:37:40 -0800 Subject: [PATCH 2/3] fix push metric Signed-off-by: yeya24 --- pkg/util/push/push.go | 17 +++++++++++------ pkg/util/push/push_test.go | 14 ++++++++++++++ 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/pkg/util/push/push.go b/pkg/util/push/push.go index 7ac1ab0819..cefb52e0ea 100644 --- a/pkg/util/push/push.go +++ b/pkg/util/push/push.go @@ -36,9 +36,10 @@ const ( rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Histograms-Written" rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Exemplars-Written" - labelValuePRW1 = "prw1" - labelValuePRW2 = "prw2" - labelValueOTLP = "otlp" + labelValuePRW1 = "prw1" + labelValuePRW2 = "prw2" + labelValueOTLP = "otlp" + labelValueUnknown = "unknown" ) // Func defines the type of the push. It is similar to http.HandlerFunc. @@ -155,8 +156,10 @@ func Handler(remoteWrite2Enabled bool, acceptUnknownRemoteWriteContentType bool, } msgType, err := remote.ParseProtoMsg(contentType) + contentTypeUnknownOrInvalid := false if msgType != remote.WriteV1MessageType && msgType != remote.WriteV2MessageType { if acceptUnknownRemoteWriteContentType { + contentTypeUnknownOrInvalid = true msgType = remote.WriteV1MessageType level.Debug(logger).Log("msg", "Treating unknown or invalid content-type as remote write v1", "content_type", contentType, "msgType", msgType, "err", err) } else { @@ -173,7 +176,7 @@ func Handler(remoteWrite2Enabled bool, acceptUnknownRemoteWriteContentType bool, } if requestTotal != nil { - requestTotal.WithLabelValues(getTypeLabel(msgType)).Inc() + requestTotal.WithLabelValues(getTypeLabel(msgType, contentTypeUnknownOrInvalid)).Inc() } enc := r.Header.Get("Content-Encoding") @@ -316,10 +319,12 @@ func convertV2ToV1Exemplars(b *labels.ScratchBuilder, symbols []string, v2Exempl return v1Exemplars, nil } -func getTypeLabel(msgType remote.WriteMessageType) string { +func getTypeLabel(msgType remote.WriteMessageType, unknownOrInvalidContentType bool) string { + if unknownOrInvalidContentType { + return labelValueUnknown + } if msgType == remote.WriteV1MessageType { return labelValuePRW1 } - return labelValuePRW2 } diff --git a/pkg/util/push/push_test.go b/pkg/util/push/push_test.go index 429ebad0b5..fc09241d66 100644 --- a/pkg/util/push/push_test.go +++ b/pkg/util/push/push_test.go @@ -813,6 +813,20 @@ func TestHandler_MetricCollection(t *testing.T) { val := testutil.ToFloat64(counter.WithLabelValues("prw2")) assert.Equal(t, 1.0, val) }) + + t.Run("counts unknown or invalid content-type as unknown when acceptUnknownRemoteWriteContentType is true", func(t *testing.T) { + handlerWithUnknown := Handler(true, true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API), counter) + req := createRequestWithHeaders(t, map[string]string{ + "Content-Type": "yolo", + "Content-Encoding": "snappy", + }, createCortexWriteRequestProtobuf(t, false, cortexpb.API)) + resp := httptest.NewRecorder() + handlerWithUnknown.ServeHTTP(resp, req) + assert.Equal(t, http.StatusOK, resp.Code) + + val := testutil.ToFloat64(counter.WithLabelValues("unknown")) + assert.Equal(t, 1.0, val) + }) } func verifyWriteRequestHandler(t *testing.T, expectSource cortexpb.SourceEnum) func(ctx context.Context, request *cortexpb.WriteRequest) (response *cortexpb.WriteResponse, err error) { From 06f451b0d944a476fde1bf374fb5e1fb1b6eabc1 Mon Sep 17 00:00:00 2001 From: yeya24 Date: Mon, 23 Feb 2026 20:40:48 -0800 Subject: [PATCH 3/3] changelog Signed-off-by: yeya24 --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f7c526927d..fda089cbab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ * [ENHANCEMENT] Distributor: Validate metric name before removing empty labels. #7253 * [ENHANCEMENT] Make cortex_ingester_tsdb_sample_ooo_delta metric per-tenant #7278 * [ENHANCEMENT] Distributor: Add dimension `nhcb` to keep track of nhcb samples in `cortex_distributor_received_samples_total` and `cortex_distributor_samples_in_total` metrics. +* [ENHANCEMENT] Distributor: Add `-distributor.accept-unknown-remote-write-content-type` flag. When enabled, requests with unknown or invalid Content-Type header are treated as remote write v1 instead of returning 415 Unsupported Media Type. Default is false. #7293 * [BUGFIX] Distributor: If remote write v2 is disabled, explicitly return HTTP 415 (Unsupported Media Type) for Remote Write V2 requests instead of attempting to parse them as V1. #7238 * [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088 * [BUGFIX] Ruler: Add XFunctions validation support. #7111