Skip to content

Instantly share code, notes, and snippets.

@bionicbeagle
Created November 30, 2022 21:07
Show Gist options
  • Save bionicbeagle/4891eba6279ead5db5f501a60ff2b194 to your computer and use it in GitHub Desktop.
Save bionicbeagle/4891eba6279ead5db5f501a60ff2b194 to your computer and use it in GitHub Desktop.
nng 1.5.2 patch to enable runtime configuration of thread pools
diff --git a/include/nng/nng.h b/include/nng/nng.h
index cd75495a..1c6b7a61 100644
--- a/include/nng/nng.h
+++ b/include/nng/nng.h
@@ -1179,6 +1179,11 @@ NNG_DECL int nng_stream_listener_set_ptr(
NNG_DECL int nng_stream_listener_set_addr(
nng_stream_listener *, const char *, const nng_sockaddr *);
+#define NNG_CAN_LIMIT_THREADS
+NNG_DECL void nng_set_ncpu_max(int);
+NNG_DECL void nng_set_pool_thread_limit_min(int);
+NNG_DECL void nng_set_pool_thread_limit_max(int);
+NNG_DECL void nng_set_resolve_thread_max(int);
#ifndef NNG_ELIDE_DEPRECATED
// These are legacy APIs that have been deprecated.
diff --git a/src/core/aio.c b/src/core/aio.c
index dfab8f60..edfc1012 100644
--- a/src/core/aio.c
+++ b/src/core/aio.c
@@ -796,8 +796,8 @@ nni_aio_sys_init(void)
#else
num_thr = NNG_EXPIRE_THREADS;
#endif
- if (num_thr > 256) {
- num_thr = 256;
+ if (num_thr > nni_thr_get_pool_thread_limit_max()) {
+ num_thr = nni_thr_get_pool_thread_limit_max();
}
nni_aio_expire_q_list =
diff --git a/src/core/platform.h b/src/core/platform.h
index f1127c5b..40bd8637 100644
--- a/src/core/platform.h
+++ b/src/core/platform.h
@@ -356,6 +356,9 @@ extern int nni_parse_ip(const char *, nng_sockaddr *);
// nni_parse_ip_port parses an IP address with an optional port appended.
extern int nni_parse_ip_port(const char *, nng_sockaddr *);
+// nni_set_resolve_thread_max is used to configure the resolve thread pool
+extern void nni_set_resolve_thread_max(int);
+
//
// IPC (UNIX Domain Sockets & Named Pipes) Support.
//
diff --git a/src/core/taskq.c b/src/core/taskq.c
index e06bc264..af5c0356 100644
--- a/src/core/taskq.c
+++ b/src/core/taskq.c
@@ -242,6 +242,9 @@ nni_taskq_sys_init(void)
#else
nthrs = NNG_NUM_TASKQ_THREADS;
#endif
+ if (nthrs > nni_thr_get_pool_thread_limit_max()) {
+ nthrs = nni_thr_get_pool_thread_limit_max();
+ }
#if NNG_MAX_TASKQ_THREADS > 0
if (nthrs > NNG_MAX_TASKQ_THREADS) {
nthrs = NNG_MAX_TASKQ_THREADS;
diff --git a/src/core/thread.c b/src/core/thread.c
index 6f50476a..0866bfd7 100644
--- a/src/core/thread.c
+++ b/src/core/thread.c
@@ -172,4 +172,46 @@ void
nni_thr_set_name(nni_thr *thr, const char *name)
{
nni_plat_thr_set_name(thr != NULL ? &thr->thr : NULL, name);
-}
\ No newline at end of file
+}
+
+static int nni_ncpu_max = 256;
+static int nni_pool_thread_limit_min = 2;
+static int nni_pool_thread_limit_max = 64;
+
+void
+nni_thr_set_ncpu_max(int limit)
+{
+ nni_ncpu_max = limit;
+}
+
+int
+nni_thr_get_ncpu_max()
+{
+ return nni_ncpu_max;
+}
+
+// nni_set_ncpu_max can be used to limit how many threads nng tries to use
+// for a single thread pool
+void
+nni_thr_set_pool_thread_limit_min(int limit)
+{
+ nni_pool_thread_limit_min = limit;
+}
+
+int
+nni_thr_get_pool_thread_limit_min()
+{
+ return nni_pool_thread_limit_min;
+}
+
+void
+nni_thr_set_pool_thread_limit_max(int limit)
+{
+ nni_pool_thread_limit_max = limit;
+}
+
+int
+nni_thr_get_pool_thread_limit_max()
+{
+ return nni_pool_thread_limit_max;
+}
diff --git a/src/core/thread.h b/src/core/thread.h
index 316acdc2..c6c4c6fb 100644
--- a/src/core/thread.h
+++ b/src/core/thread.h
@@ -82,4 +82,16 @@ extern bool nni_thr_is_self(nni_thr *thr);
// nni_thr_set_name is used to set a short name for the thread.
extern void nni_thr_set_name(nni_thr *thr, const char *);
+// nni_set_ncpu_max can be used to limit how many threads nng tries to use
+extern void nni_thr_set_ncpu_max(int);
+extern int nni_thr_get_ncpu_max();
+
+// nni_set_ncpu_max can be used to limit how many threads nng tries to use
+// for a single thread pool
+extern void nni_thr_set_pool_thread_limit_min(int);
+extern int nni_thr_get_pool_thread_limit_min();
+
+extern void nni_thr_set_pool_thread_limit_max(int);
+extern int nni_thr_get_pool_thread_limit_max();
+
#endif // CORE_THREAD_H
diff --git a/src/nng.c b/src/nng.c
index 1ccc1386..3383eedb 100644
--- a/src/nng.c
+++ b/src/nng.c
@@ -1900,3 +1900,27 @@ nng_version(void)
return (xstr(NNG_MAJOR_VERSION) "." xstr(NNG_MINOR_VERSION) "." xstr(
NNG_PATCH_VERSION) NNG_RELEASE_SUFFIX);
}
+
+void
+nng_set_ncpu_max(int limit)
+{
+ nni_thr_set_ncpu_max(limit);
+}
+
+void
+nng_set_pool_thread_limit_min(int limit)
+{
+ nni_thr_set_pool_thread_limit_min(limit);
+}
+
+void
+nng_set_pool_thread_limit_max(int limit)
+{
+ nni_thr_set_pool_thread_limit_max(limit);
+}
+
+void
+nng_set_resolve_thread_max(int limit)
+{
+ nni_set_resolve_thread_max(limit);
+}
\ No newline at end of file
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c
index 83974b3d..8e1878b2 100644
--- a/src/platform/posix/posix_resolv_gai.c
+++ b/src/platform/posix/posix_resolv_gai.c
@@ -440,6 +440,14 @@ nni_parse_ip_port(const char *addr, nni_sockaddr *sa)
return (parse_ip(addr, sa, true));
}
+static int nng_resolv_concurrency = NNG_RESOLV_CONCURRENCY;
+
+void
+nni_set_resolve_thread_max(int limit)
+{
+ nng_resolv_concurrency = limit;
+}
+
int
nni_posix_resolv_sysinit(void)
{
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c
index c47bcec6..8d0788f4 100644
--- a/src/platform/posix/posix_thread.c
+++ b/src/platform/posix/posix_thread.c
@@ -444,14 +444,21 @@ nni_plat_fini(void)
int
nni_plat_ncpu(void)
{
+ int system_proc_count = 1;
+ int proc_limit = nni_thr_get_ncpu_max();
+
// POSIX specifies sysconf exists, but not the value
// _SC_NPROCESSORS_ONLN. Nonetheless, everybody implements it.
// If you don't we'll assume you only have a single logical CPU.
#ifdef _SC_NPROCESSORS_ONLN
- return (sysconf(_SC_NPROCESSORS_ONLN));
-#else
- return (1);
+ system_proc_count = (sysconf(_SC_NPROCESSORS_ONLN));
#endif
+
+ if (system_proc_count > proc_limit) {
+ return proc_limit;
+ } else {
+ return system_proc_count;
+ }
}
#endif // NNG_PLATFORM_POSIX
diff --git a/src/platform/windows/win_io.c b/src/platform/windows/win_io.c
index 489dc01a..17a047cc 100644
--- a/src/platform/windows/win_io.c
+++ b/src/platform/windows/win_io.c
@@ -92,11 +92,11 @@ nni_win_io_sysinit(void)
int nthr = nni_plat_ncpu() * 2;
// Limits on the thread count. This is fairly arbitrary.
- if (nthr < 4) {
- nthr = 4;
+ if (nthr < nni_thr_get_pool_thread_limit_min()) {
+ nthr = nni_thr_get_pool_thread_limit_min();
}
- if (nthr > 64) {
- nthr = 64;
+ if (nthr > nni_thr_get_pool_thread_limit_max()) {
+ nthr = nni_thr_get_pool_thread_limit_max();
}
if ((win_io_thrs = NNI_ALLOC_STRUCTS(win_io_thrs, nthr)) == NULL) {
return (NNG_ENOMEM);
diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c
index 8628719f..e41a999b 100644
--- a/src/platform/windows/win_resolv.c
+++ b/src/platform/windows/win_resolv.c
@@ -405,6 +405,14 @@ nni_parse_ip_port(const char *addr, nni_sockaddr *sa)
return (parse_ip(addr, sa, true));
}
+static int nng_resolv_concurrency = NNG_RESOLV_CONCURRENCY;
+
+void
+nni_set_resolve_thread_max(int limit)
+{
+ nng_resolv_concurrency = limit;
+}
+
int
nni_win_resolv_sysinit(void)
{
@@ -413,7 +421,7 @@ nni_win_resolv_sysinit(void)
nni_aio_list_init(&resolv_aios);
resolv_fini = false;
- for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) {
+ for (int i = 0; i < nng_resolv_concurrency; i++) {
int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL);
if (rv != 0) {
nni_win_resolv_sysfini();
diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c
index dc9ed12a..73ae2ecf 100644
--- a/src/platform/windows/win_thread.c
+++ b/src/platform/windows/win_thread.c
@@ -393,9 +393,17 @@ int
nni_plat_ncpu(void)
{
SYSTEM_INFO info;
+ int ncpu_max = nni_thr_get_ncpu_max();
+ int n = 0;
GetSystemInfo(&info);
- return ((int) (info.dwNumberOfProcessors));
+ n = ((int) (info.dwNumberOfProcessors));
+
+ if (n > ncpu_max) {
+ return ncpu_max;
+ } else {
+ return n;
+ }
}
int
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment