Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions backend/cmd/headlamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
auth "github.com/kubernetes-sigs/headlamp/backend/pkg/auth"
"github.com/kubernetes-sigs/headlamp/backend/pkg/cache"
cfg "github.com/kubernetes-sigs/headlamp/backend/pkg/config"
"github.com/kubernetes-sigs/headlamp/backend/pkg/k8cache"
"github.com/kubernetes-sigs/headlamp/backend/pkg/serviceproxy"

headlampcfg "github.com/kubernetes-sigs/headlamp/backend/pkg/headlampconfig"
Expand Down Expand Up @@ -444,6 +445,8 @@ func createHeadlampHandler(config *HeadlampConfig) http.Handler {
pluginEventChan,
config.cache,
)
// Set the function to stop kubeconfig context watchers
kubeconfig.StopContextWatcher = k8cache.StopWatcher
// in-cluster mode is unlikely to want reloading kubeconfig.
go kubeconfig.LoadAndWatchFiles(config.KubeConfigStore, kubeConfigPath, kubeconfig.KubeConfig, skipFunc)
}
Expand Down
40 changes: 40 additions & 0 deletions backend/cmd/headlamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"net/url"
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
"testing"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/kubernetes-sigs/headlamp/backend/pkg/cache"
"github.com/kubernetes-sigs/headlamp/backend/pkg/config"
"github.com/kubernetes-sigs/headlamp/backend/pkg/headlampconfig"
"github.com/kubernetes-sigs/headlamp/backend/pkg/k8cache"
"github.com/kubernetes-sigs/headlamp/backend/pkg/kubeconfig"
"github.com/kubernetes-sigs/headlamp/backend/pkg/telemetry"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -322,6 +324,44 @@ func TestDynamicClustersKubeConfig(t *testing.T) {
assert.Equal(t, "default", minikubeCluster.Metadata["namespace"])
}

func TestWiresStopContextWatcher(t *testing.T) {
orig := kubeconfig.StopContextWatcher
defer func() {
kubeconfig.StopContextWatcher = orig
}()

kubeconfig.StopContextWatcher = nil
cache := cache.New[interface{}]()
kubeConfigStore := kubeconfig.NewContextStore()

c := HeadlampConfig{
HeadlampCFG: &headlampconfig.HeadlampCFG{
UseInCluster: false,
KubeConfigPath: "",
EnableDynamicClusters: true,
KubeConfigStore: kubeConfigStore,
},
cache: cache,
telemetryConfig: GetDefaultTestTelemetryConfig(),
telemetryHandler: &telemetry.RequestHandler{},
}

handler := createHeadlampHandler(&c)
require.NotNil(t, handler)

require.NotNil(
t,
kubeconfig.StopContextWatcher,
"StopContextWatcher should be wired during handler creation",
)
require.Equal(
t,
reflect.ValueOf(k8cache.StopWatcher).Pointer(),
reflect.ValueOf(kubeconfig.StopContextWatcher).Pointer(),
"StopContextWatcher should be wired to k8cache.StopWatcher",
)
}

func TestInvalidKubeConfig(t *testing.T) {
cache := cache.New[interface{}]()
kubeConfigStore := kubeconfig.NewContextStore()
Expand Down
11 changes: 11 additions & 0 deletions backend/pkg/k8cache/cacheInvalidation.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,17 @@ func CheckForChanges(
go runWatcher(ctx, k8scache, contextKey, kContext)
}

// StopWatcher stops and cleans up the watcher for a given contextKey.
func StopWatcher(contextKey string) {
logger.Log(logger.LevelInfo, nil, nil, "stopping watcher for context: "+contextKey)

if cancelAny, ok := contextCancel.Load(contextKey); ok {
cancelAny.(context.CancelFunc)()
contextCancel.Delete(contextKey)
watcherRegistry.Delete(contextKey)
}
}

// runWatcher is a long-lived goroutine that sets up and runs Kubernetes informers.
// It watches for resource changes and invalidates corresponding cache entries.
// This function will only exit when its context is cancelled.
Expand Down
6 changes: 6 additions & 0 deletions backend/pkg/kubeconfig/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (

const watchInterval = 10 * time.Second

var StopContextWatcher func(contextName string)

// LoadAndWatchFiles loads kubeconfig files and watches them for changes.
func LoadAndWatchFiles(kubeConfigStore ContextStore, paths string, source int, ignoreFunc shouldBeSkippedFunc) {
// create ticker
Expand Down Expand Up @@ -138,6 +140,10 @@ func syncContexts(kubeConfigStore ContextStore, paths string, source int, ignore
}

if !found {
if StopContextWatcher != nil {
StopContextWatcher(existingCtx.Name)
}

err := kubeConfigStore.RemoveContext(existingCtx.Name)
if err != nil {
logger.Log(logger.LevelError, nil, err, "error removing context")
Expand Down
91 changes: 91 additions & 0 deletions backend/pkg/kubeconfig/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kubeconfig_test
import (
"runtime"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -95,7 +96,97 @@ func TestWatchAndLoadFiles(t *testing.T) {

require.True(t, removed, "Context should have been removed")
})
t.Run("StopContextWatcher callback", func(t *testing.T) {
// Sleep to ensure watcher is ready
time.Sleep(2 * time.Second)

stoppedContexts := make(map[string]int)

var mu sync.Mutex

callbackInvoked := make(chan string, 10)

kubeconfig.StopContextWatcher = func(contextName string) {
mu.Lock()
stoppedContexts[contextName]++
count := stoppedContexts[contextName]
mu.Unlock()
select {
case callbackInvoked <- contextName:
default:
}

t.Logf("StopContextWatcher called for context: %s (count: %d)", contextName, count)
}
defer func() {
kubeconfig.StopContextWatcher = nil
}()

config, err := clientcmd.LoadFromFile("./test_data/kubeconfig1")
require.NoError(t, err)

testContextName := "test-context-for-callback"
config.Contexts[testContextName] = &clientcmdapi.Context{
Cluster: "docker-desktop",
AuthInfo: "docker-desktop",
}

err = clientcmd.WriteToFile(*config, "./test_data/kubeconfig1")
require.NoError(t, err)

// Wait for context to be added
found := false

for i := 0; i < 20; i++ {
context, err := kubeConfigStore.GetContext(testContextName)
if err == nil && context != nil {
found = true

t.Logf("Context %s found in store after %d attempts", testContextName, i+1)

break
}

time.Sleep(500 * time.Millisecond)
}

require.True(t, found, "Context should have been added")

time.Sleep(1 * time.Second)

for len(callbackInvoked) > 0 {
<-callbackInvoked
}

t.Logf("Removing context %s from kubeconfig file", testContextName)

config, err = clientcmd.LoadFromFile("./test_data/kubeconfig1")
require.NoError(t, err)
delete(config.Contexts, testContextName)

err = clientcmd.WriteToFile(*config, "./test_data/kubeconfig1")
require.NoError(t, err)

var removedContext string
select {
case removedContext = <-callbackInvoked:
t.Logf("Received callback for context: %s", removedContext)
case <-time.After(10 * time.Second):
t.Fatal("Timeout waiting for StopContextWatcher callback")
}

require.Equal(t, testContextName,
removedContext, "StopContextWatcher should have been called for the removed context")

_, err = kubeConfigStore.GetContext(testContextName)
require.Error(t, err, "Context should have been removed from store")

mu.Lock()
callCount := stoppedContexts[testContextName]
mu.Unlock()
t.Logf("StopContextWatcher was called %d time(s) for %s", callCount, testContextName)
require.Greater(t, callCount, 0, "StopContextWatcher should have been called at least once")
})
// Cleanup in case test fails
defer func() {
config, err := clientcmd.LoadFromFile("./test_data/kubeconfig1")
Expand Down
Loading