diff --git a/apps/workspace-engine-router/go.mod b/apps/workspace-engine-router/go.mod index 8a6ad73b4..d2e7130da 100644 --- a/apps/workspace-engine-router/go.mod +++ b/apps/workspace-engine-router/go.mod @@ -1,6 +1,6 @@ module workspace-engine-router -go 1.23 +go 1.24.0 require ( github.com/charmbracelet/log v0.4.0 @@ -14,6 +14,7 @@ require ( ) require ( + cloud.google.com/go/compute/metadata v0.3.0 // indirect github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect github.com/bytedance/sonic v1.11.6 // indirect github.com/bytedance/sonic/loader v0.1.1 // indirect @@ -52,6 +53,7 @@ require ( golang.org/x/crypto v0.27.0 // indirect golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect golang.org/x/net v0.29.0 // indirect + golang.org/x/oauth2 v0.35.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/text v0.18.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect diff --git a/apps/workspace-engine-router/go.sum b/apps/workspace-engine-router/go.sum index 8dfe82def..382aad78e 100644 --- a/apps/workspace-engine-router/go.sum +++ b/apps/workspace-engine-router/go.sum @@ -1,3 +1,6 @@ +cloud.google.com/go/compute v1.25.1 h1:ZRpHJedLtTpKgr3RV1Fx23NuaAEN1Zfx9hw1u4aJdjU= +cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc= +cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU= @@ -408,6 +411,8 @@ golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= golang.org/x/oauth2 v0.18.0 h1:09qnuIAgzdx1XplqJvW6CQqMCtGZykZWcXzPMPUusvI= golang.org/x/oauth2 v0.18.0/go.mod h1:Wf7knwG0MPoWIMMBgFlEaSUDaKskp0dCfrlJRJXbBi8= +golang.org/x/oauth2 v0.35.0 h1:Mv2mzuHuZuY2+bkyWXIHMfhNdJAdwW3FuWeCPYN5GVQ= +golang.org/x/oauth2 v0.35.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/apps/workspace-engine-router/pkg/config/config.go b/apps/workspace-engine-router/pkg/config/config.go index c0ccf908d..0830479a9 100644 --- a/apps/workspace-engine-router/pkg/config/config.go +++ b/apps/workspace-engine-router/pkg/config/config.go @@ -15,8 +15,19 @@ type Config struct { ManagementPort int `envconfig:"MANAGEMENT_PORT" default:"9091"` // Kafka configuration - KafkaBrokers string `envconfig:"KAFKA_BROKERS" default:"localhost:9092"` - KafkaTopic string `envconfig:"KAFKA_TOPIC" default:"workspace-events"` + KafkaBrokers string `envconfig:"KAFKA_BROKERS" default:"localhost:9092"` + KafkaTopic string `envconfig:"KAFKA_TOPIC" default:"workspace-events"` + KafkaSASLEnabled bool `envconfig:"KAFKA_SASL_ENABLED" default:"false"` + KafkaSecurityProtocol string `envconfig:"KAFKA_SECURITY_PROTOCOL" default:"SASL_SSL"` + KafkaSASLMechanism string `envconfig:"KAFKA_SASL_MECHANISM" default:"OAUTHBEARER"` + KafkaSASLUsername string `envconfig:"KAFKA_SASL_USERNAME" default:""` + KafkaSASLPassword string `envconfig:"KAFKA_SASL_PASSWORD" default:""` + KafkaSASLOAuthBearerMethod string `envconfig:"KAFKA_SASL_OAUTHBEARER_METHOD" default:"oidc"` + KafkaSASLOAuthBearerTokenURL string `envconfig:"KAFKA_SASL_OAUTHBEARER_TOKEN_URL" default:""` + KafkaSASLOAuthBearerClientID string `envconfig:"KAFKA_SASL_OAUTHBEARER_CLIENT_ID" default:""` + KafkaSASLOAuthBearerClientSecret string `envconfig:"KAFKA_SASL_OAUTHBEARER_CLIENT_SECRET" default:""` + KafkaSASLOAuthBearerScope string `envconfig:"KAFKA_SASL_OAUTHBEARER_SCOPE" default:""` + KafkaSASLOAuthBearerProvider string `envconfig:"KAFKA_SASL_OAUTHBEARER_PROVIDER" default:"oidc"` // Worker health configuration WorkerHeartbeatTimeout time.Duration `envconfig:"WORKER_HEARTBEAT_TIMEOUT" default:"30s"` diff --git a/apps/workspace-engine-router/pkg/kafka/config.go b/apps/workspace-engine-router/pkg/kafka/config.go new file mode 100644 index 000000000..7c92799be --- /dev/null +++ b/apps/workspace-engine-router/pkg/kafka/config.go @@ -0,0 +1,69 @@ +package kafka + +import ( + "fmt" + "strings" + "workspace-engine-router/pkg/config" + + "github.com/charmbracelet/log" + kafka "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func BaseConfig() (*kafka.ConfigMap, error) { + cfg := &kafka.ConfigMap{ + "bootstrap.servers": config.Global.KafkaBrokers, + } + if err := applySASL(cfg); err != nil { + return nil, fmt.Errorf("invalid Kafka SASL configuration: %w", err) + } + return cfg, nil +} + +func applySASL(cfg *kafka.ConfigMap) error { + if !config.Global.KafkaSASLEnabled { + return nil + } + + mechanism := config.Global.KafkaSASLMechanism + log.Info("Enabling SASL for Kafka", "mechanism", mechanism) + + _ = cfg.SetKey("security.protocol", config.Global.KafkaSecurityProtocol) + _ = cfg.SetKey("sasl.mechanisms", mechanism) + + switch mechanism { + case "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512": + var missing []string + if config.Global.KafkaSASLUsername == "" { + missing = append(missing, "KAFKA_SASL_USERNAME") + } + if config.Global.KafkaSASLPassword == "" { + missing = append(missing, "KAFKA_SASL_PASSWORD") + } + if len(missing) > 0 { + return fmt.Errorf("KAFKA_SASL_MECHANISM=%s requires: %s", mechanism, strings.Join(missing, ", ")) + } + _ = cfg.SetKey("sasl.username", config.Global.KafkaSASLUsername) + _ = cfg.SetKey("sasl.password", config.Global.KafkaSASLPassword) + case "OAUTHBEARER": + provider := config.Global.KafkaSASLOAuthBearerProvider + switch provider { + case "gcp": + log.Info("Using GCP ADC for OAUTHBEARER tokens") + case "oidc": + if config.Global.KafkaSASLOAuthBearerTokenURL == "" { + return fmt.Errorf("KAFKA_SASL_MECHANISM=OAUTHBEARER with provider=oidc requires: KAFKA_SASL_OAUTHBEARER_TOKEN_URL") + } + _ = cfg.SetKey("sasl.oauthbearer.method", config.Global.KafkaSASLOAuthBearerMethod) + _ = cfg.SetKey("sasl.oauthbearer.client.id", config.Global.KafkaSASLOAuthBearerClientID) + _ = cfg.SetKey("sasl.oauthbearer.client.secret", config.Global.KafkaSASLOAuthBearerClientSecret) + _ = cfg.SetKey("sasl.oauthbearer.token.endpoint.url", config.Global.KafkaSASLOAuthBearerTokenURL) + _ = cfg.SetKey("sasl.oauthbearer.scope", config.Global.KafkaSASLOAuthBearerScope) + default: + return fmt.Errorf("unknown KAFKA_SASL_OAUTHBEARER_PROVIDER: %s (supported: oidc, gcp)", provider) + } + default: + return fmt.Errorf("unknown KAFKA_SASL_MECHANISM: %s (supported: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER)", mechanism) + } + + return nil +} diff --git a/apps/workspace-engine-router/pkg/kafka/gcp_auth.go b/apps/workspace-engine-router/pkg/kafka/gcp_auth.go new file mode 100644 index 000000000..8154f67a2 --- /dev/null +++ b/apps/workspace-engine-router/pkg/kafka/gcp_auth.go @@ -0,0 +1,50 @@ +package kafka + +import ( + "context" + "time" + + "workspace-engine-router/pkg/config" + + "github.com/charmbracelet/log" + confluentkafka "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "golang.org/x/oauth2/google" +) + +func IsGCPProvider() bool { + return config.Global.KafkaSASLEnabled && + config.Global.KafkaSASLMechanism == "OAUTHBEARER" && + config.Global.KafkaSASLOAuthBearerProvider == "gcp" +} + +func setGCPToken(client interface { + SetOAuthBearerToken(token confluentkafka.OAuthBearerToken) error +}) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + scope := config.Global.KafkaSASLOAuthBearerScope + if scope == "" { + scope = "https://www.googleapis.com/auth/cloud-platform" + } + + ts, err := google.DefaultTokenSource(ctx, scope) + if err != nil { + return err + } + + token, err := ts.Token() + if err != nil { + return err + } + + if err := client.SetOAuthBearerToken(confluentkafka.OAuthBearerToken{ + TokenValue: token.AccessToken, + Expiration: token.Expiry, + }); err != nil { + return err + } + + log.Info("GCP OAuthBearer token set for admin client", "expires", token.Expiry.Format(time.RFC3339)) + return nil +} diff --git a/apps/workspace-engine-router/pkg/kafka/metadata.go b/apps/workspace-engine-router/pkg/kafka/metadata.go index ed5e513dc..5b4e892b9 100644 --- a/apps/workspace-engine-router/pkg/kafka/metadata.go +++ b/apps/workspace-engine-router/pkg/kafka/metadata.go @@ -59,14 +59,22 @@ func (pc *PartitionCounter) GetPartitionCount() (int32, error) { // queryPartitionCount queries Kafka for the number of partitions func (pc *PartitionCounter) queryPartitionCount() (int32, error) { // Create an AdminClient to query metadata - adminClient, err := kafka.NewAdminClient(&kafka.ConfigMap{ - "bootstrap.servers": pc.brokers, - }) + cfg, err := BaseConfig() + if err != nil { + return 0, fmt.Errorf("failed to build kafka config: %w", err) + } + adminClient, err := kafka.NewAdminClient(cfg) if err != nil { return 0, fmt.Errorf("failed to create admin client: %w", err) } defer adminClient.Close() + if IsGCPProvider() { + if err := setGCPToken(adminClient); err != nil { + return 0, fmt.Errorf("failed to set GCP token on admin client: %w", err) + } + } + // Get metadata for the topic metadata, err := adminClient.GetMetadata(&pc.topic, false, 5000) if err != nil { diff --git a/apps/workspace-engine/main.go b/apps/workspace-engine/main.go index a9da8622e..832feebe1 100644 --- a/apps/workspace-engine/main.go +++ b/apps/workspace-engine/main.go @@ -244,7 +244,7 @@ func main() { defer cancel() // Initialize shared Kafka producer used by the ticker and all job agents - producer, err := kafka.NewProducer(kafka.Brokers) + producer, err := kafka.NewProducer() if err != nil { log.Fatal("Failed to create Kafka producer", "error", err) panic(err) @@ -252,7 +252,7 @@ func main() { messaging.InitProducer(producer) defer messaging.CloseProducer() - consumer, err := kafka.NewConsumer(kafka.Brokers, kafka.Topic) + consumer, err := kafka.NewConsumer(kafka.Topic) if err != nil { log.Fatal("Failed to create Kafka consumer", "error", err) panic(err) diff --git a/apps/workspace-engine/pkg/config/env.go b/apps/workspace-engine/pkg/config/env.go index 717583760..0a691e2f3 100644 --- a/apps/workspace-engine/pkg/config/env.go +++ b/apps/workspace-engine/pkg/config/env.go @@ -24,6 +24,17 @@ type Config struct { KafkaGroupID string `envconfig:"KAFKA_GROUP_ID" default:"workspace-engine"` KafkaTopic string `envconfig:"KAFKA_TOPIC" default:"workspace-events"` KafkaConsumerTopic string `envconfig:"KAFKA_CONSUMER_TOPIC" default:"workspace-events"` + KafkaSASLEnabled bool `envconfig:"KAFKA_SASL_ENABLED" default:"false"` + KafkaSecurityProtocol string `envconfig:"KAFKA_SECURITY_PROTOCOL" default:"SASL_SSL"` + KafkaSASLMechanism string `envconfig:"KAFKA_SASL_MECHANISM" default:"OAUTHBEARER"` + KafkaSASLUsername string `envconfig:"KAFKA_SASL_USERNAME" default:""` + KafkaSASLPassword string `envconfig:"KAFKA_SASL_PASSWORD" default:""` + KafkaSASLOAuthBearerMethod string `envconfig:"KAFKA_SASL_OAUTHBEARER_METHOD" default:"oidc"` + KafkaSASLOAuthBearerTokenURL string `envconfig:"KAFKA_SASL_OAUTHBEARER_TOKEN_URL" default:""` + KafkaSASLOAuthBearerClientID string `envconfig:"KAFKA_SASL_OAUTHBEARER_CLIENT_ID" default:""` + KafkaSASLOAuthBearerClientSecret string `envconfig:"KAFKA_SASL_OAUTHBEARER_CLIENT_SECRET" default:""` + KafkaSASLOAuthBearerScope string `envconfig:"KAFKA_SASL_OAUTHBEARER_SCOPE" default:""` + KafkaSASLOAuthBearerProvider string `envconfig:"KAFKA_SASL_OAUTHBEARER_PROVIDER" default:"oidc"` OTELServiceName string `envconfig:"OTEL_SERVICE_NAME" default:"ctrlplane/workspace-engine"` OTELExporterOTLPEndpoint string `envconfig:"OTEL_EXPORTER_OTLP_ENDPOINT" default:"localhost:4318"` diff --git a/apps/workspace-engine/pkg/kafka/kafka.go b/apps/workspace-engine/pkg/kafka/kafka.go index 8cdd4ef89..0c351822e 100644 --- a/apps/workspace-engine/pkg/kafka/kafka.go +++ b/apps/workspace-engine/pkg/kafka/kafka.go @@ -13,7 +13,6 @@ import ( "workspace-engine/pkg/workspace/manager" "github.com/charmbracelet/log" - "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) // Configuration variables loaded from environment @@ -23,18 +22,14 @@ var ( Brokers = config.Global.KafkaBrokers ) -func NewConsumer(brokers string, topic string) (messaging.Consumer, error) { - return confluent.NewConfluent(brokers).CreateConsumer(GroupID, topic, &kafka.ConfigMap{ - "bootstrap.servers": Brokers, - "group.id": GroupID, - "auto.offset.reset": "latest", - "enable.auto.commit": false, - "partition.assignment.strategy": "cooperative-sticky", - "go.application.rebalance.enable": true, - "max.poll.interval.ms": 900_000, // 15 minutes - - // "debug": "cgrp,broker,protocol", - }) +func NewConsumer(topic string) (messaging.Consumer, error) { + cfg, err := confluent.BaseConsumerConfig() + if err != nil { + return nil, fmt.Errorf("failed to build consumer config: %w", err) + } + _ = cfg.SetKey("auto.offset.reset", "latest") + _ = cfg.SetKey("max.poll.interval.ms", 900_000) + return confluent.NewConfluent(Brokers).CreateConsumer(GroupID, topic, cfg) } // RunConsumerWithWorkspaceLoader starts the Kafka consumer with workspace-based offset resume diff --git a/apps/workspace-engine/pkg/kafka/producer.go b/apps/workspace-engine/pkg/kafka/producer.go index 6632134ba..bc0a81dd7 100644 --- a/apps/workspace-engine/pkg/kafka/producer.go +++ b/apps/workspace-engine/pkg/kafka/producer.go @@ -1,18 +1,15 @@ package kafka import ( + "fmt" "workspace-engine/pkg/messaging" "workspace-engine/pkg/messaging/confluent" - - "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) -func NewProducer(brokers string) (messaging.Producer, error) { - return confluent.NewConfluent(brokers).CreateProducer(Topic, &kafka.ConfigMap{ - "bootstrap.servers": Brokers, - "enable.idempotence": true, - "compression.type": "snappy", - "message.send.max.retries": 10, - "retry.backoff.ms": 100, - }) +func NewProducer() (messaging.Producer, error) { + cfg, err := confluent.BaseProducerConfig() + if err != nil { + return nil, fmt.Errorf("failed to build producer config: %w", err) + } + return confluent.NewConfluent(Brokers).CreateProducer(Topic, cfg) } diff --git a/apps/workspace-engine/pkg/messaging/confluent/config.go b/apps/workspace-engine/pkg/messaging/confluent/config.go new file mode 100644 index 000000000..5c426c1e6 --- /dev/null +++ b/apps/workspace-engine/pkg/messaging/confluent/config.go @@ -0,0 +1,89 @@ +package confluent + +import ( + "fmt" + "strings" + "workspace-engine/pkg/config" + + "github.com/charmbracelet/log" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" +) + +func BaseProducerConfig() (*kafka.ConfigMap, error) { + cfg := &kafka.ConfigMap{ + "bootstrap.servers": config.Global.KafkaBrokers, + "enable.idempotence": true, + "compression.type": "snappy", + "message.send.max.retries": 10, + "retry.backoff.ms": 100, + } + if err := applySASL(cfg); err != nil { + return nil, fmt.Errorf("invalid Kafka SASL configuration: %w", err) + } + return cfg, nil +} + +func BaseConsumerConfig() (*kafka.ConfigMap, error) { + cfg := &kafka.ConfigMap{ + "bootstrap.servers": config.Global.KafkaBrokers, + "group.id": config.Global.KafkaGroupID, + "enable.auto.commit": false, + "partition.assignment.strategy": "cooperative-sticky", + "session.timeout.ms": 3000, + "heartbeat.interval.ms": 1000, + "go.application.rebalance.enable": true, + } + if err := applySASL(cfg); err != nil { + return nil, fmt.Errorf("invalid Kafka SASL configuration: %w", err) + } + return cfg, nil +} + +func applySASL(cfg *kafka.ConfigMap) error { + if !config.Global.KafkaSASLEnabled { + return nil + } + + mechanism := config.Global.KafkaSASLMechanism + log.Info("Enabling SASL for Kafka", "mechanism", mechanism) + + _ = cfg.SetKey("security.protocol", config.Global.KafkaSecurityProtocol) + _ = cfg.SetKey("sasl.mechanisms", mechanism) + + switch mechanism { + case "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512": + var missing []string + if config.Global.KafkaSASLUsername == "" { + missing = append(missing, "KAFKA_SASL_USERNAME") + } + if config.Global.KafkaSASLPassword == "" { + missing = append(missing, "KAFKA_SASL_PASSWORD") + } + if len(missing) > 0 { + return fmt.Errorf("KAFKA_SASL_MECHANISM=%s requires: %s", mechanism, strings.Join(missing, ", ")) + } + _ = cfg.SetKey("sasl.username", config.Global.KafkaSASLUsername) + _ = cfg.SetKey("sasl.password", config.Global.KafkaSASLPassword) + case "OAUTHBEARER": + provider := config.Global.KafkaSASLOAuthBearerProvider + switch provider { + case "gcp": + log.Info("Using GCP ADC for OAUTHBEARER tokens") + case "oidc": + if config.Global.KafkaSASLOAuthBearerTokenURL == "" { + return fmt.Errorf("KAFKA_SASL_MECHANISM=OAUTHBEARER with provider=oidc requires: KAFKA_SASL_OAUTHBEARER_TOKEN_URL") + } + _ = cfg.SetKey("sasl.oauthbearer.method", config.Global.KafkaSASLOAuthBearerMethod) + _ = cfg.SetKey("sasl.oauthbearer.client.id", config.Global.KafkaSASLOAuthBearerClientID) + _ = cfg.SetKey("sasl.oauthbearer.client.secret", config.Global.KafkaSASLOAuthBearerClientSecret) + _ = cfg.SetKey("sasl.oauthbearer.token.endpoint.url", config.Global.KafkaSASLOAuthBearerTokenURL) + _ = cfg.SetKey("sasl.oauthbearer.scope", config.Global.KafkaSASLOAuthBearerScope) + default: + return fmt.Errorf("unknown KAFKA_SASL_OAUTHBEARER_PROVIDER: %s (supported: oidc, gcp)", provider) + } + default: + return fmt.Errorf("unknown KAFKA_SASL_MECHANISM: %s (supported: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER)", mechanism) + } + + return nil +} diff --git a/apps/workspace-engine/pkg/messaging/confluent/consumer.go b/apps/workspace-engine/pkg/messaging/confluent/consumer.go index f3ff6a9c1..369f61398 100644 --- a/apps/workspace-engine/pkg/messaging/confluent/consumer.go +++ b/apps/workspace-engine/pkg/messaging/confluent/consumer.go @@ -197,6 +197,9 @@ func (c *Consumer) waitForPartitionAssignment(ctx context.Context) ([]int32, err assignedPartitions = remaining log.Info("Partitions after revoke", "partitions", assignedPartitions) + case kafka.OAuthBearerTokenRefresh: + handleOAuthBearerTokenRefresh(c.consumer) + case kafka.Error: log.Error("Received Kafka error while waiting for assignment", "error", e) return nil, fmt.Errorf("consumer error while waiting for assignment: %w", e) @@ -214,32 +217,54 @@ func extractPartitionNumbers(assignment []kafka.TopicPartition) []int32 { return partitions } -// ReadMessage reads the next message with a timeout -// Returns ErrTimeout if no message is available within the timeout duration +// ReadMessage reads the next message with a timeout. +// Uses Poll() internally so we can handle OAuthBearerTokenRefresh events +// that would otherwise be swallowed by the library's ReadMessage(). func (c *Consumer) ReadMessage(timeout time.Duration) (*messaging.Message, error) { if c.closed { return nil, fmt.Errorf("consumer is closed") } - msg, err := c.consumer.ReadMessage(timeout) - if err != nil { - // Check if it's a timeout error - if kafkaErr, ok := err.(kafka.Error); ok { - if kafkaErr.Code() == kafka.ErrTimedOut { + deadline := time.Now().Add(timeout) + for { + remaining := time.Until(deadline) + if remaining <= 0 { + return nil, messaging.ErrTimeout + } + + pollMs := int(remaining.Milliseconds()) + if pollMs < 1 { + pollMs = 1 + } + + ev := c.consumer.Poll(pollMs) + if ev == nil { + return nil, messaging.ErrTimeout + } + + switch e := ev.(type) { + case *kafka.Message: + if e.TopicPartition.Error != nil { + return nil, fmt.Errorf("error reading message: %w", e.TopicPartition.Error) + } + return &messaging.Message{ + Key: e.Key, + Value: e.Value, + Partition: e.TopicPartition.Partition, + Offset: int64(e.TopicPartition.Offset), + Timestamp: e.Timestamp, + }, nil + case kafka.Error: + if e.Code() == kafka.ErrTimedOut { return nil, messaging.ErrTimeout } + return nil, fmt.Errorf("error reading message: %w", e) + case kafka.OAuthBearerTokenRefresh: + handleOAuthBearerTokenRefresh(c.consumer) + default: + continue } - return nil, fmt.Errorf("error reading message: %w", err) } - - // Convert kafka.Message to messaging.Message - return &messaging.Message{ - Key: msg.Key, - Value: msg.Value, - Partition: msg.TopicPartition.Partition, - Offset: int64(msg.TopicPartition.Offset), - Timestamp: msg.Timestamp, - }, nil } // CommitMessage commits the offset for a message diff --git a/apps/workspace-engine/pkg/messaging/confluent/gcp_auth.go b/apps/workspace-engine/pkg/messaging/confluent/gcp_auth.go new file mode 100644 index 000000000..f87fb65ef --- /dev/null +++ b/apps/workspace-engine/pkg/messaging/confluent/gcp_auth.go @@ -0,0 +1,57 @@ +package confluent + +import ( + "context" + "time" + + "workspace-engine/pkg/config" + + "github.com/charmbracelet/log" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "golang.org/x/oauth2/google" +) + +type oauthBearerClient interface { + SetOAuthBearerToken(token kafka.OAuthBearerToken) error + SetOAuthBearerTokenFailure(errstr string) error +} + +func IsGCPProvider() bool { + return config.Global.KafkaSASLEnabled && + config.Global.KafkaSASLMechanism == "OAUTHBEARER" && + config.Global.KafkaSASLOAuthBearerProvider == "gcp" +} + +func handleOAuthBearerTokenRefresh(client oauthBearerClient) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + scope := config.Global.KafkaSASLOAuthBearerScope + if scope == "" { + scope = "https://www.googleapis.com/auth/cloud-platform" + } + + ts, err := google.DefaultTokenSource(ctx, scope) + if err != nil { + log.Error("Failed to create GCP token source", "error", err) + _ = client.SetOAuthBearerTokenFailure(err.Error()) + return + } + + token, err := ts.Token() + if err != nil { + log.Error("Failed to get GCP access token", "error", err) + _ = client.SetOAuthBearerTokenFailure(err.Error()) + return + } + + if err := client.SetOAuthBearerToken(kafka.OAuthBearerToken{ + TokenValue: token.AccessToken, + Expiration: token.Expiry, + }); err != nil { + log.Error("Failed to set OAuthBearer token", "error", err) + return + } + + log.Info("GCP OAuthBearer token refreshed", "expires", token.Expiry.Format(time.RFC3339)) +} diff --git a/apps/workspace-engine/pkg/messaging/confluent/producer.go b/apps/workspace-engine/pkg/messaging/confluent/producer.go index 4067b40ed..9df18e05f 100644 --- a/apps/workspace-engine/pkg/messaging/confluent/producer.go +++ b/apps/workspace-engine/pkg/messaging/confluent/producer.go @@ -39,7 +39,6 @@ func NewProducer(brokers string, topic string, config *kafka.ConfigMap) (*Produc return nil, err } - // Handle delivery reports in the background go func() { for e := range p.Events() { switch ev := e.(type) { @@ -55,6 +54,8 @@ func NewProducer(brokers string, topic string, config *kafka.ConfigMap) (*Produc "partition", ev.TopicPartition.Partition, "offset", ev.TopicPartition.Offset) } + case kafka.OAuthBearerTokenRefresh: + handleOAuthBearerTokenRefresh(p) } } }() diff --git a/apps/workspace-engine/pkg/workspace/jobdispatch/argocd.go b/apps/workspace-engine/pkg/workspace/jobdispatch/argocd.go index 726710e65..c070a7a9f 100644 --- a/apps/workspace-engine/pkg/workspace/jobdispatch/argocd.go +++ b/apps/workspace-engine/pkg/workspace/jobdispatch/argocd.go @@ -22,7 +22,6 @@ import ( "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" "github.com/avast/retry-go" "github.com/charmbracelet/log" - confluentkafka "github.com/confluentinc/confluent-kafka-go/v2/kafka" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -362,13 +361,11 @@ func (d *ArgoCDDispatcher) getKafkaProducer() (messaging.Producer, error) { if d.kafkaProducerFactory != nil { return d.kafkaProducerFactory() } - return confluent.NewConfluent(config.Global.KafkaBrokers).CreateProducer(config.Global.KafkaTopic, &confluentkafka.ConfigMap{ - "bootstrap.servers": config.Global.KafkaBrokers, - "enable.idempotence": true, - "compression.type": "snappy", - "message.send.max.retries": 10, - "retry.backoff.ms": 100, - }) + cfg, err := confluent.BaseProducerConfig() + if err != nil { + return nil, fmt.Errorf("failed to build producer config: %w", err) + } + return confluent.NewConfluent(config.Global.KafkaBrokers).CreateProducer(config.Global.KafkaTopic, cfg) } // sendJobFailureEvent sends a job update event with a failure status and message diff --git a/apps/workspace-engine/pkg/workspace/jobdispatch/terraformcloud.go b/apps/workspace-engine/pkg/workspace/jobdispatch/terraformcloud.go index 41b338679..b4a591ec3 100644 --- a/apps/workspace-engine/pkg/workspace/jobdispatch/terraformcloud.go +++ b/apps/workspace-engine/pkg/workspace/jobdispatch/terraformcloud.go @@ -15,7 +15,6 @@ import ( "workspace-engine/pkg/workspace/releasemanager/verification" "workspace-engine/pkg/workspace/store" - confluentkafka "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/hashicorp/go-tfe" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" @@ -297,13 +296,11 @@ func (d *TerraformCloudDispatcher) createRunVerification(ctx context.Context, re } func (d *TerraformCloudDispatcher) getKafkaProducer() (messaging.Producer, error) { - return confluent.NewConfluent(config.Global.KafkaBrokers).CreateProducer(config.Global.KafkaTopic, &confluentkafka.ConfigMap{ - "bootstrap.servers": config.Global.KafkaBrokers, - "enable.idempotence": true, - "compression.type": "snappy", - "message.send.max.retries": 10, - "retry.backoff.ms": 100, - }) + cfg, err := confluent.BaseProducerConfig() + if err != nil { + return nil, fmt.Errorf("failed to build producer config: %w", err) + } + return confluent.NewConfluent(config.Global.KafkaBrokers).CreateProducer(config.Global.KafkaTopic, cfg) } func (d *TerraformCloudDispatcher) sendJobUpdateEvent(job *oapi.Job, run *tfe.Run, config *oapi.TerraformCloudJobAgentConfig, workspaceName string) error { diff --git a/apps/workspace-engine/pkg/workspace/jobdispatch/testrunner.go b/apps/workspace-engine/pkg/workspace/jobdispatch/testrunner.go index 2d300ac47..a2ad08840 100644 --- a/apps/workspace-engine/pkg/workspace/jobdispatch/testrunner.go +++ b/apps/workspace-engine/pkg/workspace/jobdispatch/testrunner.go @@ -12,7 +12,6 @@ import ( "workspace-engine/pkg/oapi" "workspace-engine/pkg/workspace/store" - confluentkafka "github.com/confluentinc/confluent-kafka-go/v2/kafka" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" @@ -160,13 +159,11 @@ func (d *TestRunnerDispatcher) getKafkaProducer() (messaging.Producer, error) { if d.producerFactory != nil { return d.producerFactory() } - return confluent.NewConfluent(config.Global.KafkaBrokers).CreateProducer(config.Global.KafkaTopic, &confluentkafka.ConfigMap{ - "bootstrap.servers": config.Global.KafkaBrokers, - "enable.idempotence": true, - "compression.type": "snappy", - "message.send.max.retries": 10, - "retry.backoff.ms": 100, - }) + cfg, err := confluent.BaseProducerConfig() + if err != nil { + return nil, fmt.Errorf("failed to build producer config: %w", err) + } + return confluent.NewConfluent(config.Global.KafkaBrokers).CreateProducer(config.Global.KafkaTopic, cfg) } func (d *TestRunnerDispatcher) sendJobUpdateEvent(job *oapi.Job, status oapi.JobStatus, message string) error { diff --git a/packages/events/package.json b/packages/events/package.json index b65ca4578..35db35db2 100644 --- a/packages/events/package.json +++ b/packages/events/package.json @@ -31,6 +31,7 @@ "@ctrlplane/workspace-engine-sdk": "workspace:*", "@t3-oss/env-core": "catalog:", "date-fns": "^4.1.0", + "google-auth-library": "^10.5.0", "kafkajs": "^2.2.4", "lodash": "catalog:", "ts-is-present": "catalog:", diff --git a/packages/events/src/config.ts b/packages/events/src/config.ts index 5025d9da9..4b92f83a3 100644 --- a/packages/events/src/config.ts +++ b/packages/events/src/config.ts @@ -1,9 +1,38 @@ import { createEnv } from "@t3-oss/env-core"; import { z } from "zod"; +const saslMechanisms = [ + "oauthbearer", + "plain", + "scram-sha-256", + "scram-sha-512", +] as const; + export const env = createEnv({ server: { KAFKA_BROKERS: z.string().default("localhost:9092"), + KAFKA_SSL_ENABLED: z + .string() + .default("false") + .transform((v) => v === "true"), + KAFKA_SASL_ENABLED: z + .string() + .default("false") + .transform((v) => v === "true"), + KAFKA_SASL_MECHANISM: z + .string() + .default("oauthbearer") + .transform((v) => v.toLowerCase()) + .pipe(z.enum(saslMechanisms)), + KAFKA_SASL_USERNAME: z.string().optional(), + KAFKA_SASL_PASSWORD: z.string().optional(), + KAFKA_SASL_OAUTHBEARER_TOKEN_URL: z.string().optional(), + KAFKA_SASL_OAUTHBEARER_CLIENT_ID: z.string().optional(), + KAFKA_SASL_OAUTHBEARER_CLIENT_SECRET: z.string().optional(), + KAFKA_SASL_OAUTHBEARER_SCOPE: z.string().optional(), + KAFKA_SASL_OAUTHBEARER_PROVIDER: z + .enum(["oidc", "gcp"]) + .default("oidc"), }, runtimeEnv: process.env, emptyStringAsUndefined: true, @@ -12,3 +41,33 @@ export const env = createEnv({ !!process.env.SKIP_ENV_VALIDATION || process.env.npm_lifecycle_event === "lint", }); + +export function validateSaslConfig(): void { + if (!env.KAFKA_SASL_ENABLED) return; + + const mechanism = env.KAFKA_SASL_MECHANISM; + const missing: string[] = []; + + switch (mechanism) { + case "plain": + case "scram-sha-256": + case "scram-sha-512": + if (!env.KAFKA_SASL_USERNAME) missing.push("KAFKA_SASL_USERNAME"); + if (!env.KAFKA_SASL_PASSWORD) missing.push("KAFKA_SASL_PASSWORD"); + break; + case "oauthbearer": + if ( + env.KAFKA_SASL_OAUTHBEARER_PROVIDER === "oidc" && + !env.KAFKA_SASL_OAUTHBEARER_TOKEN_URL + ) + missing.push("KAFKA_SASL_OAUTHBEARER_TOKEN_URL"); + break; + } + + if (missing.length > 0) + throw new Error( + `KAFKA_SASL_MECHANISM=${mechanism} requires: ${missing.join(", ")}`, + ); +} + +validateSaslConfig(); diff --git a/packages/events/src/kafka/client.ts b/packages/events/src/kafka/client.ts index b2ee7b0fc..c9db877c6 100644 --- a/packages/events/src/kafka/client.ts +++ b/packages/events/src/kafka/client.ts @@ -1,5 +1,10 @@ import type { Span } from "@ctrlplane/logger"; -import type { Producer } from "kafkajs"; +import type { + OauthbearerProviderResponse, + Producer, + SASLOptions, +} from "kafkajs"; +import { GoogleAuth } from "google-auth-library"; import { Kafka } from "kafkajs"; import { logger, SpanStatusCode } from "@ctrlplane/logger"; @@ -10,14 +15,97 @@ import { createSpanWrapper } from "../span.js"; const log = logger.child({ component: "kafka-client" }); +const fetchOIDCToken = async (): Promise => { + const tokenUrl = env.KAFKA_SASL_OAUTHBEARER_TOKEN_URL!; + const body = new URLSearchParams({ grant_type: "client_credentials" }); + + if (env.KAFKA_SASL_OAUTHBEARER_CLIENT_ID) + body.set("client_id", env.KAFKA_SASL_OAUTHBEARER_CLIENT_ID); + if (env.KAFKA_SASL_OAUTHBEARER_CLIENT_SECRET) + body.set("client_secret", env.KAFKA_SASL_OAUTHBEARER_CLIENT_SECRET); + if (env.KAFKA_SASL_OAUTHBEARER_SCOPE) + body.set("scope", env.KAFKA_SASL_OAUTHBEARER_SCOPE); + + const res = await fetch(tokenUrl, { + method: "POST", + headers: { "Content-Type": "application/x-www-form-urlencoded" }, + body, + }); + + if (!res.ok) { + const text = await res.text(); + throw new Error(`OAuth token request failed (${res.status}): ${text}`); + } + + const data = (await res.json()) as { + access_token: string; + expires_in: number; + }; + return { value: data.access_token }; +}; + +const gcpAuth = new GoogleAuth({ + scopes: + env.KAFKA_SASL_OAUTHBEARER_SCOPE ?? + "https://www.googleapis.com/auth/cloud-platform", +}); + +const fetchGCPToken = async (): Promise => { + const client = await gcpAuth.getClient(); + const { token } = await client.getAccessToken(); + if (!token) throw new Error("Failed to get GCP access token via ADC"); + return { value: token }; +}; + +const buildSaslConfig = (): SASLOptions | undefined => { + if (!env.KAFKA_SASL_ENABLED) return undefined; + + const mechanism = env.KAFKA_SASL_MECHANISM; + switch (mechanism) { + case "plain": + return { + mechanism: "plain", + username: env.KAFKA_SASL_USERNAME!, + password: env.KAFKA_SASL_PASSWORD!, + }; + case "scram-sha-256": + return { + mechanism: "scram-sha-256", + username: env.KAFKA_SASL_USERNAME!, + password: env.KAFKA_SASL_PASSWORD!, + }; + case "scram-sha-512": + return { + mechanism: "scram-sha-512", + username: env.KAFKA_SASL_USERNAME!, + password: env.KAFKA_SASL_PASSWORD!, + }; + case "oauthbearer": + return { + mechanism: "oauthbearer", + oauthBearerProvider: + env.KAFKA_SASL_OAUTHBEARER_PROVIDER === "gcp" + ? fetchGCPToken + : fetchOIDCToken, + }; + } +}; + let kafka: Kafka | null = null; let producer: Producer | null = null; -const getKafka = () => - (kafka ??= new Kafka({ +const getKafka = () => { + if (kafka != null) return kafka; + + const sasl = buildSaslConfig(); + kafka = new Kafka({ clientId: "ctrlplane-events", brokers: env.KAFKA_BROKERS.split(","), - })); + ssl: env.KAFKA_SSL_ENABLED, + ...(sasl != null ? { sasl } : {}), + }); + return kafka; +}; const getProducer = async () => { if (producer == null) { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4e3d48d30..3c6d91deb 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -167,7 +167,7 @@ importers: version: 2.4.3 better-auth: specifier: ^1.4.6 - version: 1.4.6(next@15.2.4(@opentelemetry/api@1.9.0)(@playwright/test@1.53.2)(react-dom@19.2.1(react@19.2.1))(react@19.2.1))(react-dom@19.2.1(react@19.2.1))(react@19.2.1) + version: 1.4.6(next@15.2.4(@babel/core@7.28.4)(@opentelemetry/api@1.9.0)(@playwright/test@1.53.2)(react-dom@19.2.1(react@19.2.1))(react@19.2.1))(react-dom@19.2.1(react@19.2.1))(react@19.2.1) connect-rid: specifier: ^0.0.1 version: 0.0.1 @@ -624,7 +624,7 @@ importers: version: 0.11.1(typescript@5.9.3)(zod@3.24.2) better-auth: specifier: ^1.4.6 - version: 1.4.6(next@15.2.4(@opentelemetry/api@1.9.0)(@playwright/test@1.53.2)(react-dom@19.2.1(react@19.2.1))(react@19.2.1))(react-dom@19.2.1(react@19.2.1))(react@19.2.1) + version: 1.4.6(next@15.2.4(@babel/core@7.28.4)(@opentelemetry/api@1.9.0)(@playwright/test@1.53.2)(react-dom@19.2.1(react@19.2.1))(react@19.2.1))(react-dom@19.2.1(react@19.2.1))(react@19.2.1) lodash: specifier: 'catalog:' version: 4.17.21 @@ -826,6 +826,9 @@ importers: date-fns: specifier: ^4.1.0 version: 4.1.0 + google-auth-library: + specifier: ^10.5.0 + version: 10.5.0 kafkajs: specifier: ^2.2.4 version: 2.2.4 @@ -975,7 +978,7 @@ importers: version: 11.0.0-rc.364 better-auth: specifier: ^1.4.6 - version: 1.4.6(next@15.2.4(@opentelemetry/api@1.9.0)(@playwright/test@1.53.2)(react-dom@19.2.1(react@19.2.1))(react@19.2.1))(react-dom@19.2.1(react@19.2.1))(react@19.2.1) + version: 1.4.6(next@15.2.4(@babel/core@7.28.4)(@opentelemetry/api@1.9.0)(@playwright/test@1.53.2)(react-dom@19.2.1(react@19.2.1))(react@19.2.1))(react-dom@19.2.1(react@19.2.1))(react@19.2.1) lodash: specifier: 'catalog:' version: 4.17.21 @@ -4993,6 +4996,9 @@ packages: resolution: {integrity: sha512-3yVdyZhklTiNrtg+4WqHpJpFDd+WHTg2oM7UcR80GqL05AOV0xEJzc6qNvFYoEtE+hRp1n9MpN6/+4yhlGkDXQ==} engines: {node: 20.x || 22.x || 23.x || 24.x} + bignumber.js@9.3.1: + resolution: {integrity: sha512-Ko0uX15oIUS7wJ3Rb30Fs6SkVbLmPBAKdlm7q9+ak9bbIeFf0MwuBsQV6z7+X768/cHsfg+WlysDWJcmthjsjQ==} + binary-extensions@2.3.0: resolution: {integrity: sha512-Ceh+7ox5qe7LJuLHoY0feh3pHuUDHAcRUeyL2VYghZwfpkNIy/+8Ocg0a3UuSoYzavmylwuLWQOf3hl0jjMMIw==} engines: {node: '>=8'} @@ -5026,6 +5032,9 @@ packages: engines: {node: ^6 || ^7 || ^8 || ^9 || ^10 || ^11 || ^12 || >=13.7} hasBin: true + buffer-equal-constant-time@1.0.1: + resolution: {integrity: sha512-zRpUiDwd/xk6ADqPMATG8vc9VPrkck7T07OIx0gnjmJAnHnTVXNQG3vfvWNuiZIkwu9KrKdA1iJKfsfTVxE6NA==} + buffer-from@1.1.2: resolution: {integrity: sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==} @@ -5436,6 +5445,10 @@ packages: damerau-levenshtein@1.0.8: resolution: {integrity: sha512-sdQSFB7+llfUcQHUQO3+B8ERRj0Oa4w9POWMI/puGtuf7gFywGmkaLCElnudfTiKZV+NvHqL0ifzdrI8Ro7ESA==} + data-uri-to-buffer@4.0.1: + resolution: {integrity: sha512-0R9ikRb668HB7QDxT1vkpuUBtqc53YyAwMwGeUFKRojY/NWKvdZ+9UYtRfGmhqNbRkTSVpMbmyhXipFFv2cb/A==} + engines: {node: '>= 12'} + data-uri-to-buffer@6.0.2: resolution: {integrity: sha512-7hvf7/GW8e86rW0ptuwS3OcBGDjIi6SZva7hCyWC0yYry2cOPmLIjXAUHI6DK2HsnwJd9ifmt57i8eV2n4YNpw==} engines: {node: '>= 14'} @@ -5770,6 +5783,9 @@ packages: easy-table@1.1.0: resolution: {integrity: sha512-oq33hWOSSnl2Hoh00tZWaIPi1ievrD9aFG82/IgjlycAnW9hHx5PkJiXpxPsgEE+H7BsbVQXFVFST8TEXS6/pA==} + ecdsa-sig-formatter@1.0.11: + resolution: {integrity: sha512-nagl3RYrbNv6kQkeJIpt6NJZy8twLB/2vtz6yN9Z4vRKHN4/QZJIEbqohALSgwKdnksuY3k5Addp5lg8sVoVcQ==} + ee-first@1.1.1: resolution: {integrity: sha512-WMwm9LhRUo+WUaRN+vRuETqG89IgZphVSNkdFgeb6sS/E4OrDIN7t48CAewSHXc6C8lefD8KKfr5vY61brQlow==} @@ -6114,6 +6130,9 @@ packages: exsolve@1.0.8: resolution: {integrity: sha512-LmDxfWXwcTArk8fUEnOfSZpHOJ6zOMUJKOtFLFqJLoKJetuQG874Uc7/Kki7zFLzYybmZhp1M7+98pfMqeX8yA==} + extend@3.0.2: + resolution: {integrity: sha512-fjquC59cD7CyW6urNXK0FBufkZcoiGG80wTuPujX590cB5Ttln20E2UB4S/WARVqhXffZl2LNgS+gQdPIIim/g==} + external-editor@3.1.0: resolution: {integrity: sha512-hMQ4CX1p1izmuLYyZqLMO/qGNw10wSv9QDCPfzXfyFrOaCSSoRfqE1Kf1s5an66J5JZC62NewG+mK49jOCtQew==} engines: {node: '>=4'} @@ -6172,6 +6191,10 @@ packages: fecha@4.2.3: resolution: {integrity: sha512-OP2IUU6HeYKJi3i0z4A19kHMQoLVs4Hc+DPqqxI2h/DPZHTm/vjsfC6P0b4jCMy14XizLBqvndQ+UilD7707Jw==} + fetch-blob@3.2.0: + resolution: {integrity: sha512-7yAQpD2UMJzLi1Dqv7qFYnPbaPx7ZfFK6PiIxQ4PfkGPyNyl2Ugx+a/umUonmKqjhM4DnfbMvdX6otXq83soQQ==} + engines: {node: ^12.20 || >= 14.13} + fflate@0.8.2: resolution: {integrity: sha512-cPJU47OaAoCbg0pBvzsgpTPhmhqI5eJjh/JIu8tPj5q+T7iLvW/JAYUqmE7KOB4R1ZyEhzBaIQpQpardBF5z8A==} @@ -6243,6 +6266,10 @@ packages: resolution: {integrity: sha512-8RipRLol37bNs2bhoV67fiTEvdTrbMUYcFTiy3+wuuOnUog2QBHCZWXDRijWQfAkhBj2Uf5UnVaiWwA5vdd82w==} engines: {node: '>= 6'} + formdata-polyfill@4.0.10: + resolution: {integrity: sha512-buewHzMvYL29jdeQTVILecSaZKnt/RJWjoZCF5OW60Z67/GmSLBkOFM7qh1PI3zFNtJbaZL5eQu1vLfazOwj4g==} + engines: {node: '>=12.20.0'} + forwarded@0.2.0: resolution: {integrity: sha512-buRG0fpBtRHSTCOASe6hD258tEubFoRLb4ZNA6NxMVHNw2gOcwHo9wyablzMzOA5z9xA9L1KNjk/Nt6MT9aYow==} engines: {node: '>= 0.6'} @@ -6296,6 +6323,14 @@ packages: functions-have-names@1.2.3: resolution: {integrity: sha512-xckBUXyTIqT97tq2x2AMb+g163b5JFysYk0x4qxNFwbfQkmNZoiRHb6sPzI9/QV33WeuvVYBUIiD4NzNIyqaRQ==} + gaxios@7.1.3: + resolution: {integrity: sha512-YGGyuEdVIjqxkxVH1pUTMY/XtmmsApXrCVv5EU25iX6inEPbV+VakJfLealkBtJN69AQmh1eGOdCl9Sm1UP6XQ==} + engines: {node: '>=18'} + + gcp-metadata@8.1.2: + resolution: {integrity: sha512-zV/5HKTfCeKWnxG0Dmrw51hEWFGfcF2xiXqcA3+J90WDuP0SvoiSO5ORvcBsifmx/FoIjgQN3oNOGaQ5PhLFkg==} + engines: {node: '>=18'} + gel@2.0.2: resolution: {integrity: sha512-XTKpfNR9HZOw+k0Bl04nETZjuP5pypVAXsZADSdwr3EtyygTTe1RqvftU2FjGu7Tp9e576a9b/iIOxWrRBxMiQ==} engines: {node: '>= 18.0.0'} @@ -6399,6 +6434,14 @@ packages: globrex@0.1.2: resolution: {integrity: sha512-uHJgbwAMwNFf5mLst7IWLNg14x1CkeqglJb/K3doi4dw6q2IvAAmM/Y81kevy83wP+Sst+nutFTYOGg3d1lsxg==} + google-auth-library@10.5.0: + resolution: {integrity: sha512-7ABviyMOlX5hIVD60YOfHw4/CxOfBhyduaYB+wbFWCWoni4N7SLcV46hrVRktuBbZjFC9ONyqamZITN7q3n32w==} + engines: {node: '>=18'} + + google-logging-utils@1.1.3: + resolution: {integrity: sha512-eAmLkjDjAFCVXg7A1unxHsLf961m6y17QFqXqAXGj/gVkKFrEICfStRfwUlGNfeCEjNRa32JEWOUTlYXPyyKvA==} + engines: {node: '>=14'} + gopd@1.0.1: resolution: {integrity: sha512-d65bNlIadxvpb/A2abVdlqKqV563juRnZ1Wtk6s1sIR8uNsXR70xqIzVqxVf1eTqDunwT2MkczEeaezCKTZhwA==} @@ -6416,6 +6459,10 @@ packages: graphemer@1.4.0: resolution: {integrity: sha512-EtKwoO6kxCL9WO5xipiHTZlSzBm7WLT627TqC/uVRd0HKmq8NXyebnNYxDoBi7wt8eTWrUrKXCOVaFq9x1kgag==} + gtoken@8.0.0: + resolution: {integrity: sha512-+CqsMbHPiSTdtSO14O51eMNlrp9N79gmeqmXeouJOhfucAedHw9noVe/n5uJk3tbKE6a+6ZCQg3RPhVhHByAIw==} + engines: {node: '>=18'} + handlebars@4.7.8: resolution: {integrity: sha512-vafaFqs8MZkRrSX7sFVUdo3ap/eNiLnb4IakshzvP56X5Nr1iGKAIqdX6tMlm6HcNRIkr6AxO5jFEoJzzpT8aQ==} engines: {node: '>=0.4.7'} @@ -6863,6 +6910,9 @@ packages: engines: {node: '>=6'} hasBin: true + json-bigint@1.0.0: + resolution: {integrity: sha512-SiPv/8VpZuWbvLSMtTDU8hEfrZWg/mH/nV/b4o0CYbSxu1UIQPLdwKOCIyLQX+VIPO5vrLX3i8qtqFyhdPSUSQ==} + json-buffer@3.0.1: resolution: {integrity: sha512-4bV5BfR2mqfQTJm+V5tPPdf+ZpuhiIvTuAB5g8kcrXOZpTT/QwwVRWBywX1ozr6lEuPdbHxwaJlm9G6mI2sfSQ==} @@ -6895,6 +6945,12 @@ packages: resolution: {integrity: sha512-ZZow9HBI5O6EPgSJLUb8n2NKgmVWTwCvHGwFuJlMjvLFqlGG6pjirPhtdsseaLZjSibD8eegzmYpUZwoIlj2cQ==} engines: {node: '>=4.0'} + jwa@2.0.1: + resolution: {integrity: sha512-hRF04fqJIP8Abbkq5NKGN0Bbr3JxlQ+qhZufXVr0DvujKy93ZCbXZMHDL4EOtodSbCWxOqR8MS1tXA5hwqCXDg==} + + jws@4.0.1: + resolution: {integrity: sha512-EKI/M/yqPncGUUh44xz0PxSidXFr/+r0pA70+gIYhjv+et7yxM+s29Y+VGDkovRofQem0fs7Uvf4+YmAdyRduA==} + kafkajs@2.2.4: resolution: {integrity: sha512-j/YeapB1vfPT2iOIUn/vxdyKEuhuY2PxMBvf5JWux6iSaukAccrMtXEY/Lb7OvavDhOWME589bpLrEdnVHjfjA==} engines: {node: '>=14.0.0'} @@ -7315,7 +7371,6 @@ packages: next@15.2.4: resolution: {integrity: sha512-VwL+LAaPSxEkd3lU2xWbgEOtrM8oedmyhBqaVNmgKB+GvZlCy9rgaEc+y2on0wv+l0oSFqLtYD6dcC1eAedUaQ==} engines: {node: ^18.18.0 || ^19.8.0 || >= 20.0.0} - deprecated: This version has a security vulnerability. Please upgrade to a patched version. See https://nextjs.org/blog/CVE-2025-66478 for more details. hasBin: true peerDependencies: '@opentelemetry/api': ^1.1.0 @@ -7341,6 +7396,11 @@ packages: resolution: {integrity: sha512-zsFhmbkAzwhTft6nd3VxcG0cvJsT70rL+BIGHWVq5fi6MwGrHwzqKaxXE+Hl2GmnGItnDKPPkO5/LQqjVkIdFg==} engines: {node: '>=10'} + node-domexception@1.0.0: + resolution: {integrity: sha512-/jKZoMpw0F8GRwl4/eLROPA3cfcXtLApP0QzLmUT/HuPCZWyB7IY9ZrMeKw2O/nFIqPQB3PVM9aYm0F312AXDQ==} + engines: {node: '>=10.5.0'} + deprecated: Use your platform's native DOMException instead + node-fetch@2.7.0: resolution: {integrity: sha512-c4FRfUm/dbcWZ7U+1Wq0AwCyFL+3nt2bEw05wfxSz+DWpWsitgmSgYmy2dQdWyKC1694ELPqMs/YzUSNozLt8A==} engines: {node: 4.x || >=6.0.0} @@ -7350,6 +7410,10 @@ packages: encoding: optional: true + node-fetch@3.3.2: + resolution: {integrity: sha512-dRB78srN/l6gqWulah9SrxeYnxeddIG30+GOqK/9OlLVyLg3HPnr6SqOWTWOXKRwC2eGYCkZ59NNuSgvSrpgOA==} + engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} + node-plop@0.26.3: resolution: {integrity: sha512-Cov028YhBZ5aB7MdMWJEmwyBig43aGL5WT4vdoB28Oitau1zZAcHUn8Sgfk9HM33TqhtLJ9PlM/O0Mv+QpV/4Q==} engines: {node: '>=8.9.4'} @@ -7808,6 +7872,7 @@ packages: prebuild-install@7.1.3: resolution: {integrity: sha512-8Mf2cbV7x1cXPUILADGI3wuhfqWvtiLA1iclTDbFRZkgRQS0NqsPZphna9V+HyTEadheuPmjaJMsbzKQFOzLug==} engines: {node: '>=10'} + deprecated: No longer maintained. Please contact the author of the relevant native addon; alternatives are available. hasBin: true prelude-ls@1.2.1: @@ -8193,6 +8258,10 @@ packages: deprecated: Rimraf versions prior to v4 are no longer supported hasBin: true + rimraf@5.0.10: + resolution: {integrity: sha512-l0OE8wL34P4nJH/H2ffoaniAokM2qSmrtXHmlpvYr5AVVX8msAyW0l8NVJFDxlSK4u3Uh/f41cQheDVdnYijwQ==} + hasBin: true + rollup@4.44.2: resolution: {integrity: sha512-PVoapzTwSEcelaWGth3uR66u7ZRo6qhPHc0f2uRO9fX6XDVNrIiGYS0Pj9+R8yIIYSD/mCx2b16Ws9itljKSPg==} engines: {node: '>=18.0.0', npm: '>=8.0.0'} @@ -9215,6 +9284,10 @@ packages: wcwidth@1.0.1: resolution: {integrity: sha512-XHPEwS0q6TaxcvG85+8EYkbiCux2XtWG2mkc47Ng2A77BQu9+DqIOJldST4HgPkuea7dvKSj5VgX3P1d4rW8Tg==} + web-streams-polyfill@3.3.3: + resolution: {integrity: sha512-d2JWLCivmZYTSIoge9MsgFCZrt571BikcWGYkjC1khllbTeDlGqZ2D8vD8E/lJa8WGWbb7Plm8/XJYV7IJHZZw==} + engines: {node: '>= 8'} + webidl-conversions@3.0.1: resolution: {integrity: sha512-2JAn3z8AR6rjK8Sm8orRC0h/bcl/DqL7tRPdGZ4I1CjdF+EaMLmYxBHyXuKL849eucPFhvBoxMsflfOb8kxaeQ==} @@ -12987,26 +13060,6 @@ snapshots: react: 19.2.1 react-dom: 19.2.1(react@19.2.1) - better-auth@1.4.6(next@15.2.4(@opentelemetry/api@1.9.0)(@playwright/test@1.53.2)(react-dom@19.2.1(react@19.2.1))(react@19.2.1))(react-dom@19.2.1(react@19.2.1))(react@19.2.1): - dependencies: - '@better-auth/core': 1.4.6(@better-auth/utils@0.3.0)(@better-fetch/fetch@1.1.18)(better-call@1.1.5(zod@3.24.2))(jose@6.1.0)(kysely@0.28.8)(nanostores@1.0.1) - '@better-auth/telemetry': 1.4.6(@better-auth/core@1.4.6(@better-auth/utils@0.3.0)(@better-fetch/fetch@1.1.18)(better-call@1.1.5(zod@3.24.2))(jose@6.1.0)(kysely@0.28.8)(nanostores@1.0.1)) - '@better-auth/utils': 0.3.0 - '@better-fetch/fetch': 1.1.18 - '@noble/ciphers': 2.0.1 - '@noble/hashes': 2.0.1 - better-call: 1.1.5(zod@4.1.12) - defu: 6.1.4 - jose: 6.1.0 - kysely: 0.28.8 - ms: 4.0.0-nightly.202508271359 - nanostores: 1.0.1 - zod: 4.1.12 - optionalDependencies: - next: 15.2.4(@opentelemetry/api@1.9.0)(@playwright/test@1.53.2)(react-dom@19.2.1(react@19.2.1))(react@19.2.1) - react: 19.2.1 - react-dom: 19.2.1(react@19.2.1) - better-call@1.1.5(zod@4.1.12): dependencies: '@better-auth/utils': 0.3.0 @@ -13022,6 +13075,8 @@ snapshots: prebuild-install: 7.1.3 optional: true + bignumber.js@9.3.1: {} + binary-extensions@2.3.0: {} bindings@1.5.0: @@ -13086,6 +13141,8 @@ snapshots: node-releases: 2.0.19 update-browserslist-db: 1.1.3(browserslist@4.25.1) + buffer-equal-constant-time@1.0.1: {} + buffer-from@1.1.2: {} buffer@5.7.1: @@ -13519,6 +13576,8 @@ snapshots: damerau-levenshtein@1.0.8: {} + data-uri-to-buffer@4.0.1: {} + data-uri-to-buffer@6.0.2: {} data-urls@5.0.0: @@ -13757,6 +13816,10 @@ snapshots: optionalDependencies: wcwidth: 1.0.1 + ecdsa-sig-formatter@1.0.11: + dependencies: + safe-buffer: 5.2.1 + ee-first@1.1.1: {} electron-to-chromium@1.5.180: {} @@ -14485,6 +14548,8 @@ snapshots: exsolve@1.0.8: {} + extend@3.0.2: {} + external-editor@3.1.0: dependencies: chardet: 0.7.0 @@ -14537,6 +14602,11 @@ snapshots: fecha@4.2.3: {} + fetch-blob@3.2.0: + dependencies: + node-domexception: 1.0.0 + web-streams-polyfill: 3.3.3 + fflate@0.8.2: {} figures@3.2.0: @@ -14629,6 +14699,10 @@ snapshots: hasown: 2.0.2 mime-types: 2.1.35 + formdata-polyfill@4.0.10: + dependencies: + fetch-blob: 3.2.0 + forwarded@0.2.0: {} fraction.js@4.3.7: {} @@ -14680,6 +14754,23 @@ snapshots: functions-have-names@1.2.3: {} + gaxios@7.1.3: + dependencies: + extend: 3.0.2 + https-proxy-agent: 7.0.6(supports-color@10.2.2) + node-fetch: 3.3.2 + rimraf: 5.0.10 + transitivePeerDependencies: + - supports-color + + gcp-metadata@8.1.2: + dependencies: + gaxios: 7.1.3 + google-logging-utils: 1.1.3 + json-bigint: 1.0.0 + transitivePeerDependencies: + - supports-color + gel@2.0.2: dependencies: '@petamoriken/float16': 3.9.2 @@ -14821,6 +14912,20 @@ snapshots: globrex@0.1.2: {} + google-auth-library@10.5.0: + dependencies: + base64-js: 1.5.1 + ecdsa-sig-formatter: 1.0.11 + gaxios: 7.1.3 + gcp-metadata: 8.1.2 + google-logging-utils: 1.1.3 + gtoken: 8.0.0 + jws: 4.0.1 + transitivePeerDependencies: + - supports-color + + google-logging-utils@1.1.3: {} + gopd@1.0.1: dependencies: get-intrinsic: 1.2.4 @@ -14836,6 +14941,13 @@ snapshots: graphemer@1.4.0: {} + gtoken@8.0.0: + dependencies: + gaxios: 7.1.3 + jws: 4.0.1 + transitivePeerDependencies: + - supports-color + handlebars@4.7.8: dependencies: minimist: 1.2.8 @@ -15309,6 +15421,10 @@ snapshots: jsesc@3.1.0: {} + json-bigint@1.0.0: + dependencies: + bignumber.js: 9.3.1 + json-buffer@3.0.1: {} json-schema-traverse@0.4.1: {} @@ -15338,6 +15454,17 @@ snapshots: object.assign: 4.1.5 object.values: 1.2.1 + jwa@2.0.1: + dependencies: + buffer-equal-constant-time: 1.0.1 + ecdsa-sig-formatter: 1.0.11 + safe-buffer: 5.2.1 + + jws@4.0.1: + dependencies: + jwa: 2.0.1 + safe-buffer: 5.2.1 + kafkajs@2.2.4: {} keyv@4.5.4: @@ -15723,34 +15850,6 @@ snapshots: - babel-plugin-macros optional: true - next@15.2.4(@opentelemetry/api@1.9.0)(@playwright/test@1.53.2)(react-dom@19.2.1(react@19.2.1))(react@19.2.1): - dependencies: - '@next/env': 15.2.4 - '@swc/counter': 0.1.3 - '@swc/helpers': 0.5.15 - busboy: 1.6.0 - caniuse-lite: 1.0.30001760 - postcss: 8.4.31 - react: 19.2.1 - react-dom: 19.2.1(react@19.2.1) - styled-jsx: 5.1.6(@babel/core@7.24.5)(react@19.2.1) - optionalDependencies: - '@next/swc-darwin-arm64': 15.2.4 - '@next/swc-darwin-x64': 15.2.4 - '@next/swc-linux-arm64-gnu': 15.2.4 - '@next/swc-linux-arm64-musl': 15.2.4 - '@next/swc-linux-x64-gnu': 15.2.4 - '@next/swc-linux-x64-musl': 15.2.4 - '@next/swc-win32-arm64-msvc': 15.2.4 - '@next/swc-win32-x64-msvc': 15.2.4 - '@opentelemetry/api': 1.9.0 - '@playwright/test': 1.53.2 - sharp: 0.33.5 - transitivePeerDependencies: - - '@babel/core' - - babel-plugin-macros - optional: true - no-case@2.3.2: dependencies: lower-case: 1.1.4 @@ -15760,10 +15859,18 @@ snapshots: semver: 7.7.3 optional: true + node-domexception@1.0.0: {} + node-fetch@2.7.0: dependencies: whatwg-url: 5.0.0 + node-fetch@3.3.2: + dependencies: + data-uri-to-buffer: 4.0.1 + fetch-blob: 3.2.0 + formdata-polyfill: 4.0.10 + node-plop@0.26.3: dependencies: '@babel/runtime-corejs3': 7.28.4 @@ -16622,6 +16729,10 @@ snapshots: dependencies: glob: 7.2.3 + rimraf@5.0.10: + dependencies: + glob: 10.5.0 + rollup@4.44.2: dependencies: '@types/estree': 1.0.8 @@ -17839,6 +17950,8 @@ snapshots: dependencies: defaults: 1.0.4 + web-streams-polyfill@3.3.3: {} + webidl-conversions@3.0.1: {} webidl-conversions@4.0.2: {}