Last active
May 14, 2024 23:00
-
-
Save maxsei/b5a56a8af7027ff1cee893eebd75b248 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// go version go1.20.7 linux/amd64 | |
package main | |
import ( | |
"context" | |
"database/sql" | |
"errors" | |
"log" | |
"sync" | |
"time" | |
) | |
type DeviceRemove struct { | |
SerialNumber string | |
Error chan error | |
} | |
type DeviceAdd struct { | |
Path string | |
SerialNumber string | |
} | |
type DeviceBaud struct { | |
SerialNumber string | |
Baud int | |
Error chan error | |
} | |
type DeviceInfo struct { | |
Path string | |
SerialNumber string | |
Baud int | |
} | |
type ServerEvents struct { | |
deviceAdds chan DeviceAdd | |
deviceRemoves chan DeviceRemove | |
deviceBauds chan DeviceBaud | |
clients chan *Client | |
} | |
func NewServer() *Server { | |
return &Server{ | |
events: ServerEvents{ | |
deviceAdds: make(chan DeviceAdd), | |
deviceRemoves: make(chan DeviceRemove), | |
deviceBauds: make(chan DeviceBaud), | |
clients: make(chan *Client), | |
}, | |
readyDevices: make(chan DeviceInfo), | |
} | |
} | |
// Server is keeps a list of a devices that come and go and broadcasts a copy of | |
// the full state of this list when it changes. Clients can listen to this | |
// broadcast and must receive these messages within a hard coded amount of time | |
// (100ms). A single asynchronous consumer of device info that is ready must | |
// also be provided via calling Server.ReadyDevices(). Downstream devices do not | |
// need to be notified when a device is removed because the OS will throw an | |
// exception when the path does not exist or with an io.EOF error. | |
type Server struct { | |
events ServerEvents | |
deviceInfos []DeviceInfo | |
clients []*Client | |
readyDevices chan DeviceInfo | |
} | |
func (s *Server) Start() { | |
for { | |
select { | |
case client := <-s.events.clients: | |
s.clients = append(s.clients, client) | |
case deviceAdd := <-s.events.deviceAdds: | |
if s.getDeviceInfo(deviceAdd.SerialNumber) != nil { | |
log.Print("found duplicate serial number addition") | |
continue | |
} | |
deviceInfo := lookupDeviceInfo(nil, deviceAdd.SerialNumber) | |
if deviceInfo == nil { | |
deviceInfo = &DeviceInfo{SerialNumber: deviceAdd.SerialNumber} | |
} | |
deviceInfo.Path = deviceAdd.Path | |
s.deviceInfos = append(s.deviceInfos, *deviceInfo) | |
if deviceInfo.Baud != 0 { | |
go func() { s.readyDevices <- *deviceInfo }() | |
} | |
s.publishDeviceInfosToClients() | |
case deviceRemove := <-s.events.deviceRemoves: | |
i := s.getDeviceInfoIndex(deviceRemove.SerialNumber) | |
if i == -1 { | |
err := errors.New("device not found") | |
log.Printf("removing device %s: %s", deviceRemove.SerialNumber, err) | |
go func() { deviceRemove.Error <- err }() | |
continue | |
} | |
s.deviceInfos = append(s.deviceInfos[:i], s.deviceInfos[i+1:]...) | |
go func() { deviceRemove.Error <- nil }() | |
s.publishDeviceInfosToClients() | |
case deviceBaud := <-s.events.deviceBauds: | |
device := s.getDeviceInfo(deviceBaud.SerialNumber) | |
if device == nil { | |
err := errors.New("device not found") | |
log.Printf("setting device baud %s: %s", deviceBaud.SerialNumber, err) | |
go func() { deviceBaud.Error <- err }() | |
continue | |
} | |
device.Baud = deviceBaud.Baud | |
go func() { deviceBaud.Error <- nil }() | |
go func() { s.readyDevices <- *device }() | |
s.publishDeviceInfosToClients() | |
} | |
} | |
} | |
func (s *Server) getDeviceInfoIndex(sn string) int { | |
for i := range s.deviceInfos { | |
if s.deviceInfos[i].SerialNumber != sn { | |
continue | |
} | |
return i | |
} | |
return -1 | |
} | |
func (s *Server) getDeviceInfo(sn string) *DeviceInfo { | |
i := s.getDeviceInfoIndex(sn) | |
if i == -1 { | |
return nil | |
} | |
return &s.deviceInfos[i] | |
} | |
func (s *Server) publishDeviceInfosToClients() { | |
var wg sync.WaitGroup | |
wg.Add(len(s.clients)) | |
for i := range s.clients { | |
go func(i int) { | |
defer wg.Done() | |
client := s.clients[i] | |
const ClientTimeout = 100 * time.Millisecond | |
ctx, cancel := context.WithTimeout(client.ctx, ClientTimeout) | |
defer cancel() | |
deviceInfosClone := make([]DeviceInfo, len(s.deviceInfos)) | |
copy(deviceInfosClone, s.deviceInfos) | |
select { | |
case <-client.ctx.Done(): | |
s.clients = append(s.clients[:i], s.clients[i+1:]...) | |
case <-ctx.Done(): | |
case client.state <- deviceInfosClone: | |
} | |
}(i) | |
} | |
wg.Wait() | |
} | |
func lookupDeviceInfo(db *sql.DB, serialNumber string) *DeviceInfo { | |
// TODO: this will be implemented as a database lookup <14-05-24, Max Schulte> // | |
return nil | |
} | |
func (s *Server) AssignDeviceBaud(ctx context.Context, serialNumber string, baud int) error { | |
event := DeviceBaud{ | |
SerialNumber: serialNumber, | |
Baud: baud, | |
Error: make(chan error), | |
} | |
select { | |
case <-ctx.Done(): | |
case s.events.deviceBauds <- event: | |
} | |
select { | |
case <-ctx.Done(): | |
case err := <-event.Error: | |
return err | |
} | |
return nil | |
} | |
// func (s *Server) Subscribe(ctx context.Context) chan<- []DeviceInfo { | |
// res := make(chan<- []DeviceInfo) | |
// go func() { | |
// defer close(res) | |
// c := Client{state: make(chan []DeviceInfo)} | |
// c.ctx, c.cancel = context.WithCancel(ctx) | |
// defer c.cancel() | |
// select { | |
// case s.events.clients <- c: | |
// case <-ctx.Done(): | |
// } | |
// <-ctx.Done() | |
// }() | |
// return res | |
// } | |
func (s *Server) Subscribe(ctx context.Context, client *Client) error { | |
select { | |
case s.events.clients <- client: | |
case <-ctx.Done(): | |
return ctx.Err() | |
} | |
return nil | |
} | |
func (s *Server) ReadyDevices() <-chan DeviceInfo { return s.readyDevices } | |
func (s *Server) RemoveDevice(ctx context.Context, serialNumber string) error { | |
err := make(chan error) | |
select { | |
case <-ctx.Done(): | |
return ctx.Err() | |
case s.events.deviceRemoves <- DeviceRemove{SerialNumber: serialNumber, Error: err}: | |
} | |
select { | |
case <-ctx.Done(): | |
return ctx.Err() | |
case <-err: | |
return nil | |
} | |
} | |
func NewClient(ctx context.Context) *Client { | |
res := Client{state: make(chan []DeviceInfo)} | |
res.ctx, res.cancel = context.WithCancel(ctx) | |
return &res | |
} | |
type Client struct { | |
ctx context.Context | |
cancel context.CancelFunc | |
state chan []DeviceInfo | |
} | |
func (c *Client) Next() ([]DeviceInfo, error) { | |
select { | |
case res := <-c.state: | |
return res, nil | |
case <-c.ctx.Done(): | |
return nil, c.ctx.Err() | |
} | |
} | |
func (c *Client) Close() error { | |
c.cancel() | |
if err := c.ctx.Err(); !errors.Is(err, context.Canceled) { | |
return err | |
} | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment