Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions gcp-pubsub-http-connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ func main() {
defer func() {
_ = logger.Sync()
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

connectordata, err := common.ParseConnectorMetadata()
if err != nil {
logger.Error("Environment variable is missing ", zap.Error(err))
os.Exit(127)
return
}

pubsubInfo, err := GetGCPInfo()
Expand All @@ -57,13 +59,14 @@ func main() {
}

logger.Info("Conn: %s", zap.String("Response topic", conn.connectordata.ResponseTopic))
err = conn.consumeMessage()
err = conn.consumeMessage(ctx)
if err != nil {
logger.Error("Error in consuming message from pubsub", zap.Error(err))
return
}
}

func (conn pubsubConnector) consumeMessage() error {
func (conn pubsubConnector) consumeMessage(ctx context.Context) error {
creds := []byte(conn.pubsubInfo.Creds)
headers := http.Header{
"KEDA-Topic": {conn.connectordata.Topic},
Expand All @@ -72,8 +75,7 @@ func (conn pubsubConnector) consumeMessage() error {
"Content-Type": {conn.connectordata.ContentType},
"KEDA-Source-Name": {conn.connectordata.SourceName},
}
ctx := context.Background()
client, err := pubsub.NewClient(ctx, conn.pubsubInfo.ProjectID, option.WithCredentialsJSON(creds))
client, err := pubsub.NewClient(ctx, conn.pubsubInfo.ProjectID, option.WithAuthCredentialsJSON(option.ServiceAccount, creds))
if err != nil {
conn.logger.Error(err.Error())
return err
Expand All @@ -95,22 +97,22 @@ func (conn pubsubConnector) consumeMessage() error {
resp, err := common.HandleHTTPRequest(string(msg.Data), headers, conn.connectordata, conn.logger)
if err != nil {
if conn.connectordata.ErrorTopic != "" {
conn.responseOrErrorHandler(conn.connectordata.ErrorTopic, err.Error(), headers)
conn.responseOrErrorHandler(ctx, conn.connectordata.ErrorTopic, err.Error(), headers)
}
conn.logger.Error("Error sending the message to the endpoint %v", zap.Error(err))
} else {
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
if conn.connectordata.ErrorTopic != "" {
conn.responseOrErrorHandler(conn.connectordata.ErrorTopic, string(body), headers)
conn.responseOrErrorHandler(ctx, conn.connectordata.ErrorTopic, string(body), headers)
}
conn.logger.Error("Error reading body", zap.Error(err))

} else {
msg.Ack()
if conn.connectordata.ResponseTopic != "" {
conn.responseOrErrorHandler(conn.connectordata.ResponseTopic, string(body), headers)
conn.responseOrErrorHandler(ctx, conn.connectordata.ResponseTopic, string(body), headers)
}
conn.logger.Info("Success in sending the message", zap.Any("Messsage sent: ", msg))
}
Expand All @@ -123,10 +125,8 @@ func (conn pubsubConnector) consumeMessage() error {
return nil
}

func (conn pubsubConnector) responseOrErrorHandler(topicID string, response string, headers http.Header) {

ctx := context.Background()
client, err := pubsub.NewClient(ctx, conn.pubsubInfo.ProjectID, option.WithCredentialsJSON([]byte(conn.pubsubInfo.Creds)))
func (conn pubsubConnector) responseOrErrorHandler(ctx context.Context, topicID string, response string, headers http.Header) {
client, err := pubsub.NewClient(ctx, conn.pubsubInfo.ProjectID, option.WithAuthCredentialsJSON(option.ServiceAccount, []byte(conn.pubsubInfo.Creds)))
if err != nil {
conn.logger.Error("pubsub.NewClient: %v", zap.Error(err))
}
Expand Down
2 changes: 1 addition & 1 deletion gcp-pubsub-http-connector/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.13
v0.14
41 changes: 20 additions & 21 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/fission/keda-connectors
go 1.25.5

require (
cloud.google.com/go/pubsub/v2 v2.0.0
cloud.google.com/go/pubsub/v2 v2.3.0
github.com/IBM/sarama v1.46.3
github.com/aws/aws-sdk-go-v2 v1.41.0
github.com/aws/aws-sdk-go-v2/config v1.32.6
Expand All @@ -16,15 +16,15 @@ require (
github.com/rs/xid v1.6.0
github.com/xdg/scram v1.0.5
go.uber.org/zap v1.27.1
google.golang.org/api v0.247.0
google.golang.org/api v0.258.0
sigs.k8s.io/controller-runtime v0.22.4
)

require (
cloud.google.com/go v0.121.6 // indirect
cloud.google.com/go/auth v0.16.4 // indirect
cloud.google.com/go/auth v0.17.0 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
cloud.google.com/go/compute/metadata v0.8.0 // indirect
cloud.google.com/go/compute/metadata v0.9.0 // indirect
cloud.google.com/go/iam v1.5.2 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.4 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.19.6 // indirect
Expand Down Expand Up @@ -52,7 +52,7 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/s2a-go v0.1.9 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.7 // indirect
github.com/googleapis/gax-go/v2 v2.15.0 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
Expand All @@ -68,26 +68,25 @@ require (
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/rcrowley/go-metrics v0.0.0-20250401214520-65e299d6c5c9 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
go.einride.tech/aip v0.73.0 // indirect
go.etcd.io/bbolt v1.3.11 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect
go.opentelemetry.io/otel v1.36.0 // indirect
go.opentelemetry.io/otel/metric v1.36.0 // indirect
go.opentelemetry.io/otel/trace v1.36.0 // indirect
go.opentelemetry.io/otel v1.38.0 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect
go.opentelemetry.io/otel/trace v1.38.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.43.0 // indirect
golang.org/x/net v0.46.0 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sync v0.17.0 // indirect
golang.org/x/sys v0.37.0 // indirect
golang.org/x/text v0.30.0 // indirect
golang.org/x/time v0.12.0 // indirect
golang.org/x/crypto v0.46.0 // indirect
golang.org/x/net v0.48.0 // indirect
golang.org/x/oauth2 v0.34.0 // indirect
golang.org/x/sync v0.19.0 // indirect
golang.org/x/sys v0.39.0 // indirect
golang.org/x/text v0.32.0 // indirect
golang.org/x/time v0.14.0 // indirect
google.golang.org/genproto v0.0.0-20250603155806-513f23925822 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250811230008-5f3141c8851a // indirect
google.golang.org/grpc v1.74.2 // indirect
google.golang.org/protobuf v1.36.7 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20251022142026-3a174f9686a8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251213004720-97cd9d5aeac2 // indirect
google.golang.org/grpc v1.77.0 // indirect
google.golang.org/protobuf v1.36.11 // indirect
)
Loading