Skip to content

Instantly share code, notes, and snippets.

@jn0
Last active October 25, 2018 12:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jn0/1c7067bc686095c53507b1b7a43f67ec to your computer and use it in GitHub Desktop.
Save jn0/1c7067bc686095c53507b1b7a43f67ec to your computer and use it in GitHub Desktop.
My (failed) attempt to use Python driven C++ powered TensorRT application on Jetson
#!/usr/bin/python
import sys
CHUNK_SIZE = 1024
LINE_LENGTH = 16
if __name__ == '__main__':
if len(sys.argv) < 3:
print('Usage: ' + sys.argv[0] + ' ${input_name} ${output_name} < ${bin_file} > ${c_file}')
sys.exit(0)
inp = sys.argv[1]
outp = sys.argv[2]
data = 'x' * CHUNK_SIZE
size = 0
line = 0
o = ''
first = True
print('/* UFF Model */')
print('static const char uff_input[] = "%s";' % (inp,))
print('static const char uff_output[] = "%s";' % (outp,))
print('static uint8_t uff_data[] = {')
while len(data) == CHUNK_SIZE:
data = sys.stdin.read(CHUNK_SIZE)
if data:
size += len(data)
for b in data:
o += '%c 0x%02x' % (' ' if first else ',', ord(b))
# o += '\\x%02x' % (ord(b),)
line += 1
if line == LINE_LENGTH:
print(o)
# print('\t\t"' + o + '"')
o = ''
line = 0
first = False
print('}; /* end of data */')
print('static const size_t uff_size = %r;' % (size,))
print('/* EOF */')
# vim: et ts=4 sts=4 sw=4 ai
  1. I have a Jetson TX2 to run a model on (to detect things in images)
  2. I'm given a model, which is convereted to UFF.
  3. I use bin2c.py to produce sorta C code to include in my sampleUffMNIST.cpp (heavely patched sample file)
  4. I use sample build setup to produce the binary out of that sampleUffMNIST.cpp (right on that Jetson)
  5. I run that binary and it waits on AF_UNIX socket...
  6. I run the feed.py which grabs the images, feeds it to sampleUffMNIST.cpp over that local socket and
  7. feed.py fetches output from sampleUffMNIST.cpp (which is is expected to be a matrix of "probabilities" for a pixel to belong to an object)
  8. the matrix is then applied to the source image and we get the image with everything masked out except of objects found.

I have a mockup in Python (just rewritten the same original sampleUffMNIST.cpp) on a PC with GPU and it works fine! But I have no TensorRT Python binding on Jetson. Hence, I have to code in C++ (which is not my favourite by any mean) to run the engine.

This doesn't work. I get garbage on the output.

Here we are.

#!/usr/bin/python3
# -*- coding: utf-8 -*-
'''
INFO: daemon_io.input_socket
INFO: /tmp/licpltdtc.in
INFO: daemon_io.output_socket
INFO: /tmp/licpltdtc.out
INFO: IPC setup completed
srwxrwxr-x 1 jno jno 0 Oct 17 14:06 /tmp/licpltdtc.in
srwxrwxr-x 1 jno jno 0 Oct 17 14:06 /tmp/licpltdtc.out
The model has been parsed ok in 0.168182 seconds
The engine has been built ok in 32.4339 seconds
About to execute the model
model has 2 bindings
--------------------------- 0 -------
INFO: reading image ...
image = cv2.imread(fname)
assert image is not None, fname # cv2.imread doesn't fail
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
gray1k = imutils.resize(gray, height=1024)
inputs = transform.resize(gray1k, (256, 256)).reshape(1, 256, 256, 1).ravel().astype(np.float32)
'''
import sys
import os
import socket
import time
import threading
from queue import Queue
import numpy as np
from PIL import Image
import PIL.PpmImagePlugin
import PIL.JpegImagePlugin
from skimage import transform, img_as_float
import imutils
import cv2
import logging
log = logging.getLogger('FEED' if __name__ == '__main__' else __name__)
OUTPUT_SOCKET = '/tmp/licpltdtc.in'
INPUT_SOCKET = '/tmp/licpltdtc.out'
IN_DIR = 'image' # 'in2'
OUT_DIR = 'out'
IMG_TYPE = '.pgm'
MAX_FILE = 101 # 481
NN_SIZE = 512 # 256
NN_TYPE = np.float32
queue = Queue()
done = threading.Event()
fail = threading.Event()
# 2i = lambda x: 255 - int((0.0 - x) * 255.0)
f2i = lambda x: 255 - ((1.0 - x) * 255.0)
i2f = lambda x: 1.0 - float(x) / 255.0
def sockets_ok():
return os.access(OUTPUT_SOCKET, os.W_OK) == 0 and \
os.access(INPUT_SOCKET, os.R_OK) == 0
def sockets_status():
return '; '.join([
', '.join([s] +
(['F' if os.access(s, os.F_OK) else 'f',
'R' if os.access(s, os.R_OK) else 'r',
'W' if os.access(s, os.W_OK) else 'w',
] if os.path.exists(s) else ['no-file'])
)
for s in (INPUT_SOCKET, OUTPUT_SOCKET,)
])
def files(dir=IN_DIR, ext=IMG_TYPE, maxFileNumber=MAX_FILE):
return (os.path.abspath(os.path.join(dir, '%d%s' % (i + 1, ext))) for i in range(maxFileNumber))
# return (os.path.abspath(os.path.join(dir, '%06d%s' % (i + 1, ext))) for i in range(maxFileNumber))
#return map(lambda x: os.path.abspath(os.path.join(dir, x)),
# sorted([f for f in os.listdir(dir) if f.endswith(ext)]))
def load_image(file):
image = cv2.imread(file)
assert image is not None, file # cv2.imread doesn't fail
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
gray1k = imutils.resize(gray, height=1024)
return (transform
.resize(gray1k,
(NN_SIZE, NN_SIZE),
mode='constant',
anti_aliasing=False,
) # compatibility...
.reshape(1, NN_SIZE, NN_SIZE, 1)
.ravel()
.astype(np.float32)) # >> numpy.array() [0.0 .. 1.0]
def bytes2image(data):
inp = np.frombuffer(bytes(data), np.float32)
log.info('%r: [%r..%r]', len(inp), inp.min(), inp.max())
mask = inp > 0.25
img = f2i(inp * mask).astype(np.uint8)
del mask
del inp
log.info('%r: [%r..%r]', len(img), img.min(), img.max())
img.shape = (NN_SIZE, NN_SIZE)
return img
def save_image(file, data, type='.pgm'):
fname = os.path.splitext(file.replace('/' + IN_DIR + '/', '/' + OUT_DIR + '/'))[0] + type
with Image.fromarray(data) as sav:
log.info('saving %r items to %r as %r', len(data), file, fname)
sav.save(fname)
log.info('saved %r in %r', os.path.getsize(fname), fname)
def send_file(path):
log.info('about to send %r', path)
sock = None
try:
sz2 = NN_SIZE * NN_SIZE
data = load_image(path) # numpy.array(NN_TYPE) [0.0 .. 1.0]
assert len(data) == sz2, (len(data), sz2)
sz3 = sz2 * data.itemsize
data = data.tobytes()
assert len(data) == sz3, (len(data), sz3)
try:
where = 'socket'
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
where = 'connect'
sock.connect(OUTPUT_SOCKET)
where = 'header'
sock.send(b'%d\n' % (len(data),))
where = 'data'
sock.send(data)
except (FileNotFoundError, ConnectionRefusedError) as e:
fail.set()
log.error('The peer (send) went down: %r for %r at %r',
e, OUTPUT_SOCKET, where)
except (BrokenPipeError, IOError, OSError) as e:
fail.set()
log.fatal('Error sending %s of %r: %r', where, path, e)
log.warn('Sockets: %s', sockets_status())
else:
log.info('sent %r: %r bytes', path, len(data))
finally:
if sock:
try:
sock.close()
except:
pass
def recv_file(path):
log.info('about to recv %r', path)
sock = None
try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
log.debug('recv_file: socket=%r', sock)
try:
log.debug('recv_file: connecting socket to %r', INPUT_SOCKET)
sock.connect(INPUT_SOCKET)
log.debug('recv_file: socket connected to %r', INPUT_SOCKET)
except (FileNotFoundError, ConnectionRefusedError) as e:
log.error('The peer (recv) went down: %r for %r', e, INPUT_SOCKET)
fail.set()
return
tmp = b''
size = -1
while not fail.is_set():
c = sock.recv(1)
if not c:
fail.set()
break
if c == b'\n':
size = int(tmp)
break
tmp += c
assert len(tmp) < 10, tmp
if fail.is_set():
log.warn('Aborting recv...')
return
log.debug('recv_file(%r): size=%r tmp=%r fail=%r done=%r',
path, size, tmp, fail.is_set(), done.is_set())
data = bytes()
while len(data) < size and not fail.is_set():
data += sock.recv(size - len(data))
log.debug('got %d as %d bytes for %r', size, len(data), path)
assert size == len(data), (size, len(data), 'data lost')
return data
finally:
if sock:
try:
sock.close()
except:
pass
def feeder(dir=IN_DIR, ext=IMG_TYPE, maxFiles=None):
count = 0
for file in files(dir, ext):
count += 1
if maxFiles and count >= maxFiles:
break
send_file(file)
if fail.is_set():
break
queue.put(file)
if done.is_set():
break
time.sleep(0.001)
if fail.is_set():
log.warn('feeder: aborted')
else:
log.info('feeder: done')
done.set()
def fetcher():
while not (fail.is_set() and queue.empty()):
log.debug('fetcher: qsize=%r empty=%r fail=%r done=%r',
queue.qsize(), queue.empty(), fail.is_set(), done.is_set())
path = queue.get()
data = recv_file(path)
if data is not None:
img = bytes2image(data)
save_image(path, img)
del img
del data
else:
fail.set()
queue.task_done()
if done.is_set() and queue.empty():
log.info('fetcher: done')
break
time.sleep(0.001)
if fail.is_set():
log.error('fetcher: abort; queue size about %r', queue.qsize())
while not queue.empty(): # flush the queue
queue.get()
queue.task_done()
def main():
assert os.access(OUTPUT_SOCKET, os.W_OK), OUTPUT_SOCKET
assert os.access(INPUT_SOCKET, os.R_OK), INPUT_SOCKET
feeder_t = threading.Thread(name='feeder', target=feeder)
feeder_t.daemon = True
fetcher_t = threading.Thread(name='fetcher', target=fetcher)
fetcher_t.daemon = True
log.info('Starting...')
fetcher_t.start()
feeder_t.start()
log.info('Running...')
done.wait()
if fail.is_set():
log.warn('Aborted...')
log.info('Terminating...')
while feeder_t.is_alive() or fetcher_t.is_alive():
log.debug('Threads (feed, fetch): %r', [feeder_t.is_alive(), fetcher_t.is_alive()])
if feeder_t.is_alive():
feeder_t.join(.5)
log.warn('Cannot join feeder')
time.sleep(0.001)
if fetcher_t.is_alive():
fetcher_t.join(.5)
log.warn('Cannot join fetcher')
time.sleep(0.001)
log.info('Exiting...')
return os.EX_OK
if __name__ == '__main__':
logging.logThreads = True
if 0:
logging.basicConfig(level=logging.DEBUG)
xmax = -9.9
xmin = 9.9
for file in files('in2', '.jpg'):
data = load_image(file)
print(os.path.basename(file), data.min(), data.max())
xmax = max((data.max(), xmax))
xmin = min((data.min(), xmin))
print(xmin, xmax)
# save_image('xz.pgm', data.astype(np.uint8))
sys.exit(0)
logging.basicConfig(level=logging.INFO, format='%(threadName)s:' + logging.BASIC_FORMAT)
sys.exit(main())
# vim: et ts=4 sts=4 sw=4 ai
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <signal.h>
#include <algorithm>
#include <chrono>
#include <cerrno>
#include <cstdlib>
#include <fstream>
#include <iostream>
#include <string>
#include <thread>
#include <unordered_map>
#include <cassert>
#include <vector>
#include <cuda_runtime_api.h>
#include "NvInfer.h"
#include "NvUffParser.h"
#include "NvUtils.h"
using namespace nvuffparser;
using namespace nvinfer1;
#include "common.h"
static Logger gLogger(ILogger::Severity::kINFO);
static inline void log_info(const char* msg) { gLogger.log(ILogger::Severity::kINFO, msg); }
static inline void log_warn(const char* msg) { gLogger.log(ILogger::Severity::kWARNING, msg); }
static inline void log_error(const char* msg) { gLogger.log(ILogger::Severity::kERROR, msg); }
static inline void log_level(ILogger::Severity level) { gLogger.reportableSeverity = level; }
#define THE_CPP_TIME std::chrono::time_point<std::chrono::high_resolution_clock>
static inline THE_CPP_TIME stamp(void)
{
return std::chrono::high_resolution_clock::now();
}
static inline float stamp_delta(THE_CPP_TIME t_start, THE_CPP_TIME t_stop)
{
return std::chrono::duration<float, std::milli>(t_stop - t_start).count();
}
#define RETURN_AND_LOG(ret, severity, message) \
do { \
std::string error_message = "sample_uff_mnist: " + std::string(message); \
gLogger.log(ILogger::Severity::k ## severity, error_message.c_str()); \
return (ret); \
} while(0)
#define ASSERT(predicate, msg) do { \
if (!(predicate)) { \
log_error(msg ": " # predicate); \
std::cerr << "File " << __FILE__ << ", line " << __LINE__ << std::endl; \
exit(EXIT_FAILURE); \
} \
} while(0)
#define X_ASSERT(predicate, lbl, msg) do { \
if (!(predicate)) { \
log_error(msg); \
std::cerr << "File " << __FILE__ << ", line " << __LINE__ << std::endl; \
goto lbl; \
} \
} while(0)
#define T_ASSERT(predicate, lbl, msg, t1, t2) do { \
(t1) = stamp(); \
if (!(predicate)) { \
log_error(msg); \
std::cerr << "File " << __FILE__ << ", line " << __LINE__ << std::endl; \
goto lbl; \
} \
(t2) = stamp(); \
} while(0)
// syscall/libcall assert
#define C_ASSERT(call, predicate, lbl, msg) do { \
if (!(predicate)) { \
log_error(strerror(errno)); \
log_error("in " # call ": " # msg); \
std::cerr << "File " << __FILE__ << ", line " << __LINE__ << std::endl; \
goto lbl; \
} \
} while(0)
/* ************************************************************************** *
* IO things (via AF_UNIX sockets)
*/
typedef enum ipc_how {
IPC_FULL,
IPC_HEAD,
IPC_DATA,
} ipc_how_t;
class daemon_io {
/* make sure the socket file does NOT exist before bind(2) call (or face the EADDRINUSE)
* create AF_UNIX socket(7)
* set it O_NONBLOCK via fcntl(2)
* bind(2) it to its name (create the socket file)
* adjust permissions (R/W) via stat(2)/chmod(2)
* go listen(2) for an incoming connection on `listen_socket`
* ...
* accept(2) the incoming call
* perform IO (read(2)/send(2)) on that accept'ed socket (peer_socket)
*
* the protocol is quite simple:
* 1. <decimal-ascii-numeric-size-value> '\n' (header, IPC_HEAD)
* 2. <binary-data-of-specified-size> (data, IPC_DATA)
*
* inject calls to relinquish() to leverage busyloops, if any
*
* this is *synchronous* thing! it runs in single thread now!
*/
private:
static const int NONE = -1;
static const int INPUT_BACKLOG = 50;
static const int OUTPUT_BACKLOG = 50;
static const int NO_FLAGS = 0;
struct io_socket {
struct sockaddr_un socket_name;
int listen_socket,
peer_socket,
backlog;
} input, output;
static inline void relinquish(void) { usleep(10); } // allow some breath to scheduler
static inline void close_peer(struct io_socket* s)
{
if (s->peer_socket != daemon_io::NONE)
close(s->peer_socket);
s->peer_socket = daemon_io::NONE;
}
inline void drop_sock(struct io_socket* s)
{
this->close_peer(s);
if (s->listen_socket != daemon_io::NONE)
close(s->listen_socket);
s->listen_socket = daemon_io::NONE;
if (access(s->socket_name.sun_path, F_OK) == 0)
unlink(s->socket_name.sun_path);
}
ssize_t socket_read(void *buffer, size_t size)
{
for (;;) {
ssize_t rc = read(this->input.peer_socket, buffer, size);
if (rc < 0) {
switch (errno) {
case EAGAIN:
#if EAGAIN != EWOULDBLOCK
case EWOULDBLOCK:
#endif
this->relinquish();
continue;
default:
log_error("socket_read: error ///");
log_error(strerror(errno));
return IPC_ERROR;
}
}
if (rc == 0) {
log_error("socket_read: end of stream");
return IPC_ERROR;
}
return rc;
}
}
ssize_t socket_write(const void *buffer, size_t size)
{
for (;;) {
errno = 0;
ssize_t rc = send(this->output.peer_socket, buffer, size, daemon_io::NO_FLAGS);
if (rc < 0) {
switch (errno) {
case EAGAIN:
#if EAGAIN != EWOULDBLOCK
case EWOULDBLOCK:
#endif
this->relinquish();
continue;
default:
log_error("socket_write: error ///");
log_error(strerror(errno));
return IPC_ERROR;
}
}
if (rc == 0) {
log_error("socket_write: ooops!");
log_error(strerror(errno));
return IPC_ERROR;
}
return rc;
}
}
static inline bool try_accept(struct io_socket *s)
{
if (s->peer_socket == daemon_io::NONE)
s->peer_socket = accept(s->listen_socket, nullptr, nullptr);
return s->peer_socket != daemon_io::NONE;
}
size_t _ipc_read(void *buffer, size_t size, ipc_how_t how)
{
C_ASSERT("accept", this->try_accept(&this->input),
errout, "ipc_read");
switch (how) {
case IPC_HEAD: {
char header[32];
for (unsigned i = 0; i < sizeof(header); i++) {
if (this->socket_read(&header[i], 1) == (ssize_t)IPC_ERROR) {
log_error("ipc_read: fail to read header");
return IPC_ERROR;
}
if (header[i] == '\n') {
header[i] = '\0';
break;
}
if (strchr("0123456789", header[i]) == NULL) {
log_error("ipc_read: non-digit in input stream header");
return IPC_ERROR;
}
}
return atoll(header);
}
case IPC_DATA: {
unsigned char* p = (unsigned char*)buffer;
while (size > 0) {
this->relinquish();
ssize_t rc = this->socket_read(p, size);
if (rc == (ssize_t)IPC_ERROR) {
log_error("ipc_read: fail to read data");
return IPC_ERROR;
}
p += rc;
size -= rc;
}
return (size_t)(p - (const unsigned char *)buffer);
}
case IPC_FULL: {
size_t proposed = this->_ipc_read(nullptr, 0, IPC_HEAD);
if (proposed == IPC_ERROR) {
log_error("ipc_read: Cannot fetch header");
return IPC_ERROR;
}
proposed = (proposed < size) ? proposed : size;
if (this->_ipc_read(buffer, proposed, IPC_DATA) != proposed) {
log_error("ipc_read: Cannot fetch data");
return IPC_ERROR;
}
this->close_peer(&this->input);
return proposed;
}
default:
this->close_peer(&this->input);
ASSERT(0, "Bad 'how' in ipc_read");
}
errout:
exit(99);
}
size_t _ipc_write(const void *buffer, size_t size, ipc_how_t how)
{
C_ASSERT("accept",
this->try_accept(&this->output),
errout, "ipc_write");
switch (how) {
case IPC_HEAD: {
char header[32];
snprintf(header, sizeof(header), "%lu\n", size);
ssize_t len = strlen(header);
std::cout
<< "ipc_write(HEAD," << size << ") h='" << header
<< "' len=" << len
<< std::endl;
this->relinquish();
ASSERT(this->socket_write(header, len) == len,
"ipc_write: Cannot send header");
return len;
}
case IPC_DATA: {
const unsigned char *p = (const unsigned char*)buffer;
while (size > 0) {
this->relinquish();
ssize_t rc = this->socket_write(p, size);
if (rc == (ssize_t)IPC_ERROR) {
log_error("ipc_write: fail to read data");
return IPC_ERROR;
}
p += rc;
size -= rc;
}
return (size_t)(p - (const unsigned char *)buffer);
}
case IPC_FULL: {
ASSERT(this->_ipc_write(nullptr, size, IPC_HEAD) != IPC_ERROR,
"ipc_write: Cannot send header");
ASSERT(this->_ipc_write(buffer, size, IPC_DATA) == size,
"ipc_write: Cannot send data");
this->close_peer(&this->output);
return size;
}
default:
this->close_peer(&this->output);
ASSERT(0, "Bad 'how' in ipc_write");
}
errout:
exit(99);
}
static void make_socket(struct io_socket *s, const char *path, int backlog)
{
struct stat st;
int rc;
s->socket_name.sun_family = AF_UNIX;
strncpy(s->socket_name.sun_path, path, sizeof(s->socket_name.sun_path));
s->listen_socket = daemon_io::NONE;
s->peer_socket = daemon_io::NONE;
s->backlog = backlog;
if (access(s->socket_name.sun_path, F_OK) == 0)
C_ASSERT("unlink",
unlink(s->socket_name.sun_path) == 0,
errout,
s->socket_name.sun_path);
s->listen_socket = socket(AF_UNIX, SOCK_STREAM, 0);
C_ASSERT("socket", s->listen_socket >= 0,
errout, "No daemon_io.input.socket");
C_ASSERT("fcntl",
fcntl(s->listen_socket, F_SETFD, O_NONBLOCK) == 0,
errout,
"Cannot fcntl on daemon_io.input.socket");
C_ASSERT("bind",
bind(s->listen_socket,
(struct sockaddr *)&s->socket_name,
sizeof(s->socket_name)) == 0,
errout,
s->socket_name.sun_path);
C_ASSERT("stat",
stat(s->socket_name.sun_path, &st) == 0,
errout,
s->socket_name.sun_path);
rc = chmod(s->socket_name.sun_path,
(st.st_mode | S_IWOTH) & ~S_IROTH);
C_ASSERT("chmod", rc == 0, errout, s->socket_name.sun_path);
rc = listen(s->listen_socket, s->backlog);
C_ASSERT("listen", rc == 0, errout, "input socket");
return;
errout:
log_error(path);
log_error("IPC setup failed");
}
public:
static const size_t IPC_ERROR = (size_t)(-1);
bool ok;
daemon_io(void)
{
this->ok = false;
log_info("daemon_io.input.socket");
this->make_socket(&this->input, "/tmp/licpltdtc.in", daemon_io::INPUT_BACKLOG);
log_info(this->input.socket_name.sun_path);
log_info("daemon_io.output.socket");
this->make_socket(&this->output, "/tmp/licpltdtc.out", daemon_io::OUTPUT_BACKLOG);
log_info(this->output.socket_name.sun_path);
this->ok = true;
log_info("IPC setup completed");
system("ls -la /tmp/licpltdtc.*"); // XXX
}
~daemon_io(void)
{
this->drop_sock(&this->output);
this->drop_sock(&this->input);
log_warn("daemon_io destroyed");
}
inline size_t ipc_read(void *buffer, size_t size)
{
return this->_ipc_read(buffer, size, IPC_FULL);
}
inline size_t ipc_write(const void *buffer, size_t size)
{
return this->_ipc_write(buffer, size, IPC_FULL);
}
};
/* ************************************************************************** */
#include "model-20181018/model_512x512.20181018.uff.inc"
// included file must define initialized
// size_t uff_size; // size of the model in bytes
// uint8_t uff_data[]; // the model itself
// char uff_input[]; // name of the INPUT node
// char uff_output[]; // name of the OUTPUT node
/* ************************************************************************** *
*
* class execution_context
*/
class execution_context {
private:
const size_t NN_SIZE = 512; // 256
const size_t INPUT_H = NN_SIZE;
const size_t INPUT_W = NN_SIZE;
const int MAX_BATCH_SIZE = 1;
const size_t MAX_WORKSPACE = 1 << 30;
nvinfer1::DataType NN_TYPE;
typedef float data_t; // for bindings' IO (== FP32)
IBuilder* builder;
IUffParser* parser;
INetworkDefinition* network;
daemon_io *ipc;
ICudaEngine* engine;
IExecutionContext* context;
int nbBindings;
int batchSize; // == 1; WTF?
int input_binding_index;
std::vector<std::pair<int64_t, DataType>> sizes;
std::vector<void*> buffers;
static inline size_t volume(const Dims& d)
{
size_t v = 1;
for (int i = 0; i < d.nbDims; i++)
v *= d.d[i];
return v;
}
inline size_t elementCount(int index)
{
return this->sizes[index].first;
}
inline DataType elementType(int index)
{
return this->sizes[index].second;
}
inline const char *elementTypeName(int index)
{
switch (this->elementType(index)) {
case DataType::kINT32: return "INT32";
case DataType::kFLOAT: return "FLOAT";
case DataType::kHALF: return "HALF";
case DataType::kINT8: return "INT8";
}
return "InVaLiD";
}
inline size_t elementSize(int index)
{
switch (this->elementType(index)) {
case DataType::kINT32: // Fallthrough, same as kFLOAT
case DataType::kFLOAT: return 4;
case DataType::kHALF: return 2;
case DataType::kINT8: return 1;
}
assert(0);
return 0;
}
inline size_t memSize(int index)
{
return this->elementCount(index) * this->elementSize(index);
}
static void* safeCudaMalloc(size_t size)
{
void* deviceMem = nullptr;
CHECK(cudaMalloc(&deviceMem, size));
ASSERT(deviceMem != nullptr, "Out of memory");
return deviceMem;
}
float run_once(int run) /* single turn (single image) of NN inference */
{
THE_CPP_TIME t_start, t_end;
t_start = stamp();
this->context->execute(this->batchSize, &this->buffers[0]);
t_end = stamp();
float ms = stamp_delta(t_start, t_end);
std::cout << "Run " << run << " took " << ms / 1.E3 << " seconds" << std::endl;
return ms;
}
data_t* readImageFile(int index)
{
log_info("reading image ...");
size_t mem_size = this->memSize(index);
size_t count = this->elementCount(index);
data_t *buffer = new data_t[count];
auto t1 = stamp();
size_t rc = this->ipc->ipc_read(buffer, mem_size);
auto t2 = stamp();
std::cout << "read " << (rc == daemon_io::IPC_ERROR ? -1 : rc)
<< " of " << mem_size << " bytes in " << stamp_delta(t1, t2) << " ms"
<< std::endl;
ASSERT(rc != daemon_io::IPC_ERROR, "Failed to read image - aborting");
log_info("image taken");
return buffer;
}
void createInputCudaBuffer(int run)
{
const int index = this->input_binding_index;
assert(index >= 0 && index < this->nbBindings);
size_t mem_size = this->memSize(index);
void* deviceMem = this->safeCudaMalloc(mem_size);
data_t* data_in = this->readImageFile(index);
this->saveBinary(run, index, "in", data_in);
CHECK(cudaMemcpy(deviceMem, data_in, mem_size,
cudaMemcpyHostToDevice));
delete[] data_in;
this->buffers[index] = deviceMem;
}
inline void dropCudaBuffer(int index)
{
assert(index >= 0 && index < this->nbBindings);
if (this->buffers[index] != nullptr) {
CHECK(cudaFree(this->buffers[index]));
this->buffers[index] = nullptr;
}
}
inline void dropInputCudaBuffer(void)
{
this->dropCudaBuffer(this->input_binding_index);
}
void printBinding(int index)
{
std::cout
<< "[" << index << "] "
<< (this->engine->bindingIsInput(index) ? "INPUT" : "OUTPUT")
<< " " << this->elementTypeName(index)
<< " " << this->engine->getBindingName(index)
<< "[" << this->elementCount(index) << "];"
<< " batchSize= " << this->batchSize
<< " expect= " << (INPUT_H * INPUT_W * this->batchSize)
<< " type='" << this->elementTypeName(index)
<< "':" << this->elementSize(index)
<< " sizeof(data_t)= " << sizeof(data_t)
<< " datasize= " << this->memSize(index)
<< std::endl;
}
void saveImageFile(const void* buffer, size_t size)
{
log_info("writing image ...");
auto t1 = stamp();
size_t rc = this->ipc->ipc_write(buffer, size);
auto t2 = stamp();
std::cout << "written " << (rc == daemon_io::IPC_ERROR ? -1 : rc)
<< " of " << size << " bytes"
<< " in " << stamp_delta(t1, t2) << " ms"
<< std::endl;
if (rc == daemon_io::IPC_ERROR)
log_warn("Failed to write image");
else
log_info("image saved");
}
inline data_t* fetch(int index)
{
data_t* data_out = new data_t[this->elementCount(index)];
CHECK(cudaMemcpy(data_out, this->buffers[index], this->memSize(index),
cudaMemcpyDeviceToHost));
return data_out;
}
void saveBinary(int run, int index, const char* direction, const void *data)
{
assert(index >= 0 && index < this->nbBindings);
size_t mem_size = this->memSize(index);
std::string filename = "/tmp/";
filename += std::to_string(run);
filename += "-";
filename += direction;
filename += "-";
filename += std::to_string(index);
filename += ".bin";
FILE *fp = fopen(filename.c_str(), "wb");
ASSERT(fp != NULL, "fopen");
ASSERT(fwrite(data, 1, mem_size, fp) == mem_size, "fwrite");
fclose(fp);
}
void printOneOutput(int index, int run)
{
std::cout << "--- OUTPUT[" << index << "] ---" << std::endl;
data_t* data_out = this->fetch(index);
this->saveBinary(run, index, "out", data_out);
this->saveImageFile(data_out, this->memSize(index));
delete[] data_out;
std::cout << std::endl;
}
void printOutput(int run)
{
for (int i = 0; i < this->nbBindings; ++i) {
if (!this->engine->bindingIsInput(i))
this->printOneOutput(i, run);
}
}
bool build_builder(void)
{
ASSERT(this->builder = nvinfer1::createInferBuilder(gLogger),
"Cannot create builder");
this->builder->setMaxBatchSize(this->MAX_BATCH_SIZE);
this->builder->setMaxWorkspaceSize(this->MAX_WORKSPACE);
switch(this->NN_TYPE) {
case nvinfer1::DataType::kINT8:
log_warn("Running INT8 mode");
this->builder->setInt8Mode(true);
break;
case nvinfer1::DataType::kINT32:
log_warn("Running INT32 mode");
break;
case nvinfer1::DataType::kFLOAT:
log_warn("Running FP32 mode");
break;
case nvinfer1::DataType::kHALF:
log_warn("Enabling FP16 mode");
this->builder->setFp16Mode(true);
break;
default:
log_warn("Running BAD mode");
return false;
}
std::string message = "Builder config: ";
message += "Half2:";
message += (this->builder->getHalf2Mode() ? "ON" : "off");
message += "; DebugSync:";
message += (this->builder->getDebugSync() ? "ON": "off");
message += "; platformHasFastFp16:";
message += (this->builder->platformHasFastFp16() ? "ON": "off");
message += "; platformHasFastInt8:";
message += (this->builder->platformHasFastInt8() ? "ON": "off");
message += "; Int8:";
message += (this->builder->getInt8Mode() ? "ON": "off");
message += "; Fp16:";
message += (this->builder->getFp16Mode() ? "ON": "off");
log_warn(message.c_str());
return true;
}
bool build_network(void)
{
this->network = this->builder ? this->builder->createNetwork() : nullptr;
return this->network != nullptr;
}
bool build_parser(void)
{
this->parser = nvuffparser::createUffParser();
if (this->parser) {
X_ASSERT(this->parser->registerInput(uff_input,
DimsCHW(1,
this->INPUT_H,
this->INPUT_W),
UffInputOrder::kNCHW),
errout, "Failed to register input");
X_ASSERT(this->parser->registerOutput(uff_output),
errout, "Failed to register output");
}
return true;
errout:
this->cleanup();
return false;
}
inline bool parseBuiltin(nvinfer1::INetworkDefinition& network, nvinfer1::DataType type)
{
return this->parser->parseBuffer((const char*)uff_data, uff_size, network, type);
}
inline void cleanup(void)
{
/* these are unneeded for normal operations */
if (this->parser) { this->parser->destroy(); this->parser = nullptr; }
if (this->network) { this->network->destroy(); this->network = nullptr; }
if (this->builder) { this->builder->destroy(); this->builder = nullptr; }
}
public:
bool engine_ok;
bool ipc_ok;
execution_context(nvinfer1::DataType type = nvinfer1::DataType::kHALF)
{
this->engine_ok = false;
this->ipc_ok = false;
switch (type) {
case DataType::kINT32:
case DataType::kFLOAT:
case DataType::kHALF:
case DataType::kINT8:
this->NN_TYPE = type;
break;
default:
log_error("Invalid data type");
exit(102);
}
this->builder = nullptr;
this->parser = nullptr;
this->network = nullptr;
this->engine = nullptr;
this->context = nullptr;
this->nbBindings = 0;
this->input_binding_index = -1;
this->batchSize = 1;
this->ipc = new daemon_io();
this->ipc_ok = this->ipc->ok;
log_warn("blank execution context created");
}
~execution_context(void)
{
log_warn("removing execution context...");
if (this->ipc) delete this->ipc;
this->cleanup();
for (int i = 0; i < this->nbBindings; i++) {
this->dropCudaBuffer(i);
}
if (this->context) this->context->destroy();
if (this->engine) this->engine->destroy();
log_warn("execution context removed");
}
bool build_engine(void)
{
THE_CPP_TIME t_start, t_stop;
float ms;
std::string message = "Engine looks ok";
X_ASSERT(this->build_builder(), errout, "Cannot create builder");
X_ASSERT(this->build_parser(), errout, "Cannot create parser");
X_ASSERT(this->build_network(), errout, "Cannot create network");
log_info("About to parse the model");
T_ASSERT(this->parseBuiltin(*this->network, this->NN_TYPE),
errout, "Fail to parse", t_start, t_stop);
ms = stamp_delta(t_start, t_stop);
std::cout << "The model has been parsed ok in " << ms / 1.E3 << " seconds" << std::endl;
log_info("Building engine...");
T_ASSERT(this->engine = builder->buildCudaEngine(*network),
errout, "Unable to create engine", t_start, t_stop);
ms = stamp_delta(t_start, t_stop);
std::cout << "The engine has been built ok in " << ms / 1.E3 << " seconds" << std::endl;
this->context = this->engine->createExecutionContext();
if (this->context == nullptr) {
message = "Failed to create context";
this->engine->destroy();
this->engine = nullptr;
}
this->engine_ok = this->engine != nullptr;
errout:
std::string error_message = "sample_uff_mnist: " + std::string(message);
(this->engine_ok ? log_info : log_error)(error_message.c_str());
this->cleanup();
return this->engine_ok;
}
void allocate_bindings(void)
{
log_info("allocating bindings...");
/* OUTPUTs are allocated once, while INPUTs are allocated on each turn */
std::cout << "DeviceMemorySize=" << this->engine->getDeviceMemorySize() << std::endl;
this->nbBindings = this->engine->getNbBindings();
ASSERT(this->nbBindings == 2, "Num of bindings != 2");
for (int i = 0; i < this->nbBindings; ++i)
{
std::cout << "binding #" << i << std::endl;
/* calculateBindingBufferSizes */
Dims dims = this->engine->getBindingDimensions(i);
DataType dtype = this->engine->getBindingDataType(i);
size_t count = volume(dims) * this->batchSize;
this->sizes.push_back(std::make_pair(count, dtype));
this->printBinding(i);
ASSERT(this->elementSize(i) == sizeof(data_t), "Wrong element type");
/* in that specific case, eltCount == INPUT_H * INPUT_W */
ASSERT(count == this->INPUT_H * this->INPUT_W, "Bad array size");
void *mem_ptr;
if (this->engine->bindingIsInput(i)) {
log_info("found input binding");
if (this->input_binding_index < 0) {
this->input_binding_index = i;
}
mem_ptr = nullptr;
} else {
log_info("found output binding");
mem_ptr = this->safeCudaMalloc(this->memSize(i));
}
this->buffers.push_back(mem_ptr);
}
log_info("bindings have been allocated");
}
void execute_engine(void) /* do the job */
{
log_info("About to execute the model");
float total = 0.0;
int run;
for (run = 0; ; run++) {
std::cout << "\n--------------------------- " << run << " -------" << std::endl;
this->createInputCudaBuffer(run);
total += this->run_once(run);
this->printOutput(run);
this->dropInputCudaBuffer();
if ((run + 1) % 100 == 0) {
std::cout
<< "Average over " << (run + 1) << " runs is "
<< (total / run) << " ms."
<< std::endl;
}
}
total /= run;
std::cout << "Average over " << run << " runs is " << total << " ms." << std::endl;
}
}; // end of class execution_context
/* ************************************************************************** */
static execution_context* xctx = new execution_context(nvinfer1::DataType::kFLOAT);
static void cleanup(void)
{
log_info("Cleanup...");
delete xctx;
shutdownProtobufLibrary();
log_info("Done.");
}
static int go_daemonic(void)
{
return 0;
/*
errout:
return -1;
*/
}
static void on_signal(int sig)
{
switch (sig) {
case SIGINT:
log_warn("<INTERRUPT>");
exit(3);
default:
log_error("unknown signal");
exit(100);
}
}
int main(int argc, char** argv)
{
C_ASSERT("atexit", atexit(cleanup) == 0,
errout, "Cannot register cleanup");
C_ASSERT("signal", signal(SIGINT, on_signal) != SIG_ERR,
errout, "Cannot register signal handler");
ASSERT(xctx->ipc_ok, "Cannot setup IPC");
log_level(ILogger::Severity::kWARNING);
// build_engine() is way too verbose at INFO level...
ASSERT(xctx->build_engine(), "Failed to create engine");
log_level(ILogger::Severity::kINFO);
ASSERT(go_daemonic() == 0, "Cannot become a daemon");
xctx->allocate_bindings();
xctx->execute_engine();
return EXIT_SUCCESS;
errout:
return EXIT_FAILURE;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment