Skip to content

Instantly share code, notes, and snippets.

@maxsei
Last active May 14, 2024 23:00
Show Gist options
  • Save maxsei/b5a56a8af7027ff1cee893eebd75b248 to your computer and use it in GitHub Desktop.
Save maxsei/b5a56a8af7027ff1cee893eebd75b248 to your computer and use it in GitHub Desktop.
// go version go1.20.7 linux/amd64
package main
import (
"context"
"database/sql"
"errors"
"log"
"sync"
"time"
)
type DeviceRemove struct {
SerialNumber string
Error chan error
}
type DeviceAdd struct {
Path string
SerialNumber string
}
type DeviceBaud struct {
SerialNumber string
Baud int
Error chan error
}
type DeviceInfo struct {
Path string
SerialNumber string
Baud int
}
type ServerEvents struct {
deviceAdds chan DeviceAdd
deviceRemoves chan DeviceRemove
deviceBauds chan DeviceBaud
clients chan *Client
}
func NewServer() *Server {
return &Server{
events: ServerEvents{
deviceAdds: make(chan DeviceAdd),
deviceRemoves: make(chan DeviceRemove),
deviceBauds: make(chan DeviceBaud),
clients: make(chan *Client),
},
readyDevices: make(chan DeviceInfo),
}
}
// Server is keeps a list of a devices that come and go and broadcasts a copy of
// the full state of this list when it changes. Clients can listen to this
// broadcast and must receive these messages within a hard coded amount of time
// (100ms). A single asynchronous consumer of device info that is ready must
// also be provided via calling Server.ReadyDevices(). Downstream devices do not
// need to be notified when a device is removed because the OS will throw an
// exception when the path does not exist or with an io.EOF error.
type Server struct {
events ServerEvents
deviceInfos []DeviceInfo
clients []*Client
readyDevices chan DeviceInfo
}
func (s *Server) Start() {
for {
select {
case client := <-s.events.clients:
s.clients = append(s.clients, client)
case deviceAdd := <-s.events.deviceAdds:
if s.getDeviceInfo(deviceAdd.SerialNumber) != nil {
log.Print("found duplicate serial number addition")
continue
}
deviceInfo := lookupDeviceInfo(nil, deviceAdd.SerialNumber)
if deviceInfo == nil {
deviceInfo = &DeviceInfo{SerialNumber: deviceAdd.SerialNumber}
}
deviceInfo.Path = deviceAdd.Path
s.deviceInfos = append(s.deviceInfos, *deviceInfo)
if deviceInfo.Baud != 0 {
go func() { s.readyDevices <- *deviceInfo }()
}
s.publishDeviceInfosToClients()
case deviceRemove := <-s.events.deviceRemoves:
i := s.getDeviceInfoIndex(deviceRemove.SerialNumber)
if i == -1 {
err := errors.New("device not found")
log.Printf("removing device %s: %s", deviceRemove.SerialNumber, err)
go func() { deviceRemove.Error <- err }()
continue
}
s.deviceInfos = append(s.deviceInfos[:i], s.deviceInfos[i+1:]...)
go func() { deviceRemove.Error <- nil }()
s.publishDeviceInfosToClients()
case deviceBaud := <-s.events.deviceBauds:
device := s.getDeviceInfo(deviceBaud.SerialNumber)
if device == nil {
err := errors.New("device not found")
log.Printf("setting device baud %s: %s", deviceBaud.SerialNumber, err)
go func() { deviceBaud.Error <- err }()
continue
}
device.Baud = deviceBaud.Baud
go func() { deviceBaud.Error <- nil }()
go func() { s.readyDevices <- *device }()
s.publishDeviceInfosToClients()
}
}
}
func (s *Server) getDeviceInfoIndex(sn string) int {
for i := range s.deviceInfos {
if s.deviceInfos[i].SerialNumber != sn {
continue
}
return i
}
return -1
}
func (s *Server) getDeviceInfo(sn string) *DeviceInfo {
i := s.getDeviceInfoIndex(sn)
if i == -1 {
return nil
}
return &s.deviceInfos[i]
}
func (s *Server) publishDeviceInfosToClients() {
var wg sync.WaitGroup
wg.Add(len(s.clients))
for i := range s.clients {
go func(i int) {
defer wg.Done()
client := s.clients[i]
const ClientTimeout = 100 * time.Millisecond
ctx, cancel := context.WithTimeout(client.ctx, ClientTimeout)
defer cancel()
deviceInfosClone := make([]DeviceInfo, len(s.deviceInfos))
copy(deviceInfosClone, s.deviceInfos)
select {
case <-client.ctx.Done():
s.clients = append(s.clients[:i], s.clients[i+1:]...)
case <-ctx.Done():
case client.state <- deviceInfosClone:
}
}(i)
}
wg.Wait()
}
func lookupDeviceInfo(db *sql.DB, serialNumber string) *DeviceInfo {
// TODO: this will be implemented as a database lookup <14-05-24, Max Schulte> //
return nil
}
func (s *Server) AssignDeviceBaud(ctx context.Context, serialNumber string, baud int) error {
event := DeviceBaud{
SerialNumber: serialNumber,
Baud: baud,
Error: make(chan error),
}
select {
case <-ctx.Done():
case s.events.deviceBauds <- event:
}
select {
case <-ctx.Done():
case err := <-event.Error:
return err
}
return nil
}
// func (s *Server) Subscribe(ctx context.Context) chan<- []DeviceInfo {
// res := make(chan<- []DeviceInfo)
// go func() {
// defer close(res)
// c := Client{state: make(chan []DeviceInfo)}
// c.ctx, c.cancel = context.WithCancel(ctx)
// defer c.cancel()
// select {
// case s.events.clients <- c:
// case <-ctx.Done():
// }
// <-ctx.Done()
// }()
// return res
// }
func (s *Server) Subscribe(ctx context.Context, client *Client) error {
select {
case s.events.clients <- client:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (s *Server) ReadyDevices() <-chan DeviceInfo { return s.readyDevices }
func (s *Server) RemoveDevice(ctx context.Context, serialNumber string) error {
err := make(chan error)
select {
case <-ctx.Done():
return ctx.Err()
case s.events.deviceRemoves <- DeviceRemove{SerialNumber: serialNumber, Error: err}:
}
select {
case <-ctx.Done():
return ctx.Err()
case <-err:
return nil
}
}
func NewClient(ctx context.Context) *Client {
res := Client{state: make(chan []DeviceInfo)}
res.ctx, res.cancel = context.WithCancel(ctx)
return &res
}
type Client struct {
ctx context.Context
cancel context.CancelFunc
state chan []DeviceInfo
}
func (c *Client) Next() ([]DeviceInfo, error) {
select {
case res := <-c.state:
return res, nil
case <-c.ctx.Done():
return nil, c.ctx.Err()
}
}
func (c *Client) Close() error {
c.cancel()
if err := c.ctx.Err(); !errors.Is(err, context.Canceled) {
return err
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment