Skip to content

Instantly share code, notes, and snippets.

@M0nteCarl0
Last active July 21, 2023 09:12
Show Gist options
  • Save M0nteCarl0/f353e924c5c930ff99049187e5f99e7f to your computer and use it in GitHub Desktop.
Save M0nteCarl0/f353e924c5c930ff99049187e5f99e7f to your computer and use it in GitHub Desktop.
IPC plugin system on Golang

IPC plugin system on Golang

Here's an example of how you can implement a multi-process plugin system for Linux on Golang

Structure of files and folders

Sources structure

  • core_module.go - Core module of plugin system.
  • process_plugin.go - Plugin module.
  • watchdog.go - Watchdog module.

Folders structure

  • ./pipes/* - folder of pipes of active plugin with name name_of_plugin.
  • ./plugins/* - folder of bins of plugins.
  • ./ - root folder of system.

Files structure

  • ./pipes/name_of_plugin_pipe.pipe - pipe of active plugin with name name_of_plugin.
  • ./plugins/name_of_plugin.bin - Binary of plugin.
  • core_module.bin - Module for control plugins.
  • watchdog.bin - Module of watchdog plugin system.

How build?

  • go build -o core_module.bin ./core_module.go
  • go build -o process_plugin.bin ./process_plugin.go
  • go build -o watchdog.bin ./watchdog.go

How run?

Main case

  • Enter in Bash shell : ./core_module.bin
  • Enter in Bash shell : ./watchdog.bin

Separate run plugin binary

  • Enter in Bash shell : ./plugins/name_of_plugin.bin "path for named pipe of plugin"

What can upgrade in system of plugins?

  • Automatic detect plugins binary for pipe creating.
  • Log behavior of every module.
  • Rate limiter module.
  • Embeded NoSQL storage of requsets/responce from plugins(Fault tolerance mechanism).
  • Add Windows OS support.
  • Add active plugins dashboard with some metrics.
package main
import (
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
"path/filepath"
"sync"
"syscall"
)
type Plugin interface {
Process(data string) (string, error)
}
func main() {
// Create a directory for the named pipes
pipeDir := "./pipes"
err := os.MkdirAll(pipeDir, 0755)
if err != nil {
log.Fatal(err)
}
// Start the plugin processes
pluginPaths := []string{"./plugins/plugin1", "./plugins/plugin2"}
var wg sync.WaitGroup
for _, pluginPath := range pluginPaths {
wg.Add(1)
go startPluginProcess(pluginPath, &wg)
}
// Wait for the plugin processes to start
wg.Wait()
// Send data to the plugins
data := "Hello, plugins!"
for _, pluginPath := range pluginPaths {
result, err := sendDataToPlugin(data, pluginPath)
if err != nil {
log.Printf("Error sending data to plugin %s: %v", pluginPath, err)
continue
}
fmt.Printf("Plugin %s processed data: %s\n", pluginPath, result)
}
// Cleanup the named pipes
err = os.RemoveAll(pipeDir)
if err != nil {
log.Fatal(err)
}
}
func startPluginProcess(pluginPath string, wg *sync.WaitGroup) {
defer wg.Done()
// Create a named pipe for the plugin process
pipePath := filepath.Join("./pipes", filepath.Base(pluginPath))
err := syscall.Mkfifo(pipePath, 0666)
if err != nil {
log.Printf("Error creating named pipe for plugin %s: %v", pluginPath, err)
return
}
// Start the plugin process
cmd := exec.Command(pluginPath, pipePath)
err = cmd.Start()
if err != nil {
log.Printf("Error starting plugin %s: %v", pluginPath, err)
return
}
log.Printf("Plugin %s started", pluginPath)
// Wait for the plugin process to exit
err = cmd.Wait()
if err != nil {
log.Printf("Plugin %s exited with error: %v", pluginPath, err)
} else {
log.Printf("Plugin %s exited successfully", pluginPath)
}
// Remove the named pipe
err = os.Remove(pipePath)
if err != nil {
log.Printf("Error removing named pipe for plugin %s: %v", pluginPath, err)
}
}
func sendDataToPlugin(data string, pluginPath string) (string, error) {
// Open the named pipe for writing
pipePath := filepath.Join("./pipes", filepath.Base(pluginPath))
pipe, err := os.OpenFile(pipePath, os.O_WRONLY, os.ModeNamedPipe)
if err != nil {
return "", err
}
defer pipe.Close()
// Write the data to the named pipe
_, err = pipe.WriteString(data)
if err != nil {
return "", err
}
// Read the result from the named pipe
result, err := ioutil.ReadFile(pipePath)
if err != nil {
return "", err
}
return string(result), nil
}
package main
import (
"io/ioutil"
"log"
"os"
"path/filepath"
)
type Plugin struct {
pipePath string
}
func NewPlugin(pipePath string) *Plugin {
return &Plugin{
pipePath: pipePath,
}
}
func (p *Plugin) Process(data string) (string, error) {
// Read data from the named pipe
pipe, err := os.OpenFile(p.pipePath, os.O_RDONLY, os.ModeNamedPipe)
if err != nil {
return "", err
}
defer pipe.Close()
inputData, err := ioutil.ReadAll(pipe)
if err != nil {
return "", err
}
// Process the data
processedData := processData(string(inputData))
// Write the processed data back to the named pipe
_, err = ioutil.WriteFile(p.pipePath, []byte(processedData), 0666)
if err != nil {
return "", err
}
return processedData, nil
}
func processData(data string) string {
// Perform some processing on the data
return "Processed: " + data
}
func main() {
if len(os.Args) != 2 {
log.Fatal("Usage: plugin <pipe path>")
}
pipePath := os.Args[1]
plugin := NewPlugin(pipePath)
// Start processing data from the named pipe
for {
_, err := plugin.Process("")
if err != nil {
log.Printf("Error processing data: %v", err)
}
}
}
package main
import (
"io/ioutil"
"log"
"os"
"os/exec"
"path/filepath"
"sync"
"syscall"
"time"
)
type Watchdog struct {
pluginPaths []string
timeout time.Duration
}
func NewWatchdog(pluginPaths []string, timeout time.Duration) *Watchdog {
return &Watchdog{
pluginPaths: pluginPaths,
timeout: timeout,
}
}
func (w *Watchdog) Start() {
// Create a directory for the named pipes
pipeDir := "./pipes"
err := os.MkdirAll(pipeDir, 0755)
if err != nil {
log.Fatal(err)
}
// Start the plugin processes
var wg sync.WaitGroup
for _, pluginPath := range w.pluginPaths {
wg.Add(1)
go w.startPluginProcess(pluginPath, &wg)
}
// Wait for the plugin processes to start
wg.Wait()
// Monitor the plugin processes
for {
// Sleep for the specified timeout duration
time.Sleep(w.timeout)
// Check if any plugin process has exited
for _, pluginPath := range w.pluginPaths {
if !w.isPluginRunning(pluginPath) {
log.Printf("Plugin %s has exited, restarting...", pluginPath)
w.startPluginProcess(pluginPath, &wg)
}
}
}
}
func (w *Watchdog) startPluginProcess(pluginPath string, wg *sync.WaitGroup) {
defer wg.Done()
// Create a named pipe for the plugin process
pipePath := filepath.Join("./pipes", filepath.Base(pluginPath))
err := syscall.Mkfifo(pipePath, 0666)
if err != nil {
log.Printf("Error creating named pipe for plugin %s: %v", pluginPath, err)
return
}
// Start the plugin process
cmd := exec.Command(pluginPath, pipePath)
err = cmd.Start()
if err != nil {
log.Printf("Error starting plugin %s: %v", pluginPath, err)
return
}
log.Printf("Plugin %s started", pluginPath)
}
func (w *Watchdog) isPluginRunning(pluginPath string) bool {
// Check if the plugin process is still running
cmd := exec.Command("pgrep", "-f", pluginPath)
err := cmd.Run()
return err == nil
}
func main() {
// Specify the plugin paths and timeout duration
pluginPaths := []string{"./plugins/plugin1", "./plugins/plugin2"}
timeout := 5 * time.Second
// Create a new watchdog instance
watchdog := NewWatchdog(pluginPaths, timeout)
// Start the watchdog
watchdog.Start()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment