Skip to content

Commit 584d10a

Browse files
authored
Merge pull request #15 from datum-cloud/14-migrate-from-resend-audiences-to-segments
Migrate from resend audiences to segments
2 parents aa4937d + a58b1f5 commit 584d10a

18 files changed

+494
-398
lines changed

README.md

Lines changed: 26 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,35 @@
1-
Complete Kubernetes operator implementation that integrates Resend email service with the Milo platform for automated email delivery.
1+
# Complete Kubernetes operator implementation that integrates Resend email service with the Milo platform for automated email delivery
22

3-
### Architecture
3+
## Architecture
44

5-
* **Controller Manager**: Reconciles Email CRDs by fetching templates, rendering content, and sending emails via Resend
6-
7-
* **Webhook Server**: Processes Resend delivery events (sent/delivered/bounced) and updates Email status
8-
9-
* **Email Provider Interface**: Abstracted email provider supporting multiple backends (currently Resend)
10-
5+
* **Controller Manager**: Reconciles Email CRDs by fetching templates, rendering content, and sending emails via Resend
6+
7+
* **Webhook Server**: Processes Resend delivery events (sent/delivered/bounced) and updates Email status
8+
9+
* **Email Provider Interface**: Abstracted email provider supporting multiple backends (currently Resend)
1110

1211
### Key Features
1312

14-
* **Template Rendering**: Dynamic email content using EmailTemplate CRDs with variable substitution
15-
16-
* **Idempotency**: Uses Email UID as idempotency key to prevent duplicate sends
17-
18-
* **Status Tracking**: Kubernetes conditions for email delivery states (Pending/Delivered/Failed)
19-
20-
* **Event Correlation**: Indexes emails by providerID for efficient webhook event processing
21-
22-
* **Priority Support**: Configurable retry delays based on email priority
23-
24-
* **Security**: SVIX webhook signature verification for event authenticity
25-
13+
* **Template Rendering**: Dynamic email content using EmailTemplate CRDs with variable substitution
14+
15+
* **Idempotency**: Uses Email UID as idempotency key to prevent duplicate sends
16+
17+
* **Status Tracking**: Kubernetes conditions for email delivery states (Pending/Delivered/Failed)
18+
19+
* **Event Correlation**: Indexes emails by providerID for efficient webhook event processing
20+
21+
* **Priority Support**: Configurable retry delays based on email priority
22+
23+
* **Security**: SVIX webhook signature verification for event authenticity
2624

2725
### Components
2826

29-
* Dual-mode deployment: controller manager + separate webhook server
30-
31-
* RBAC for Email, EmailTemplate, and User CRD access
32-
33-
* Metrics and health endpoints with TLS support
34-
35-
* Leader election for HA deployments
36-
27+
* Dual-mode deployment: controller manager + separate webhook server
28+
29+
* RBAC for Email, EmailTemplate, and User CRD access
30+
31+
* Metrics and health endpoints with TLS support
32+
33+
* Leader election for HA deployments
3734

38-
Provides reliable, observable email delivery for the Milo notification system.
35+
Provides reliable, observable email delivery for the Milo notification system.

cmd/manager.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,8 @@ func runManager(
304304

305305
// Setup contact controller
306306
if err := (&controller.ContactController{
307-
Client: mgr.GetClient(),
307+
Client: mgr.GetClient(),
308+
EmailProvider: *emailProviderService,
308309
}).SetupWithManager(mgr); err != nil {
309310
setupLog.Error(err, "unable to create controller", "controller", "Contact")
310311
return fmt.Errorf("unable to create controller: %w", err)

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@ require (
1010
sigs.k8s.io/controller-runtime v0.21.0
1111
)
1212

13-
require go.miloapis.com/milo v0.4.5-0.20251003143746-b718a6ae1df6
14-
15-
require github.com/resend/resend-go/v2 v2.23.0
13+
require go.miloapis.com/milo v0.10.2
1614

1715
require github.com/svix/svix-webhooks v1.74.1
1816

17+
require github.com/resend/resend-go/v3 v3.0.0
18+
1919
require (
2020
cel.dev/expr v0.24.0 // indirect
2121
github.com/antlr4-go/antlr/v4 v4.13.1 // indirect

go.sum

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,8 @@ github.com/prometheus/common v0.65.0 h1:QDwzd+G1twt//Kwj/Ww6E9FQq1iVMmODnILtW1t2
110110
github.com/prometheus/common v0.65.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8=
111111
github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7DuK0=
112112
github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw=
113-
github.com/resend/resend-go/v2 v2.23.0 h1:zOMoKJUW0IKyzKU///ieyxUFcz576Y5l+Z6wUrur01Q=
114-
github.com/resend/resend-go/v2 v2.23.0/go.mod h1:3YCb8c8+pLiqhtRFXTyFwlLvfjQtluxOr9HEh2BwCkQ=
113+
github.com/resend/resend-go/v3 v3.0.0 h1:RCZgLuAFMUYH4ZByu+rncNvlOf69DCJwBdOH6q/aZCs=
114+
github.com/resend/resend-go/v3 v3.0.0/go.mod h1:iI7VA0NoGjWvsNii5iNC5Dy0llsI3HncXPejhniYzwE=
115115
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
116116
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
117117
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
@@ -139,8 +139,8 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
139139
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
140140
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
141141
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
142-
go.miloapis.com/milo v0.4.5-0.20251003143746-b718a6ae1df6 h1:JO9wCqB1O3S5iH+W4A7GCIajCeWXocCvqhOgN4qr19o=
143-
go.miloapis.com/milo v0.4.5-0.20251003143746-b718a6ae1df6/go.mod h1:84gyGl7t8qEBMVxBGTAQsKoP0zIZLG5l47voo5hXOCw=
142+
go.miloapis.com/milo v0.10.2 h1:z7h9CEtmU/X+PQr0wsbyyWZojwzpybxkRRZe6U3TX20=
143+
go.miloapis.com/milo v0.10.2/go.mod h1:xOFYvUsvSZV3z6eow5YdB5C/qRQf2s/5/arcfJs5XPg=
144144
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
145145
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
146146
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 h1:Hf9xI/XLML9ElpiHVDNwvqI0hIFlzV8dgIr35kV1kRU=

internal/controller/contact_controller.go

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222

23+
"go.miloapis.com/email-provider-resend/internal/emailprovider"
2324
notificationmiloapiscomv1alpha1 "go.miloapis.com/milo/pkg/apis/notification/v1alpha1"
2425
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2526

@@ -44,13 +45,15 @@ func buildContactNamespacedIndexKey(contactName, contactNamespace string) string
4445

4546
// ContactReconciler reconciles a Contact object
4647
type ContactController struct {
47-
Client client.Client
48-
Finalizers finalizer.Finalizers
48+
Client client.Client
49+
Finalizers finalizer.Finalizers
50+
EmailProvider emailprovider.Service
4951
}
5052

5153
// contactFinalizer is a finalizer for the Contact object
5254
type contactFinalizer struct {
53-
Client client.Client
55+
Client client.Client
56+
EmailProvider emailprovider.Service
5457
}
5558

5659
func (f *contactFinalizer) Finalize(ctx context.Context, obj client.Object) (finalizer.Result, error) {
@@ -64,9 +67,20 @@ func (f *contactFinalizer) Finalize(ctx context.Context, obj client.Object) (fin
6467
return finalizer.Result{}, fmt.Errorf("object is not a Contact")
6568
}
6669

70+
// Delete Contact on email provider
71+
deleted, err := f.EmailProvider.DeleteContact(ctx, *contact)
72+
if err != nil && !errors.IsNotFound(err) {
73+
log.Error(err, "Failed to delete Contact on email provider")
74+
return finalizer.Result{}, fmt.Errorf("failed to delete Contact on email provider: %w", err)
75+
}
76+
if err == nil && !deleted.Deleted {
77+
log.Error(fmt.Errorf("failed to delete Contact on email provider. Expected deleted to be true, got %t", deleted.Deleted), "Failed to delete Contact on email provider")
78+
return finalizer.Result{}, fmt.Errorf("failed to delete Contact on email provider. Expected deleted to be true, got %t", deleted.Deleted)
79+
}
80+
6781
// Get associated ContactGroupMemberships to contact name
6882
contactGroupMemberships := &notificationmiloapiscomv1alpha1.ContactGroupMembershipList{}
69-
err := f.Client.List(ctx, contactGroupMemberships, client.MatchingFields{contactNamespacedIndexKey: buildContactNamespacedIndexKey(contact.Name, contact.Namespace)})
83+
err = f.Client.List(ctx, contactGroupMemberships, client.MatchingFields{contactNamespacedIndexKey: buildContactNamespacedIndexKey(contact.Name, contact.Namespace)})
7084
if err != nil {
7185
log.Error(err, "Failed to list ContactGroupMemberships")
7286
return finalizer.Result{}, fmt.Errorf("failed to list ContactGroupMemberships: %w", err)
@@ -164,32 +178,59 @@ func (r *ContactController) Reconcile(ctx context.Context, req ctrl.Request) (ct
164178
switch {
165179
// First creation – condition not present yet
166180
case existingCond == nil:
181+
// Create Contact on email provider
182+
emailProviderContact, err := r.EmailProvider.CreateContactIdempotent(ctx, *contact)
183+
if err != nil {
184+
log.Error(err, "Failed to create Contact on email provider")
185+
return ctrl.Result{}, fmt.Errorf("failed to create Contact on email provider: %w", err)
186+
}
187+
contact.Status.ProviderID = emailProviderContact.ContactId
188+
167189
log.Info("Contact first creation")
168190
meta.SetStatusCondition(&contact.Status.Conditions, metav1.Condition{
169191
Type: notificationmiloapiscomv1alpha1.ContactReadyCondition,
192+
Status: metav1.ConditionFalse,
193+
Reason: notificationmiloapiscomv1alpha1.ContactCreatePendingReason,
194+
Message: "Contact created. Waiting for email provider webhook confirmation",
195+
LastTransitionTime: metav1.Now(),
196+
ObservedGeneration: contact.GetGeneration(),
197+
})
198+
199+
// Set ContactUpdatedCondition to true to indicate that the contact is ready to accept incoming updates
200+
meta.SetStatusCondition(&contact.Status.Conditions, metav1.Condition{
201+
Type: notificationmiloapiscomv1alpha1.ContactUpdatedCondition,
170202
Status: metav1.ConditionTrue,
171203
Reason: notificationmiloapiscomv1alpha1.ContactCreatedReason,
172-
Message: "Contact created",
204+
Message: "Contact created. Contact is ready to accept incoming updates",
173205
LastTransitionTime: metav1.Now(),
174206
ObservedGeneration: contact.GetGeneration(),
175207
})
176208

177209
// Update – generation changed since we last processed the object
178-
case updatedCond == nil || updatedCond.ObservedGeneration != contact.GetGeneration():
210+
case updatedCond != nil && updatedCond.ObservedGeneration != contact.GetGeneration():
179211
log.Info("Contact updated")
212+
213+
// Update Contact on email provider
214+
emailProviderContact, err := r.EmailProvider.UpdateContactIdempotent(ctx, *contact)
215+
if err != nil {
216+
log.Error(err, "Failed to update Contact on email provider")
217+
return ctrl.Result{}, fmt.Errorf("failed to update Contact on email provider: %w", err)
218+
}
219+
contact.Status.ProviderID = emailProviderContact.ContactId
220+
180221
// Update condition
181222
meta.SetStatusCondition(&contact.Status.Conditions, metav1.Condition{
182223
Type: notificationmiloapiscomv1alpha1.ContactUpdatedCondition,
183-
Status: metav1.ConditionTrue,
184-
Reason: notificationmiloapiscomv1alpha1.ContactUpdatedReason,
185-
Message: "Contact updated",
224+
Status: metav1.ConditionFalse,
225+
Reason: notificationmiloapiscomv1alpha1.ContactUpdatePendingReason,
226+
Message: "Contact updated. Waiting for email provider webhook confirmation",
186227
LastTransitionTime: metav1.Now(),
187228
ObservedGeneration: contact.GetGeneration(),
188229
})
189230

190231
// Get associated ContactGroupMemberships to contact
191232
contactGroupMemberships := &notificationmiloapiscomv1alpha1.ContactGroupMembershipList{}
192-
err := r.Client.List(ctx, contactGroupMemberships, client.MatchingFields{contactNamespacedIndexKey: buildContactNamespacedIndexKey(contact.Name, contact.Namespace)})
233+
err = r.Client.List(ctx, contactGroupMemberships, client.MatchingFields{contactNamespacedIndexKey: buildContactNamespacedIndexKey(contact.Name, contact.Namespace)})
193234
if err != nil {
194235
log.Error(err, "Failed to list ContactGroupMemberships")
195236
return ctrl.Result{}, fmt.Errorf("failed to list ContactGroupMemberships: %w", err)
@@ -250,7 +291,9 @@ func (r *ContactController) SetupWithManager(mgr ctrl.Manager) error {
250291
// Register finalizer
251292
r.Finalizers = finalizer.NewFinalizers()
252293
if err := r.Finalizers.Register(contactFinalizerKey, &contactFinalizer{
253-
Client: r.Client}); err != nil {
294+
Client: r.Client,
295+
EmailProvider: r.EmailProvider,
296+
}); err != nil {
254297
return fmt.Errorf("failed to register contact finalizer: %w", err)
255298
}
256299

internal/controller/contact_controller_test.go

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ import (
1414
"sigs.k8s.io/controller-runtime/pkg/client"
1515
"sigs.k8s.io/controller-runtime/pkg/client/fake"
1616
finalizerpkg "sigs.k8s.io/controller-runtime/pkg/finalizer"
17+
18+
"go.miloapis.com/email-provider-resend/internal/emailprovider"
19+
"go.miloapis.com/email-provider-resend/internal/emailprovider/mockprovider"
1720
)
1821

1922
var _ = ginkgo.Describe("ContactController", func() {
@@ -22,6 +25,7 @@ var _ = ginkgo.Describe("ContactController", func() {
2225
k8sClient client.Client
2326
controller *ContactController
2427
contact *notificationv1.Contact
28+
prov *mockprovider.MockEmailProvider
2529
)
2630

2731
ginkgo.BeforeEach(func() {
@@ -56,7 +60,13 @@ var _ = ginkgo.Describe("ContactController", func() {
5660
WithIndex(&notificationv1.ContactGroupMembership{}, contactNamespacedIndexKey, indexFn).
5761
Build()
5862

59-
controller = &ContactController{Client: k8sClient}
63+
prov = &mockprovider.MockEmailProvider{
64+
CreateContactOutput: emailprovider.CreateContactOutput{ContactId: "c-123"},
65+
DeleteContactOutput: emailprovider.DeleteContactOutput{Deleted: true},
66+
}
67+
svc := emailprovider.NewService(prov, "from", "reply")
68+
69+
controller = &ContactController{Client: k8sClient, EmailProvider: *svc}
6070
controller.Finalizers = finalizerpkg.NewFinalizers()
6171
})
6272

@@ -70,8 +80,12 @@ var _ = ginkgo.Describe("ContactController", func() {
7080

7181
cond := meta.FindStatusCondition(fetched.Status.Conditions, notificationv1.ContactReadyCondition)
7282
gomega.Expect(cond).NotTo(gomega.BeNil())
73-
gomega.Expect(cond.Reason).To(gomega.Equal(notificationv1.ContactCreatedReason))
74-
gomega.Expect(cond.Status).To(gomega.Equal(metav1.ConditionTrue))
83+
gomega.Expect(cond.Reason).To(gomega.Equal(notificationv1.ContactCreatePendingReason))
84+
gomega.Expect(cond.Status).To(gomega.Equal(metav1.ConditionFalse))
85+
86+
// provider call assertions
87+
gomega.Expect(prov.CreateContactCallCount).To(gomega.Equal(1))
88+
gomega.Expect(fetched.Status.ProviderID).To(gomega.Equal("c-123"))
7589
})
7690
})
7791

@@ -100,6 +114,10 @@ var _ = ginkgo.Describe("ContactController", func() {
100114
cond := meta.FindStatusCondition(fetched.Status.Conditions, notificationv1.ContactUpdatedCondition)
101115
gomega.Expect(cond).NotTo(gomega.BeNil())
102116
gomega.Expect(cond.ObservedGeneration).To(gomega.Equal(int64(2)))
117+
118+
// Provider should have been called to delete and recreate contact
119+
gomega.Expect(prov.DeleteContactCallCount).To(gomega.Equal(1))
120+
gomega.Expect(prov.CreateContactCallCount).To(gomega.Equal(2)) // initial plus recreate
103121
})
104122
})
105123
})
@@ -111,6 +129,7 @@ var _ = ginkgo.Describe("contactFinalizer", func() {
111129
finalizer *contactFinalizer
112130
contact *notificationv1.Contact
113131
cgm *notificationv1.ContactGroupMembership
132+
prov *mockprovider.MockEmailProvider
114133
)
115134

116135
ginkgo.BeforeEach(func() {
@@ -145,7 +164,10 @@ var _ = ginkgo.Describe("contactFinalizer", func() {
145164
return []string{buildContactNamespacedIndexKey(c.Spec.ContactRef.Name, c.Spec.ContactRef.Namespace)}
146165
}).
147166
Build()
148-
finalizer = &contactFinalizer{Client: k8sClient}
167+
168+
prov = &mockprovider.MockEmailProvider{DeleteContactOutput: emailprovider.DeleteContactOutput{Deleted: true}}
169+
svc := emailprovider.NewService(prov, "from", "reply")
170+
finalizer = &contactFinalizer{Client: k8sClient, EmailProvider: *svc}
149171
})
150172

151173
ginkgo.It("deletes associated membership in same namespace", func() {
@@ -156,6 +178,8 @@ var _ = ginkgo.Describe("contactFinalizer", func() {
156178
list := &notificationv1.ContactGroupMembershipList{}
157179
gomega.Expect(k8sClient.List(ctx, list)).To(gomega.Succeed())
158180
gomega.Expect(list.Items).To(gomega.BeEmpty())
181+
182+
gomega.Expect(prov.DeleteContactCallCount).To(gomega.Equal(1))
159183
})
160184

161185
ginkgo.It("ignores memberships in other namespaces", func() {
@@ -176,7 +200,7 @@ var _ = ginkgo.Describe("contactFinalizer", func() {
176200
return []string{buildContactNamespacedIndexKey(c.Spec.ContactRef.Name, c.Spec.ContactRef.Namespace)}
177201
}).
178202
Build()
179-
finalizer = &contactFinalizer{Client: k8sClient}
203+
finalizer = &contactFinalizer{Client: k8sClient, EmailProvider: *emailprovider.NewService(&mockprovider.MockEmailProvider{DeleteContactOutput: emailprovider.DeleteContactOutput{Deleted: true}}, "from", "reply")}
180204

181205
_, err := finalizer.Finalize(ctx, contact.DeepCopy())
182206
gomega.Expect(err).NotTo(gomega.HaveOccurred())

0 commit comments

Comments
 (0)