Build an OPC UA bridge in Python: https://h3xagn.com
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Import libraries | |
import asyncio | |
import json | |
# Import OPC UA library | |
from asyncua import Client, Node | |
# Create handler for subscriptions | |
class SubscriptionHandler: | |
def __init__(self, destination_client): | |
self.destination_client = destination_client | |
async def datachange_notification(self, node: Node, val, data): | |
"""Once a new value is received from the source, lookup the destination node id and write it to the destination server""" | |
if data.monitored_item.Value.Value.Value is not None: | |
# Get the source value | |
source_value = data.monitored_item.Value.Value.Value | |
# Get the source node id | |
source_nodeid = f"ns=3;s={node.nodeid.Identifier}" | |
# Lookup destination node id | |
destination_node_id = f'ns=2;s={tags[source_nodeid]["tagName"]}' | |
# Get the node in the destination server | |
destination_node = self.destination_client.get_node(destination_node_id) | |
# Write the value to the destination | |
await destination_node.write_value(source_value) | |
# Main function | |
async def read_and_write_value(source_server_url, source_node_id, destination_server_url): | |
"""Connects to both source and destination servers and subscribes to all the source node ids provided in tags.json""" | |
# Connect to source OPC UA server | |
source_client = Client(url=source_server_url) | |
await source_client.connect() | |
# Connect to destination OPC UA server | |
destination_client = Client(url=destination_server_url) | |
await destination_client.connect() | |
# Create a handler for the subscription | |
handler = SubscriptionHandler(destination_client) | |
# Create an instance of the custom subscription class | |
subscription = await source_client.create_subscription(5000, handler) | |
# Add the source node ids to the subscription | |
for source_node_id in source_node_ids: | |
source_node = source_client.get_node(source_node_id) | |
await subscription.subscribe_data_change(source_node) | |
try: | |
# Run the event loop | |
while True: | |
await asyncio.sleep(5) | |
finally: | |
# Disconnect open subscriptions | |
await source_client.disconnect() | |
await destination_client.disconnect() | |
# Set up the source OPC UA server | |
source_server_url = "opc.tcp://192.168.0.1:4840/" | |
# Set up the destination OPC UA server | |
destination_server_url = "opc.tcp://10.10.10.1:4940/HIST001" | |
# Read node ids from the JSON file | |
with open(f"tags.json", "r") as json_file: | |
tags = json.load(json_file) | |
# Add source node ids to a list | |
source_node_ids = [] | |
for source_node_id in tags: | |
source_node_ids.append(source_node_id) | |
# Run the main function | |
asyncio.run(read_and_write_value(source_server_url, source_node_ids, destination_server_url)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment