Skip to content

Instantly share code, notes, and snippets.

@h3xagn
Created June 11, 2023 10:07
Show Gist options
  • Save h3xagn/def6f51d23c73b98aafdaae03183ac72 to your computer and use it in GitHub Desktop.
Save h3xagn/def6f51d23c73b98aafdaae03183ac72 to your computer and use it in GitHub Desktop.
Build an OPC UA bridge in Python: https://h3xagn.com
# Import libraries
import asyncio
import json
# Import OPC UA library
from asyncua import Client, Node
# Create handler for subscriptions
class SubscriptionHandler:
def __init__(self, destination_client):
self.destination_client = destination_client
async def datachange_notification(self, node: Node, val, data):
"""Once a new value is received from the source, lookup the destination node id and write it to the destination server"""
if data.monitored_item.Value.Value.Value is not None:
# Get the source value
source_value = data.monitored_item.Value.Value.Value
# Get the source node id
source_nodeid = f"ns=3;s={node.nodeid.Identifier}"
# Lookup destination node id
destination_node_id = f'ns=2;s={tags[source_nodeid]["tagName"]}'
# Get the node in the destination server
destination_node = self.destination_client.get_node(destination_node_id)
# Write the value to the destination
await destination_node.write_value(source_value)
# Main function
async def read_and_write_value(source_server_url, source_node_id, destination_server_url):
"""Connects to both source and destination servers and subscribes to all the source node ids provided in tags.json"""
# Connect to source OPC UA server
source_client = Client(url=source_server_url)
await source_client.connect()
# Connect to destination OPC UA server
destination_client = Client(url=destination_server_url)
await destination_client.connect()
# Create a handler for the subscription
handler = SubscriptionHandler(destination_client)
# Create an instance of the custom subscription class
subscription = await source_client.create_subscription(5000, handler)
# Add the source node ids to the subscription
for source_node_id in source_node_ids:
source_node = source_client.get_node(source_node_id)
await subscription.subscribe_data_change(source_node)
try:
# Run the event loop
while True:
await asyncio.sleep(5)
finally:
# Disconnect open subscriptions
await source_client.disconnect()
await destination_client.disconnect()
# Set up the source OPC UA server
source_server_url = "opc.tcp://192.168.0.1:4840/"
# Set up the destination OPC UA server
destination_server_url = "opc.tcp://10.10.10.1:4940/HIST001"
# Read node ids from the JSON file
with open(f"tags.json", "r") as json_file:
tags = json.load(json_file)
# Add source node ids to a list
source_node_ids = []
for source_node_id in tags:
source_node_ids.append(source_node_id)
# Run the main function
asyncio.run(read_and_write_value(source_server_url, source_node_ids, destination_server_url))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment