Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion charts/jobset/templates/controller/cluster_role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ metadata:
{{- include "jobset.controller.labels" . | nindent 4 }}
rules:
- apiGroups:
- ""
- events.k8s.io
resources:
- events
verbs:
Expand Down
18 changes: 9 additions & 9 deletions config/components/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,6 @@ kind: ClusterRole
metadata:
name: manager-role
rules:
- apiGroups:
- ""
resources:
- events
verbs:
- create
- patch
- update
- watch
- apiGroups:
- ""
resources:
Expand Down Expand Up @@ -83,6 +74,15 @@ rules:
- get
- patch
- update
- apiGroups:
- events.k8s.io
resources:
- events
verbs:
- create
- patch
- update
- watch
- apiGroups:
- jobset.x-k8s.io
resources:
Expand Down
42 changes: 38 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,26 @@ limitations under the License.
package main

import (
"context"
"crypto/tls"
"errors"
"flag"
"fmt"
"net/http"
"os"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
_ "k8s.io/client-go/plugin/pkg/client/auth"

eventsv1 "k8s.io/api/events/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
k8sevents "k8s.io/client-go/tools/events"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand Down Expand Up @@ -187,6 +193,18 @@ func main() {
}

ctx := ctrl.SetupSignalHandler()
eventBroadcaster, err := newEventBroadcaster(ctx, kubeConfig)
if err != nil {
setupLog.Error(err, "unable to initialize events broadcaster")
os.Exit(1)
}
go func() {
<-ctx.Done()
eventBroadcaster.Shutdown()
}()
jobSetRecorder := eventBroadcaster.NewRecorder(mgr.GetScheme(), "jobset")
podRecorder := eventBroadcaster.NewRecorder(mgr.GetScheme(), "pod")

if err := controllers.SetupJobSetIndexes(ctx, mgr.GetFieldIndexer()); err != nil {
setupLog.Error(err, "unable to setup jobset reconciler indexes")
os.Exit(1)
Expand All @@ -199,7 +217,7 @@ func main() {
// Cert won't be ready until manager starts, so start a goroutine here which
// will block until the cert is ready before setting up the controllers.
// Controllers who register after manager starts will start directly.
go setupControllers(mgr, certsReady)
go setupControllers(mgr, certsReady, jobSetRecorder, podRecorder)

setupHealthzAndReadyzCheck(mgr, certsReady)

Expand All @@ -210,22 +228,22 @@ func main() {
}
}

func setupControllers(mgr ctrl.Manager, certsReady chan struct{}) {
func setupControllers(mgr ctrl.Manager, certsReady chan struct{}, jobSetRecorder, podRecorder k8sevents.EventRecorder) {
// The controllers won't work until the webhooks are operating,
// and the webhook won't work until the certs are all in places.
setupLog.Info("waiting for the cert generation to complete")
<-certsReady
setupLog.Info("certs ready")

// Set up JobSet controller.
jobSetController := controllers.NewJobSetReconciler(mgr.GetClient(), mgr.GetScheme(), mgr.GetEventRecorderFor("jobset"))
jobSetController := controllers.NewJobSetReconciler(mgr.GetClient(), mgr.GetScheme(), jobSetRecorder)
if err := jobSetController.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "JobSet")
os.Exit(1)
}

// Set up pod reconciler.
podController := controllers.NewPodReconciler(mgr.GetClient(), mgr.GetScheme(), mgr.GetEventRecorderFor("pod"))
podController := controllers.NewPodReconciler(mgr.GetClient(), mgr.GetScheme(), podRecorder)
if err := podController.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Pod")
os.Exit(1)
Expand Down Expand Up @@ -279,6 +297,22 @@ func setupHealthzAndReadyzCheck(mgr ctrl.Manager, certsReady <-chan struct{}) {
}
}

func newEventBroadcaster(ctx context.Context, cfg *rest.Config) (k8sevents.EventBroadcaster, error) {
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, err
}
if _, err := clientset.Discovery().ServerResourcesForGroupVersion(eventsv1.SchemeGroupVersion.String()); err != nil {
return nil, fmt.Errorf("events.k8s.io/v1 API not available: %w", err)
}

broadcaster := k8sevents.NewBroadcaster(&k8sevents.EventSinkImpl{Interface: clientset.EventsV1()})
if err := broadcaster.StartRecordingToSinkWithContext(ctx); err != nil {
return nil, err
}
return broadcaster, nil
}

func apply(configFile string) (ctrl.Options, configapi.Configuration, error) {
options, cfg, err := config.Load(scheme, configFile)
if err != nil {
Expand Down
58 changes: 58 additions & 0 deletions pkg/controllers/events_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package controllers

import (
"context"
"testing"

corev1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/events"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
"sigs.k8s.io/jobset/pkg/constants"
)

type benchmarkEventSink struct{}

func (s *benchmarkEventSink) Create(ctx context.Context, event *eventsv1.Event) (*eventsv1.Event, error) {
return event, nil
}

func (s *benchmarkEventSink) Update(ctx context.Context, event *eventsv1.Event) (*eventsv1.Event, error) {
return event, nil
}

func (s *benchmarkEventSink) Patch(ctx context.Context, event *eventsv1.Event, _ []byte) (*eventsv1.Event, error) {
return event, nil
}

func BenchmarkEventRecorderV1(b *testing.B) {
scheme := runtime.NewScheme()
utilruntime.Must(jobset.AddToScheme(scheme))

ctx, cancel := context.WithCancel(context.Background())
b.Cleanup(cancel)

broadcaster := events.NewBroadcaster(&benchmarkEventSink{})
if err := broadcaster.StartRecordingToSinkWithContext(ctx); err != nil {
b.Fatalf("start recording: %v", err)
}
b.Cleanup(broadcaster.Shutdown)

recorder := broadcaster.NewRecorder(scheme, "jobset")
js := &jobset.JobSet{
ObjectMeta: metav1.ObjectMeta{
Name: "bench",
Namespace: "default",
},
}

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
recorder.Eventf(js, nil, corev1.EventTypeWarning, constants.JobCreationFailedReason, constants.JobCreationFailedReason, "benchmark")
}
}
14 changes: 7 additions & 7 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
Expand All @@ -56,7 +56,7 @@ var apiGVStr = jobset.GroupVersion.String()
type JobSetReconciler struct {
client.Client
Scheme *runtime.Scheme
Record record.EventRecorder
Record events.EventRecorder
clock clock.Clock
}

Expand Down Expand Up @@ -91,11 +91,11 @@ type eventParams struct {
eventMessage string
}

func NewJobSetReconciler(client client.Client, scheme *runtime.Scheme, record record.EventRecorder) *JobSetReconciler {
func NewJobSetReconciler(client client.Client, scheme *runtime.Scheme, record events.EventRecorder) *JobSetReconciler {
return &JobSetReconciler{Client: client, Scheme: scheme, Record: record, clock: clock.RealClock{}}
}

//+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch
//+kubebuilder:rbac:groups=events.k8s.io,resources=events,verbs=create;watch;update;patch
//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=jobset.x-k8s.io,resources=jobsets/finalizers,verbs=update
Expand Down Expand Up @@ -332,7 +332,7 @@ func (r *JobSetReconciler) updateJobSetStatus(ctx context.Context, js *jobset.Jo
}
// If the status update was successful, emit any enqueued events.
for _, event := range updateStatusOpts.events {
r.Record.Eventf(event.object, event.eventType, event.eventReason, event.eventMessage)
r.Record.Eventf(event.object, nil, event.eventType, event.eventReason, event.eventReason, event.eventMessage)
}
}
return nil
Expand Down Expand Up @@ -605,7 +605,7 @@ func (r *JobSetReconciler) reconcileReplicatedJobs(ctx context.Context, js *jobs
// Create jobs as necessary.
if err := r.createJobs(ctx, js, jobs); err != nil {
log.Error(err, "creating jobs")
r.Record.Eventf(js, corev1.EventTypeWarning, constants.JobCreationFailedReason, err.Error())
r.Record.Eventf(js, nil, corev1.EventTypeWarning, constants.JobCreationFailedReason, constants.JobCreationFailedReason, err.Error())
return err
}

Expand Down Expand Up @@ -721,7 +721,7 @@ func (r *JobSetReconciler) createHeadlessSvcIfNecessary(ctx context.Context, js

// Create headless service.
if err := r.Create(ctx, &headlessSvc); err != nil {
r.Record.Eventf(js, corev1.EventTypeWarning, constants.HeadlessServiceCreationFailedReason, err.Error())
r.Record.Eventf(js, nil, corev1.EventTypeWarning, constants.HeadlessServiceCreationFailedReason, constants.HeadlessServiceCreationFailedReason, err.Error())
return err
}
log.V(2).Info("successfully created headless service", "service", klog.KObj(&headlessSvc))
Expand Down
5 changes: 2 additions & 3 deletions pkg/controllers/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"testing"
"time"

"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"

"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -1677,8 +1677,7 @@ func TestCreateHeadlessSvcIfNecessary(t *testing.T) {
}
fakeClient := fakeClientBuilder.Build()

eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "jobset-test-reconciler"})
recorder := events.NewFakeRecorder(1)

// Create a JobSetReconciler instance with the fake client
r := &JobSetReconciler{
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -52,10 +52,10 @@ const (
type PodReconciler struct {
client.Client
Scheme *runtime.Scheme
Record record.EventRecorder
Record events.EventRecorder
}

func NewPodReconciler(client client.Client, scheme *runtime.Scheme, record record.EventRecorder) *PodReconciler {
func NewPodReconciler(client client.Client, scheme *runtime.Scheme, record events.EventRecorder) *PodReconciler {
return &PodReconciler{Client: client, Scheme: scheme, Record: record}
}

Expand Down Expand Up @@ -108,7 +108,7 @@ func IndexPodName(obj client.Object) []string {
return []string{podName}
}

// +kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update;patch
// +kubebuilder:rbac:groups=events.k8s.io,resources=events,verbs=create;watch;update;patch
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch

Expand Down
7 changes: 3 additions & 4 deletions pkg/controllers/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
Expand Down Expand Up @@ -355,7 +355,7 @@ func makePod(args *makePodArgs) *testutils.PodWrapper {
type fakeClient struct {
client client.WithWatch
scheme *runtime.Scheme
record record.EventRecorder
record events.EventRecorder
}

func makeFakeClient(interceptor interceptor.Funcs, initObjs ...client.Object) *fakeClient {
Expand All @@ -364,8 +364,7 @@ func makeFakeClient(interceptor interceptor.Funcs, initObjs ...client.Object) *f
utilruntime.Must(corev1.AddToScheme(scheme))
utilruntime.Must(batchv1.AddToScheme(scheme))

eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "jobset-test-reconciler"})
recorder := events.NewFakeRecorder(1)
fc := fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(initObjs...).
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/volume_claim_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (r *JobSetReconciler) createPVCsIfNecessary(ctx context.Context, js *jobset
// Create PVC if it doesn't exist.
if err := r.Create(ctx, &pvc); err != nil {
if !apierrors.IsAlreadyExists(err) {
r.Record.Eventf(js, corev1.EventTypeWarning, constants.PVCCreationFailedReason, err.Error())
r.Record.Eventf(js, nil, corev1.EventTypeWarning, constants.PVCCreationFailedReason, constants.PVCCreationFailedReason, err.Error())
return err
}
} else {
Expand Down
5 changes: 2 additions & 3 deletions pkg/controllers/volume_claim_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2/ktesting"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -221,8 +221,7 @@ func TestReconcileVolumeClaimPolicies(t *testing.T) {
fakeClientBuilder.WithObjects(tc.existingPVC)
}
fakeClient := fakeClientBuilder.Build()
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: "jobset-test-reconciler"})
recorder := events.NewFakeRecorder(1)

// Create a JobSetReconciler instance with the fake client
r := &JobSetReconciler{
Expand Down
Loading
Loading