Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion apps/workspace-engine-router/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module workspace-engine-router

go 1.23
go 1.24.0

require (
github.com/charmbracelet/log v0.4.0
Expand All @@ -14,6 +14,7 @@ require (
)

require (
cloud.google.com/go/compute/metadata v0.3.0 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/bytedance/sonic v1.11.6 // indirect
github.com/bytedance/sonic/loader v0.1.1 // indirect
Expand Down Expand Up @@ -52,6 +53,7 @@ require (
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/oauth2 v0.35.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237 // indirect
Expand Down
5 changes: 5 additions & 0 deletions apps/workspace-engine-router/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
cloud.google.com/go/compute v1.25.1 h1:ZRpHJedLtTpKgr3RV1Fx23NuaAEN1Zfx9hw1u4aJdjU=
cloud.google.com/go/compute/metadata v0.3.0 h1:Tz+eQXMEqDIKRsmY3cHTL6FVaynIjX2QxYC4trgAKZc=
cloud.google.com/go/compute/metadata v0.3.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k=
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU=
Expand Down Expand Up @@ -408,6 +411,8 @@ golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
golang.org/x/oauth2 v0.18.0 h1:09qnuIAgzdx1XplqJvW6CQqMCtGZykZWcXzPMPUusvI=
golang.org/x/oauth2 v0.18.0/go.mod h1:Wf7knwG0MPoWIMMBgFlEaSUDaKskp0dCfrlJRJXbBi8=
golang.org/x/oauth2 v0.35.0 h1:Mv2mzuHuZuY2+bkyWXIHMfhNdJAdwW3FuWeCPYN5GVQ=
golang.org/x/oauth2 v0.35.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
15 changes: 13 additions & 2 deletions apps/workspace-engine-router/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,19 @@ type Config struct {
ManagementPort int `envconfig:"MANAGEMENT_PORT" default:"9091"`

// Kafka configuration
KafkaBrokers string `envconfig:"KAFKA_BROKERS" default:"localhost:9092"`
KafkaTopic string `envconfig:"KAFKA_TOPIC" default:"workspace-events"`
KafkaBrokers string `envconfig:"KAFKA_BROKERS" default:"localhost:9092"`
KafkaTopic string `envconfig:"KAFKA_TOPIC" default:"workspace-events"`
KafkaSASLEnabled bool `envconfig:"KAFKA_SASL_ENABLED" default:"false"`
KafkaSecurityProtocol string `envconfig:"KAFKA_SECURITY_PROTOCOL" default:"SASL_SSL"`
KafkaSASLMechanism string `envconfig:"KAFKA_SASL_MECHANISM" default:"OAUTHBEARER"`
KafkaSASLUsername string `envconfig:"KAFKA_SASL_USERNAME" default:""`
KafkaSASLPassword string `envconfig:"KAFKA_SASL_PASSWORD" default:""`
KafkaSASLOAuthBearerMethod string `envconfig:"KAFKA_SASL_OAUTHBEARER_METHOD" default:"oidc"`
KafkaSASLOAuthBearerTokenURL string `envconfig:"KAFKA_SASL_OAUTHBEARER_TOKEN_URL" default:""`
KafkaSASLOAuthBearerClientID string `envconfig:"KAFKA_SASL_OAUTHBEARER_CLIENT_ID" default:""`
KafkaSASLOAuthBearerClientSecret string `envconfig:"KAFKA_SASL_OAUTHBEARER_CLIENT_SECRET" default:""`
KafkaSASLOAuthBearerScope string `envconfig:"KAFKA_SASL_OAUTHBEARER_SCOPE" default:""`
KafkaSASLOAuthBearerProvider string `envconfig:"KAFKA_SASL_OAUTHBEARER_PROVIDER" default:"oidc"`

// Worker health configuration
WorkerHeartbeatTimeout time.Duration `envconfig:"WORKER_HEARTBEAT_TIMEOUT" default:"30s"`
Expand Down
69 changes: 69 additions & 0 deletions apps/workspace-engine-router/pkg/kafka/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package kafka

import (
"fmt"
"strings"
"workspace-engine-router/pkg/config"

"github.com/charmbracelet/log"
kafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func BaseConfig() (*kafka.ConfigMap, error) {
cfg := &kafka.ConfigMap{
"bootstrap.servers": config.Global.KafkaBrokers,
}
if err := applySASL(cfg); err != nil {
return nil, fmt.Errorf("invalid Kafka SASL configuration: %w", err)
}
return cfg, nil
}

func applySASL(cfg *kafka.ConfigMap) error {
if !config.Global.KafkaSASLEnabled {
return nil
}

mechanism := config.Global.KafkaSASLMechanism
log.Info("Enabling SASL for Kafka", "mechanism", mechanism)

_ = cfg.SetKey("security.protocol", config.Global.KafkaSecurityProtocol)
_ = cfg.SetKey("sasl.mechanisms", mechanism)

switch mechanism {
case "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512":
var missing []string
if config.Global.KafkaSASLUsername == "" {
missing = append(missing, "KAFKA_SASL_USERNAME")
}
if config.Global.KafkaSASLPassword == "" {
missing = append(missing, "KAFKA_SASL_PASSWORD")
}
if len(missing) > 0 {
return fmt.Errorf("KAFKA_SASL_MECHANISM=%s requires: %s", mechanism, strings.Join(missing, ", "))
}
_ = cfg.SetKey("sasl.username", config.Global.KafkaSASLUsername)
_ = cfg.SetKey("sasl.password", config.Global.KafkaSASLPassword)
case "OAUTHBEARER":
provider := config.Global.KafkaSASLOAuthBearerProvider
switch provider {
case "gcp":
log.Info("Using GCP ADC for OAUTHBEARER tokens")
case "oidc":
if config.Global.KafkaSASLOAuthBearerTokenURL == "" {
return fmt.Errorf("KAFKA_SASL_MECHANISM=OAUTHBEARER with provider=oidc requires: KAFKA_SASL_OAUTHBEARER_TOKEN_URL")
}
_ = cfg.SetKey("sasl.oauthbearer.method", config.Global.KafkaSASLOAuthBearerMethod)
_ = cfg.SetKey("sasl.oauthbearer.client.id", config.Global.KafkaSASLOAuthBearerClientID)
_ = cfg.SetKey("sasl.oauthbearer.client.secret", config.Global.KafkaSASLOAuthBearerClientSecret)
_ = cfg.SetKey("sasl.oauthbearer.token.endpoint.url", config.Global.KafkaSASLOAuthBearerTokenURL)
_ = cfg.SetKey("sasl.oauthbearer.scope", config.Global.KafkaSASLOAuthBearerScope)
default:
return fmt.Errorf("unknown KAFKA_SASL_OAUTHBEARER_PROVIDER: %s (supported: oidc, gcp)", provider)
}
default:
return fmt.Errorf("unknown KAFKA_SASL_MECHANISM: %s (supported: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER)", mechanism)
}

return nil
}
50 changes: 50 additions & 0 deletions apps/workspace-engine-router/pkg/kafka/gcp_auth.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package kafka

import (
"context"
"time"

"workspace-engine-router/pkg/config"

"github.com/charmbracelet/log"
confluentkafka "github.com/confluentinc/confluent-kafka-go/v2/kafka"
"golang.org/x/oauth2/google"
)

func IsGCPProvider() bool {
return config.Global.KafkaSASLEnabled &&
config.Global.KafkaSASLMechanism == "OAUTHBEARER" &&
config.Global.KafkaSASLOAuthBearerProvider == "gcp"
}

func setGCPToken(client interface {
SetOAuthBearerToken(token confluentkafka.OAuthBearerToken) error
}) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

scope := config.Global.KafkaSASLOAuthBearerScope
if scope == "" {
scope = "https://www.googleapis.com/auth/cloud-platform"
}

ts, err := google.DefaultTokenSource(ctx, scope)
if err != nil {
return err
}

token, err := ts.Token()
if err != nil {
return err
}

if err := client.SetOAuthBearerToken(confluentkafka.OAuthBearerToken{
TokenValue: token.AccessToken,
Expiration: token.Expiry,
}); err != nil {
return err
}

log.Info("GCP OAuthBearer token set for admin client", "expires", token.Expiry.Format(time.RFC3339))
return nil
}
14 changes: 11 additions & 3 deletions apps/workspace-engine-router/pkg/kafka/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,22 @@ func (pc *PartitionCounter) GetPartitionCount() (int32, error) {
// queryPartitionCount queries Kafka for the number of partitions
func (pc *PartitionCounter) queryPartitionCount() (int32, error) {
// Create an AdminClient to query metadata
adminClient, err := kafka.NewAdminClient(&kafka.ConfigMap{
"bootstrap.servers": pc.brokers,
})
cfg, err := BaseConfig()
if err != nil {
return 0, fmt.Errorf("failed to build kafka config: %w", err)
}
adminClient, err := kafka.NewAdminClient(cfg)
if err != nil {
return 0, fmt.Errorf("failed to create admin client: %w", err)
}
defer adminClient.Close()

if IsGCPProvider() {
if err := setGCPToken(adminClient); err != nil {
return 0, fmt.Errorf("failed to set GCP token on admin client: %w", err)
}
}

// Get metadata for the topic
metadata, err := adminClient.GetMetadata(&pc.topic, false, 5000)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions apps/workspace-engine/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,15 @@ func main() {
defer cancel()

// Initialize shared Kafka producer used by the ticker and all job agents
producer, err := kafka.NewProducer(kafka.Brokers)
producer, err := kafka.NewProducer()
if err != nil {
log.Fatal("Failed to create Kafka producer", "error", err)
panic(err)
}
messaging.InitProducer(producer)
defer messaging.CloseProducer()

consumer, err := kafka.NewConsumer(kafka.Brokers, kafka.Topic)
consumer, err := kafka.NewConsumer(kafka.Topic)
if err != nil {
log.Fatal("Failed to create Kafka consumer", "error", err)
panic(err)
Expand Down
11 changes: 11 additions & 0 deletions apps/workspace-engine/pkg/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ type Config struct {
KafkaGroupID string `envconfig:"KAFKA_GROUP_ID" default:"workspace-engine"`
KafkaTopic string `envconfig:"KAFKA_TOPIC" default:"workspace-events"`
KafkaConsumerTopic string `envconfig:"KAFKA_CONSUMER_TOPIC" default:"workspace-events"`
KafkaSASLEnabled bool `envconfig:"KAFKA_SASL_ENABLED" default:"false"`
KafkaSecurityProtocol string `envconfig:"KAFKA_SECURITY_PROTOCOL" default:"SASL_SSL"`
KafkaSASLMechanism string `envconfig:"KAFKA_SASL_MECHANISM" default:"OAUTHBEARER"`
KafkaSASLUsername string `envconfig:"KAFKA_SASL_USERNAME" default:""`
KafkaSASLPassword string `envconfig:"KAFKA_SASL_PASSWORD" default:""`
KafkaSASLOAuthBearerMethod string `envconfig:"KAFKA_SASL_OAUTHBEARER_METHOD" default:"oidc"`
KafkaSASLOAuthBearerTokenURL string `envconfig:"KAFKA_SASL_OAUTHBEARER_TOKEN_URL" default:""`
KafkaSASLOAuthBearerClientID string `envconfig:"KAFKA_SASL_OAUTHBEARER_CLIENT_ID" default:""`
KafkaSASLOAuthBearerClientSecret string `envconfig:"KAFKA_SASL_OAUTHBEARER_CLIENT_SECRET" default:""`
KafkaSASLOAuthBearerScope string `envconfig:"KAFKA_SASL_OAUTHBEARER_SCOPE" default:""`
KafkaSASLOAuthBearerProvider string `envconfig:"KAFKA_SASL_OAUTHBEARER_PROVIDER" default:"oidc"`

OTELServiceName string `envconfig:"OTEL_SERVICE_NAME" default:"ctrlplane/workspace-engine"`
OTELExporterOTLPEndpoint string `envconfig:"OTEL_EXPORTER_OTLP_ENDPOINT" default:"localhost:4318"`
Expand Down
21 changes: 8 additions & 13 deletions apps/workspace-engine/pkg/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"workspace-engine/pkg/workspace/manager"

"github.com/charmbracelet/log"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

// Configuration variables loaded from environment
Expand All @@ -23,18 +22,14 @@ var (
Brokers = config.Global.KafkaBrokers
)

func NewConsumer(brokers string, topic string) (messaging.Consumer, error) {
return confluent.NewConfluent(brokers).CreateConsumer(GroupID, topic, &kafka.ConfigMap{
"bootstrap.servers": Brokers,
"group.id": GroupID,
"auto.offset.reset": "latest",
"enable.auto.commit": false,
"partition.assignment.strategy": "cooperative-sticky",
"go.application.rebalance.enable": true,
"max.poll.interval.ms": 900_000, // 15 minutes

// "debug": "cgrp,broker,protocol",
})
func NewConsumer(topic string) (messaging.Consumer, error) {
cfg, err := confluent.BaseConsumerConfig()
if err != nil {
return nil, fmt.Errorf("failed to build consumer config: %w", err)
}
_ = cfg.SetKey("auto.offset.reset", "latest")
_ = cfg.SetKey("max.poll.interval.ms", 900_000)
return confluent.NewConfluent(Brokers).CreateConsumer(GroupID, topic, cfg)
}

// RunConsumerWithWorkspaceLoader starts the Kafka consumer with workspace-based offset resume
Expand Down
17 changes: 7 additions & 10 deletions apps/workspace-engine/pkg/kafka/producer.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
package kafka

import (
"fmt"
"workspace-engine/pkg/messaging"
"workspace-engine/pkg/messaging/confluent"

"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func NewProducer(brokers string) (messaging.Producer, error) {
return confluent.NewConfluent(brokers).CreateProducer(Topic, &kafka.ConfigMap{
"bootstrap.servers": Brokers,
"enable.idempotence": true,
"compression.type": "snappy",
"message.send.max.retries": 10,
"retry.backoff.ms": 100,
})
func NewProducer() (messaging.Producer, error) {
cfg, err := confluent.BaseProducerConfig()
if err != nil {
return nil, fmt.Errorf("failed to build producer config: %w", err)
}
return confluent.NewConfluent(Brokers).CreateProducer(Topic, cfg)
}
89 changes: 89 additions & 0 deletions apps/workspace-engine/pkg/messaging/confluent/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package confluent

import (
"fmt"
"strings"
"workspace-engine/pkg/config"

"github.com/charmbracelet/log"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)

func BaseProducerConfig() (*kafka.ConfigMap, error) {
cfg := &kafka.ConfigMap{
"bootstrap.servers": config.Global.KafkaBrokers,
"enable.idempotence": true,
"compression.type": "snappy",
"message.send.max.retries": 10,
"retry.backoff.ms": 100,
}
if err := applySASL(cfg); err != nil {
return nil, fmt.Errorf("invalid Kafka SASL configuration: %w", err)
}
return cfg, nil
}

func BaseConsumerConfig() (*kafka.ConfigMap, error) {
cfg := &kafka.ConfigMap{
"bootstrap.servers": config.Global.KafkaBrokers,
"group.id": config.Global.KafkaGroupID,
"enable.auto.commit": false,
"partition.assignment.strategy": "cooperative-sticky",
"session.timeout.ms": 3000,
"heartbeat.interval.ms": 1000,
"go.application.rebalance.enable": true,
}
if err := applySASL(cfg); err != nil {
return nil, fmt.Errorf("invalid Kafka SASL configuration: %w", err)
}
return cfg, nil
}

func applySASL(cfg *kafka.ConfigMap) error {
if !config.Global.KafkaSASLEnabled {
return nil
}

mechanism := config.Global.KafkaSASLMechanism
log.Info("Enabling SASL for Kafka", "mechanism", mechanism)

_ = cfg.SetKey("security.protocol", config.Global.KafkaSecurityProtocol)
_ = cfg.SetKey("sasl.mechanisms", mechanism)

switch mechanism {
case "PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512":
var missing []string
if config.Global.KafkaSASLUsername == "" {
missing = append(missing, "KAFKA_SASL_USERNAME")
}
if config.Global.KafkaSASLPassword == "" {
missing = append(missing, "KAFKA_SASL_PASSWORD")
}
if len(missing) > 0 {
return fmt.Errorf("KAFKA_SASL_MECHANISM=%s requires: %s", mechanism, strings.Join(missing, ", "))
}
_ = cfg.SetKey("sasl.username", config.Global.KafkaSASLUsername)
_ = cfg.SetKey("sasl.password", config.Global.KafkaSASLPassword)
case "OAUTHBEARER":
provider := config.Global.KafkaSASLOAuthBearerProvider
switch provider {
case "gcp":
log.Info("Using GCP ADC for OAUTHBEARER tokens")
case "oidc":
if config.Global.KafkaSASLOAuthBearerTokenURL == "" {
return fmt.Errorf("KAFKA_SASL_MECHANISM=OAUTHBEARER with provider=oidc requires: KAFKA_SASL_OAUTHBEARER_TOKEN_URL")
}
_ = cfg.SetKey("sasl.oauthbearer.method", config.Global.KafkaSASLOAuthBearerMethod)
_ = cfg.SetKey("sasl.oauthbearer.client.id", config.Global.KafkaSASLOAuthBearerClientID)
_ = cfg.SetKey("sasl.oauthbearer.client.secret", config.Global.KafkaSASLOAuthBearerClientSecret)
_ = cfg.SetKey("sasl.oauthbearer.token.endpoint.url", config.Global.KafkaSASLOAuthBearerTokenURL)
_ = cfg.SetKey("sasl.oauthbearer.scope", config.Global.KafkaSASLOAuthBearerScope)
default:
return fmt.Errorf("unknown KAFKA_SASL_OAUTHBEARER_PROVIDER: %s (supported: oidc, gcp)", provider)
}
default:
return fmt.Errorf("unknown KAFKA_SASL_MECHANISM: %s (supported: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, OAUTHBEARER)", mechanism)
}

return nil
}
Loading
Loading