Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/actions/setup-keda/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,22 @@ runs:
- name: Setup Kind cluster
uses: helm/kind-action@0025e74a8c7512023d06dc019c617aa3cf561fde # v1.10.0
with:
node_image: kindest/node:v1.25.16
version: v0.23.0
node_image: kindest/node:v1.35.0
version: v0.31.0
cluster_name: kind

- name: Helm installation
uses: Azure/setup-helm@fe7b79cd5ee1e45176fcad797de68ecaf3ca4814 # v4.2.0
with:
version: v3.15.2
version: v4.0.4

- name: Install Keda
shell: bash
run: |
kubectl create namespace keda
helm repo add kedacore https://kedacore.github.io/charts
helm repo update
helm install keda kedacore/keda --namespace keda --version 2.14.0
helm install keda kedacore/keda --namespace keda --version 2.18.3

- name: Setup Ko
uses: ko-build/setup-ko@3aebd0597dc1e9d1a26bcfdb7cbeb19c131d3037 # v0.7
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ jobs:
helm repo add localstack-charts https://localstack.github.io/helm-charts
helm repo update
helm install localstack localstack-charts/localstack
# Wait for localstack pod to be ready
kubectl wait pod -l=app.kubernetes.io/name=localstack --for=condition=ready

- name: Deploy SQS Keda Connector, Keda ScaledObject and SQS test queue
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ on:
workflow_dispatch:

env:
GOLANGCI_LINT_VERSION: v2.1.6
GOLANGCI_LINT_VERSION: v2.7.2
GOLANGCI_LINT_TIMEOUT: 10m

concurrency:
Expand Down
4 changes: 0 additions & 4 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ linters:
- prealloc
- unconvert
- unparam
settings:
errcheck:
exclude-functions:
- (*go.uber.org/zap.Logger).Sync
exclusions:
generated: lax
presets:
Expand Down
69 changes: 34 additions & 35 deletions aws-kinesis-http-connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/aws/aws-sdk-go-v2/service/kinesis/types"

"github.com/fission/keda-connectors/common"

Expand All @@ -20,23 +21,23 @@ import (

type pullFunc func(*record) error
type record struct {
*kinesis.Record
*types.Record
shardID string
millisBehindLatest *int64
}
type awsKinesisConnector struct {
ctx context.Context
client *kinesis.Kinesis
client *kinesis.Client
connectordata common.ConnectorMetadata
logger *zap.Logger
shardc chan *kinesis.Shard
maxRecords int64
shardc chan *types.Shard
maxRecords int32
}

// listShards get called every 30sec to get all the shards
func (conn *awsKinesisConnector) listShards() ([]*kinesis.Shard, error) {
func (conn *awsKinesisConnector) listShards(ctx context.Context) ([]types.Shard, error) {
// call DescribeStream to get updated shards
stream, err := conn.client.DescribeStream(&kinesis.DescribeStreamInput{
stream, err := conn.client.DescribeStream(ctx, &kinesis.DescribeStreamInput{
StreamName: &conn.connectordata.Topic,
})
if err != nil {
Expand All @@ -56,7 +57,7 @@ func (conn *awsKinesisConnector) findNewShards() {
return
case <-ticker.C:
// check if new shards are available in every 30 seconds
shardList, err := conn.listShards()
shardList, err := conn.listShards(conn.ctx)
if err != nil {
return
}
Expand All @@ -65,7 +66,7 @@ func (conn *awsKinesisConnector) findNewShards() {
// send only new shards
_, loaded := shards.LoadOrStore(*s.ShardId, s)
if !loaded {
conn.shardc <- s
conn.shardc <- &s
}
}
}
Expand All @@ -82,26 +83,26 @@ func (conn *awsKinesisConnector) getIterator(shardID string, checkpoint string)
if checkpoint != "" {
// Start from, where we left
params.StartingSequenceNumber = aws.String(checkpoint)
params.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeAfterSequenceNumber)
iteratorOutput, err := conn.client.GetShardIteratorWithContext(conn.ctx, params)
params.ShardIteratorType = types.ShardIteratorTypeAfterSequenceNumber
iteratorOutput, err := conn.client.GetShardIterator(conn.ctx, params)
if err != nil {
return nil, err
}
return iteratorOutput, err
}
// Start from, oldest record in the shard
params.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeTrimHorizon)
iteratorOutput, err := conn.client.GetShardIteratorWithContext(conn.ctx, params)
params.ShardIteratorType = types.ShardIteratorTypeTrimHorizon
iteratorOutput, err := conn.client.GetShardIterator(conn.ctx, params)
if err != nil {
return nil, err
}
return iteratorOutput, err
}

// getRecords get the data for the specific shard
func (conn *awsKinesisConnector) getRecords(shardIterator *string) (*kinesis.GetRecordsOutput, error) {
func (conn *awsKinesisConnector) getRecords(ctx context.Context, shardIterator *string) (*kinesis.GetRecordsOutput, error) {
// get records use shard iterator for making request
records, err := conn.client.GetRecords(&kinesis.GetRecordsInput{
records, err := conn.client.GetRecords(ctx, &kinesis.GetRecordsInput{
ShardIterator: shardIterator,
Limit: &conn.maxRecords,
})
Expand Down Expand Up @@ -148,7 +149,7 @@ func (conn *awsKinesisConnector) pullRecords(fn pullFunc) {
}
iterator := iteratorOutput.ShardIterator
if iterator != nil {
resp, err := conn.getRecords(iterator)
resp, err := conn.getRecords(conn.ctx, iterator)
if err != nil {
conn.logger.Error("error in getting records",
zap.String("shardID", shardID),
Expand All @@ -158,7 +159,7 @@ func (conn *awsKinesisConnector) pullRecords(fn pullFunc) {

for _, r := range resp.Records {
// send records
err := fn(&record{r, shardID, resp.MillisBehindLatest})
err := fn(&record{&r, shardID, resp.MillisBehindLatest})
checkpoints.Store(shardID, *r.SequenceNumber)
if err != nil {
conn.logger.Error("error in processing records",
Expand Down Expand Up @@ -201,17 +202,17 @@ func (conn *awsKinesisConnector) consumeMessage(r *record) {
conn.logger.Error("error processing message",
zap.String("shardID", r.shardID),
zap.Error(err))
conn.errorHandler(r, err.Error())
conn.errorHandler(conn.ctx, r, err.Error())
} else {
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
conn.logger.Error("error processing message",
zap.String("shardID", r.shardID),
zap.Error(err))
conn.errorHandler(r, err.Error())
conn.errorHandler(conn.ctx, r, err.Error())
} else {
if err := conn.responseHandler(r, string(body)); err != nil {
if err := conn.responseHandler(conn.ctx, r, string(body)); err != nil {
conn.logger.Error("failed to publish response body from http request to topic",
zap.Error(err),
zap.String("topic", conn.connectordata.ResponseTopic),
Expand All @@ -225,23 +226,23 @@ func (conn *awsKinesisConnector) consumeMessage(r *record) {
}
}

func (conn *awsKinesisConnector) responseHandler(r *record, response string) error {
func (conn *awsKinesisConnector) responseHandler(ctx context.Context, r *record, response string) error {
if len(conn.connectordata.ResponseTopic) > 0 {
params := &kinesis.PutRecordInput{
Data: []byte(response), // Required
PartitionKey: aws.String(*r.PartitionKey), // Required
StreamName: aws.String(conn.connectordata.ResponseTopic), // Required
SequenceNumberForOrdering: aws.String(*r.SequenceNumber),
}
_, err := conn.client.PutRecord(params)
_, err := conn.client.PutRecord(ctx, params)
if err != nil {
return err
}
}
return nil
}

func (conn *awsKinesisConnector) errorHandler(r *record, errMsg string) {
func (conn *awsKinesisConnector) errorHandler(ctx context.Context, r *record, errMsg string) {
if len(conn.connectordata.ErrorTopic) > 0 {
params := &kinesis.PutRecordInput{
Data: []byte(errMsg), // Required
Expand All @@ -250,7 +251,7 @@ func (conn *awsKinesisConnector) errorHandler(r *record, errMsg string) {
SequenceNumberForOrdering: aws.String(*r.SequenceNumber),
}

_, err := conn.client.PutRecord(params)
_, err := conn.client.PutRecord(ctx, params)
if err != nil {
conn.logger.Error("failed to publish message to error topic",
zap.Error(err),
Expand All @@ -272,29 +273,27 @@ func main() {
if err != nil {
log.Fatalf("can't initialize zap logger: %v", err)
}
defer logger.Sync()
defer func() {
_ = logger.Sync()
}()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

config, err := common.GetAwsConfig()
config, err := common.GetAwsConfig(ctx)
if err != nil {
logger.Error("failed to fetch aws config", zap.Error(err))
return
}

s, err := common.CreateValidatedSession(config)
if err != nil {
logger.Error("not able to create the session", zap.Error(err))
return
}
kc := kinesis.New(s)
kc := kinesis.NewFromConfig(config)
connectordata, err := common.ParseConnectorMetadata()
if err != nil {
logger.Error("error while parsing metadata", zap.Error(err))
return
}
if err := kc.WaitUntilStreamExists(&kinesis.DescribeStreamInput{StreamName: &connectordata.Topic}); err != nil {
waiter := kinesis.NewStreamExistsWaiter(kc)
if err := waiter.Wait(ctx, &kinesis.DescribeStreamInput{StreamName: &connectordata.Topic}, 5*time.Minute); err != nil {
logger.Error("not able to connect to kinesis stream", zap.Error(err))
return
}
Expand All @@ -306,7 +305,7 @@ func main() {
cancel() // call cancellation
}()

shardc := make(chan *kinesis.Shard, 1)
shardc := make(chan *types.Shard, 1)

conn := awsKinesisConnector{
ctx: ctx,
Expand Down
2 changes: 1 addition & 1 deletion aws-kinesis-http-connector/version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.18
v0.19
Loading
Loading