Created
June 16, 2021 15:18
-
-
Save Serhg94/87cce0315d38688f1a8c57c3d006159c to your computer and use it in GitHub Desktop.
Simple example of fighting with pyads (evil TwinCAT2 dll make segfault from her bowels)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import time | |
import calendar | |
import multiprocessing | |
import queue | |
POLLING_INTERVAL = 5 | |
QUEUES_CHECK_INTERVAL = 0.1 | |
def _poll(name: str, ams_addr: str, port: int, items: dict, events: queue.Queue | |
, logs: queue.Queue, commands: queue.Queue): | |
import pyads | |
import asyncio | |
loop = asyncio.new_event_loop() | |
plc = pyads.Connection(ams_addr, port) | |
@plc.notification(pyads.PLCTYPE_BOOL) | |
def callback(handle, name, timestamp, value): | |
t = calendar.timegm(timestamp.timetuple()) | |
events.put((t, items[name][1], value)) | |
async def poll_task(): | |
while True: | |
try: | |
plc.read_state() | |
except: | |
loop.stop() | |
await asyncio.sleep(POLLING_INTERVAL) | |
async def do_cmd(): | |
while True: | |
try: | |
cmd = commands.get_nowait() | |
except queue.Empty: | |
pass | |
else: | |
plc.write_by_name(cmd[0], cmd[1], pyads.PLCTYPE_BOOL) | |
await asyncio.sleep(QUEUES_CHECK_INTERVAL) | |
plc.open() | |
for item in items: | |
plc.add_device_notification(item, pyads.NotificationAttrib(1), callback) | |
plc.read_state() | |
logs.put('Beckhoff %s connected!' % name) | |
asyncio.ensure_future(poll_task(), loop=loop) | |
asyncio.ensure_future(do_cmd(), loop=loop) | |
loop.run_forever() | |
logs.put('Beckhoff %s disconnected!' % name) | |
class BeckhoffDevice: | |
def __init__(self, name, ams_addr, port, event_clb, log_clb): | |
self.ams_addr = ams_addr | |
self.port = port | |
self.name = name | |
self.onEvent = event_clb | |
self.systemEvent = log_clb | |
self.manager = multiprocessing.Manager() | |
self.events = self.manager.Queue() | |
self.logs = self.manager.Queue() | |
self.commands = self.manager.Queue() | |
self.items = dict() | |
self.alias_name = dict() | |
self.run_flag = False | |
self.__poll_thread = None | |
self.__poll_process = None | |
def start(self): | |
self.run_flag = True | |
self.__poll_thread = threading.Thread(target=self.__process_control_loop, name='Beckhoff {} poll'.format(self.ams_addr)) | |
self.__poll_thread.start() | |
def stop(self): | |
self.run_flag = False | |
try: | |
self.__poll_thread.join() | |
except: pass | |
if self.__poll_process is not None and self.__poll_process.is_alive(): | |
self.__poll_process.kill() | |
def __process_control_loop(self): | |
while self.run_flag: | |
try: | |
if self.__poll_process is not None and self.__poll_process.is_alive(): | |
try: | |
while True: | |
event = self.events.get_nowait() | |
self.onEvent(*event) | |
except queue.Empty: pass | |
try: | |
while True: | |
text = self.logs.get_nowait() | |
self.systemEvent(text) | |
except queue.Empty: pass | |
else: | |
if self.__poll_process: time.sleep(POLLING_INTERVAL) | |
try: | |
while not self.commands.empty(): | |
self.commands.get_nowait() | |
except queue.Empty: pass | |
self.__poll_process = multiprocessing.Process(target=_poll, args=[self.name, self.ams_addr, self.port | |
, self.items, self.events, self.logs, self.commands]) | |
self.__poll_process.start() | |
except Exception as e: | |
print('Polling %s error: %s' % (self.name, str(e))) | |
time.sleep(QUEUES_CHECK_INTERVAL) | |
print('Polling %s terminated!' % self.name) | |
def addItem(self, name, data_type, alias): | |
self.items[name] = (data_type, alias) | |
self.alias_name[alias] = name | |
def setValue(self, alias, value): | |
self.commands.put((self.alias_name[alias], value)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment