Skip to content

Instantly share code, notes, and snippets.

@Serhg94
Created June 16, 2021 15:18
Show Gist options
  • Save Serhg94/87cce0315d38688f1a8c57c3d006159c to your computer and use it in GitHub Desktop.
Save Serhg94/87cce0315d38688f1a8c57c3d006159c to your computer and use it in GitHub Desktop.
Simple example of fighting with pyads (evil TwinCAT2 dll make segfault from her bowels)
import threading
import time
import calendar
import multiprocessing
import queue
POLLING_INTERVAL = 5
QUEUES_CHECK_INTERVAL = 0.1
def _poll(name: str, ams_addr: str, port: int, items: dict, events: queue.Queue
, logs: queue.Queue, commands: queue.Queue):
import pyads
import asyncio
loop = asyncio.new_event_loop()
plc = pyads.Connection(ams_addr, port)
@plc.notification(pyads.PLCTYPE_BOOL)
def callback(handle, name, timestamp, value):
t = calendar.timegm(timestamp.timetuple())
events.put((t, items[name][1], value))
async def poll_task():
while True:
try:
plc.read_state()
except:
loop.stop()
await asyncio.sleep(POLLING_INTERVAL)
async def do_cmd():
while True:
try:
cmd = commands.get_nowait()
except queue.Empty:
pass
else:
plc.write_by_name(cmd[0], cmd[1], pyads.PLCTYPE_BOOL)
await asyncio.sleep(QUEUES_CHECK_INTERVAL)
plc.open()
for item in items:
plc.add_device_notification(item, pyads.NotificationAttrib(1), callback)
plc.read_state()
logs.put('Beckhoff %s connected!' % name)
asyncio.ensure_future(poll_task(), loop=loop)
asyncio.ensure_future(do_cmd(), loop=loop)
loop.run_forever()
logs.put('Beckhoff %s disconnected!' % name)
class BeckhoffDevice:
def __init__(self, name, ams_addr, port, event_clb, log_clb):
self.ams_addr = ams_addr
self.port = port
self.name = name
self.onEvent = event_clb
self.systemEvent = log_clb
self.manager = multiprocessing.Manager()
self.events = self.manager.Queue()
self.logs = self.manager.Queue()
self.commands = self.manager.Queue()
self.items = dict()
self.alias_name = dict()
self.run_flag = False
self.__poll_thread = None
self.__poll_process = None
def start(self):
self.run_flag = True
self.__poll_thread = threading.Thread(target=self.__process_control_loop, name='Beckhoff {} poll'.format(self.ams_addr))
self.__poll_thread.start()
def stop(self):
self.run_flag = False
try:
self.__poll_thread.join()
except: pass
if self.__poll_process is not None and self.__poll_process.is_alive():
self.__poll_process.kill()
def __process_control_loop(self):
while self.run_flag:
try:
if self.__poll_process is not None and self.__poll_process.is_alive():
try:
while True:
event = self.events.get_nowait()
self.onEvent(*event)
except queue.Empty: pass
try:
while True:
text = self.logs.get_nowait()
self.systemEvent(text)
except queue.Empty: pass
else:
if self.__poll_process: time.sleep(POLLING_INTERVAL)
try:
while not self.commands.empty():
self.commands.get_nowait()
except queue.Empty: pass
self.__poll_process = multiprocessing.Process(target=_poll, args=[self.name, self.ams_addr, self.port
, self.items, self.events, self.logs, self.commands])
self.__poll_process.start()
except Exception as e:
print('Polling %s error: %s' % (self.name, str(e)))
time.sleep(QUEUES_CHECK_INTERVAL)
print('Polling %s terminated!' % self.name)
def addItem(self, name, data_type, alias):
self.items[name] = (data_type, alias)
self.alias_name[alias] = name
def setValue(self, alias, value):
self.commands.put((self.alias_name[alias], value))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment