Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shabunin/0c65febe67607c5328cb31e2e459900a to your computer and use it in GitHub Desktop.
Save shabunin/0c65febe67607c5328cb31e2e459900a to your computer and use it in GitHub Desktop.
https://github.com/dobaosll/llsdk
Vladimir Shabunin
Table of Contents
_________________
1 Parallel Line Scanner and Sliding Window for KNX bus.
.. 1.1 Hardware used in this demo
.. 1.2 Software
.. 1.3 Purpose
.. 1.4 D programming language
.. 1.5 llsdk
.. 1.6 mprop
.. 1.7 cEMI
.. 1.8 Sliding window
..... 1.8.1 Transport Connections
..... 1.8.2 Automatic Repeat Requests
..... 1.8.3 Implementation
..... 1.8.4 Results
.. 1.9 Parallel scanner
..... 1.9.1 Implementation
..... 1.9.2 Results
.. 1.10 References
1 Parallel Line Scanner and Sliding Window for KNX bus.
=======================================================
1.1 Hardware used in this demo
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1. 1x KNX power supply
2. 2x Weinzierl BAOS 83x KNX Module with Application Layer stack
implemented(BAOS) and DataLink Layer.
3. 1x NanoPi NEO Plus2 Single board computer with two UART ports.
4. For scanner - line with KNX devices.
1.2 Software
~~~~~~~~~~~~
1. GNU/Linux - Operating System.
2. ldc2 compiler, dub - D programming language.
3. llsdk - SDK for BAOS 83x LinkLayer.
1.3 Purpose
~~~~~~~~~~~
The purpose is to study speed of data transmission in KNX bus. To
test efficiency of existing Transport Connections and to think about
alternatives. Whole this project started when I developed line
scanner and noticed that scanning lines with couplers or repeaters
takes a lot of time. So the scanner was made to support parallel
operation.
1.4 D programming language
~~~~~~~~~~~~~~~~~~~~~~~~~~
D programming language was chosen to implement LinkLayer SDK. It's
statically typed, native-compiling language with C-syntax. Advantages
of developing in D programming language: familiar syntax, runtime
dynamic arrays, garbage collection, object-oriented programming
support. Native binary is faster and consuming less resources in
comparison with interpretators and byte-code virtual machines. Direct
access to C APIs allows to use lot of existing libraries in Dlang
projects.
Hello World in D:
,----
| import std.stdio;
|
| void main() {
| writeln("hello, world");
| }
`----
Disadvantages include runtime overhead(for compiled binary size) and
impossibility of use D libraries outside of D projects (actually it is
possible, but with sacrifice of garbage collection, dynamic arrays,
classes, etc).
1.5 llsdk
~~~~~~~~~
To access KNX Data LinkLayer BAOS module 838 was connected to NanoPi
NEO Plus2 UART1 and UART2 ports. BAOS module accept and send back KNX
telegrams in cEMI format. In order to work with BAOS by means of cEMI
frames dobaosll SDK was developed.
,----
| +------------+
| | KNX BUS | Physical Layer
| +------------+
| |
| +------------+
| ------------- | BAOS 83x | -------------------------------
| +------------+ DataLink Layer
| | cEMI inside FT1.2
| | 0x68..0x68..2900..0x16
| +------------+
| | llsdk | -------------------------------
| ------------- | | cEMI frames
| +------------+
| |
| from and to bus
| |
| +-------------+
| | redis |
| | pubsub |
| +-------------+
| / | \
| / | \
| +----------+ +----------+ +----------+
| | Client_1 | | Client_2 | | Client_3 |
| +----------+ +----------+ +----------+
`----
dobaosll sdk has microservice architecture. Main service - llpub -
connects to redis database, where settings are stored, then it
subscribes to redis pubsub channel and listens for incoming requests.
When request(base64-encoded cEMI frame) is received, llpub sends this
frame to UART. When it receives cEMI from bus it sends this frame to
pubsub channel and all connected clients are notified.
As an experiment KNXNet/IP Gateway(llnet), line scanner(llscan) and
utility to set device programming mode was implemented. All this
procedures(KNXNet/IP message flow, establishing transport connections,
reading and writing device properties) are described in details in
official specifications "The KNX Standard v2.1".
1.6 mprop
~~~~~~~~~
BAOS module provides access to local device properties with M_Prop
service. This service is used to read device properties: such as
descriptor, serialnumber, object tables, etc. For example, when
llscan process is started, it reads at first subnetwork address
property(57) and then starts scanning this line.
Example of M_PropRead.req from KnxBaos_UserGuide.pdf, 12.2.3:
,----
| 0xFC, // cEMI service code: M_PropRead.req
| 0x00, 0x00 // Interface Object Type 0
| 0x01, // Object Instance 1
| 0x36, // PropertyId: 54(PROGMODE)
| 0x10, // Num of Elements 1, Start idx 1 (high part)
| 0x01 // Start index (low part)
`----
Implementation of this byte structure in D:
,----
| struct MPropFrame {
| MC mc;
| ushort obj_type;
| ubyte obj_inst;
| ubyte prop_id;
| ubyte number;
| ushort start_index;
| ubyte[] data;
| this(MC mc) {
| this.mc = mc;
| // default values
| this.obj_type = 0;
| this.obj_inst = 1;
| this.number = 1;
| this.start_index = 1;
| }
| this(ubyte[] msg) {
| int offset = 0;
| // read first byte and transform to message code
| mc = cast(MC) msg.peek!ubyte(offset); offset += 1;
| obj_type = msg.peek!ushort(offset); offset += 2; // read ushort value
| obj_inst = msg.peek!ubyte(offset); offset += 1;
| prop_id = msg.peek!ubyte(offset); offset += 1;
| ushort noe_six = msg.peek!ushort(offset);
| number = noe_six >> 12;
| start_index = noe_six & 0b0000_1111_1111_1111;
| offset += 2;
| data = msg[offset..$].dup;
| }
| ubyte[] toUbytes() {
| ubyte[] res; res.length = 7;
| int offset = 0;
| res.write!ubyte(mc, offset); offset += 1; // write message code
| res.write!ushort(obj_type, offset); offset += 2;
| res.write!ubyte(obj_inst, offset); offset += 1;
| res.write!ubyte(prop_id, offset); offset += 1;
| ushort noe_six =to!ushort(
| (number << 12) | (start_index & 0b0000_1111_1111_1111));
| res.write!ushort(noe_six, offset); offset += 2;
| res ~= data;
| return res;
| }
| }
`----
Assume we have LinkLayer connection named ll and are able to send and
receive data. Incoming(from BAOS) data handler will look like this:
,----
| private void onCemiFrame(ubyte[] cemi) {
| int offset = 0;
| MPropFrame _response = MPropFrame(cemi);
| if (_response.mc != MC.MPROPREAD_CON &&
| _response.mc != MC.MPROPWRITE_CON) {
| return;
| }
|
| if (last.obj_type == _response.obj_type &&
| last.obj_inst == _response.obj_inst &&
| last.prop_id == _response.prop_id) {
| resolved = true;
| response = _response;
| }
| }
`----
To MPropRead/Write request BAOS module should response with
MPropRead.con/MPropWrite.con message. So, if message code is
different, don't process this message further. But if message code
was MProp confirmation, assign locally created _response to global
variable and set resolved flag to true.
MPropRead method:
,----
| public ubyte[] read(ubyte id, ubyte num = 1,
| ushort si = 0x0001, ushort iot = 0, ubyte instance = 0x01) {
|
| resolved = false;
|
| MPropFrame mf = MPropFrame(MC.MPROPREAD_REQ);
| mf.obj_type = iot;
| mf.obj_inst = instance;
| mf.prop_id = id;
| mf.start_index = si;
| mf.number = num;
| last = mf;
| ll.sendCemi(mf.toUbytes);
| bool timeout = false;
| StopWatch sw = StopWatch(AutoStart.yes);
| while (!resolved && !timeout) {
| timeout = sw.peek() > req_timeout;
| ll.processMessages();
| Thread.sleep(1.msecs);
| }
| if (timeout) {
| throw new Exception(ERR_MPROPREAD_TIMEOUT);
| }
| sw.stop();
| ubyte[] result;
| if (response.number > 0) {
| result = response.data.dup;
| } else {
| throw new Exception(ERR_LL ~ response.data.toHexString);
| }
| return result;
| }
`----
This method constructs MProp frame, send it to bus and wait
confirmation frame which contains data. If no response was received
in time, ERR_TIMEOUT is thrown.
Example: reading local individual address:
,----
| ubyte local_sub = mprop.read(57)[0];
| ubyte local_addr = mprop.read(58)[0];
| ushort local_ia = to!ushort(local_sub << 8 | local_addr);
`----
Example: putting BAOS to programming mode
,----
| writeln("Writing progmode for BAOS module: ", mode);
|
| MProp mprop;
| try {
| mprop = new MProp(host, port, prefix);
| } catch(Exception e) {
| writeln("Exception while initializing MProp client.");
| writeln(e.message);
| return;
| }
| try {
| mprop.write(54, progmode);
| } catch (Exception e) {
| writeln("Error writing progmode property: ", e.message);
| }
`----
1.7 cEMI
~~~~~~~~
cEMI frame has following structure:
--------------------------------
Field Data type
--------------------------------
Message Code ubyte
--------------------------------
Additional Info Len ubyte
Additional Info ubyte[]
--------------------------------
Control Field 1 ubyte
Control Field 2 ubyte
Source address ushort
Destination address ushort
--------------------------------
APDU+Data Len ubyte
TPDU ubyte
APDU+Data ubyte[]
--------------------------------
There is diffent cEMI message types. It can be distinguished by
message code(MC). Working with LinkLayer assumes working with
LData.Req, LData.Con, LData.Ind in most cases.
If you want to send value to some group address, you send LData.Req
message. If this request was confirmed and group address value has
changed, LData.Con frame will be received from BAOS. If some another
device broadcasted data, LData.Ind frame will be received.
LData_cEMI structure was implemented in D similar to MPropFrame.
Example of working with LData_cEMI - establishing transport
connection:
,----
| LData_cEMI tconn = new LData_cEMI();
| tconn.message_code = MC.LDATA_REQ;
| tconn.address_type_group = false;
| tconn.source = 0x0000;
| tconn.dest = address;
| tconn.tservice = TService.TConnect;
| ll.sendCemi(tconn.toUbytes);
`----
1.8 Sliding window
~~~~~~~~~~~~~~~~~~
1.8.1 Transport Connections
---------------------------
KNX standard describes layers: Physical, Data Link, Network,
Transport, Session, Presentation and Application layers.
BAOS module provides access to Data Link, which purpose is to serve
reliable data flow between nodes in one network segment. Network
layer is responsible for packet transferring between devices via one
or more network parts. It's implementation have sense for complex
network configurations and should be implemented in devices such as
Line Couplers, Routers. This article discussed about communication
within one physycal segment, so network layer stack is ommited for
simplicity. (Yet this is probably the most complex layer among them
all.)
Transport layer defines connection-oriented and connectionless
messages. Speaking by analogy - TCP connections and UDP datagrams.
But KNX transport connections differ from TCP.
(To full description look at 03_03_04 [13].)
To establish transport, device should send cEMI frame with address
type individual, TConnect as a transport service, without any
apci_data bytes. If this frame was confirmed on bus, then connection
is established.
(TCP uses three-way handshake to create connection.)
After connection has been established one device may send
TConnectedData to another and wait for TAck acknowledge. If no TAck
was received in three seconds, then this message is repeated up to
three times and in case of failure connection is broken. If TAck was
received then sequence number is increased and next message is ready
to be send.
It is simple Stop-And-Wait protocol. No data is sent until previous
frame acknowledge receiving.
TCP, in contrast, uses Selective Repeat Protocol with sliding window.
(For computer networks protocols refer to Tanenbaum book.)
I will implement SRP for KNX bus and compare it with Stop-And-Wait by
means of BAOS connectionless UserMessage application service.
1.8.2 Automatic Repeat Requests
-------------------------------
Both Stop-And-Wait and Selective Repeat protocols are called Automatic
Repeat Requests(ARQ). The purpose of ARQ is to ensure a sequence of
information packets is delivered in order and without errors or
duplications despite transmission errors & losses.
In Stop-And-Wait sender transmits one frame and waits for acknowledge
from receiver. If no acknowledge is received in time, sender
transmits the same frame again. And sender waits again.
,----
| | sender receiver |
| | 0 -------------------> |
| | <-ack0 |
| | 1 -------------------> |
| | <-ack1 |
| | 2 -------------------> |
| | <-ack2 |
`----
In comparison with anothers ARQ Stop-And-Wait is inefficient because
of wait time between packets. Instead, this time could be used to
transmit another message frames.
Sliding window method, on the other hand, assumes sending multiple
messages at once. Both sender and receiver should have buffer arrays
where sent and receved frames are cached. Sender is buffering sent
frames so he will be able to resend them at ack timeout. Receiver is
buffering incoming frames too so received frames will be put in result
message in right order.
Example 1: data flow without errors.
,----
| | sender receiver |
| | 0 -------------------> |
| | 1 -------------------> |
| | 2 -------------------> |
| | 3 -------------------> |
| | |
|
| sender is waiting
|
| | <-ack0 |
|
| sender received ack0
| moving sliding window
| to the right by one position
|
| | 4 -------------------> |
| | <-ack1 |
| | 5 -------------------> |
| | <-ack2 |
| | 6 -------------------> |
| | <-ack3 |
| | 7 -------------------> |
| | <-ack4 |
| | <-ack5 |
| | <-ack6 |
| | <-ack7 |
`----
Example 2: Acks received not in order.
,----
| | sender receiver |
| | 0 -------------------> |
| | 1 -------------------> |
| | 2 -------------------> |
| | 3 -------------------> |
| | |
|
| sender is waiting
|
| | <-ack1 |
|
| sender received ack1
| but smallest expected is ack0
| so sender marks frame 1 acknowledged
| and do nothing else
|
| | <-ack0 |
|
| sender received ack0
| moving sliding window
| to the right by two positions
|
| | 4 -------------------> |
| | 5 -------------------> |
| | <-ack2 |
| | 6 -------------------> |
| | <-ack3 |
| | 7 -------------------> |
| | <-ack4 |
| | <-ack5 |
| | <-ack6 |
| | <-ack7 |
`----
Example 3: Ack timeout
,----
| | sender receiver |
| | 0 -------------------> |
| | 1 -------------------> |
| | 2 -------------------> |
| | 3 -------------------> |
| | |
| | <-ack1 |
| | <-ack2 |
| | <-ack3 |
|
| sender didn't receive ack0 in time
| resending frame 0
|
| | 0 -------------------> |
| | <-ack0 |
|
| ack0 was received
| sliding window by four positions
|
| | 4 -------------------> |
| | 5 -------------------> |
| | 6 -------------------> |
| | 7 -------------------> |
| | <-ack4 |
| | <-ack5 |
| | <-ack6 |
| | <-ack7 |
`----
Sequence number should be included in frame as well as payload data.
Let's define it's maximum value in constant MAX_SEQ. Sliding window
size for both sender and receiver should be (MAX_SEQ+1)/2. For
MAX_SEQ = 7 (seq.no from 0 to 7), sliding window size is 4.
For detailed information refer to Andrew S. Tanenbaum, Computer
Networks.
In couple of words - sliding window should be N times more effective
than Stop-And-Wait, where N is window size.
1.8.3 Implementation
--------------------
As a base for Implementation I took listing from Tanenbaum's Computer
Networks book, protocol6.
First of all, frame structure was implemented.
Listing: NetworkFrame structure.
,----
| module knx_cl;
|
| import core.thread;
| import std.bitmanip;
| import std.datetime.stopwatch;
| import std.functional;
|
| import std.stdio;
|
| import llsdk;
|
| public:
| enum FrameType: ubyte {
| unknown = 0x00,
| connect = 0x01,
| data = 0x02,
| ack = 0x03,
| nack = 0x04,
| disconnect = 0x05
| }
|
| struct NetworkFrame {
| FrameType type = FrameType.unknown;
| ubyte seq = 0x00;
| ubyte[] data = [];
| ushort source, dest;
| this(ubyte[] msg) {
| if (msg.length == 0) return;
| type = cast(FrameType) msg.read!ubyte();
| switch(type) {
| case FrameType.ack:
| case FrameType.nack:
| if (msg.length != 1) {
| type = FrameType.unknown;
| return;
| }
| seq = msg.read!ubyte();
| break;
| case FrameType.data:
| if (msg.length == 0) {
| type = FrameType.unknown;
| return;
| }
| seq = msg.read!ubyte();
| data = msg.dup;
| break;
| case FrameType.connect:
| case FrameType.disconnect:
| default:
| break;
| }
| }
| ubyte[] toUbytes() {
| ubyte[] res = [];
| res.length = 1;
| res.write!ubyte(type, 0);
| switch(type) {
| case FrameType.ack:
| case FrameType.nack:
| res.length = 2;
| res.write!ubyte(seq, 1);
| return res;
| case FrameType.data:
| res.length = 2;
| res.write!ubyte(seq, 1);
| res ~= data;
| return res;
| case FrameType.connect:
| case FrameType.disconnect:
| default:
| break;
| }
|
| return res;
| }
| }
|
`----
Network frame consist at least one byte - frame type, which should be
one of following: connect, data, ack, nack, disconnect. If frame type
is "data", "ack" or "nack", following byte should be sequence number.
Then data payload is following(just for data frame).
In Tanenbaum's Computer Networks protocol6 was implemented for Data
LinkLayer message flow, we are implementing own kind of Transport
Connection. So, protocol6.c was calling
from_physical_layer/to_physical_layer, and
from_network_layer/to_network_layer. We are not developing fully
layered architecture, so for simplicity let use just network socket
abstraction(analogy to physical layer in Tanenbaum's p6) and abolish
to/from_network_layer.
If you want to do completely in accord with protocol6, you need to use
from_network_layer/to_network_layer(instead of physical in p6) and
from_application_layer/to_application_layer(instead of network in p6).
Listing: Network socket abstraction.
,----
| class NetworkSocket {
| private LLClient ll;
| private LData_cEMI in_frame;
| private ushort self;
| private ushort client;
|
| public bool available = true;
| private LData_cEMI last_req;
| private StopWatch sw = StopWatch(AutoStart.no);
|
| private void onCemiFrame(ubyte[] frame) {
| ubyte mc = frame.peek!ubyte(0);
| if (mc == MC.LDATA_REQ ||
| mc == MC.LDATA_IND) {
| in_frame = new LData_cEMI(frame);
| }
| if ( mc == MC.LDATA_CON ) {
| auto conFrame = new LData_cEMI(frame);
| if (conFrame.message_code == MC.LDATA_CON &&
| conFrame.tservice == last_req.tservice &&
| conFrame.dest == last_req.dest) {
| available = true;
| sw.stop();
| sw.reset();
| }
| }
| }
| this(string host = "127.0.0.1",
| ushort port = 6379,
| string prefix = "dobaosll") {
| ll = new LLClient(host, port, prefix);
| ll.onCemi(toDelegate(&onCemiFrame));
| }
| NetworkFrame receive() {
| NetworkFrame res = NetworkFrame([0]);
|
| ll.processMessages();
| if (in_frame is null) return res;
| if (in_frame.tservice == TService.TDataIndividual &&
| in_frame.apci == APCI.AUserMessageReq) {
| res = NetworkFrame(in_frame.data);
| res.source = in_frame.source;
| res.dest = in_frame.dest;
| in_frame = null;
| return res;
| }
| in_frame = null;
|
| processConTimer();
|
| return res;
| }
| void processConTimer() {
| auto dur = sw.peek();
| if (dur > 100.msecs) {
| sw.stop();
| sw.reset();
| available = true;
| }
| }
| void send(NetworkFrame frame, Duration delay = 0.msecs) {
| Thread.sleep(delay);
| ubyte[] data = frame.toUbytes;
| LData_cEMI cl = new LData_cEMI();
| cl.message_code = MC.LDATA_REQ;
| cl.address_type_group = false;
| cl.source = 0x0000;
| cl.dest = frame.dest;
| cl.tservice = TService.TDataIndividual;
| cl.apci_data_len = (1 + data.length) & 0xff;
| cl.apci = APCI.AUserMessageReq;
| cl.data = data;
| last_req = cl;
| available = false;
| sw.reset();
| sw.start();
| ll.sendCemi(cl.toUbytes);
| }
| }
`----
When we are calling send(frame) method, it sends data to BAOS
LinkLayer and waits for LData.Con to arrive. Until that socket will
not be available to sending(available = false). If LData.Con didn't
arrive in 100ms, socket will be available to send
again(processConTimer method).
Now let's study main.d source code:
Listing: functions inc, between - were taken from protocol6 without
significant changes.
,----
| ubyte inc(ubyte k, ubyte max_seq) {
| if (k < max_seq)
| return (k + 1) & 0xff;
| else
| return 0;
| }
|
| bool between(ubyte a, ubyte b, ubyte c) {
| // a <= b < c
| // b < c < a
| // c < a <=b
| bool res = ((a <= b) && (b < c)) ||
| ((b < c) && (c < a)) ||
| ((c < a) && (a <= b));
|
| return res;
| }
`----
Listing: main function. Some parts are absent in order to make it
compact. I will try to explain some things, but for better
explanation refer to Tanenbaum's book.
,----
| void main(string[] args) {
| string prefix = "dobaosll";
| string host = "127.0.0.1";
| ushort port = 6379;
| ubyte max_seq = 7;
| ubyte nr_bufs = ((max_seq + 1)/2);
| int msg_len = 45;
| int ack_timeout_int = 1500;
| int ack_rate = 90;
| Duration ack_timeout;
| string device = "";
| string file_to_send = "";
|
| ....
| .... parse command line args
| ....
|
| // recalculate with new arg values
|
| nr_bufs = ((max_seq + 1)/2);
| ack_timeout = ack_timeout_int.msecs;
|
| ....
| ....
|
| // by default - listen as a server
|
| // if commandline args filename and remote device address was provided
| // then init client mode
| bool client_mode = false;
| if (device.length > 0 && file_to_send.length > 0)
| client_mode = true;
|
| NetworkFrame[] queue;
| ushort server_addr;
|
| // read file and divide it by chunks of size msg_len
| // put data NetworkFrames into queue
| if (client_mode) {
| server_addr = str2ia(device);
| writeln("tryin to send [", file_to_send, "] to device ", device);
| if (!exists(file_to_send)) {
| writeln("File doesn't exist");
| return;
| }
| // read file
| ubyte[] data = cast(ubyte[]) read(file_to_send);
| // divide data by chunks
| queue.reserve(data.length/msg_len + 1);
| while(data.length > 0) {
| NetworkFrame q;
| q.type = FrameType.data;
| if (data.length > msg_len) {
| q.data ~= data[0..msg_len].dup;
| data = data[msg_len..$];
| } else {
| q.data ~= data.dup;
| data = [];
| }
| q.dest = server_addr;
| queue ~= q;
| }
| }
|
| // now, queue is filled with data frames that should be sent to server
|
| // initialize Network Socket instance
| NetworkSocket nl = new NetworkSocket(host, port, prefix);
| writeln("network socket created");
|
| // if we want to send file - send connect request to server first
| if (client_mode) {
| NetworkFrame c;
| c.type = FrameType.connect;
| c.dest = server_addr;
| nl.send(c, SEND_DELAY);
| }
|
| // protocol implementation begin
| ubyte ack_expected = 0, // seq number to check incoming ack numbers
| next_frame_to_send = 0, // seq number which is increased everytime new frame is sent
| frame_expected = 0, // window beginning
| too_far = nr_bufs; // window end
|
| // frame_expected and too_far are numbers of start and end window's edge
| // it not necessary that too_far is greater than frame_expected
| // example: window size if 4, max_seq is 7.
| // frame_expected=6, too_far = 2.
| // window: [6, 7, 0, 1]
| //
| // function "between" checks all cases
|
| NetworkFrame[] out_buf; out_buf.length = nr_bufs;
| NetworkFrame[] in_buf; in_buf.length = nr_bufs;
| bool[] acknowledged; acknowledged.length = nr_bufs;
| StopWatch[] ack_timers; ack_timers.length = nr_bufs;
| int nbuffered = 0;
|
| bool[] arrived; arrived.length = nr_bufs;
|
| bool can_send = true;
|
| ubyte[] result;
| int lost_count, arrived_count; // to get stats
|
| bool done = false;
| auto idx = 0; auto total = queue.length;
| while(!done) {
| // if available to send and queue is not empty
| // send, then slide window
| if (can_send && queue.length > 0) {
| NetworkFrame q = queue[0];
|
| q.seq = next_frame_to_send;
| // save frame to resend it in case of timeout
| out_buf[q.seq % nr_bufs] = q;
| nl.send(q, SEND_DELAY);
| writeln("sending packet ", idx, "/", total);
| idx += 1;
| nbuffered += 1;
| // freshly sent packet is not acknowledged yet.
| // start acknowledge timer
| acknowledged[q.seq % nr_bufs] = false;
| ack_timers[q.seq % nr_bufs] = StopWatch(AutoStart.no);
| ack_timers[q.seq % nr_bufs].reset();
| ack_timers[q.seq % nr_bufs].start();
|
| // slide window
| next_frame_to_send = inc(next_frame_to_send, max_seq);
| // decrease queue size
| if (queue.length == 1) {
| queue = [];
| } else {
| queue = queue[1..$];
| }
| }
|
| // can_send is
| can_send = (nbuffered < nr_bufs) && nl.available;
|
| NetworkFrame recvd = nl.receive();
|
| switch(recvd.type) {
| case FrameType.connect:
| // on connect set default values to all variables
| ack_expected = 0,
| next_frame_to_send = 0,
| frame_expected = 0,
| too_far = nr_bufs;
| lost_count = 0, arrived_count = 0;
| for (auto i = 0; i < nr_bufs; i += 1) {
| acknowledged[i] = false;
| arrived[i] = false;
| out_buf[i] = NetworkFrame([0]);
| }
| nbuffered = 0;
| result = [];
| writeln("client connected");
| break;
| case FrameType.disconnect:
| // on disconnect request - save result file
| writeln("client disconnected", );
| auto f = File("file.out", "w");
| f.write(cast(string)result);
| writeln("saved output to file.out");
| writeln("lost frames: ", lost_count);
| writeln("arrived: ", arrived_count);
| break;
| case FrameType.data:
| // if data frame arrived - send ack,
| // buffer frame and slide window
| if (between(frame_expected, recvd.seq, too_far) &&
| !arrived[recvd.seq % nr_bufs]) {
|
| // emulate lost data frame
| auto rd = uniform(0, 100, rnd);
| if (rd > ack_rate) {
| lost_count += 1;
| break;
| } else {
| arrived_count += 1;
| }
|
| arrived[recvd.seq % nr_bufs] = true;
| in_buf[recvd.seq % nr_bufs] = recvd;
| NetworkFrame a;
| a.type = FrameType.ack;
| a.seq = recvd.seq;
| a.dest = recvd.source;
| nl.send(a, SEND_DELAY);
|
| // slide window
| // here is a while loop.
| // consider example: window size is 4, max_seq=7,
| // first four frames were sent [f0, f1, f2, f3]
| // acknowledges were received [ , a1, a2, a3]
| // f0 was re-sent and a0 arrived.
| // the window should slide four positions further
| while(arrived[frame_expected % nr_bufs]) {
| result ~= in_buf[frame_expected % nr_bufs].data;
| arrived[frame_expected % nr_bufs] = false;
| frame_expected = inc(frame_expected, max_seq);
| too_far = inc(too_far, max_seq);
| }
| } else if (arrived[recvd.seq % nr_bufs]){
| //send duplicate ACK
| NetworkFrame a;
| a.type = FrameType.ack;
| a.seq = recvd.seq;
| a.dest = recvd.source;
| //nl.send(a, SEND_DELAY);
| }
| break;
| case FrameType.ack:
| if (between(ack_expected, recvd.seq, next_frame_to_send) &&
| !acknowledged[recvd.seq % nr_bufs]) {
| acknowledged[recvd.seq % nr_bufs] = true;
| ack_timers[recvd.seq % nr_bufs].stop();
| ack_timers[recvd.seq % nr_bufs].reset();
| while(acknowledged[ack_expected % nr_bufs]) {
| result ~= recvd.data;
| acknowledged[ack_expected % nr_bufs] = false;
| ack_expected = inc(ack_expected, max_seq);
| nbuffered -= 1;
| }
|
| // nothing is awaiting ack and queue is empty
| // client should end his work
| if (nbuffered == 0 && queue.length == 0 && client_mode)
| done = true;
| }
| break;
| case FrameType.nack:
| break;
| default:
| break;
| }
| // process timers
| for (auto i = 0; i < nr_bufs; i += 1) {
| if (acknowledged[i]) continue;
| Duration d = ack_timers[i].peek();
| if (d > ack_timeout) {
| writeln("ack timeout, resending");
| // resend and reset timer
| nl.send(out_buf[i], SEND_DELAY);
| ack_timers[i].reset();
| ack_timers[i].start();
| }
| }
| }
|
| // finally, send disconnect
| if (client_mode) {
| // connect
| NetworkFrame d;
| d.type = FrameType.disconnect;
| d.dest = server_addr;
| nl.send(d, SEND_DELAY);
| }
| }
|
`----
Some explanations I left in comments for source code - it is easier to
read code and comments as it flows.
1.8.4 Results
-------------
Packet size = 45bytes.
--------------------------------------------------------------------------
MAX_SEQ Window Size ACK_TIMEOUT, ms File Size, kb Lost frames Time
--------------------------------------------------------------------------
1 1 1500 22 11/495 3m8s
1 1 1500 22 17/495 3m17s
1 1 1500 22 19/495 3m20s
--------------------------------------------------------------------------
7 4 1500 22 20/495 2m2s
7 4 1500 22 17/495 2m19s
7 4 1500 22 16/495 2m14s
--------------------------------------------------------------------------
15 8 1500 22 20/495 1m49s
15 8 1500 22 16/495 1m44s
15 8 1500 22 16/495 1m43s
--------------------------------------------------------------------------
15 8 3000 22 21/495 2m23s
15 8 3000 22 20/495 2m27s
15 8 3000 22 20/495 2m25s
--------------------------------------------------------------------------
31 16 1500 22 17/495 2m13s
31 16 1500 22 15/495 3m22s
31 16 1500 22 17/495 9m47s
--------------------------------------------------------------------------
31 16 3000 22 16/495 1m44s
31 16 3000 22 18/495 1m42s
31 16 3000 22 19/495 1m45s
--------------------------------------------------------------------------
Something wrong in one case(MAX_SEQ 31, 9m47s) - ACKs get lost on it's
way back and it's took 9 minutes for file to arrive completely.
When MAX_SEQ parameter is 1, window size is 1 too and algorithm acts
the same as Stop-And-Wait protocol. Lesser ACK_TIMEOUT param allow to
reduce receiving time, but it is important to keep the balance between
this value and window size because the more messages sent at the same
time - the more time to receive acknowledge is required.
1.9 Parallel scanner
~~~~~~~~~~~~~~~~~~~~
1.9.1 Implementation
--------------------
Scanning KNX line is pretty straight: you send TConnect request and if
LData.Con arrived without error flag - you may send TDataConnected
requests - read descriptor, serialnumber, etc.
So, algorithm is following:
,----
| for i = start to end
| try:
| connect;
| read data;
| save result;
| catch(error):
| continue loop;
`----
I noticed that scanning some lines take a lot of time. I don't know
where exactly the problem lies, but when you send TConnect request to
absent device, on some lines it comes as it should (with error flag
set) and on other lines it comes without - as if actual transport
connection was established. It may be because of
linecouplers/repeaters acknowledge transport requests or because of
some unknown reason. Application then sends first TDataConnected
request and got no ACK frame back. Ack timeout is three seconds and
after that, transport connection should re-send frame up to three
times and close connection. When you scan line device-after-device
and app waits for ACK for every absent device(if TConnect confirmation
was without error), it takes big amount of time to just waiting
acknowledge. What if we do awaiting for the multiple devices at the
same time?
Listing: llscan/app.d full source code.
,----
| import core.thread;
| import std.bitmanip;
| import std.conv;
| import std.digest;
| import std.file;
| import std.json;
| import std.datetime.stopwatch;
| import std.getopt;
| import std.parallelism;
| import std.stdio;
| import std.string;
|
| import llsdk;
|
| void main(string[] args) {
| string prefix = "dobaosll";
| string host = "127.0.0.1";
| ushort port = 6379;
| string line = "";
| ubyte start = 1;
| ubyte end = 255;
| string fname = "";
| int thread_num = 1;
|
| GetoptResult getoptResult;
| try {
| getoptResult = getopt(args,
| "prefix|c",
| "Prefix for redis config and stream keys. Default: dobaosll",
| &prefix,
|
| "host|h",
| "Host with llpub service running. Default: 127.0.0.1",
| &host,
|
| "port|p",
| "Redis port. Default: 6379",
| &port,
|
| "line|l",
| "Line to scan. If empty, scan BAOS subnetwork.",
| &line,
|
| "start|s",
| "Start scanning from device. Default: 1",
| &start,
|
| "end|e",
| "Stop at given device. Default: 255",
| &end,
|
| "threads|t",
| "Number of threads for parallel operation. Default: 1",
| &thread_num,
|
| "output|o",
| "Filename to save output in JSON format.",
| &fname);
| } catch(Exception e) {
| string info = "SDK for Weinzierl BAOS 83x Data Link Layer - scan line.\n";
| info ~= "Scan given line.";
| info ~= "If no --line argument was provided, scan BAOS subnetwork.";
| defaultGetoptPrinter(info,
| getoptResult.options);
| writeln(e.message);
| return;
| }
|
| if (getoptResult.helpWanted) {
| string info = "SDK for Weinzierl BAOS 83x Data Link Layer - scan line.\n";
| info ~= "Scan given line.";
| info ~= "If no --line argument was provided, scan BAOS subnetwork.";
| defaultGetoptPrinter(info,
| getoptResult.options);
| return;
| }
|
| ubyte[] lines;
| string[] lines_str = line.split(" ");
| foreach(string line_addr; lines_str) {
| try {
| if (line_addr == "") continue;
| lines ~= str2subnetwork(line_addr);
| } catch(Exception e) {
| writeln(e.message);
| return;
| }
| }
|
| MProp mprop = new MProp(host, port, prefix);
| ubyte[] local_descr = mprop.read(83);
| ubyte[] local_sn = mprop.read(11);
| ubyte[] local_manu = mprop.read(12);
| ubyte local_sub = mprop.read(57)[0];
| ubyte local_addr = mprop.read(58)[0];
| ushort local_ia = to!ushort(local_sub << 8 | local_addr);
|
| if (lines.length == 0) {
| // if no line was provided in args, use BAOS module line
| lines ~= local_sub;
| }
|
| JSONValue jout = parseJSON("[]");
|
| foreach(ubyte sub; lines) {
| string lineStr = subnetwork2str(sub);
| writeln("Scanning line ", lineStr);
| ushort[] ia2scan;
| for (int i = start; i <= end; i += 1) {
| ubyte a = i & 0xff;
| ushort addr = sub*256 + a;
| ia2scan ~= addr;
| }
| //for (int i = start; i <= end; i += 1) {
| auto workUnitSize = ia2scan.length/thread_num;
|
| auto taskPool = new TaskPool(thread_num);
| scope(exit) taskPool.stop();
| foreach(i, addr; taskPool.parallel(ia2scan, workUnitSize)) {
| JSONValue jo = parseJSON("{}");
| writeln("Scanning address ", ia2str(addr));
| if (addr == local_ia) {
| jo["addr_ushort"] = local_ia;
| jo["addr_string"] = ia2str(local_ia);
| jo["sn"] = local_sn.toHexString;
| jo["descr"] = local_descr.toHexString;
| jo["manufacturer"] = local_manu.toHexString;
| jout.array ~= jo;
|
| writefln("..Local [%s Descr: %s. SN: %s. Manuf-r: %s.]",
| ia2str(local_ia), toHexString(local_descr), toHexString(local_sn),
| local_manu.toHexString);
| continue;
| }
| auto tc = new TConn(addr, host, port, prefix);
| tc.connect();
| if (!tc.connected) {
| writeln("..TConnect.req confirmation error ", ia2str(addr));
| continue;
| }
| writeln("..TConnect.req confirmed ", ia2str(addr));
| ubyte[] descr;
| ubyte[] serial;
| ubyte[] manufacturer;
| try {
| descr = tc.deviceDescriptorRead();
| } catch(Exception e) {
| writefln("..Error reading device %s descriptor: %s",
| ia2str(addr), e.message);
| continue;
| }
| try {
| // manufacturer code: Obj 0, PID 12, num 1, start 01
| manufacturer = tc.propertyRead(0x0c, 0x00, 0x01, 0x01);
| } catch(Exception e) {
| writefln("..Error reading device %s manufacturer code: %s",
| ia2str(addr), e.message);
| }
| try {
| // serialnum: Obj 0, PID 11, num 1, start 1
| serial = tc.propertyRead(0x0b, 0x00, 0x01, 0x01);
| Thread.sleep(50.msecs);
| } catch(Exception e) {
| writefln("..Error reading device %s serial number: %s",
| ia2str(addr), e.message);
| } finally {
| tc.disconnect();
| jo["addr_ushort"] = addr;
| jo["addr_string"] = ia2str(addr);
| jo["descr"] = descr.toHexString;
| jo["sn"] = serial.toHexString;
| jo["manufacturer"] = manufacturer.toHexString;
| jout.array ~= jo;
| writefln("..[%s Descr: %s. SN: %s. Manuf-r: %s. ]",
| ia2str(addr), toHexString(descr),
| toHexString(serial), toHexString(manufacturer));
| }
| }
| taskPool.stop();
| }
|
| writeln("=================");
| if (fname.length > 0) {
| writeln("saving to file <", fname, ">");
| auto file = File(fname, "w");
| file.writeln(jout.toPrettyString);
| }
| writeln(jout.toPrettyString);
| }
`----
Thanks to D standard library phobos and std.parallelism module I was
able to made application parallel without effort.
Listing: multi-thread initialization.
,----
| auto workUnitSize = ia2scan.length/thread_num;
|
| auto taskPool = new TaskPool(thread_num);
| scope(exit) taskPool.stop();
| foreach(i, addr; taskPool.parallel(ia2scan, workUnitSize)) {
| ....
| .... scan device addr
| ....
| }
`----
1.9.2 Results
-------------
Tests were running on a line where for every TConnect request
LData.Con arrives without error. 65 devices presented in line
--------------------------
Threads Time Note
--------------------------
1 23m15s
2 19m56s
4 10m02s
8 5m1s
12 3m31s
16 2m44s
18 2m35s
20 2m25s
22 2m19s
24 2m16s
32 --- TIMEOUT
36 2m10s
--------------------------
Increasing thread count can reduce line scan time, but too much
messages sent simultaneously can cause response timeouts.
If Transport connections confirmations on bus is received in a right
way (if device is absent - error flag is set) then there is no big
difference between one- and multi-threaded modes.
1.10 References
~~~~~~~~~~~~~~~
1. The KNX Standard v2.1
2. Weinzierl Engineering GmbH, KnxBAOS_Users_Guide.pdf
3. Andrew S. Tanenbaum, Computer Networks
4. Vladimir Shabunin, bb_digging_knx
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment