@@ -10,8 +10,9 @@ import (
1010 "sync"
1111 "time"
1212
13- "github.com/aws/aws-sdk-go/aws"
14- "github.com/aws/aws-sdk-go/service/kinesis"
13+ "github.com/aws/aws-sdk-go-v2/aws"
14+ "github.com/aws/aws-sdk-go-v2/service/kinesis"
15+ "github.com/aws/aws-sdk-go-v2/service/kinesis/types"
1516
1617 "github.com/fission/keda-connectors/common"
1718
@@ -20,23 +21,23 @@ import (
2021
2122type pullFunc func (* record ) error
2223type record struct {
23- * kinesis .Record
24+ * types .Record
2425 shardID string
2526 millisBehindLatest * int64
2627}
2728type awsKinesisConnector struct {
2829 ctx context.Context
29- client * kinesis.Kinesis
30+ client * kinesis.Client
3031 connectordata common.ConnectorMetadata
3132 logger * zap.Logger
32- shardc chan * kinesis .Shard
33+ shardc chan * types .Shard
3334 maxRecords int64
3435}
3536
3637// listShards get called every 30sec to get all the shards
37- func (conn * awsKinesisConnector ) listShards () ([]* kinesis .Shard , error ) {
38+ func (conn * awsKinesisConnector ) listShards () ([]types .Shard , error ) {
3839 // call DescribeStream to get updated shards
39- stream , err := conn .client .DescribeStream (& kinesis.DescribeStreamInput {
40+ stream , err := conn .client .DescribeStream (context . Background (), & kinesis.DescribeStreamInput {
4041 StreamName : & conn .connectordata .Topic ,
4142 })
4243 if err != nil {
@@ -65,7 +66,7 @@ func (conn *awsKinesisConnector) findNewShards() {
6566 // send only new shards
6667 _ , loaded := shards .LoadOrStore (* s .ShardId , s )
6768 if ! loaded {
68- conn .shardc <- s
69+ conn .shardc <- & s
6970 }
7071 }
7172 }
@@ -82,16 +83,16 @@ func (conn *awsKinesisConnector) getIterator(shardID string, checkpoint string)
8283 if checkpoint != "" {
8384 // Start from, where we left
8485 params .StartingSequenceNumber = aws .String (checkpoint )
85- params .ShardIteratorType = aws . String ( kinesis . ShardIteratorTypeAfterSequenceNumber )
86- iteratorOutput , err := conn .client .GetShardIteratorWithContext (conn .ctx , params )
86+ params .ShardIteratorType = types . ShardIteratorTypeAfterSequenceNumber
87+ iteratorOutput , err := conn .client .GetShardIterator (conn .ctx , params )
8788 if err != nil {
8889 return nil , err
8990 }
9091 return iteratorOutput , err
9192 }
9293 // Start from, oldest record in the shard
93- params .ShardIteratorType = aws . String ( kinesis . ShardIteratorTypeTrimHorizon )
94- iteratorOutput , err := conn .client .GetShardIteratorWithContext (conn .ctx , params )
94+ params .ShardIteratorType = types . ShardIteratorTypeTrimHorizon
95+ iteratorOutput , err := conn .client .GetShardIterator (conn .ctx , params )
9596 if err != nil {
9697 return nil , err
9798 }
@@ -101,9 +102,9 @@ func (conn *awsKinesisConnector) getIterator(shardID string, checkpoint string)
101102// getRecords get the data for the specific shard
102103func (conn * awsKinesisConnector ) getRecords (shardIterator * string ) (* kinesis.GetRecordsOutput , error ) {
103104 // get records use shard iterator for making request
104- records , err := conn .client .GetRecords (& kinesis.GetRecordsInput {
105+ records , err := conn .client .GetRecords (context . Background (), & kinesis.GetRecordsInput {
105106 ShardIterator : shardIterator ,
106- Limit : & conn .maxRecords ,
107+ Limit : aws . Int32 ( int32 ( conn .maxRecords )) ,
107108 })
108109 if err != nil {
109110 return nil , err
@@ -158,7 +159,7 @@ func (conn *awsKinesisConnector) pullRecords(fn pullFunc) {
158159
159160 for _ , r := range resp .Records {
160161 // send records
161- err := fn (& record {r , shardID , resp .MillisBehindLatest })
162+ err := fn (& record {& r , shardID , resp .MillisBehindLatest })
162163 checkpoints .Store (shardID , * r .SequenceNumber )
163164 if err != nil {
164165 conn .logger .Error ("error in processing records" ,
@@ -233,7 +234,7 @@ func (conn *awsKinesisConnector) responseHandler(r *record, response string) err
233234 StreamName : aws .String (conn .connectordata .ResponseTopic ), // Required
234235 SequenceNumberForOrdering : aws .String (* r .SequenceNumber ),
235236 }
236- _ , err := conn .client .PutRecord (params )
237+ _ , err := conn .client .PutRecord (context . Background (), params )
237238 if err != nil {
238239 return err
239240 }
@@ -250,7 +251,7 @@ func (conn *awsKinesisConnector) errorHandler(r *record, errMsg string) {
250251 SequenceNumberForOrdering : aws .String (* r .SequenceNumber ),
251252 }
252253
253- _ , err := conn .client .PutRecord (params )
254+ _ , err := conn .client .PutRecord (context . Background (), params )
254255 if err != nil {
255256 conn .logger .Error ("failed to publish message to error topic" ,
256257 zap .Error (err ),
@@ -277,24 +278,21 @@ func main() {
277278 ctx , cancel := context .WithCancel (context .Background ())
278279 defer cancel ()
279280
280- config , err := common .GetAwsConfig ( )
281+ cfg , err := common .GetAwsV2Config ( ctx )
281282 if err != nil {
282283 logger .Error ("failed to fetch aws config" , zap .Error (err ))
283284 return
284285 }
285286
286- s , err := common .CreateValidatedSession (config )
287- if err != nil {
288- logger .Error ("not able to create the session" , zap .Error (err ))
289- return
290- }
291- kc := kinesis .New (s )
287+ kc := kinesis .NewFromConfig (cfg )
292288 connectordata , err := common .ParseConnectorMetadata ()
293289 if err != nil {
294290 logger .Error ("error while parsing metadata" , zap .Error (err ))
295291 return
296292 }
297- if err := kc .WaitUntilStreamExists (& kinesis.DescribeStreamInput {StreamName : & connectordata .Topic }); err != nil {
293+ // Test stream exists by describing it
294+ _ , err = kc .DescribeStream (ctx , & kinesis.DescribeStreamInput {StreamName : & connectordata .Topic })
295+ if err != nil {
298296 logger .Error ("not able to connect to kinesis stream" , zap .Error (err ))
299297 return
300298 }
@@ -306,7 +304,7 @@ func main() {
306304 cancel () // call cancellation
307305 }()
308306
309- shardc := make (chan * kinesis .Shard , 1 )
307+ shardc := make (chan * types .Shard , 1 )
310308
311309 conn := awsKinesisConnector {
312310 ctx : ctx ,
0 commit comments