-- import "kubevirt.io/kubernetes-device-plugins/pkg/dpm"
This package provides a framework (Device Plugin Manager, DPM) that makes implementation of Device Plugins https://kubernetes.io/docs/concepts/cluster-administration/device-plugins/ easier. It provides abstraction of Plugins, thanks to it a user does not need to implement actual gRPC server. It also handles dynamic management of available resources and their respective plugins.
The framework contains two main interfaces which must be implemented by user. ListerInterface handles resource management, it notifies DPM about available resources. Plugin interface then represents a plugin that handles available devices of one resource.
Repository of this package and some plugins using it can be found on https://github.com/kubevirt/kubernetes-device-plugins/.
Following code illustrates usage of Device Plugin Manager. Note that this is not complete working implementation of a plugin, but rather shows structure of it.
import (
"kubevirt.io/kubernetes-device-plugins/pkg/dpm"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
)
type Plugin struct{}
func (p *Plugin) Start() error {
// Set up resources if needed, initialize custom channels etc
return nil
}
func (p *Plugin) Stop() error {
// Tear down resources if needed
return nil
}
// Monitors available resource's devices and notifies Kubernetes
func (p *Plugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.DevicePlugin_ListAndWatchServer) error {
devs := make([]*pluginapi.Device, 0)
// Set initial set of devices
for _, deviceID := range ... { // Iterate initial list of resource's devices
devs = append(devs, &pluginapi.Device{
ID: deviceID,
Health: pluginapi.Healthy,
}
}
s.Send(&pluginapi.ListAndWatchResponse{Devices: devs})
// Send new list of devices everytime it changes
devicesUpdateCh = ... // User implemented channel sending list of new devices everytime it changes
for {
select {
case newDevices<-devicesUpdateCh:
devs = make([]*pluginapi.Device, 0)
for _, deviceID := range ... { // Iterate initial list of resource's devices
devs = append(devs, &pluginapi.Device{
ID: deviceID,
Health: pluginapi.Healthy,
}
}
s.Send(&pluginapi.ListAndWatchResponse{Devices: devs})
case ...:
// Handle stop channel, could be passed from Stop
}
}
}
// Allocates a device requested by one of Pods
func (p *Plugin) Allocate(ctx context.Context, r *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
var response pluginapi.AllocateResponse
for _, nic := range r.DevicesIDs {
dev := new(pluginapi.DeviceSpec)
dev.HostPath = ...
dev.ContainerPath = ...
dev.Permissions = "r"
response.Devices = append(response.Devices, dev)
}
return &response, nil
}
type Lister struct{}
func (l *Lister) GetResourceNamespace() string {
return "color.example.com"
}
// Monitors available resources
func (l *Lister) Discover(pluginListCh chan dpm.ResourceLastNamesList) {
resourcesUpdateCh = ... // User implemented channel notifing about new resources
for {
select {
case newResourcesList := <-resourcesUpdateCh: // New resources found
pluginListCh <- dpm.ResourceLastNamesList(newResourceList)
case <-pluginListCh: // Stop message received
... // Stop resourceUpdateCh
return
}
}
}
func (l *Lister) NewPlugin(resourceLastName string) dpm.PluginInterface {
return &Plugin{}
}
func main() {
manager := dpm.NewManager(Lister{})
manager.Run()
}
type ListerInterface interface {
// GetResourceNamespace must return namespace (vendor ID) of implemented Lister. e.g. for
// resources in format "color.example.com/<color>" that would be "color.example.com".
GetResourceNamespace() string
// Discover notifies manager with a list of currently available resources in its namespace.
// e.g. if "color.example.com/red" and "color.example.com/blue" are available in the system,
// it would pass ResourceLastNamesList{"red", "blue"} to given channel. In case list of
// resources is static, it would use the channel only once and then return. In case the list is
// dynamic, it could block and pass a new list each times resources changed. If blocking is
// used, it should check whether the channel is closed, i.e. Discover should stop.
Discover(chan ResourceLastNamesList)
// NewPlugin instantiates a plugin implementation. It is given the last name of the resource,
// e.g. for resource name "color.example.com/red" that would be "red". It must return valid
// implementation of a PluginInterface.
NewPlugin(string) PluginInterface
}
ListerInterface serves as an interface between imlementation and Manager machinery. User passes implementation of this interface to NewManager function. Manager will use it to obtain resource namespace, monitor available resources and instantate a new plugin for them.
type Manager struct {
}
Manager contains the main machinery of this framework. It uses user defined lister to monitor available resources and start/stop plugins accordingly. It also handles system signals and unexpected kubelet events.
func NewManager(lister ListerInterface) *Manager
NewManager is the canonical way of initializing Manager. User must provide ListerInterface implementation. Lister will provide information about handled resources, monitor their availability and provide method to spawn plugins that will handle found resources.
func (dpm *Manager) Run()
Run starts the Manager. It sets up the infrastructure and handles system signals, Kubelet socket watch and monitoring of available resources as well as starting and stoping of plugins.
type PluginInterface interface {
pluginapi.DevicePluginServer
}
PluginInterface is a mandatory interface that must be implemented by all plugins. It is identical to DevicePluginServer interface of device plugin API. In version v1alpha this interface contains methods Allocate and ListAndWatch. For more information see https://godoc.org/k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha#DevicePluginServer
type PluginInterfaceStart interface {
Start() error
}
PluginInterfaceStart is an optional interface that could be implemented by plugin. If case Start is implemented, it will be executed by Manager after plugin instantiation and before its registartion to kubelet. This method could be used to prepare resources before they are offered to Kubernetes.
type PluginInterfaceStop interface {
Stop() error
}
PluginInterfaceStop is an optional interface that could be implemented by plugin. If case Stop is implemented, it will be executed by Manager after the plugin is unregistered from kubelet. This method could be used to tear down resources.
type ResourceLastNamesList []string
ResourceLastNamesList contains last names of discovered resources. This type is used by Discover implementation to inform manager about changes in found resources, e.g. last name of resource "color.example.com/red" is "red".