Last active
March 12, 2023 14:56
-
-
Save bamchoh/a68ecf8db25168d231e27a11dff46401 to your computer and use it in GitHub Desktop.
My gopcua
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2018-2020 opcua authors. All rights reserved. | |
// Use of this source code is governed by a MIT-style license that can be | |
// found in the LICENSE file. | |
package main | |
import ( | |
"context" | |
"database/sql" | |
"flag" | |
"fmt" | |
"log" | |
"os" | |
"os/signal" | |
"sync" | |
"syscall" | |
"time" | |
"github.com/google/uuid" | |
"github.com/gopcua/opcua" | |
"github.com/gopcua/opcua/debug" | |
"github.com/gopcua/opcua/ua" | |
_ "github.com/mattn/go-sqlite3" | |
) | |
type OpcUaConfig struct { | |
Endpoint string | |
Policy string | |
Mode string | |
CertFile string | |
KeyFile string | |
Interval time.Duration | |
} | |
type MyOpcUaClient struct { | |
*opcua.Client | |
IdCache map[string]*ua.NodeID | |
} | |
func NewMyOpcUaClient(ctx context.Context, config OpcUaConfig) *MyOpcUaClient { | |
endpoints, err := opcua.GetEndpoints(ctx, config.Endpoint) | |
if err != nil { | |
log.Fatal(err) | |
} | |
ep := opcua.SelectEndpoint(endpoints, config.Policy, ua.MessageSecurityModeFromString(config.Mode)) | |
if ep == nil { | |
log.Fatal("Failed to find suitable endpoint") | |
} | |
fmt.Println("*", ep.SecurityPolicyURI, ep.SecurityMode) | |
opts := []opcua.Option{ | |
opcua.SecurityPolicy(config.Policy), | |
opcua.SecurityModeString(config.Mode), | |
opcua.CertificateFile(config.CertFile), | |
opcua.PrivateKeyFile(config.KeyFile), | |
opcua.AuthAnonymous(), | |
opcua.SecurityFromEndpoint(ep, ua.UserTokenTypeAnonymous), | |
} | |
return &MyOpcUaClient{ | |
Client: opcua.NewClient(ep.EndpointURL, opts...), | |
IdCache: make(map[string]*ua.NodeID), | |
} | |
} | |
func (c *MyOpcUaClient) findOrParse(nodeId string) (*ua.NodeID, error) { | |
if id, ok := c.IdCache[nodeId]; ok { | |
return id, nil | |
} | |
id, err := ua.ParseNodeID(nodeId) | |
if err != nil { | |
return nil, err | |
} | |
c.IdCache[nodeId] = id | |
return id, nil | |
} | |
func (c *MyOpcUaClient) write(ctx context.Context, nodeIds []string, values []interface{}) { | |
for i, nodeId := range nodeIds { | |
id, err := c.findOrParse(nodeId) | |
if err != nil { | |
log.Fatalf("invalid node id: %v", err) | |
} | |
v, err := ua.NewVariant(values[i]) | |
if err != nil { | |
log.Fatalf("invalid value: %v", err) | |
} | |
req := &ua.WriteRequest{ | |
NodesToWrite: []*ua.WriteValue{ | |
{ | |
NodeID: id, | |
AttributeID: ua.AttributeIDValue, | |
Value: &ua.DataValue{ | |
EncodingMask: ua.DataValueValue, | |
Value: v, | |
}, | |
}, | |
}, | |
} | |
resp, err := c.WriteWithContext(ctx, req) | |
if err != nil { | |
log.Fatalf("Write failed: %s", err) | |
} | |
for _, result := range resp.Results { | |
_ = result | |
// log.Printf("%v", result) | |
} | |
} | |
} | |
func (c *MyOpcUaClient) read(ctx context.Context, nodeIds []string) { | |
nodesToRead := make([]*ua.ReadValueID, 0) | |
for _, nodeId := range nodeIds { | |
id, err := c.findOrParse(nodeId) | |
if err != nil { | |
log.Fatalf("invalid node id: %v", err) | |
} | |
nodesToRead = append(nodesToRead, &ua.ReadValueID{ | |
AttributeID: ua.AttributeIDValue, | |
NodeID: id, | |
}) | |
} | |
req := &ua.ReadRequest{ | |
MaxAge: 2000, | |
NodesToRead: nodesToRead, | |
TimestampsToReturn: ua.TimestampsToReturnBoth, | |
} | |
resp, err := c.ReadWithContext(ctx, req) | |
if err != nil { | |
log.Fatalf("Read failed: %s", err) | |
} | |
for i, result := range resp.Results { | |
if result.Status != ua.StatusOK { | |
log.Fatalf("Status not OK: %v", result.Status) | |
} | |
log.Printf("%v = %#v (%v)", nodesToRead[i].NodeID, result.Value.Value(), result.Value.Type().String()) | |
} | |
} | |
func (c *MyOpcUaClient) subscribe(ctx context.Context, nodeIds []string, interval time.Duration, db *sql.DB, isReady chan struct{}) error { | |
notifyCh := make(chan *opcua.PublishNotificationData) | |
sub, err := c.SubscribeWithContext(ctx, &opcua.SubscriptionParameters{ | |
Interval: interval, | |
}, notifyCh) | |
if err != nil { | |
return fmt.Errorf("subscription creation was failed: %w", err) | |
} | |
defer sub.Cancel(ctx) | |
log.Printf("Created subscription with id %v", sub.SubscriptionID) | |
IdMap := make(map[uint32]*ua.NodeID) | |
for _, nodeID := range nodeIds { | |
id, err := c.findOrParse(nodeID) | |
if err != nil { | |
return fmt.Errorf("node id parsing was failed: %w", err) | |
} | |
handle := uuid.New().ID() | |
IdMap[handle] = id | |
var miCreateRequest *ua.MonitoredItemCreateRequest | |
miCreateRequest = c.valueRequest(id, handle) | |
res, err := sub.Monitor(ua.TimestampsToReturnBoth, miCreateRequest) | |
if err != nil || res.Results[0].StatusCode != ua.StatusOK { | |
return fmt.Errorf("monitored item creation request was failed:%w", err) | |
} | |
} | |
tx, err := db.Begin() | |
if err != nil { | |
return fmt.Errorf("transaction could not begin: %w", err) | |
} | |
stmt, err := tx.Prepare("insert into variables(nodeid, value) values(?, ?)") | |
if err != nil { | |
return fmt.Errorf("transaction preparation was failed: %w", err) | |
} | |
defer stmt.Close() | |
for _, nodeId := range nodeIds { | |
_, err = stmt.Exec(nodeId, "0") | |
if err != nil { | |
return fmt.Errorf("nodeid cannot be inserted: %w", err) | |
} | |
} | |
err = tx.Commit() | |
if err != nil { | |
return fmt.Errorf("transaction could not commit: %w", err) | |
} | |
isReady <- struct{}{} | |
// read from subscription's notification channel until ctx is cancelled | |
for { | |
select { | |
case <-ctx.Done(): | |
return nil | |
case res := <-notifyCh: | |
if res.Error != nil { | |
log.Printf("subscription response error: %q", res.Error) | |
continue | |
} | |
switch x := res.Value.(type) { | |
case *ua.DataChangeNotification: | |
for _, item := range x.MonitoredItems { | |
data := item.Value.Value.Value() | |
id := IdMap[item.ClientHandle] | |
tx, err := db.Begin() | |
if err != nil { | |
return fmt.Errorf("update transaction begin was failed: %w", err) | |
} | |
stmt, err := tx.Prepare("update variables set value = ? where nodeid = ?") | |
if err != nil { | |
return fmt.Errorf("update transaction prepare was failed: %w", err) | |
} | |
defer stmt.Close() | |
_, err = stmt.Exec(fmt.Sprint(data), id.String()) | |
if err != nil { | |
return fmt.Errorf("update transaction exec was failed: %w", err) | |
} | |
err = tx.Commit() | |
if err != nil { | |
return fmt.Errorf("update transaction commit was failed: %w", err) | |
} | |
log.Printf("MonitoredItem with client handle %v(%v) = %v", id, item.ClientHandle, data) | |
} | |
default: | |
log.Printf("what's this publish result? %T", res.Value) | |
} | |
} | |
} | |
} | |
func (c *MyOpcUaClient) close(ctx context.Context) { | |
c.CloseSession() | |
c.CloseWithContext(ctx) | |
} | |
func (c *MyOpcUaClient) valueRequest(nodeID *ua.NodeID, handle uint32) *ua.MonitoredItemCreateRequest { | |
return opcua.NewMonitoredItemCreateRequestWithDefaults(nodeID, ua.AttributeIDValue, handle) | |
} | |
func parseFlags() OpcUaConfig { | |
var ( | |
endpoint = flag.String("endpoint", "opc.tcp://localhost:4840", "OPC UA Endpoint URL") | |
policy = flag.String("policy", "None", "Security policy: None, Basic128Rsa15, Basic256, Basic256Sha256. Default: auto") | |
mode = flag.String("mode", "", "Security mode: None, Sign, SignAndEncrypt. Default: auto") | |
certFile = flag.String("cert", "", "Path to cert.pem. Required for security mode/policy != None") | |
keyFile = flag.String("key", "", "Path to private key.pem. Required for security mode/policy != None") | |
interval = flag.Duration("interval", opcua.DefaultSubscriptionInterval, "subscription interval") | |
) | |
flag.BoolVar(&debug.Enable, "debug", false, "enable debug logging") | |
flag.Parse() | |
return OpcUaConfig{ | |
Endpoint: *endpoint, | |
Policy: *policy, | |
Mode: *mode, | |
CertFile: *certFile, | |
KeyFile: *keyFile, | |
Interval: *interval, | |
} | |
} | |
func runOpcUa(ctx context.Context, wg *sync.WaitGroup, db *sql.DB, msgCh chan string, isReady chan struct{}) error { | |
config := parseFlags() | |
log.SetFlags(0) | |
c := NewMyOpcUaClient(ctx, config) | |
if err := c.Connect(ctx); err != nil { | |
err = fmt.Errorf("opc ua connection was failed: %w", err) | |
return err | |
} | |
defer c.close(ctx) | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
case msg := <-msgCh: | |
switch msg { | |
case "write_control": | |
nodeIDsToWrite := []string{ | |
"ns=2;s=Root.Control", | |
} | |
values := []interface{}{ | |
uint16(0), | |
} | |
c.write(ctx, nodeIDsToWrite, values) | |
} | |
} | |
select { | |
case <-ctx.Done(): | |
return | |
case <-time.After(100 * time.Millisecond): | |
} | |
} | |
/* | |
nodeIDsToWrite := []string{ | |
"ns=2;s=Root.Code", | |
"ns=2;s=Root.Count", | |
"ns=2;s=Root.Message", | |
"ns=2;s=Root.Timestamp", | |
"ns=2;s=Root.Value", | |
} | |
values := []interface{}{ | |
float32(1.234), | |
float32(2.345), | |
float32(3.456), | |
} | |
for { | |
_ = nodeIDsToWrite | |
// c.write(ctx, nodeIDsToWrite, values) | |
for i := 0; i < len(values); i++ { | |
values[i] = values[i].(float32) + float32(0.1) | |
} | |
select { | |
case <-ctx.Done(): | |
return | |
case <-time.After(100 * time.Millisecond): | |
} | |
} | |
*/ | |
}() | |
nodeIDs := []string{ | |
"ns=2;s=Root.Code", | |
"ns=2;s=Root.Count", | |
"ns=2;s=Root.Message", | |
"ns=2;s=Root.Timestamp", | |
"ns=2;s=Root.Value", | |
} | |
/* | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
c.read(ctx, nodeIDs) | |
}() | |
*/ | |
err := c.subscribe(ctx, nodeIDs, config.Interval, db, isReady) | |
if err != nil { | |
return fmt.Errorf("subscription failed: %w", err) | |
} | |
return nil | |
} | |
func getCount(db *sql.DB) (int, error) { | |
countNodeID := "ns=2;s=Root.Count" | |
var count int | |
rows, err := db.Query("select value from variables where nodeid = '" + countNodeID + "'") | |
if err != nil { | |
return 0, fmt.Errorf("query failed: %w", err) | |
} | |
defer rows.Close() | |
for rows.Next() { | |
err = rows.Scan(&count) | |
if err != nil { | |
return 0, fmt.Errorf("row scan failed: %w", err) | |
} | |
} | |
err = rows.Err() | |
if err != nil { | |
return 0, fmt.Errorf("rows failed: %w", err) | |
} | |
return count, nil | |
} | |
func readAlarm(db *sql.DB) error { | |
codeNodeId := "ns=2;s=Root.Code" | |
messageNodeId := "ns=2;s=Root.Message" | |
timestampNodeId := "ns=2;s=Root.Timestamp" | |
valueNodeId := "ns=2;s=Root.Value" | |
rows, err := db.Query("select nodeid, value from variables where nodeid in ($1, $2, $3, $4)", | |
codeNodeId, messageNodeId, timestampNodeId, valueNodeId, | |
) | |
if err != nil { | |
return fmt.Errorf("query failed: %w", err) | |
} | |
defer rows.Close() | |
values := make(map[string]string) | |
for rows.Next() { | |
var nodeid string | |
var value string | |
err = rows.Scan(&nodeid, &value) | |
if err != nil { | |
return fmt.Errorf("row scan failed: %w", err) | |
} | |
values[nodeid] = value | |
} | |
err = rows.Err() | |
if err != nil { | |
return fmt.Errorf("rows failed: %w", err) | |
} | |
_, err = db.Exec(`insert into alarms(code, message, timestamp, value) values (?, ?, ?, ?)`, | |
values[codeNodeId], | |
values[messageNodeId], | |
values[timestampNodeId], | |
values[valueNodeId], | |
) | |
if err != nil { | |
return fmt.Errorf("nodeid cannot be inserted: %w", err) | |
} | |
return nil | |
} | |
func dbMain(ctx context.Context, db *sql.DB, msgCh chan string, isReady chan struct{}) error { | |
<-isReady | |
for { | |
count, err := getCount(db) | |
if err != nil { | |
return fmt.Errorf("getCount was failed: %w", err) | |
} | |
log.Println(count) | |
if count > 0 { | |
err = readAlarm(db) | |
if err != nil { | |
return fmt.Errorf("readAlarm was failed: %w", err) | |
} | |
msgCh <- "write_control" | |
} | |
select { | |
case <-ctx.Done(): | |
return nil | |
case <-time.After(500 * time.Millisecond): | |
} | |
} | |
} | |
func createSql() (*sql.DB, error) { | |
os.Remove("./foo.db") | |
db, err := sql.Open("sqlite3", "./foo.db") | |
if err != nil { | |
return nil, fmt.Errorf("db open failed: %w", err) | |
} | |
sqlStmt := ` | |
create table if not exists variables ( | |
nodeid text not null primary key, | |
value string | |
); | |
delete from variables; | |
` | |
_, err = db.Exec(sqlStmt) | |
if err != nil { | |
err = fmt.Errorf("%w: %s\n", err, sqlStmt) | |
return nil, err | |
} | |
sqlStmt = ` | |
create table if not exists alarms ( | |
id integer primary key, | |
code integer, | |
message text, | |
timestamp integer, | |
value integer | |
); | |
delete from alarms; | |
` | |
_, err = db.Exec(sqlStmt) | |
if err != nil { | |
err = fmt.Errorf("%w: %s\n", err, sqlStmt) | |
return nil, err | |
} | |
return db, nil | |
} | |
func main() { | |
db, err := createSql() | |
if err != nil { | |
log.Fatalf("db creation was failed: %v", err) | |
} | |
defer db.Close() | |
ctx, cancel := context.WithCancel(context.Background()) | |
defer cancel() | |
wg := new(sync.WaitGroup) | |
defer wg.Wait() | |
msgCh := make(chan string, 10) | |
isReady := make(chan struct{}, 1) | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
err := runOpcUa(ctx, wg, db, msgCh, isReady) | |
if err != nil { | |
err = fmt.Errorf("runOPCUa failed: %w", err) | |
log.Println(err) | |
} | |
}() | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
err = dbMain(ctx, db, msgCh, isReady) | |
if err != nil { | |
err = fmt.Errorf("dbMain error: %w", err) | |
log.Println(err) | |
} | |
}() | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
sig := make(chan os.Signal, 1) | |
signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT) | |
select { | |
case s := <-sig: | |
fmt.Printf("Signal received: %s \n", s.String()) | |
cancel() | |
case <-ctx.Done(): | |
} | |
}() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment