Go: Creating Dynamic Kubernetes Informers

Introduction

Recently, I published v1.0.0 of Kollect, a dynamic Kubernetes informer that publishes changes in cluster resources to a configurable event bus. At the heart of this project is a dynamic informer, a method of handling add/update/delete notifications of arbitrary cluster resources (including those added as a CustomResourceDefinition).

This kind of tooling is quite powerful, as you can perform operations on arbitrary resources without knowing their structure. This is especially useful in situations where you cannot import the canonical types for those resources from public repositories, or they’re too large and complex to write your own types without a lot of time and effort.

For example, using Kollect (or your own informer), you can track changes to your resources in real time, or check that resources follow best practices as they change, using a tool like Open Policy Agent.

In this post, I’ll attempt to get you started writing a dynamic Kubernetes informer that will allow you to perform operations when any resources of your choosing change.

Getting Started

Every go project starts with initialising a new Go module:

go mod init github.com/myname/myinformer

And a main.go:

package main

func main() {
	
}

Cluster Authentication

In order to start handling notifications, we’re going to need to authenticate with the cluster that we’re running in or against. This means we have two separate methods of authentication:

  • Kubeconfig - Pointing directly to a kubeconfig file that our application accesses on startup. This would be used typically when your program is not running in the cluster it is watching.
  • In-cluster - Obtaining permissions based on the ServiceAccount associated with the Pod that our program is running in within the cluster we want to watch.

We’re going to use the k8s.io/client-go package, so you’ll need to run:

go get k8s.io/client-go

Now we’ve downloaded the dependency, let’s update our main.go to create a Kubernetes API cluster config based on where our application is running:

package main

import (
	"log"
	"os"

	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	kubeConfig := os.Getenv("KUBECONFIG")
	
	var clusterConfig *rest.Config
	var err error
	if kubeConfig != "" {
		clusterConfig, err = clientcmd.BuildConfigFromFlags("", kubeConfig)
	} else {
		clusterConfig, err = rest.InClusterConfig()
	}
	if err != nil {
		log.Fatalln(err)
	}

	clusterClient, err := dynamic.NewForConfig(clusterConfig)
	if err != nil {
		log.Fatalln(err)
	}
}

The code above checks for the presence of the KUBECONFIG environment variable, if it is present, we create our cluster configuration using the clientcmd package. Otherwise, we use the rest package to assume credentials from the Pod we’re running in. Then, we create a new dynamic client.

The dynamic package, allows us to query cluster resources as unstructured.Unstructured types. These are basically wrappers around map[string]interface{} that have helper methods for obtaining Kubernetes resource specifics such as the API version, group, kind, labels, annotations etc.

Monitoring Resources

Now that we’ve authenticated against the cluster, we can start monitoring resources. We’ll do this with the dynamicinformer package. We’re also going to need to decide which resources we want to watch and create an informer for each one. In this example, we’ll create a single informer that watches Deployment resources, but you can easily extend it to watch multiple resources.

package main

import (
	"log"
	"os"
	"time"

	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/dynamic/dynamicinformer"
	"k8s.io/apimachinery/pkg/runtime/schema"
	corev1 "k8s.io/api/core/v1"
)

func main() {
	kubeConfig := os.Getenv("KUBECONFIG")
	
	var clusterConfig *rest.Config
	var err error
	if kubeConfig != "" {
		clusterConfig, err = clientcmd.BuildConfigFromFlags("", kubeConfig)
	} else {
		clusterConfig, err = rest.InClusterConfig()
	}
	if err != nil {
		log.Fatalln(err)
	}

	clusterClient, err := dynamic.NewForConfig(clusterConfig)
	if err != nil {
		log.Fatalln(err)
	}
	
	resource := schema.GroupVersionResource{Group:"apps", Version:"v1", Resource: "deployments"}
	factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(clusterClient, time.Minute, corev1.NamespaceAll, nil)
	informer := factory.ForResource(resource).Informer()
}

Notice that when we call NewFilteredDynamicSharedInformerFactory, we pass in corev1.NamespaceAll as the namespace to watch resources in. This causes the informer to watch over all namespaces within the cluster. You can modify this to only a specific namespace, or filter by namespace in the handler methods.

Now that we’ve created a new informer that will watch for changes in Deployment resources, we need to register handler functions for add, update and delete events. This is done via the informer.AddEventHandler method:

package main

import (
	"log"
	"os"
	"time"
	"os/signal"
	"context"

	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/dynamic/dynamicinformer"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	"k8s.io/apimachinery/pkg/runtime/schema"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/client-go/tools/cache"
)

func main() {
	kubeConfig := os.Getenv("KUBECONFIG")
	
	var clusterConfig *rest.Config
	var err error
	if kubeConfig != "" {
		clusterConfig, err = clientcmd.BuildConfigFromFlags("", kubeConfig)
	} else {
		clusterConfig, err = rest.InClusterConfig()
	}
	if err != nil {
		log.Fatalln(err)
	}

	clusterClient, err := dynamic.NewForConfig(clusterConfig)
	if err != nil {
		log.Fatalln(err)
	}
	
	resource := schema.GroupVersionResource{Group:"apps", Version:"v1", Resource: "deployments"}
	factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(clusterClient, time.Minute, corev1.NamespaceAll, nil)
	informer := factory.ForResource(resource).Informer()
	
	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			u := obj.(*unstructured.Unstructured)
        },
		UpdateFunc: func(oldObj, newObj interface{}) {},
		DeleteFunc: func(obj interface{}) {},
    })

	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
	defer cancel()
	
	informer.Run(ctx.Done())
}

Notice that for AddFunc, UpdateFunc and DeleteFunc that the parameters are passed as interface{}, because we’re using the dynamicinformer package, we can assume these are instances of *unstructured.Unstructured and safely cast them.

We’re also creating a context.Context that is cancelled on an os.Interrupt signal. This allows us to prevent the application from exiting until it receives an interrupt signal. Its Done channel is passed to informer.Run, to keep the informer alive until execution is cancelled.

From here, your handling logic is your own, do what you want when resources are added, updated or changed. Further sections in this post will cover additional considerations regarding cache syncing and using RBAC to give your Pod access to the Kubernetes API.

Cache Syncing

When an informer starts, it will build a cache of all resources it currently watches which is lost when the application restarts. This means that on startup, each of your handler functions will be invoked as the initial state is built. If this is not desirable for your use case, you can wait until the caches are synced before performing any updates using the cache.WaitForCacheSync function:

package main

import (
	"log"
	"os"
	"sync"
	"time"
	"os/signal"
	"context"

	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/dynamic/dynamicinformer"
	"k8s.io/apimachinery/pkg/runtime/schema"
	corev1 "k8s.io/api/core/v1"
	"k8s.io/client-go/tools/cache"
)

func main() {
	kubeConfig := os.Getenv("KUBECONFIG")

	var clusterConfig *rest.Config
	var err error
	if kubeConfig != "" {
		clusterConfig, err = clientcmd.BuildConfigFromFlags("", kubeConfig)
	} else {
		clusterConfig, err = rest.InClusterConfig()
	}
	if err != nil {
		log.Fatalln(err)
	}

	clusterClient, err := dynamic.NewForConfig(clusterConfig)
	if err != nil {
		log.Fatalln(err)
	}

	resource := schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}
	factory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(clusterClient, time.Minute, corev1.NamespaceAll, nil)
	informer := factory.ForResource(resource).Informer()

	mux := &sync.RWMutex{}
	synced := false
	informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			mux.RLock()
			defer mux.RUnlock()
			if !synced {
				return
			}
			
			// Handler logic
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			mux.RLock()
			defer mux.RUnlock()
			if !synced {
				return
			}

			// Handler logic
		},
		DeleteFunc: func(obj interface{}) {
			mux.RLock()
			defer mux.RUnlock()
			if !synced {
				return
			}

			// Handler logic
		},
	})

	ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
	defer cancel()

	go informer.Run(ctx.Done())
	
	isSynced := cache.WaitForCacheSync(ctx.Done(), informer.HasSynced)
	mux.Lock()
	synced = isSynced
	mux.Unlock()
	
	if !isSynced {
		log.Fatal("failed to sync")
    }
	
	<-ctx.Done()
}

In the code above, we use a boolean synced to indicate that the caches are finished syncing and that our handler functions are only being invoked once the initial state of the watched resources has been built. We’ve had to make some modifications, like starting the informer asynchronously using a go statement, as the caches will not start building until informer.Run is called.

It may seem unintuitive at first, but we also don’t directly assign the return value of WaitForCacheSync to the synced variable within a mutex lock. This is because the handler functions are being invoked while the cache is syncing and will effectively be queued. If we lock that mutex initially, the updates that occurred while the cache was syncing will still trigger our handler functions. This means we need to only reassign synced once we’re sure the cache sync is complete.

RBAC

Finally, when running within a cluster, we’re going to need to use RBAC to provide the ServiceAccount the appropriate permissions to monitor resources of our choosing. This is done using the Role/RoleBinding resources (if you’re handling things at the namespace level) or the ClusteRole/ClusterRoleBinding resources (if you’re handling things at the cluster level). You can view full documentation for these resources here

Let’s create a ServiceAccount, ClusterRole and ClusterRoleBinding to match our code above. It will allow us to watch all Deployment resources in all namespaces:

apiVersion: v1
kind: ServiceAccount
metadata:
  name: myinformer
  namespace: mynamespace
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: deployment-informer
rules:
- apiGroups: ["apps/v1"]
  resources: ["deployments"]
  verbs: ["get", "watch", "list"]
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: read-secrets-global
subjects:
- kind: ServiceAccount
  name: myinformer
  namespace: mynamespace
roleRef:
  kind: ClusterRole
  name: deployment-informer
  apiGroup: rbac.authorization.k8s.io

When you deploy your application within the cluster, use the serviceAccountName field in the pod specification to the myinformer one created above. This will provide the Pod with access to the Kubernetes API, specifically to perform get, list and watch request on Deployment resources.

Wrapping Up

Hopefully this post has given you enough insight into the world of Kubernetes informers to implement your own. As said at the start, I used code like this to implement Kollect, and it works as well as you would expect.