Skip to content

Commit 9a47f1c

Browse files
authored
Single endpoint api (#1009)
* Add subscribe and register to process (#1010) * Task.Fulfill (#1012) * Actually task.suspend (#1013)
1 parent 7b86264 commit 9a47f1c

38 files changed

+2687
-260
lines changed

cmd/dst/run.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,13 +180,17 @@ func RunDSTCmd(cfg *config.Config, vip *viper.Viper) *cobra.Command {
180180
system.AddOnRequest(t_api.PromiseCreate, coroutines.CreatePromise)
181181
system.AddOnRequest(t_api.TaskCreate, coroutines.CreatePromiseAndTask)
182182
system.AddOnRequest(t_api.PromiseComplete, coroutines.CompletePromise)
183-
system.AddOnRequest(t_api.PromiseRegister, coroutines.CreateCallback)
183+
system.AddOnRequest(t_api.PromiseRegister, coroutines.PromiseRegister)
184+
system.AddOnRequest(t_api.PromiseSubscribe, coroutines.PromiseSubscribe)
185+
system.AddOnRequest(t_api.CallbackCreate, coroutines.CreateCallback)
184186
system.AddOnRequest(t_api.ScheduleRead, coroutines.ReadSchedule)
185187
system.AddOnRequest(t_api.ScheduleSearch, coroutines.SearchSchedules)
186188
system.AddOnRequest(t_api.ScheduleCreate, coroutines.CreateSchedule)
187189
system.AddOnRequest(t_api.ScheduleDelete, coroutines.DeleteSchedule)
188190
system.AddOnRequest(t_api.TaskAcquire, coroutines.ClaimTask)
189191
system.AddOnRequest(t_api.TaskComplete, coroutines.CompleteTask)
192+
system.AddOnRequest(t_api.TaskFulfill, coroutines.TaskFulfill)
193+
system.AddOnRequest(t_api.TaskSuspend, coroutines.TaskSuspend)
190194
system.AddOnRequest(t_api.TaskRelease, coroutines.DropTask)
191195
system.AddOnRequest(t_api.TaskHeartbeat, coroutines.HeartbeatTasks)
192196

cmd/serve/serve.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,14 +189,18 @@ func Serve(cfg *config.Config) error {
189189
system.AddOnRequest(t_api.PromiseSearch, coroutines.SearchPromises)
190190
system.AddOnRequest(t_api.PromiseCreate, coroutines.CreatePromise)
191191
system.AddOnRequest(t_api.TaskCreate, coroutines.CreatePromiseAndTask)
192-
system.AddOnRequest(t_api.PromiseRegister, coroutines.CreateCallback)
192+
system.AddOnRequest(t_api.PromiseRegister, coroutines.PromiseRegister)
193+
system.AddOnRequest(t_api.PromiseSubscribe, coroutines.PromiseSubscribe)
194+
system.AddOnRequest(t_api.CallbackCreate, coroutines.CreateCallback)
193195
system.AddOnRequest(t_api.PromiseComplete, coroutines.CompletePromise)
194196
system.AddOnRequest(t_api.ScheduleRead, coroutines.ReadSchedule)
195197
system.AddOnRequest(t_api.ScheduleSearch, coroutines.SearchSchedules)
196198
system.AddOnRequest(t_api.ScheduleCreate, coroutines.CreateSchedule)
197199
system.AddOnRequest(t_api.ScheduleDelete, coroutines.DeleteSchedule)
198200
system.AddOnRequest(t_api.TaskAcquire, coroutines.ClaimTask)
199201
system.AddOnRequest(t_api.TaskComplete, coroutines.CompleteTask)
202+
system.AddOnRequest(t_api.TaskFulfill, coroutines.TaskFulfill)
203+
system.AddOnRequest(t_api.TaskSuspend, coroutines.TaskSuspend)
200204
system.AddOnRequest(t_api.TaskRelease, coroutines.DropTask)
201205
system.AddOnRequest(t_api.TaskHeartbeat, coroutines.HeartbeatTasks)
202206
system.AddOnRequest(t_api.Noop, coroutines.Noop)

internal/api/auth_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ func TestAuthenticate(t *testing.T) {
104104
t.Run("missing authorization header", func(t *testing.T) {
105105
req := &t_api.Request{
106106
Head: map[string]string{},
107-
Data: &t_api.EchoRequest{Data: "test"},
107+
Data: &t_api.EchoRequest{Data: "test"},
108108
}
109109
_, err := auth.authenticate(req)
110110
if err == nil {
@@ -127,7 +127,7 @@ func TestAuthenticate(t *testing.T) {
127127

128128
req := &t_api.Request{
129129
Head: map[string]string{"authorization": token},
130-
Data: &t_api.EchoRequest{Data: "test"},
130+
Data: &t_api.EchoRequest{Data: "test"},
131131
}
132132
returnedClaims, err := auth.authenticate(req)
133133
if err != nil {
@@ -157,7 +157,7 @@ func TestAuthenticate(t *testing.T) {
157157

158158
req := &t_api.Request{
159159
Head: map[string]string{"authorization": token},
160-
Data: &t_api.EchoRequest{Data: "test"},
160+
Data: &t_api.EchoRequest{Data: "test"},
161161
}
162162
_, err = auth.authenticate(req)
163163
if err == nil {
@@ -264,7 +264,7 @@ func TestMatchPromisePrefix(t *testing.T) {
264264
// Callback requests
265265
{
266266
name: "PromiseRegisterRequest - matching prefix",
267-
payload: &t_api.PromiseRegisterRequest{PromiseId: "test.123"},
267+
payload: &t_api.CallbackCreateRequest{PromiseId: "test.123"},
268268
prefix: "test",
269269
shouldMatch: true,
270270
},
@@ -387,7 +387,7 @@ func TestProcess(t *testing.T) {
387387

388388
req := &t_api.Request{
389389
Head: map[string]string{"authorization": token},
390-
Data: &t_api.PromiseGetRequest{Id: "app.promise.1"},
390+
Data: &t_api.PromiseGetRequest{Id: "app.promise.1"},
391391
}
392392
apiErr := auth.Process(req)
393393
if apiErr != nil {
@@ -409,7 +409,7 @@ func TestProcess(t *testing.T) {
409409

410410
req := &t_api.Request{
411411
Head: map[string]string{"authorization": token},
412-
Data: &t_api.PromiseGetRequest{Id: "any.prefix.promise"},
412+
Data: &t_api.PromiseGetRequest{Id: "any.prefix.promise"},
413413
}
414414
apiErr := auth.Process(req)
415415
if apiErr != nil {
@@ -432,7 +432,7 @@ func TestProcess(t *testing.T) {
432432

433433
req := &t_api.Request{
434434
Head: map[string]string{"authorization": token},
435-
Data: &t_api.PromiseGetRequest{Id: "other.promise.1"},
435+
Data: &t_api.PromiseGetRequest{Id: "other.promise.1"},
436436
}
437437
apiErr := auth.Process(req)
438438
if apiErr == nil {
@@ -443,7 +443,7 @@ func TestProcess(t *testing.T) {
443443
t.Run("full flow - missing token", func(t *testing.T) {
444444
req := &t_api.Request{
445445
Head: map[string]string{},
446-
Data: &t_api.PromiseGetRequest{Id: "app:promise:1"},
446+
Data: &t_api.PromiseGetRequest{Id: "app:promise:1"},
447447
}
448448
apiErr := auth.Process(req)
449449
if apiErr == nil {
@@ -506,7 +506,7 @@ func TestProcessMultipleRequestTypes(t *testing.T) {
506506
},
507507
{
508508
name: "PromiseRegisterRequest - authorized",
509-
payload: &t_api.PromiseRegisterRequest{PromiseId: "test:promise"},
509+
payload: &t_api.CallbackCreateRequest{PromiseId: "test:promise"},
510510
shouldPass: true,
511511
},
512512
{
@@ -525,7 +525,7 @@ func TestProcessMultipleRequestTypes(t *testing.T) {
525525
t.Run(tt.name, func(t *testing.T) {
526526
req := &t_api.Request{
527527
Head: metadata,
528-
Data: tt.payload,
528+
Data: tt.payload,
529529
}
530530
apiErr := auth.Process(req)
531531
if tt.shouldPass && apiErr != nil {

internal/api/middleware.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ func matchPromisePrefix(req *t_api.Request, prefix string) error {
132132
id = r.Promise.Id
133133
case *t_api.PromiseCompleteRequest:
134134
id = r.Id
135-
case *t_api.PromiseRegisterRequest:
135+
case *t_api.CallbackCreateRequest:
136136
id = r.PromiseId
137137
case *t_api.ScheduleGetRequest:
138138
id = r.Id

internal/app/coroutines/claimTask.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,8 +171,8 @@ func ClaimTask(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any], r
171171
util.Assert(status != t_api.StatusCreated || t != nil, "task must be non nil if status created")
172172

173173
return &t_api.Response{
174-
Status: status,
175-
Head: r.Head,
174+
Status: status,
175+
Head: r.Head,
176176
Data: &t_api.TaskAcquireResponse{
177177
Task: t,
178178
RootPromise: rp,

internal/app/coroutines/completePromise.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ func CompletePromise(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, an
8787
}
8888

8989
res = &t_api.Response{
90-
Status: status,
91-
Head: r.Head,
90+
Status: status,
91+
Head: r.Head,
9292
Data: &t_api.PromiseCompleteResponse{
9393
Promise: &promise.Promise{
9494
Id: p.Id,
@@ -104,18 +104,18 @@ func CompletePromise(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, an
104104
}
105105
} else {
106106
res = &t_api.Response{
107-
Status: t_api.StatusOK,
108-
Head: r.Head,
107+
Status: t_api.StatusOK,
108+
Head: r.Head,
109109
Data: &t_api.PromiseCompleteResponse{
110110
Promise: p,
111111
},
112112
}
113113
}
114114
} else {
115115
res = &t_api.Response{
116-
Status: t_api.StatusPromiseNotFound,
117-
Head: r.Head,
118-
Data: &t_api.PromiseCompleteResponse{},
116+
Status: t_api.StatusPromiseNotFound,
117+
Head: r.Head,
118+
Data: &t_api.PromiseCompleteResponse{},
119119
}
120120
}
121121

internal/app/coroutines/completeTask.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,8 @@ func CompleteTask(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any],
114114
util.Assert(status != t_api.StatusCreated || t != nil, "task must be non nil if status created")
115115

116116
return &t_api.Response{
117-
Status: status,
118-
Head: r.Head,
117+
Status: status,
118+
Head: r.Head,
119119
Data: &t_api.TaskCompleteResponse{
120120
Task: t,
121121
},

internal/app/coroutines/createCallback.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,14 @@ import (
1313
)
1414

1515
func CreateCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any], r *t_api.Request) (*t_api.Response, error) {
16-
req := r.Data.(*t_api.PromiseRegisterRequest)
16+
req := r.Data.(*t_api.CallbackCreateRequest)
1717
util.Assert(req != nil, "create callback must not be nil")
1818
util.Assert(req.Mesg.Type == "resume" || req.Mesg.Type == "notify", "message type must be resume or notify")
1919
util.Assert(req.Mesg.Type == "resume" || req.PromiseId == req.Mesg.Root, "if notify, root promise id must equal leaf promise id")
2020
util.Assert(req.Mesg.Type == "notify" || (req.PromiseId == req.Mesg.Leaf && req.PromiseId != req.Mesg.Root), "if resume, root promise id must not equal leaf promise id")
2121

2222
var res *t_api.Response
2323

24-
// read the promise to see if it exists
2524
completion, err := gocoro.YieldAndAwait(c, &t_aio.Submission{
2625
Kind: t_aio.Store,
2726
Tags: r.Head,
@@ -114,18 +113,18 @@ func CreateCallback(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any
114113

115114
res = &t_api.Response{
116115
// Status could be StatusOk or StatusCreated if the Callback Id was already present
117-
Status: status,
118-
Head: r.Head,
119-
Data: &t_api.PromiseRegisterResponse{
116+
Status: status,
117+
Head: r.Head,
118+
Data: &t_api.CallbackCreateResponse{
120119
Callback: cb,
121120
Promise: p,
122121
},
123122
}
124123
} else {
125124
res = &t_api.Response{
126-
Status: t_api.StatusPromiseNotFound,
127-
Head: r.Head,
128-
Data: &t_api.PromiseRegisterResponse{},
125+
Status: t_api.StatusPromiseNotFound,
126+
Head: r.Head,
127+
Data: &t_api.CallbackCreateResponse{},
129128
}
130129
}
131130

internal/app/coroutines/createPromise.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,9 @@ func CreatePromise(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any]
5050
}
5151

5252
return &t_api.Response{
53-
Status: status,
54-
Head: r.Head,
55-
Data: &t_api.PromiseCreateResponse{Promise: completion.promise},
53+
Status: status,
54+
Head: r.Head,
55+
Data: &t_api.PromiseCreateResponse{Promise: completion.promise},
5656
}, nil
5757
}
5858

@@ -108,9 +108,9 @@ func CreatePromiseAndTask(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completio
108108
status = t_api.StatusOK
109109
}
110110
return &t_api.Response{
111-
Status: status,
112-
Head: r.Head,
113-
Data: &t_api.TaskCreateResponse{Promise: completion.promise, Task: completion.task},
111+
Status: status,
112+
Head: r.Head,
113+
Data: &t_api.TaskCreateResponse{Promise: completion.promise, Task: completion.task},
114114
}, nil
115115
}
116116

internal/app/coroutines/createSchedule.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,8 @@ func CreateSchedule(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any
9393

9494
if result.RowsAffected == 1 {
9595
res = &t_api.Response{
96-
Status: t_api.StatusCreated,
97-
Head: r.Head,
96+
Status: t_api.StatusCreated,
97+
Head: r.Head,
9898
Data: &t_api.ScheduleCreateResponse{
9999
Schedule: &schedule.Schedule{
100100
Id: req.Id,
@@ -128,8 +128,8 @@ func CreateSchedule(c gocoro.Coroutine[*t_aio.Submission, *t_aio.Completion, any
128128
}
129129

130130
res = &t_api.Response{
131-
Status: t_api.StatusOK,
132-
Head: r.Head,
131+
Status: t_api.StatusOK,
132+
Head: r.Head,
133133
Data: &t_api.ScheduleCreateResponse{
134134
Schedule: s,
135135
},

0 commit comments

Comments
 (0)