A video of this working example is available on YouTube and complete PYTHON source code and CODESYS project are available on GitHub.
Start with the CODESYS project and define the variables to be exchanged in two structures. AppCmd is the data received from PYTHON and AppFbk contains the variables to be sent back to PYTHON.
TYPE AppCmd:
STRUCT
axis_power: BOOL;
move_exec: BOOL;
move_distance: LREAL;
move_velocity: LREAL;
io: ARRAY [0..128] OF BOOL;
END_STRUCT
END_TYPE
TYPE AppFbk:
STRUCT
axis_powered: BOOL;
move_done: BOOL;
move_error: BOOL;
io: ARRAY [0..128] OF BOOL;
END_STRUCT
END_TYPE
Next, instantiate the structures as cmd and fbk within a program named app. At init, create a semaphore and a shared memory block large enough to hold cmd and fbk.
PROGRAM app
VAR
cmd: AppCmd;
fbk: AppFbk;
shm: RTS_IEC_HANDLE;
sem: RTS_IEC_HANDLE;
shm_size: __UXINT;
iec_result: RTS_IEC_RESULT;
conflicts: UDINT := 0;
END_VAR
shm_size := SIZEOF(fbk) + SIZEOF(cmd);
sem := SysSemProcessCreate('codesys', ADR(iec_result));
shm := SysSharedMemoryCreate('codesys', 0, ADR(shm_size), ADR(iec_result));
Now cyclically call the next block of code to read cmd from shared memory and write fbk to shared memory.
IF SysSemProcessEnter(sem, 0) = 0 THEN
SysSharedMemoryRead (shm, 0, ADR(cmd), SIZEOF(cmd), ADR(iec_result));
SysSharedMemoryWrite(shm, SIZEOF(cmd), ADR(fbk), SIZEOF(fbk), ADR(iec_result));
SysSemProcessLeave(sem);
ELSE
conflicts := conflicts + 1;
END_IF
Protect the read/write operation with the semaphore. In order not to delay the CODESYS task and wait for non real-time PYTHON, set the timeout of SysSemProcessEnter to zero.
Since CODESYS structures are binary compatible with C, use the ctypes module to recompose AppCmd and AppFbk in PYTHON. Only the _fields_ list is used at runtime; the type annotations above are useful for static type checkers. Instantiate the structures as cmd and fbk.
class AppCmd(Structure):
axis_power: bool
move_exec: bool
move_distance: float
move_velocity: float
io: list[bool]
_fields_ = [
('axis_power', c_bool),
('move_exec', c_bool),
('move_distance', c_double),
('move_velocity', c_double),
('io', c_bool * 129),
]
class AppFbk(Structure):
axis_powered: bool
move_done: bool
move_error: bool
io: list[bool]
_fields_ = [
('axis_powered', c_bool),
('move_done', c_bool),
('move_error', c_bool),
('io', c_bool * 129),
]
cmd = AppCmd()
fbk = AppFbk()
The creation of the above structures can be automated using CODESYS scripting. Check out the following example.
Use the external posix_ipc module to open the semaphore and shared memory and create a mapfile using mmap. After calculating all source and destination addresses for the data transfer, periodically call memmove to copy cmd to shared memory and fbk from shared memory. Again, protect the read/write operation with the semaphore.
sem = posix_ipc.Semaphore('/codesys')
shm = posix_ipc.SharedMemory('/codesys')
with mmap(shm.fd, shm.size) as mapfile:
cmd_addr, cmd_size = addressof(cmd), sizeof(cmd)
fbk_addr, fbk_size = addressof(fbk), sizeof(fbk)
shm_cmd_addr = addressof(c_byte.from_buffer(mapfile))
shm_fbk_addr = addressof(c_byte.from_buffer(mapfile, cmd_size))
while True:
with sem:
memmove(shm_cmd_addr, cmd_addr, cmd_size)
memmove(fbk_addr, shm_fbk_addr, fbk_size)
await asyncio.sleep(period)
Now the communication with a running CODESYS application takes place almost in real time.
Create the following program as an example in the CODESYS project. It uses two function blocks to power and move an axis. Assign the variables from cmd to the inputs of the function blocks and use the outputs to set the variables in fbk.
PROGRAM main
VAR
Power: MC_Power;
MoveRelative: MC_MoveRelative;
END_VAR
Power(
Axis := AxisMR,
Enable := TRUE,
bRegulatorOn := app.cmd.axis_power,
bDriveStart := app.cmd.axis_power,
);
app.fbk.axis_powered := Power.Status;
MoveRelative(
Axis := AxisMR,
Execute := app.cmd.move_exec,
Distance := app.cmd.move_distance,
BufferMode := MC_BUFFER_MODE.Aborting,
Velocity := app.cmd.move_velocity,
Acceleration := AxisMR.fSWMaxAcceleration,
Deceleration := AxisMR.fSWMaxDeceleration,
Jerk := AxisMR.fSWMaxJerk,
);
app.fbk.move_done := MoveRelative.Done;
app.fbk.move_error := MoveRelative.Error;
Now define PYTHON functions to control COSDESYS function blocks.
async def poll(condition, *, abort=None, timeout=None)
while True:
if condition():
return True
if abort() or timeout(): #pseudocode
return False
await asyncio.sleep(poll_period)
@asynccontextmanager
async def power():
cmd.axis_power = True
try:
if not await poll(lambda: fbk.axis_powered, timeout=1):
raise Exception('Failed to power on axis')
yield
finally:
cmd.axis_power = False
async def move(distance, velocity):
cmd.move_distance = distance
cmd.move_velocity = velocity
cmd.move_exec = True
try:
if not await poll(lambda: fbk.move_done, abort=lambda: fbk.move_error):
raise Exception('Failed to move axis')
finally:
cmd.move_exec = False
await poll(lambda: not (fbk.move_done or fbk.move_error))
Finally, write a short demo program to move a CODESYS axis in a PYTHONic way.
async def run():
async with power():
await move(+400, 300)
await move(-700, 400)
await move(+900, 500)