Skip to content

Instantly share code, notes, and snippets.

@pfpmeijers
Forked from anonymous/client.c
Last active August 29, 2015 14:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pfpmeijers/e35e1bd74fbef2d165c9 to your computer and use it in GitHub Desktop.
Save pfpmeijers/e35e1bd74fbef2d165c9 to your computer and use it in GitHub Desktop.
#include <time.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#ifdef _WIN32
#include <windows.h>
#endif
#define MIN(a, b) (((a) < (b)) ? (a) : (b))
#define MAX(a, b) (((a) > (b)) ? (a) : (b))
// Select between supported communication frameworks:
#ifndef OPC
#define EPICS
#endif
// Include communication framework header.
#if defined(OPC)
#include "open62541.h"
#elif defined(EPICS)
#include "cadef.h"
#endif
// Framework related globals.
#if defined(OPC)
static UA_ReadRequest req;
static UA_Client* client;
#elif defined(EPICS)
#define CA_TIMEOUT_TIME (20.0) // seconds timeout for EPICS
#define MAX_T_DISCONN (10) // total accepted disconnects
#define MAX_CONN_RETRIES (20) // retries after 1 disconnect (a 1 second)
#define MAX_BENCH_ARRAY (1<<20) // max size of benchmark array in longs of 4 bytes
#define MAX_BENCH_CHANNELS (1<<20) // max number of benchmark channels
typedef struct {
char * pvname;
chid chid;
} channel_struct;
static channel_struct channels [MAX_BENCH_CHANNELS];
#endif
// Define parameters/defaults.
static char* variable_name = NULL;
static int array_size = 1;
static int array_item_size = 4; // int32 assumed. Not configurable yet.
static int transaction_size = 1;
static int transaction_count = 1000;
static float runout_duration = 0.0;
static int verify = 0;
static int verbose = 0;
static int sync_begin = 0;
static int sync_end = 0;
#if defined(OPC)
static char* protocol = "opc.tcp://";
static char* server_address = "localhost";
static char* port_nr = "48010";
static int namespace_index = 1;
#endif
const char* measure_trigger = "measure";
//---------------------------------------------------------------------------------------------------------------------
// Argument parsing and usage.
//
static void print_usage(char** argv, char *title)
{
if(title)
{ printf ("%s\n", title);
}
printf("usage: %s\n"
#if defined(OPC)
" [-server server_address ] (default: %s)\n"
" [-port port_nr ] (default: %s)\n"
" [-ns namespace_index ] (default: %d)\n"
#endif
" [-var variable_name ] (overrules name constructed from array size)\n"
" [-size array_size ] (default: %d, unit: # int32 items, to construct variable name)\n"
" [-vars transaction_size] (default: %d, unit: # variables)\n"
" [-repeat nr_transactions ] (default: %d)\n"
" [-runout duration ] (default: %g)\n"
" [-verify ] (verify results)\n"
" [-sync_begin ] (wait on '%s' trigger file to sync)\n"
" [-sync_end ] (continue as long as '%s' trigger file present)\n"
" [-v ] (verbose)\n"
" [-h ] (help)\n",
argv[0],
#if defined(OPC)
server_address, port_nr, namespace_index,
#endif
array_size, transaction_size, transaction_count, runout_duration,
measure_trigger, measure_trigger
);
exit(-1);
}
static void parse_arguments(int argc, char** argv)
{
// Parse arguments.
int i;
for(i = 1; i < argc; i++)
{ if(strcmp(argv[i], "-h") == 0)
{ print_usage(argv, NULL);
}
else if(strcmp(argv[i], "-v") == 0)
{ verbose = 1;
}
else if(strcmp(argv[i], "-verify") == 0)
{ verify = 1;
}
else if(strcmp(argv[i], "-sync_begin") == 0)
{ sync_begin = 1;
}
else if(strcmp(argv[i], "-sync_end") == 0)
{ sync_end = 1;
}
else if(i < argc - 1)
{
#if defined(OPC)
if(strcmp(argv[i], "-server") == 0)
{ server_address = argv[i + 1];
}
else if(strcmp(argv[i], "-port") == 0)
{ port_nr = argv[i + 1];
}
else if(strcmp(argv[i], "-ns") == 0)
{ namespace_index = atoi(argv[i + 1]);
}
else
#endif
if(strcmp(argv[i], "-var") == 0)
{ variable_name = argv[i + 1];
}
else if(strcmp(argv[i], "-size") == 0)
{ array_size = atoi(argv[i + 1]);
}
else if(strcmp(argv[i], "-vars") == 0)
{ transaction_size = atoi(argv[i + 1]);
}
else if(strcmp(argv[i], "-repeat") == 0)
{ transaction_count = atoi(argv[i + 1]);
}
else if(strcmp(argv[i], "-runout") == 0)
{ runout_duration = atof(argv[i + 1]);
}
}
}
if(variable_name == NULL)
{ variable_name = malloc(128);
#if defined(OPC)
sprintf(variable_name, "var%d", array_size);
#elif defined(EPICS)
sprintf(variable_name, "VAR%d.VAL", array_size);
#endif
}
else
{ // Make a string copy, such that it can be freed during cleanup. Needed to align with the case above.
char* s = malloc(strlen(variable_name) + 1);
strcpy(s, variable_name);
variable_name = s;
}
if(verbose)
{ printf("%s %s %s %s "
#if defined(OPC)
"-server %s -port %s -ns %d "
#endif
"-var %s -size %d -vars %d -repeat %d -runout %g\n",
argv[0], verify ? "-verify" : "", sync_begin ? "-sync_begin" : "", sync_end ? "-sync_end" : "",
#if defined(OPC)
server_address, port_nr, namespace_index,
#endif
variable_name, array_size, transaction_size, transaction_count, runout_duration);
}
#if defined(EPICS)
{
// array size must be a power of 2
int i = 1;
int found = 0;
while (i <= MAX_BENCH_ARRAY) {
if (array_size == i) {
found = 1;
}
i *= 2;
}
if (!found) {
printf ("EPICS Array size must be a power of 2\n"); exit (1);
}
}
if (array_size > MAX_BENCH_ARRAY) {
printf ("EPICS Array size cannot exceed %d\n", MAX_BENCH_ARRAY); exit (1);
}
if (transaction_size > MAX_BENCH_CHANNELS) {
printf ("EPICS Number of channels cannot exceed %d\n", MAX_BENCH_CHANNELS); exit (1);
}
#endif
}
//---------------------------------------------------------------------------------------------------------------------
#if defined(EPICS)
// reconnect if status indicates disconnect -------------------------------
static int handle_disconnect (int status, channel_struct *pch)
{
static int total_disconnects = 0;
int tries = 0;
if ((status==ECA_DISCONN) || (status == ECA_TIMEOUT)) {
total_disconnects++;
while ((status != ECA_NORMAL) &&
(total_disconnects < MAX_T_DISCONN) &&
(tries++ < MAX_CONN_RETRIES) ) {
status = ca_create_channel(pch->pvname, NULL, NULL, 10, &pch->chid);
if (status == ECA_NORMAL) {
status = ca_pend_io (CA_TIMEOUT_TIME);
}
}
if (status != ECA_NORMAL) {
printf ("ERROR: Failed to reconnect after disconnect; pv %s retries = %d; total disconnects = %d\n",
pch->pvname, tries, total_disconnects);
}
exit (1);
}
return status;
}
#endif
//---------------------------------------------------------------------------------------------------------------------
#if defined(_WIN32)
typedef LARGE_INTEGER Clock_t;
static void get_clock(Clock_t* c)
{ QueryPerformanceCounter(c);
}
static float get_clock_diff(Clock_t* c1, Clock_t* c2)
{ LARGE_INTEGER f;
QueryPerformanceFrequency(&f);
return (float)(c1->QuadPart - c2->QuadPart) / f.QuadPart;
}
#elif _POSIX_C_SOURCE >= 199309L
typedef struct timespec Clock_t;
static void get_clock(Clock_t* c)
{ clock_gettime(CLOCK_REALTIME, c);
}
static float get_clock_diff(Clock_t* c1, Clock_t* c2)
{ return (c1->tv_sec - c2->tv_sec) + (c1->tv_nsec - c2->tv_nsec )/1e9;
}
#else
typedef clock_t Clock_t;
static void get_clock(Clock_t* c)
{ *c = clock();
}
static float get_clock_diff(Clock_t* c1, Clock_t* c2)
{ return (*c1 - *c2)/(float)CLOCKS_PER_SEC;
}
#endif
//---------------------------------------------------------------------------------------------------------------------
// Execute the read transactions.
//
void run_transactions(int max_transaction_count, float max_measurement_duration,
int sync_begin, int sync_end, int print_output)
{
Clock_t measurement_begin_clock;
Clock_t measurement_end_clock;
int measurement_begin_clocked = 0;
int measurement_end_clocked = 0;
int measurement_ok = 1;
int measuring = 0;
float measurement_duration = 0.0;
int transaction_count = 0;
Clock_t transaction_begin_clock;
Clock_t transaction_end_clock;
float transaction_duration_max = 0.0;
float transaction_duration_min = 1e12;
float transaction_duration_sum = 0.0;
#if defined(OPC)
UA_Int32 value = -1;
#elif defined(EPICS)
long* pl = (long *)malloc(4*MAX_BENCH_ARRAY);
#endif
while(!measurement_end_clocked &&
(max_transaction_count == 0 || transaction_count < max_transaction_count) &&
(max_measurement_duration == 0.0 || measurement_duration < max_measurement_duration))
{
if(verbose)
{ if(measuring && !verify)
{ printf("%d \r", transaction_count);
}
else if(!measuring)
{ printf("Waiting ...\r");
}
}
get_clock(&transaction_begin_clock);
#if defined(OPC)
// Do the read transaction.
UA_ReadResponse resp = UA_Client_read(client, &req);
get_clock(&transaction_end_clock);
if(resp.responseHeader.serviceResult != UA_STATUSCODE_GOOD)
{ measurement_ok = 0;
printf("ERROR: Read failed. Service result is not good: 0x%X\n", resp.responseHeader.serviceResult);
break;
}
if(resp.resultsSize != transaction_size)
{ measurement_ok = 0;
printf("ERROR: Read failed. Result size is: %d, instead of: %d\n", resp.resultsSize, transaction_size);
break;
}
if(!resp.results[0].hasValue)
{ measurement_ok = 0;
printf("ERROR: Read failed. Result has no value.\n");
break;
}
if(resp.results[0].value.type != &UA_TYPES[UA_TYPES_INT32])
{ measurement_ok = 0;
printf("ERROR: Read failed. Result has wrong type: %d\n",
resp.results[0].value.type->typeId.identifier.numeric);
break;
}
if(resp.resultsSize != transaction_size)
{ measurement_ok = 0;
printf("ERROR: Read failed. Wrong number of results received: %d\n", resp.resultsSize);
break;
}
UA_Variant* variant = &resp.results[0].value;
int variant_length = UA_Variant_isScalar(variant) ? 1 : variant->arrayLength;
if(variant_length != array_size)
{ measurement_ok = 0;
printf("ERROR: Read failed. Wrong array size: %d\n", variant_length);
break;
}
value = 0;
if(verify && measuring)
{ // Retrieve the individual values.
int j;
for(j = 0; j < resp.resultsSize; j++)
{ if(resp.results[j].hasValue && resp.results[j].value.type == &UA_TYPES[UA_TYPES_INT32])
{ UA_Variant* variant = &resp.results[j].value;
int variant_length = UA_Variant_isScalar(variant) ? 1 : variant->arrayLength;
int k;
for(k = 0; k < variant_length; k++)
{ value = ((UA_Int32*)variant->data)[k];
if(verbose)
{ printf("%d ", value);
}
}
if(verbose)
{ printf("\r");
}
}
}
}
// Cleanup.
UA_ReadResponse_deleteMembers(&resp);
#elif defined(EPICS)
int retries = 0;
int status = ECA_NORMAL;
int j = 0;
// Loop through all channels per transaction
for (j = 0; j < transaction_size && status == ECA_NORMAL; j++) {
channel_struct *pch = &channels [j];
int ec = 0;
while ((ec == 0) && (retries++ < MAX_CONN_RETRIES)) {
ec = ca_element_count (pch -> chid);
if (ec == 0) handle_disconnect (ECA_DISCONN, pch);
}
status = ca_array_get(DBR_LONG, array_size, pch -> chid, (void *) pl);
}
SEVCHK (status, "ca_array_get failure in loop");
status = ca_pend_io(CA_TIMEOUT_TIME);
get_clock(&transaction_end_clock);
SEVCHK(status, "ca_pend_io failure in loop");
if (status != ECA_NORMAL) {
printf ("Premature exit after %d reads\n", transaction_count);
break;
}
#endif
float transaction_duration = get_clock_diff(&transaction_end_clock, &transaction_begin_clock);
transaction_duration_min = MIN(transaction_duration, transaction_duration_min);
transaction_duration_max = MAX(transaction_duration, transaction_duration_max);
transaction_duration_sum += transaction_duration;
// Wait with time measurement until trigger file present, that acts as synchronization between multiple clients.
if(!measurement_begin_clocked && !measuring && (!sync_begin || access(measure_trigger, F_OK) != -1))
{ get_clock(&measurement_begin_clock);
measurement_begin_clocked = 1;
measuring = 1;
}
// End the measurement when trigger file removed, or when max duration reached (used for run-out).
// Thus determine the duration of the measurement so far.
else if(!measurement_end_clocked && measuring)
{ get_clock(&measurement_end_clock);
measurement_duration = get_clock_diff(&measurement_end_clock, &measurement_begin_clock);
if(sync_end && access(measure_trigger, F_OK) == -1)
{ measurement_end_clocked = 1;
measuring = 0;
}
}
transaction_count += measuring;
}
int bytes_per_transaction = array_size * array_item_size * transaction_size;
float bandwidth = (float)bytes_per_transaction * transaction_count / measurement_duration;
if(measurement_ok && print_output)
{ printf("%d transactions, %g seconds, %d B/transaction, %g MB/s",
transaction_count, measurement_duration, bytes_per_transaction, bandwidth * 1e-6);
if(transaction_duration_sum > 0.0)
{ printf(", latency min/avg/max: %g/%g/%g seconds",
transaction_duration_min, transaction_duration_sum / transaction_count, transaction_duration_max);
}
printf("\n");
}
}
//---------------------------------------------------------------------------------------------------------------------
#if defined(EPICS)
// create a variable name from a sequence number and a type, and return it in 'the_name'
static void makename (int size, char **the_name)
{
char dest [100];
sprintf (dest, "benchlongarray:%d.VAL", size);
*the_name = (char *) malloc (strlen (dest) + 1);
if (*the_name == NULL) {
printf ("ERROR: Failed to allocate %d bytes of memory for variable name %s\n", (int) (strlen (dest)) + 1, dest);
exit (1);
}
strcpy (*the_name, dest);
}
// Create all EPICS channels ---------------------------------------
// Depending on array size (=1 or >1) create an array of scalar channels or an array of array channels
static void make_channels (void)
{
int i;
int status;
channel_struct *pcs = &channels [0];
for (i = 0; i < transaction_size; i++) {
pcs = &channels [i];
makename (array_size, &(pcs -> pvname));
status = ca_create_channel (pcs->pvname, NULL, NULL, 10, &pcs->chid);
if (status != ECA_NORMAL) {
printf ("make_channels: ca_create_channel failure, channel %d\n", i);
}
SEVCHK(status,"ca_create_channel failure");
}
status = ca_pend_io(CA_TIMEOUT_TIME);
if (status != ECA_NORMAL) {
printf ("ca_pend_io failure creating channel %s; offline?\n", pcs -> pvname);
}
SEVCHK (status, "ca_pend_io failure creating channel");
}
// Destroy all created EPICS channels ---------------------------------------
static void destroy_channels (void)
{
int i;
int status;
for (i = 0; i < transaction_size; i++) {
channel_struct *pcs = &channels [i];
status = ca_clear_channel(pcs->chid);
if (status != ECA_NORMAL) {
printf ("ca_clear_channel failure, channel %s\n", pcs -> pvname);
}
SEVCHK(status,"ca_clear_channel failure");
status = ca_pend_io(CA_TIMEOUT_TIME);
if (status != ECA_NORMAL) {
printf ("ca_pend_io failure destroying channels, channel %s\n", pcs -> pvname);
}
SEVCHK (status, "ca_pend_io failure destroying channels");
free (pcs -> pvname);
}
}
#endif
//---------------------------------------------------------------------------------------------------------------------
// Main app.
//
int main(int argc, char** argv)
{
// Handle command line arguments.
parse_arguments(argc, argv);
// Setup the connnection.
#if defined(OPC)
char* server_url = malloc(strlen(protocol) + strlen(server_address) + 1 + strlen(port_nr) + 1);
server_url[0] = 0;
strcat(server_url, protocol);
strcat(server_url, server_address);
strcat(server_url, ":");
strcat(server_url, port_nr);
// open65241 stack does not support message chunking yet.
// Hence single buffer to be specified that is big enough to contain all transaction data, including overhead.
int buffer_overhead = 1024; // Extra space for non-payload.
int buffer_size = array_size * sizeof(UA_Int32) * transaction_size + buffer_overhead;
if(buffer_size < 65536)
{ buffer_size = 65536;
}
UA_ClientConfig config = {
.timeout = 5, // sync response timeout in ms
.secureChannelLifeTime = 1000000, // lifetime in ms (then the channel needs to be renewed)
.timeToRenewSecureChannel = 2000, // time in ms before expiration to renew the secure channel
{.protocolVersion = 0, .sendBufferSize = buffer_size, .recvBufferSize = buffer_size,
.maxMessageSize = buffer_size, .maxChunkCount = 1
}
};
client = UA_Client_new(config /*UA_ClientConfig_standard*/, Logger_Stdout_new());
UA_StatusCode retval = UA_Client_connect(client, ClientNetworkLayerTCP_connect, server_url);
if(retval != UA_STATUSCODE_GOOD)
{ printf("Aborted.\n");
return retval;
}
UA_ReadRequest_init(&req);
req.nodesToReadSize = transaction_size;
req.nodesToRead = UA_Array_new(&UA_TYPES[UA_TYPES_READVALUEID], req.nodesToReadSize);
int i;
for(i = 0; i < req.nodesToReadSize; i++)
{ UA_ReadValueId_init(&(req.nodesToRead[i]));
UA_NodeId_init(&(req.nodesToRead[i].nodeId));
req.nodesToRead[i].nodeId = UA_NODEID_STRING_ALLOC(namespace_index, variable_name); // nodeId string deleted with req
req.nodesToRead[i].attributeId = UA_ATTRIBUTEID_VALUE;
}
#elif defined(EPICS)
int status;
make_channels ();
status = ca_context_create(ca_disable_preemptive_callback);
SEVCHK(status,"ca_context_create failure");
#endif
// Run the transactions.
run_transactions(transaction_count, 0.0, sync_begin, sync_end, 1);
if(runout_duration > 0.0)
{ if(verbose)
{ printf("Run-out ...\n");
}
run_transactions(0, runout_duration, 0, 0, 0);
}
// Cleanup.
#if defined(OPC)
UA_ReadRequest_deleteMembers(&req);
UA_Client_disconnect(client);
UA_Client_delete(client);
free(server_url);
#elif defined (EPICS)
destroy_channels ();
// SEVCHK(ca_context_destroy(),"ca_context_destroy");
SEVCHK(ca_task_exit(),"ca_task_exit failure");
#endif
free(variable_name);
return 0;
}
//---------------------------------------------------------------------------------------------------------------------
gcc -O3 -DOPC -D_WIN32 -std=c11 client.c open62541.c -lws2_32 -o client.exe
gcc -O3 -std=c11 server.c open62541.c -lws2_32 -o server.exe
#!/bin/sh
gcc -O3 -DOPC -D_POSIX_C_SOURCE=199309L -std=c11 client.c open62541.c -o client.bin
gcc -O3 -std=c11 server.c open62541.c -o server.bin
import os
import sys
import subprocess
import threading
import time
from datetime import datetime
client_executable = "client"
server_address = "localhost"
port_nr = "48010"
namespace_index = 1
variable_name = None
client_count = 1
array_size = 10
transaction_size = 1
max_transaction_count = 1000000
max_measure_duration = 1 # seconds
runout_duration = 0.5 # seconds
out_csv = "measurements.csv"
max_retries = 2
verify = False
sync_begin = True
sync_end = True
verbose = False
args = sys.argv
for i in range(1, len(args)):
option = args[i]
if option == "-v":
verbose = True
elif option == "-verify":
verify = True
elif i + 1 < len(args):
value = args[i + 1]
if option == "-client":
client_executable = value
elif option == "-server":
server_address = value
elif option == "-port":
port_nr = value
elif option == "-ns":
namespace_index = int(value)
elif option == "-clients":
client_count = int(value)
elif option == "-var":
variable_name = value
elif option == "-size":
array_size = int(value)
elif option == "-vars":
transaction_size = int(value)
elif option == "-repeat":
max_transaction_count = int(value)
elif option == "-duration":
max_measure_duration = float(value)
elif option == "-runout":
runout_duration = float(value)
elif option == "-csv":
out_csv = value
args = \
client_executable + \
" -server " + server_address + \
" -port " + port_nr + \
" -ns " + str(namespace_index) + \
" -clients " + str(client_count) + \
" -size " + str(array_size) + \
" -vars " + str(transaction_size) + \
" -repeat " + str(max_transaction_count) + \
" -runout " + str(runout_duration)
if variable_name is not None:
args += " -var " + variable_name
if sync_begin:
args += " -sync_begin"
if sync_end:
args += " -sync_end"
if verify:
args += " -verify"
if verbose:
print(args)
def run(*args, **kwargs):
subprocess.call(args, **kwargs)
def start_client(args, client_idx, out_file):
thread = threading.Thread(target = run, args = tuple(args.split()), kwargs = {"stdout": out_file})
thread.start()
if verbose:
print("Client " + str(client_idx) + " started.")
return thread
measuring = False
measurement_done = False
retries = 0
while not measurement_done:
measure_trigger = "measure"
if os.path.exists(measure_trigger):
os.remove(measure_trigger)
clients = []
for i in range(client_count):
if not os.path.exists("logs"):
os.mkdir("logs")
log_name = "logs/" + str(i) + ".log"
log_file = open(log_name, "w")
clients += [(start_client(args, i, log_file), i, log_name, log_file)]
time.sleep(0.1 * client_count)
open(measure_trigger, "w").close()
measuring = True
if verbose:
print("Measuring started.")
begin_time = datetime.now()
first_client_duration = None
transaction_count_sum = 0
transaction_count_min = 1000000000
transaction_count_max = 0
transaction_duration_min = 1e12
transaction_duration_sum = 0
transaction_duration_max = 0
measurement_duration_sum = 0
measurement_duration_min = 1e12
measurement_duration_max = 0
bandwidth_sum = 0
bandwidth_min = 1e12
bandwidth_max = 0.0
measurement_ok = True
while clients:
for client in clients:
thread, i, log_name, log_file = client
if not thread.is_alive():
if first_client_duration is None:
first_client_duration = (datetime.now() - begin_time).total_seconds()
log_file.close()
log_file = open(log_name, "r")
lines = log_file.readlines()
line = lines[0].strip()
line_items =line.split()
try:
transaction_count, measurement_duration, transaction_duration_items = \
int(line_items[0]), float(line_items[2]), line_items[10].split("/")
except:
transaction_count = measurement_duration = 0
transaction_duration_items = [0, 0, 0]
if transaction_count == 0 or measurement_duration == 0:
print("ERROR: Client " + str(i) + " transactions failed: " + line)
measurement_ok = False
array_item_size = 4 # Int32
if measurement_duration > 0:
bandwidth = array_size * array_item_size * transaction_size * transaction_count / measurement_duration
else:
bandwidth = 0
transaction_duration_sum += float(transaction_duration_items[1])
transaction_duration_min = min(transaction_duration_min, float(transaction_duration_items[0]))
transaction_duration_max = max(transaction_duration_max, float(transaction_duration_items[2]))
measurement_duration_sum += measurement_duration
measurement_duration_min = min(measurement_duration_min, measurement_duration)
measurement_duration_max = max(measurement_duration_max, measurement_duration)
transaction_count_sum += transaction_count
transaction_count_min = min(transaction_count_min, transaction_count)
transaction_count_max = max(transaction_count_max, transaction_count)
bandwidth_sum += bandwidth
bandwidth_min = min(bandwidth_min, bandwidth)
bandwidth_max = max(bandwidth_max, bandwidth)
if verbose:
print("Client " + str(i) + " done: " + line)
clients.remove(client)
break
if sync_end and measuring and (datetime.now() - begin_time).total_seconds() > max_measure_duration:
while True:
# noinspection PyBroadException
try:
os.remove(measure_trigger)
measuring = False
break
except:
assert((datetime.now() - begin_time).total_seconds() < max_measure_duration + 2)
if verbose:
print("Measuring done. Runout ...")
time.sleep(0.01)
if verbose:
print("All done.")
if client_count > 1 and measurement_duration_max > first_client_duration:
print("ERROR: Run-out too short. "
"Max measurement duration: " + str(measurement_duration_max) + " sec, "
"First client duration: " + str(first_client_duration) + " sec")
measurement_ok = False
if measurement_ok:
measurement_done = True
else:
if retries == max_retries:
break
print("Retrying ...")
retries += 1
runout_duration *= 2
transaction_bytes = array_size * array_item_size * transaction_size
transaction_count_avg = transaction_count_sum / client_count
transaction_duration_avg = transaction_duration_sum / client_count
measurement_duration_avg = measurement_duration_sum / client_count
bandwidth_avg = bandwidth_sum / client_count
if measurement_duration_max > 0:
bandwidth_sum_alt1 = client_count * transaction_bytes * transaction_count_avg / measurement_duration_avg
bandwidth_sum_alt2 = client_count * transaction_bytes * transaction_count_avg / measurement_duration_max
bandwidth_sum_alt3 = transaction_bytes * transaction_count_sum / measurement_duration_max
if not measurement_ok or measurement_duration_max == 0:
bandwidth_sum = 0
bandwidth_sum_alt1 = 0
bandwidth_sum_alt2 = 0
bandwidth_sum_alt3 = 0
out_keys = ["client_executable", "client_count", "array_size", "transaction_size", "transaction_bytes",
"measurement_duration_min", "measurement_duration_avg", "measurement_duration_max",
"transaction_count_min", "transaction_count_avg", "transaction_count_max",
"transaction_duration_min", "transaction_duration_avg", "transaction_duration_max",
"bandwidth_min", "bandwidth_avg", "bandwidth_max",
"bandwidth_sum", "bandwidth_sum_alt1", "bandwidth_sum_alt2", "bandwidth_sum_alt3"]
out_data = {key: globals()[key] for key in out_keys}
s = ""
for key in out_keys:
s += key + ": " + str(out_data[key]) + ", "
print(s)
if transaction_count > 0:
csv = open(out_csv, "a")
for key in out_keys:
csv.write(str(out_data[key]) + ",")
csv.write("\n")
csv.close()
import os
for client in ["./opcClient", "./caClient"]:
for client_count in [1 << i for i in range(8)]:
for transaction_size in [1 << i for i in range(10)]:
for array_size in [1 << i for i in range(20)]:
if client == "./opcClient" and (array_size * transaction_size * 4 > 65536 or transaction_size >= 64):
print("Skipping: ", array_size, transaction_size)
continue
os.system("python multi_client.py"
" -client " + client +
" -server 192.168.0.100" +
" -clients " + str(client_count) +
" -size " + str(array_size) +
" -vars " + str(transaction_size) +
"")
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <signal.h>
#include "open62541.h"
// Application arguments/defaults
static int port_nr = 48010;
static int thread_count = 4;
static int verbose = 0;
// Argument parsing and usage.
void print_usage(char *title)
{
if(title)
{ printf("%s\n", title);
}
printf(
"usage: \n"
" [-port port_nr ] (default: %d)\n"
" [-threads thread_count] (default: %d)\n"
" [-v ] (verbose)\n"
" [-h ] (help)\n",
port_nr, thread_count
);
exit(-1);
}
void parse_arguments(int argc, char** argv)
{
for(int i = 1; i < argc; i++)
{ if(strcmp(argv[i], "-h") == 0)
{ print_usage(NULL);
}
else if(strcmp(argv[i], "-v") == 0)
{ verbose = 1;
}
else if(i < argc - 1)
{ if(strcmp(argv[1], "-port") == 0)
{ port_nr = atoi(argv[i+1]);
}
else if(strcmp(argv[i], "-threads") == 0)
{ thread_count = atoi(argv[i+1]);
}
}
}
if(verbose)
{ printf("%s -port %d -threads %d\n", argv[0], port_nr, thread_count);
}
}
// Define arrays to provide data to the variable nodes.
typedef struct
{ int size;
char* name;
UA_Int32* data;
}
Variable_t;
static int variable_sizes[] =
{1, 2, 4, 5, 8, 10, 13, 16, 20, 32, 50, 64, 100, 128, 200, 256, 500, 512, 1000, 1024,
2000, 2048, 4096, 5000, 8192, 10000, 16384, 20000, 25000, 32768, 50000, 65536,
100000, 131072, 200000, 262144, 500000, 524288, 1000000, 1048576, 2097152};
static int max_variable_size;
static int variable_count;
static Variable_t** variables;
void setup_variables()
{
variable_count = sizeof(variable_sizes)/sizeof(int);
variables = malloc(variable_count * sizeof(Variable_t));
if(variables == NULL)
{ printf("ERROR: Memory allocation of variables data failed.\n");
exit(-1);
}
for(int i = 0; i < variable_count; i++)
{ int size = variable_sizes[i];
max_variable_size = size > max_variable_size ? size : max_variable_size;
variables[i] = malloc(sizeof(Variable_t));
variables[i]->size = size;
variables[i]->data = malloc(size * sizeof(UA_Int32));
for(int j = 0; j < size; j++)
{ variables[i]->data[j] = j;
}
variables[i]->name = malloc(5 + i);
sprintf(variables[i]->name, "var%d", size);
}
}
void cleanup_variables()
{ for(int i = 0; i < variable_count; i++)
{ free(variables[i]->data);
free(variables[i]->name);
free(variables[i]);
}
free(variables);
}
// DataSource callbacks
static UA_StatusCode read(void *handle, UA_Boolean includeSourceTimeStamp, const UA_NumericRange *range, UA_DataValue *dataValue)
{
Variable_t* variable = handle;
if(variable->size == 1)
{ UA_Variant_setScalar(&dataValue->value, variable->data, &UA_TYPES[UA_TYPES_INT32]);
}
else
{ UA_Variant_setArray(&dataValue->value, variable->data, variable->size, &UA_TYPES[UA_TYPES_INT32]);
}
dataValue->status = UA_STATUSCODE_GOOD;
dataValue->hasValue = UA_TRUE;
dataValue->hasStatus = UA_TRUE;
if(includeSourceTimeStamp)
{ dataValue->hasSourceTimestamp = UA_TRUE;
dataValue->sourceTimestamp = UA_DateTime_now();
}
if(verbose)
{ printf("%s: %d", variable->name, variable->data[variable->size - 1]);
if(dataValue->hasSourceTimestamp)
{ UA_ByteString s;
UA_DateTime_toString(dataValue->sourceTimestamp, &s);
printf(" %s", s.data);
UA_ByteString_deleteMembers(&s);
}
printf(" \r");
}
// Increase last array element, in order to give feedback to the client that the data is really updated.
variable->data[variable->size - 1]++;
return UA_STATUSCODE_GOOD;
}
static void release(void *handle, UA_DataValue *dataValue)
{
if(dataValue->hasValue)
{ // UA_Variant_deleteMembers(&dataValue->value);
// Not to be deleted because data is not copied into the variant. Just referenced.
}
}
static UA_StatusCode write(void *handle, const UA_Variant *data, const UA_NumericRange *range)
{
}
// Callback to handle ctrl-c in order to stop the server with cleanup.
static UA_Boolean running = UA_TRUE;
static void stop_handler(int sign)
{ running = UA_FALSE;
}
// Main app.
int main(int argc, char** argv)
{
// Handle command line arguments.main.c
parse_arguments(argc, argv);
// Setup variable node data.
setup_variables();
// Initialize server.
int buffer_overhead = 1024; // Extra space for non-payload.
int buffer_size = max_variable_size + buffer_overhead;
UA_ConnectionConfig config =
{.protocolVersion = 0, .sendBufferSize = buffer_size, .recvBufferSize = buffer_size,
.maxMessageSize = buffer_size, .maxChunkCount = 1};
UA_Server *server = UA_Server_new(UA_ServerConfig_standard);
UA_Server_addNetworkLayer(server, ServerNetworkLayerTCP_new(config, port_nr));
// Add the variable nodes.
for(int i = 0; i < variable_count; i++)
{ printf("Variable: %s\n", variables[i]->name);
UA_QualifiedName nodeName = UA_QUALIFIEDNAME(1, variables[i]->name);
UA_NodeId nodeId = UA_NODEID_STRING(1, variables[i]->name);
UA_NodeId parentNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_OBJECTSFOLDER);
UA_NodeId parentReferenceNodeId = UA_NODEID_NUMERIC(0, UA_NS0ID_ORGANIZES);
UA_DataSource dataSource = {variables[i], read, release, write};
UA_Server_addDataSourceVariableNode(server, dataSource, nodeName, nodeId, parentNodeId, parentReferenceNodeId);
}
// Run the server loop.
printf("Running ...\n");
signal(SIGINT, stop_handler); /* Catches ctrl-c */
UA_StatusCode retval = UA_Server_run(server, thread_count, &running);
// Cleanup.
UA_Server_delete(server);
cleanup_variables();
printf("\rDone.\n");
return retval;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment