Skip to content

Instantly share code, notes, and snippets.

@phoracek
Created March 4, 2018 19:22
Show Gist options
  • Save phoracek/473f1a3f92dcb85a2b184ecba373710c to your computer and use it in GitHub Desktop.
Save phoracek/473f1a3f92dcb85a2b184ecba373710c to your computer and use it in GitHub Desktop.
Device Plugin Manager

dpm

-- import "kubevirt.io/kubernetes-device-plugins/pkg/dpm"

This package provides a framework (Device Plugin Manager, DPM) that makes implementation of Device Plugins https://kubernetes.io/docs/concepts/cluster-administration/device-plugins/ easier. It provides abstraction of Plugins, thanks to it a user does not need to implement actual gRPC server. It also handles dynamic management of available resources and their respective plugins.

Usage

The framework contains two main interfaces which must be implemented by user. ListerInterface handles resource management, it notifies DPM about available resources. Plugin interface then represents a plugin that handles available devices of one resource.

See Also

Repository of this package and some plugins using it can be found on https://github.com/kubevirt/kubernetes-device-plugins/.

Example

Following code illustrates usage of Device Plugin Manager. Note that this is not complete working implementation of a plugin, but rather shows structure of it.

import (
    "kubevirt.io/kubernetes-device-plugins/pkg/dpm"
    pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
)

type Plugin struct{}

func (p *Plugin) Start() error {
    // Set up resources if needed, initialize custom channels etc
    return nil
}

func (p *Plugin) Stop() error {
    // Tear down resources if needed
    return nil
}

// Monitors available resource's devices and notifies Kubernetes
func (p *Plugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
    devs := make([]*pluginapi.Device, 0)

    // Set initial set of devices
    for _, deviceID := range ... { // Iterate initial list of resource's devices
        devs = append(devs, &pluginapi.Device{
            ID: deviceID,
            Health: pluginapi.Healthy,
        }
    }
    s.Send(&pluginapi.ListAndWatchResponse{Devices: devs})

    // Send new list of devices everytime it changes
    devicesUpdateCh = ... // User implemented channel sending list of new devices everytime it changes
    for {
        select {
        case newDevices<-devicesUpdateCh:
            devs = make([]*pluginapi.Device, 0)
            for _, deviceID := range ... { // Iterate initial list of resource's devices
                devs = append(devs, &pluginapi.Device{
                    ID: deviceID,
                    Health: pluginapi.Healthy,
                }
            }
            s.Send(&pluginapi.ListAndWatchResponse{Devices: devs})
        case ...:
            // Handle stop channel, could be passed from Stop
        }
    }
}

// Allocates a device requested by one of Pods
func (p *Plugin) Allocate(ctx context.Context, r *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
    var response pluginapi.AllocateResponse
    for _, nic := range r.DevicesIDs {
        dev := new(pluginapi.DeviceSpec)
        dev.HostPath = ...
        dev.ContainerPath = ...
        dev.Permissions = "r"
        response.Devices = append(response.Devices, dev)
    }
    return &response, nil
}

type Lister struct{}

func (l *Lister) GetResourceNamespace() string {
    return "color.example.com"
}

// Monitors available resources
func (l *Lister) Discover(pluginListCh chan dpm.ResourceLastNamesList) {
    resourcesUpdateCh = ... // User implemented channel notifing about new resources
    for {
        select {
        case newResourcesList := <-resourcesUpdateCh: // New resources found
            pluginListCh <- dpm.ResourceLastNamesList(newResourceList)
        case <-pluginListCh: // Stop message received
            ... // Stop resourceUpdateCh
            return
        }
    }
}

func (l *Lister) NewPlugin(resourceLastName string) dpm.PluginInterface {
    return &Plugin{}
}

func main() {
    manager := dpm.NewManager(Lister{})
    manager.Run()
}

Usage

type ListerInterface

type ListerInterface interface {
	// GetResourceNamespace must return namespace (vendor ID) of implemented Lister. e.g. for
	// resources in format "color.example.com/<color>" that would be "color.example.com".
	GetResourceNamespace() string
	// Discover notifies manager with a list of currently available resources in its namespace.
	// e.g. if "color.example.com/red" and "color.example.com/blue" are available in the system,
	// it would pass ResourceLastNamesList{"red", "blue"} to given channel. In case list of
	// resources is static, it would use the channel only once and then return. In case the list is
	// dynamic, it could block and pass a new list each times resources changed. If blocking is
	// used, it should check whether the channel is closed, i.e. Discover should stop.
	Discover(chan ResourceLastNamesList)
	// NewPlugin instantiates a plugin implementation. It is given the last name of the resource,
	// e.g. for resource name "color.example.com/red" that would be "red". It must return valid
	// implementation of a PluginInterface.
	NewPlugin(string) PluginInterface
}

ListerInterface serves as an interface between imlementation and Manager machinery. User passes implementation of this interface to NewManager function. Manager will use it to obtain resource namespace, monitor available resources and instantate a new plugin for them.

type Manager

type Manager struct {
}

Manager contains the main machinery of this framework. It uses user defined lister to monitor available resources and start/stop plugins accordingly. It also handles system signals and unexpected kubelet events.

func NewManager

func NewManager(lister ListerInterface) *Manager

NewManager is the canonical way of initializing Manager. User must provide ListerInterface implementation. Lister will provide information about handled resources, monitor their availability and provide method to spawn plugins that will handle found resources.

func (*Manager) Run

func (dpm *Manager) Run()

Run starts the Manager. It sets up the infrastructure and handles system signals, Kubelet socket watch and monitoring of available resources as well as starting and stoping of plugins.

type PluginInterface

type PluginInterface interface {
	pluginapi.DevicePluginServer
}

PluginInterface is a mandatory interface that must be implemented by all plugins. It is identical to DevicePluginServer interface of device plugin API. In version v1alpha this interface contains methods Allocate and ListAndWatch. For more information see https://godoc.org/k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha#DevicePluginServer

type PluginInterfaceStart

type PluginInterfaceStart interface {
	Start() error
}

PluginInterfaceStart is an optional interface that could be implemented by plugin. If case Start is implemented, it will be executed by Manager after plugin instantiation and before its registartion to kubelet. This method could be used to prepare resources before they are offered to Kubernetes.

type PluginInterfaceStop

type PluginInterfaceStop interface {
	Stop() error
}

PluginInterfaceStop is an optional interface that could be implemented by plugin. If case Stop is implemented, it will be executed by Manager after the plugin is unregistered from kubelet. This method could be used to tear down resources.

type ResourceLastNamesList

type ResourceLastNamesList []string

ResourceLastNamesList contains last names of discovered resources. This type is used by Discover implementation to inform manager about changes in found resources, e.g. last name of resource "color.example.com/red" is "red".

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment