Created June 11, 2023 10:07
Build an OPC UA bridge in Python:
# Import libraries
import asyncio
import json
# Import OPC UA library
from asyncua import Client, Node
# Create handler for subscriptions
class SubscriptionHandler:
def __init__(self, destination_client):
self.destination_client = destination_client
async def datachange_notification(self, node: Node, val, data):
"""Once a new value is received from the source, lookup the destination node id and write it to the destination server"""
if data.monitored_item.Value.Value.Value is not None:
# Get the source value
source_value = data.monitored_item.Value.Value.Value
# Get the source node id
source_nodeid = f"ns=3;s={node.nodeid.Identifier}"
# Lookup destination node id
destination_node_id = f'ns=2;s={tags[source_nodeid]["tagName"]}'
# Get the node in the destination server
destination_node = self.destination_client.get_node(destination_node_id)
# Write the value to the destination
await destination_node.write_value(source_value)
# Main function
async def read_and_write_value(source_server_url, source_node_id, destination_server_url):
"""Connects to both source and destination servers and subscribes to all the source node ids provided in tags.json"""
# Connect to source OPC UA server
source_client = Client(url=source_server_url)
await source_client.connect()
# Connect to destination OPC UA server
destination_client = Client(url=destination_server_url)
await destination_client.connect()
# Create a handler for the subscription
handler = SubscriptionHandler(destination_client)
# Create an instance of the custom subscription class
subscription = await source_client.create_subscription(5000, handler)
# Add the source node ids to the subscription
for source_node_id in source_node_ids:
source_node = source_client.get_node(source_node_id)
await subscription.subscribe_data_change(source_node)
# Run the event loop
while True:
await asyncio.sleep(5)
# Disconnect open subscriptions
await source_client.disconnect()
await destination_client.disconnect()
# Set up the source OPC UA server
source_server_url = "opc.tcp://"
# Set up the destination OPC UA server
destination_server_url = "opc.tcp://"
# Read node ids from the JSON file
with open(f"tags.json", "r") as json_file:
tags = json.load(json_file)
# Add source node ids to a list
source_node_ids = []
for source_node_id in tags:
# Run the main function, source_node_ids, destination_server_url))
