Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/scheduler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func start() error {
// start monitor metrics
go sher.RegisterFromNodeAnnotations()
go sher.CleanupGPUBindingsLoop()
go sher.CleanupPodsWithMissingDevicesLoop()
go initMetrics(config.MetricsBindAddress)

// start http server
Expand Down
109 changes: 109 additions & 0 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/Project-HAMi/HAMi/pkg/device/nvidia"

corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -396,6 +397,114 @@ func (s *Scheduler) CleanupGPUBindingsLoop() {
}
}

// CleanupPodsWithMissingDevicesLoop periodically cleans up pods that are assigned
// devices which no longer exist in the cluster.
func (s *Scheduler) CleanupPodsWithMissingDevicesLoop() {
klog.InfoS("Starting CleanupPodsWithMissingDevicesLoop")
defer klog.InfoS("Exiting CleanupPodsWithMissingDevicesLoop")
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

for {
select {
case <-ticker.C:
util.GPUManageLock.Lock()
func() {
defer util.GPUManageLock.Unlock()

// Get all scheduled pods with their device assignments FIRST
// This ordering prevents race conditions: if a new device is hot-plugged
// after we get the pods list, any pod scheduled to that device won't be
// in our list yet, so we won't incorrectly delete it.
scheduledPods := s.ListPodsInfo()
if len(scheduledPods) == 0 {
return
}

// Get all valid device UUIDs from nodes
nodes, err := s.ListNodes()
if err != nil {
klog.ErrorS(err, "CleanupPodsWithMissingDevicesLoop: failed to list nodes")
return
}
validUUIDs := make(map[string]struct{})
for _, n := range nodes {
for _, d := range n.Devices {
validUUIDs[d.ID] = struct{}{}
}
}

podsToDelete := make([]*podInfo, 0)
for _, pod := range scheduledPods {
if pod.Devices == nil || len(pod.Devices) == 0 {
continue
}

// Check if any assigned device no longer exists
hasMissingDevice := false
for _, deviceList := range pod.Devices {
for _, containerDevices := range deviceList {
for _, device := range containerDevices {
if device.UUID == "" {
continue
}
if _, exists := validUUIDs[device.UUID]; !exists {
klog.InfoS("CleanupPodsWithMissingDevicesLoop: pod has missing device",
"pod", klog.KRef(pod.Namespace, pod.Name),
"deviceUUID", device.UUID,
)
hasMissingDevice = true
break
}
}
if hasMissingDevice {
break
}
}
if hasMissingDevice {
break
}
}

if hasMissingDevice {
podsToDelete = append(podsToDelete, pod)
}
}

if len(podsToDelete) == 0 {
return
}

klog.InfoS("CleanupPodsWithMissingDevicesLoop: deleting pods with missing devices", "count", len(podsToDelete))
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

for _, pod := range podsToDelete {

err := s.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err != nil {
if kerrors.IsNotFound(err) {
klog.InfoS("CleanupPodsWithMissingDevicesLoop: pod not found",
"pod", klog.KRef(pod.Namespace, pod.Name),
)
continue
}
klog.ErrorS(err, "CleanupPodsWithMissingDevicesLoop: failed to delete pod",
"pod", klog.KRef(pod.Namespace, pod.Name),
)
} else {
klog.InfoS("CleanupPodsWithMissingDevicesLoop: deleted pod with missing device",
"pod", klog.KRef(pod.Namespace, pod.Name),
)
}
}
}()
case <-s.stopCh:
return
}
}
}

// InspectAllNodesUsage is used by metrics monitor.
func (s *Scheduler) InspectAllNodesUsage() *map[string]*NodeUsage {
return &s.overviewstatus
Expand Down