Skip to content

Commit c67c063

Browse files
committed
feat: process compass sink concurrently
1 parent 208cb97 commit c67c063

File tree

3 files changed

+29
-17
lines changed

3 files changed

+29
-17
lines changed

.github/workflows/test.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ jobs:
2323
- name: Upload coverage artifact
2424
uses: actions/upload-artifact@v4
2525
with:
26-
name: coverage
26+
name: coverage-${{ strategy.job-index }}
2727
path: coverage.out
2828

2929
pretest:
@@ -62,7 +62,7 @@ jobs:
6262
- name: Upload coverage artifact
6363
uses: actions/upload-artifact@v4
6464
with:
65-
name: coverage-plugins
65+
name: coverage-plugins-${{ strategy.job-index }}
6666
path: coverage-plugins*.out
6767

6868
coverage:
@@ -79,11 +79,11 @@ jobs:
7979
- name: Download coverage
8080
uses: actions/download-artifact@v4
8181
with:
82-
name: coverage
82+
name: coverage-${{ strategy.job-index }}
8383
- name: Download plugins coverage
8484
uses: actions/download-artifact@v4
8585
with:
86-
name: coverage-plugins
86+
name: coverage-plugins-${{ strategy.job-index }}
8787
- name: Install goveralls and send coverage
8888
env:
8989
COVERALLS_TOKEN: ${{ secrets.GITHUB_TOKEN }}

plugins/sinks/compass/sink.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/goto/meteor/utils"
2121
"github.com/goto/salt/log"
2222
"github.com/pkg/errors"
23+
"golang.org/x/sync/errgroup"
2324
"google.golang.org/protobuf/encoding/protojson"
2425
"google.golang.org/protobuf/types/known/anypb"
2526
)
@@ -91,22 +92,33 @@ func (s *Sink) Init(ctx context.Context, config plugins.Config) error {
9192
}
9293

9394
func (s *Sink) Sink(ctx context.Context, batch []models.Record) error {
94-
for _, record := range batch {
95-
asset := record.Data()
96-
s.logger.Info("sinking record to compass", "record", asset.GetUrn())
95+
if len(batch) == 0 {
96+
return nil
97+
}
9798

98-
compassPayload, err := s.buildCompassPayload(asset)
99-
if err != nil {
100-
return fmt.Errorf("build compass payload: %w", err)
101-
}
102-
if err = s.send(ctx, compassPayload); err != nil {
103-
return fmt.Errorf("send data: %w", err)
104-
}
99+
errCh := errgroup.Group{}
100+
errCh.SetLimit(len(batch))
105101

106-
s.logger.Info("successfully sinked record to compass", "record", asset.GetUrn())
102+
for _, record := range batch {
103+
record := record
104+
errCh.Go(func() error {
105+
asset := record.Data()
106+
s.logger.Info("sinking record to compass", "record", asset.GetUrn())
107+
108+
compassPayload, err := s.buildCompassPayload(asset)
109+
if err != nil {
110+
return err
111+
}
112+
if err := s.send(ctx, compassPayload); err != nil {
113+
return err
114+
}
115+
116+
s.logger.Info("successfully sinked record to compass", "record", asset.GetUrn())
117+
return nil
118+
})
107119
}
108120

109-
return nil
121+
return errCh.Wait()
110122
}
111123

112124
func (*Sink) Close() error { return nil }

plugins/sinks/compass/sink_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func TestInit(t *testing.T) {
5353
func TestSink(t *testing.T) {
5454
t.Run("should return error if compass host returns error", func(t *testing.T) {
5555
compassError := `{"reason":"no asset found"}`
56-
errMessage := "send data: compass returns 404: {\"reason\":\"no asset found\"}"
56+
errMessage := "compass returns 404: {\"reason\":\"no asset found\"}"
5757

5858
// setup mock client
5959
url := fmt.Sprintf("%s/v1beta1/assets", host)

0 commit comments

Comments
 (0)