Skip to content

Instantly share code, notes, and snippets.

@arwie
Last active October 20, 2024 06:08
Show Gist options
  • Save arwie/ff56783d9a2fff52940de82ae1bc1a22 to your computer and use it in GitHub Desktop.
Save arwie/ff56783d9a2fff52940de82ae1bc1a22 to your computer and use it in GitHub Desktop.
Fast variable exchange between CODESYS and PYTHON via shared memory

Fast variable exchange between CODESYS and PYTHON via shared memory

A video of this working example is available on YouTube and complete PYTHON source code and CODESYS project are available on GitHub.

Setup on the CODESYS side

Start with the CODESYS project and define the variables to be exchanged in two structures. AppCmd is the data received from PYTHON and AppFbk contains the variables to be sent back to PYTHON.

TYPE AppCmd:
STRUCT
	axis_power:		BOOL;
	move_exec:		BOOL;
	move_distance:		LREAL;
	move_velocity:		LREAL;
	io:			ARRAY [0..128] OF BOOL;
END_STRUCT
END_TYPE

TYPE AppFbk:
STRUCT
	axis_powered:		BOOL;
	move_done:		BOOL;
	move_error:		BOOL;
	io:			ARRAY [0..128] OF BOOL;
END_STRUCT
END_TYPE

Next, instantiate the structures as cmd and fbk within a program named app. At init, create a semaphore and a shared memory block large enough to hold cmd and fbk.

PROGRAM app
VAR
	cmd:			AppCmd;
	fbk:			AppFbk;
	shm:			RTS_IEC_HANDLE;
	sem:			RTS_IEC_HANDLE;
	shm_size:		__UXINT;
	iec_result:		RTS_IEC_RESULT;
	conflicts:		UDINT := 0;
END_VAR

shm_size := SIZEOF(fbk) + SIZEOF(cmd);
sem := SysSemProcessCreate('codesys', ADR(iec_result));
shm := SysSharedMemoryCreate('codesys', 0, ADR(shm_size), ADR(iec_result));

Now cyclically call the next block of code to read cmd from shared memory and write fbk to shared memory.

IF SysSemProcessEnter(sem, 0) = 0 THEN
	SysSharedMemoryRead (shm,           0, ADR(cmd), SIZEOF(cmd), ADR(iec_result));
	SysSharedMemoryWrite(shm, SIZEOF(cmd), ADR(fbk), SIZEOF(fbk), ADR(iec_result));
	SysSemProcessLeave(sem);
ELSE
	conflicts := conflicts + 1;
END_IF

Protect the read/write operation with the semaphore. In order not to delay the CODESYS task and wait for non real-time PYTHON, set the timeout of SysSemProcessEnter to zero.

Setup on the PYTHON side

Since CODESYS structures are binary compatible with C, use the ctypes module to recompose AppCmd and AppFbk in PYTHON. Only the _fields_ list is used at runtime; the type annotations above are useful for static type checkers. Instantiate the structures as cmd and fbk.

class AppCmd(Structure):
	axis_power:		bool
	move_exec:		bool
	move_distance:		float
	move_velocity:		float
	io:			list[bool]
	_fields_ = [
		('axis_power',		c_bool),
		('move_exec',		c_bool),
		('move_distance',	c_double),
		('move_velocity',	c_double),
		('io',			c_bool * 129),
	]

class AppFbk(Structure):
	axis_powered:		bool
	move_done:		bool
	move_error:		bool
	io:			list[bool]
	_fields_ = [
		('axis_powered',	c_bool),
		('move_done',		c_bool),
		('move_error',		c_bool),
		('io',			c_bool * 129),
	]

cmd = AppCmd()
fbk = AppFbk()

The creation of the above structures can be automated using CODESYS scripting. Check out the following example.

Use the external posix_ipc module to open the semaphore and shared memory and create a mapfile using mmap. After calculating all source and destination addresses for the data transfer, periodically call memmove to copy cmd to shared memory and fbk from shared memory. Again, protect the read/write operation with the semaphore.

sem = posix_ipc.Semaphore('/codesys')
shm = posix_ipc.SharedMemory('/codesys')
with mmap(shm.fd, shm.size) as mapfile:

	cmd_addr, cmd_size = addressof(cmd), sizeof(cmd)
	fbk_addr, fbk_size = addressof(fbk), sizeof(fbk)
	shm_cmd_addr = addressof(c_byte.from_buffer(mapfile))
	shm_fbk_addr = addressof(c_byte.from_buffer(mapfile, cmd_size))

	while True:
		with sem:
			memmove(shm_cmd_addr, cmd_addr, cmd_size)
			memmove(fbk_addr, shm_fbk_addr, fbk_size)
		await asyncio.sleep(period)

Now the communication with a running CODESYS application takes place almost in real time.

Execution of CODESYS SoftMotion function blocks from PYTHON

Create the following program as an example in the CODESYS project. It uses two function blocks to power and move an axis. Assign the variables from cmd to the inputs of the function blocks and use the outputs to set the variables in fbk.

PROGRAM main
VAR
	Power:		MC_Power;
	MoveRelative:	MC_MoveRelative;
END_VAR

Power(
	Axis := AxisMR,
	Enable := TRUE,
	bRegulatorOn := app.cmd.axis_power,
	bDriveStart  := app.cmd.axis_power,
);
app.fbk.axis_powered := Power.Status;

MoveRelative(
	Axis := AxisMR,
	Execute := app.cmd.move_exec,
	Distance := app.cmd.move_distance,
	BufferMode := MC_BUFFER_MODE.Aborting,
	Velocity     := app.cmd.move_velocity,
	Acceleration := AxisMR.fSWMaxAcceleration,
	Deceleration := AxisMR.fSWMaxDeceleration,
	Jerk         := AxisMR.fSWMaxJerk,
);
app.fbk.move_done  := MoveRelative.Done;
app.fbk.move_error := MoveRelative.Error;

Now define PYTHON functions to control COSDESYS function blocks.

async def poll(condition, *, abort=None, timeout=None)
	while True:
		if condition():
			return True
		if abort() or timeout(): #pseudocode
			return False
		await asyncio.sleep(poll_period)

@asynccontextmanager
async def power():
	cmd.axis_power = True
	try:
		if not await poll(lambda: fbk.axis_powered, timeout=1):
			raise Exception('Failed to power on axis')
		yield
	finally:
		cmd.axis_power = False

async def move(distance, velocity):
	cmd.move_distance = distance
	cmd.move_velocity = velocity
	cmd.move_exec = True
	try:
		if not await poll(lambda: fbk.move_done, abort=lambda: fbk.move_error):
			raise Exception('Failed to move axis')
	finally:
		cmd.move_exec = False
		await poll(lambda: not (fbk.move_done or fbk.move_error))

Finally, write a short demo program to move a CODESYS axis in a PYTHONic way.

async def run():
	async with power():
		await move(+400, 300)
		await move(-700, 400)
		await move(+900, 500)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment