Skip to content

Instantly share code, notes, and snippets.

@bamchoh
Last active March 12, 2023 14:56
Show Gist options
  • Save bamchoh/a68ecf8db25168d231e27a11dff46401 to your computer and use it in GitHub Desktop.
Save bamchoh/a68ecf8db25168d231e27a11dff46401 to your computer and use it in GitHub Desktop.
My gopcua
// Copyright 2018-2020 opcua authors. All rights reserved.
// Use of this source code is governed by a MIT-style license that can be
// found in the LICENSE file.
package main
import (
"context"
"database/sql"
"flag"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/google/uuid"
"github.com/gopcua/opcua"
"github.com/gopcua/opcua/debug"
"github.com/gopcua/opcua/ua"
_ "github.com/mattn/go-sqlite3"
)
type OpcUaConfig struct {
Endpoint string
Policy string
Mode string
CertFile string
KeyFile string
Interval time.Duration
}
type MyOpcUaClient struct {
*opcua.Client
IdCache map[string]*ua.NodeID
}
func NewMyOpcUaClient(ctx context.Context, config OpcUaConfig) *MyOpcUaClient {
endpoints, err := opcua.GetEndpoints(ctx, config.Endpoint)
if err != nil {
log.Fatal(err)
}
ep := opcua.SelectEndpoint(endpoints, config.Policy, ua.MessageSecurityModeFromString(config.Mode))
if ep == nil {
log.Fatal("Failed to find suitable endpoint")
}
fmt.Println("*", ep.SecurityPolicyURI, ep.SecurityMode)
opts := []opcua.Option{
opcua.SecurityPolicy(config.Policy),
opcua.SecurityModeString(config.Mode),
opcua.CertificateFile(config.CertFile),
opcua.PrivateKeyFile(config.KeyFile),
opcua.AuthAnonymous(),
opcua.SecurityFromEndpoint(ep, ua.UserTokenTypeAnonymous),
}
return &MyOpcUaClient{
Client: opcua.NewClient(ep.EndpointURL, opts...),
IdCache: make(map[string]*ua.NodeID),
}
}
func (c *MyOpcUaClient) findOrParse(nodeId string) (*ua.NodeID, error) {
if id, ok := c.IdCache[nodeId]; ok {
return id, nil
}
id, err := ua.ParseNodeID(nodeId)
if err != nil {
return nil, err
}
c.IdCache[nodeId] = id
return id, nil
}
func (c *MyOpcUaClient) write(ctx context.Context, nodeIds []string, values []interface{}) {
for i, nodeId := range nodeIds {
id, err := c.findOrParse(nodeId)
if err != nil {
log.Fatalf("invalid node id: %v", err)
}
v, err := ua.NewVariant(values[i])
if err != nil {
log.Fatalf("invalid value: %v", err)
}
req := &ua.WriteRequest{
NodesToWrite: []*ua.WriteValue{
{
NodeID: id,
AttributeID: ua.AttributeIDValue,
Value: &ua.DataValue{
EncodingMask: ua.DataValueValue,
Value: v,
},
},
},
}
resp, err := c.WriteWithContext(ctx, req)
if err != nil {
log.Fatalf("Write failed: %s", err)
}
for _, result := range resp.Results {
_ = result
// log.Printf("%v", result)
}
}
}
func (c *MyOpcUaClient) read(ctx context.Context, nodeIds []string) {
nodesToRead := make([]*ua.ReadValueID, 0)
for _, nodeId := range nodeIds {
id, err := c.findOrParse(nodeId)
if err != nil {
log.Fatalf("invalid node id: %v", err)
}
nodesToRead = append(nodesToRead, &ua.ReadValueID{
AttributeID: ua.AttributeIDValue,
NodeID: id,
})
}
req := &ua.ReadRequest{
MaxAge: 2000,
NodesToRead: nodesToRead,
TimestampsToReturn: ua.TimestampsToReturnBoth,
}
resp, err := c.ReadWithContext(ctx, req)
if err != nil {
log.Fatalf("Read failed: %s", err)
}
for i, result := range resp.Results {
if result.Status != ua.StatusOK {
log.Fatalf("Status not OK: %v", result.Status)
}
log.Printf("%v = %#v (%v)", nodesToRead[i].NodeID, result.Value.Value(), result.Value.Type().String())
}
}
func (c *MyOpcUaClient) subscribe(ctx context.Context, nodeIds []string, interval time.Duration, db *sql.DB, isReady chan struct{}) error {
notifyCh := make(chan *opcua.PublishNotificationData)
sub, err := c.SubscribeWithContext(ctx, &opcua.SubscriptionParameters{
Interval: interval,
}, notifyCh)
if err != nil {
return fmt.Errorf("subscription creation was failed: %w", err)
}
defer sub.Cancel(ctx)
log.Printf("Created subscription with id %v", sub.SubscriptionID)
IdMap := make(map[uint32]*ua.NodeID)
for _, nodeID := range nodeIds {
id, err := c.findOrParse(nodeID)
if err != nil {
return fmt.Errorf("node id parsing was failed: %w", err)
}
handle := uuid.New().ID()
IdMap[handle] = id
var miCreateRequest *ua.MonitoredItemCreateRequest
miCreateRequest = c.valueRequest(id, handle)
res, err := sub.Monitor(ua.TimestampsToReturnBoth, miCreateRequest)
if err != nil || res.Results[0].StatusCode != ua.StatusOK {
return fmt.Errorf("monitored item creation request was failed:%w", err)
}
}
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("transaction could not begin: %w", err)
}
stmt, err := tx.Prepare("insert into variables(nodeid, value) values(?, ?)")
if err != nil {
return fmt.Errorf("transaction preparation was failed: %w", err)
}
defer stmt.Close()
for _, nodeId := range nodeIds {
_, err = stmt.Exec(nodeId, "0")
if err != nil {
return fmt.Errorf("nodeid cannot be inserted: %w", err)
}
}
err = tx.Commit()
if err != nil {
return fmt.Errorf("transaction could not commit: %w", err)
}
isReady <- struct{}{}
// read from subscription's notification channel until ctx is cancelled
for {
select {
case <-ctx.Done():
return nil
case res := <-notifyCh:
if res.Error != nil {
log.Printf("subscription response error: %q", res.Error)
continue
}
switch x := res.Value.(type) {
case *ua.DataChangeNotification:
for _, item := range x.MonitoredItems {
data := item.Value.Value.Value()
id := IdMap[item.ClientHandle]
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("update transaction begin was failed: %w", err)
}
stmt, err := tx.Prepare("update variables set value = ? where nodeid = ?")
if err != nil {
return fmt.Errorf("update transaction prepare was failed: %w", err)
}
defer stmt.Close()
_, err = stmt.Exec(fmt.Sprint(data), id.String())
if err != nil {
return fmt.Errorf("update transaction exec was failed: %w", err)
}
err = tx.Commit()
if err != nil {
return fmt.Errorf("update transaction commit was failed: %w", err)
}
log.Printf("MonitoredItem with client handle %v(%v) = %v", id, item.ClientHandle, data)
}
default:
log.Printf("what's this publish result? %T", res.Value)
}
}
}
}
func (c *MyOpcUaClient) close(ctx context.Context) {
c.CloseSession()
c.CloseWithContext(ctx)
}
func (c *MyOpcUaClient) valueRequest(nodeID *ua.NodeID, handle uint32) *ua.MonitoredItemCreateRequest {
return opcua.NewMonitoredItemCreateRequestWithDefaults(nodeID, ua.AttributeIDValue, handle)
}
func parseFlags() OpcUaConfig {
var (
endpoint = flag.String("endpoint", "opc.tcp://localhost:4840", "OPC UA Endpoint URL")
policy = flag.String("policy", "None", "Security policy: None, Basic128Rsa15, Basic256, Basic256Sha256. Default: auto")
mode = flag.String("mode", "", "Security mode: None, Sign, SignAndEncrypt. Default: auto")
certFile = flag.String("cert", "", "Path to cert.pem. Required for security mode/policy != None")
keyFile = flag.String("key", "", "Path to private key.pem. Required for security mode/policy != None")
interval = flag.Duration("interval", opcua.DefaultSubscriptionInterval, "subscription interval")
)
flag.BoolVar(&debug.Enable, "debug", false, "enable debug logging")
flag.Parse()
return OpcUaConfig{
Endpoint: *endpoint,
Policy: *policy,
Mode: *mode,
CertFile: *certFile,
KeyFile: *keyFile,
Interval: *interval,
}
}
func runOpcUa(ctx context.Context, wg *sync.WaitGroup, db *sql.DB, msgCh chan string, isReady chan struct{}) error {
config := parseFlags()
log.SetFlags(0)
c := NewMyOpcUaClient(ctx, config)
if err := c.Connect(ctx); err != nil {
err = fmt.Errorf("opc ua connection was failed: %w", err)
return err
}
defer c.close(ctx)
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case msg := <-msgCh:
switch msg {
case "write_control":
nodeIDsToWrite := []string{
"ns=2;s=Root.Control",
}
values := []interface{}{
uint16(0),
}
c.write(ctx, nodeIDsToWrite, values)
}
}
select {
case <-ctx.Done():
return
case <-time.After(100 * time.Millisecond):
}
}
/*
nodeIDsToWrite := []string{
"ns=2;s=Root.Code",
"ns=2;s=Root.Count",
"ns=2;s=Root.Message",
"ns=2;s=Root.Timestamp",
"ns=2;s=Root.Value",
}
values := []interface{}{
float32(1.234),
float32(2.345),
float32(3.456),
}
for {
_ = nodeIDsToWrite
// c.write(ctx, nodeIDsToWrite, values)
for i := 0; i < len(values); i++ {
values[i] = values[i].(float32) + float32(0.1)
}
select {
case <-ctx.Done():
return
case <-time.After(100 * time.Millisecond):
}
}
*/
}()
nodeIDs := []string{
"ns=2;s=Root.Code",
"ns=2;s=Root.Count",
"ns=2;s=Root.Message",
"ns=2;s=Root.Timestamp",
"ns=2;s=Root.Value",
}
/*
wg.Add(1)
go func() {
defer wg.Done()
c.read(ctx, nodeIDs)
}()
*/
err := c.subscribe(ctx, nodeIDs, config.Interval, db, isReady)
if err != nil {
return fmt.Errorf("subscription failed: %w", err)
}
return nil
}
func getCount(db *sql.DB) (int, error) {
countNodeID := "ns=2;s=Root.Count"
var count int
rows, err := db.Query("select value from variables where nodeid = '" + countNodeID + "'")
if err != nil {
return 0, fmt.Errorf("query failed: %w", err)
}
defer rows.Close()
for rows.Next() {
err = rows.Scan(&count)
if err != nil {
return 0, fmt.Errorf("row scan failed: %w", err)
}
}
err = rows.Err()
if err != nil {
return 0, fmt.Errorf("rows failed: %w", err)
}
return count, nil
}
func readAlarm(db *sql.DB) error {
codeNodeId := "ns=2;s=Root.Code"
messageNodeId := "ns=2;s=Root.Message"
timestampNodeId := "ns=2;s=Root.Timestamp"
valueNodeId := "ns=2;s=Root.Value"
rows, err := db.Query("select nodeid, value from variables where nodeid in ($1, $2, $3, $4)",
codeNodeId, messageNodeId, timestampNodeId, valueNodeId,
)
if err != nil {
return fmt.Errorf("query failed: %w", err)
}
defer rows.Close()
values := make(map[string]string)
for rows.Next() {
var nodeid string
var value string
err = rows.Scan(&nodeid, &value)
if err != nil {
return fmt.Errorf("row scan failed: %w", err)
}
values[nodeid] = value
}
err = rows.Err()
if err != nil {
return fmt.Errorf("rows failed: %w", err)
}
_, err = db.Exec(`insert into alarms(code, message, timestamp, value) values (?, ?, ?, ?)`,
values[codeNodeId],
values[messageNodeId],
values[timestampNodeId],
values[valueNodeId],
)
if err != nil {
return fmt.Errorf("nodeid cannot be inserted: %w", err)
}
return nil
}
func dbMain(ctx context.Context, db *sql.DB, msgCh chan string, isReady chan struct{}) error {
<-isReady
for {
count, err := getCount(db)
if err != nil {
return fmt.Errorf("getCount was failed: %w", err)
}
log.Println(count)
if count > 0 {
err = readAlarm(db)
if err != nil {
return fmt.Errorf("readAlarm was failed: %w", err)
}
msgCh <- "write_control"
}
select {
case <-ctx.Done():
return nil
case <-time.After(500 * time.Millisecond):
}
}
}
func createSql() (*sql.DB, error) {
os.Remove("./foo.db")
db, err := sql.Open("sqlite3", "./foo.db")
if err != nil {
return nil, fmt.Errorf("db open failed: %w", err)
}
sqlStmt := `
create table if not exists variables (
nodeid text not null primary key,
value string
);
delete from variables;
`
_, err = db.Exec(sqlStmt)
if err != nil {
err = fmt.Errorf("%w: %s\n", err, sqlStmt)
return nil, err
}
sqlStmt = `
create table if not exists alarms (
id integer primary key,
code integer,
message text,
timestamp integer,
value integer
);
delete from alarms;
`
_, err = db.Exec(sqlStmt)
if err != nil {
err = fmt.Errorf("%w: %s\n", err, sqlStmt)
return nil, err
}
return db, nil
}
func main() {
db, err := createSql()
if err != nil {
log.Fatalf("db creation was failed: %v", err)
}
defer db.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := new(sync.WaitGroup)
defer wg.Wait()
msgCh := make(chan string, 10)
isReady := make(chan struct{}, 1)
wg.Add(1)
go func() {
defer wg.Done()
err := runOpcUa(ctx, wg, db, msgCh, isReady)
if err != nil {
err = fmt.Errorf("runOPCUa failed: %w", err)
log.Println(err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
err = dbMain(ctx, db, msgCh, isReady)
if err != nil {
err = fmt.Errorf("dbMain error: %w", err)
log.Println(err)
}
}()
wg.Add(1)
go func() {
defer wg.Done()
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT)
select {
case s := <-sig:
fmt.Printf("Signal received: %s \n", s.String())
cancel()
case <-ctx.Done():
}
}()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment