Created
November 30, 2022 21:07
-
-
Save bionicbeagle/4891eba6279ead5db5f501a60ff2b194 to your computer and use it in GitHub Desktop.
nng 1.5.2 patch to enable runtime configuration of thread pools
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/include/nng/nng.h b/include/nng/nng.h | |
index cd75495a..1c6b7a61 100644 | |
--- a/include/nng/nng.h | |
+++ b/include/nng/nng.h | |
@@ -1179,6 +1179,11 @@ NNG_DECL int nng_stream_listener_set_ptr( | |
NNG_DECL int nng_stream_listener_set_addr( | |
nng_stream_listener *, const char *, const nng_sockaddr *); | |
+#define NNG_CAN_LIMIT_THREADS | |
+NNG_DECL void nng_set_ncpu_max(int); | |
+NNG_DECL void nng_set_pool_thread_limit_min(int); | |
+NNG_DECL void nng_set_pool_thread_limit_max(int); | |
+NNG_DECL void nng_set_resolve_thread_max(int); | |
#ifndef NNG_ELIDE_DEPRECATED | |
// These are legacy APIs that have been deprecated. | |
diff --git a/src/core/aio.c b/src/core/aio.c | |
index dfab8f60..edfc1012 100644 | |
--- a/src/core/aio.c | |
+++ b/src/core/aio.c | |
@@ -796,8 +796,8 @@ nni_aio_sys_init(void) | |
#else | |
num_thr = NNG_EXPIRE_THREADS; | |
#endif | |
- if (num_thr > 256) { | |
- num_thr = 256; | |
+ if (num_thr > nni_thr_get_pool_thread_limit_max()) { | |
+ num_thr = nni_thr_get_pool_thread_limit_max(); | |
} | |
nni_aio_expire_q_list = | |
diff --git a/src/core/platform.h b/src/core/platform.h | |
index f1127c5b..40bd8637 100644 | |
--- a/src/core/platform.h | |
+++ b/src/core/platform.h | |
@@ -356,6 +356,9 @@ extern int nni_parse_ip(const char *, nng_sockaddr *); | |
// nni_parse_ip_port parses an IP address with an optional port appended. | |
extern int nni_parse_ip_port(const char *, nng_sockaddr *); | |
+// nni_set_resolve_thread_max is used to configure the resolve thread pool | |
+extern void nni_set_resolve_thread_max(int); | |
+ | |
// | |
// IPC (UNIX Domain Sockets & Named Pipes) Support. | |
// | |
diff --git a/src/core/taskq.c b/src/core/taskq.c | |
index e06bc264..af5c0356 100644 | |
--- a/src/core/taskq.c | |
+++ b/src/core/taskq.c | |
@@ -242,6 +242,9 @@ nni_taskq_sys_init(void) | |
#else | |
nthrs = NNG_NUM_TASKQ_THREADS; | |
#endif | |
+ if (nthrs > nni_thr_get_pool_thread_limit_max()) { | |
+ nthrs = nni_thr_get_pool_thread_limit_max(); | |
+ } | |
#if NNG_MAX_TASKQ_THREADS > 0 | |
if (nthrs > NNG_MAX_TASKQ_THREADS) { | |
nthrs = NNG_MAX_TASKQ_THREADS; | |
diff --git a/src/core/thread.c b/src/core/thread.c | |
index 6f50476a..0866bfd7 100644 | |
--- a/src/core/thread.c | |
+++ b/src/core/thread.c | |
@@ -172,4 +172,46 @@ void | |
nni_thr_set_name(nni_thr *thr, const char *name) | |
{ | |
nni_plat_thr_set_name(thr != NULL ? &thr->thr : NULL, name); | |
-} | |
\ No newline at end of file | |
+} | |
+ | |
+static int nni_ncpu_max = 256; | |
+static int nni_pool_thread_limit_min = 2; | |
+static int nni_pool_thread_limit_max = 64; | |
+ | |
+void | |
+nni_thr_set_ncpu_max(int limit) | |
+{ | |
+ nni_ncpu_max = limit; | |
+} | |
+ | |
+int | |
+nni_thr_get_ncpu_max() | |
+{ | |
+ return nni_ncpu_max; | |
+} | |
+ | |
+// nni_set_ncpu_max can be used to limit how many threads nng tries to use | |
+// for a single thread pool | |
+void | |
+nni_thr_set_pool_thread_limit_min(int limit) | |
+{ | |
+ nni_pool_thread_limit_min = limit; | |
+} | |
+ | |
+int | |
+nni_thr_get_pool_thread_limit_min() | |
+{ | |
+ return nni_pool_thread_limit_min; | |
+} | |
+ | |
+void | |
+nni_thr_set_pool_thread_limit_max(int limit) | |
+{ | |
+ nni_pool_thread_limit_max = limit; | |
+} | |
+ | |
+int | |
+nni_thr_get_pool_thread_limit_max() | |
+{ | |
+ return nni_pool_thread_limit_max; | |
+} | |
diff --git a/src/core/thread.h b/src/core/thread.h | |
index 316acdc2..c6c4c6fb 100644 | |
--- a/src/core/thread.h | |
+++ b/src/core/thread.h | |
@@ -82,4 +82,16 @@ extern bool nni_thr_is_self(nni_thr *thr); | |
// nni_thr_set_name is used to set a short name for the thread. | |
extern void nni_thr_set_name(nni_thr *thr, const char *); | |
+// nni_set_ncpu_max can be used to limit how many threads nng tries to use | |
+extern void nni_thr_set_ncpu_max(int); | |
+extern int nni_thr_get_ncpu_max(); | |
+ | |
+// nni_set_ncpu_max can be used to limit how many threads nng tries to use | |
+// for a single thread pool | |
+extern void nni_thr_set_pool_thread_limit_min(int); | |
+extern int nni_thr_get_pool_thread_limit_min(); | |
+ | |
+extern void nni_thr_set_pool_thread_limit_max(int); | |
+extern int nni_thr_get_pool_thread_limit_max(); | |
+ | |
#endif // CORE_THREAD_H | |
diff --git a/src/nng.c b/src/nng.c | |
index 1ccc1386..3383eedb 100644 | |
--- a/src/nng.c | |
+++ b/src/nng.c | |
@@ -1900,3 +1900,27 @@ nng_version(void) | |
return (xstr(NNG_MAJOR_VERSION) "." xstr(NNG_MINOR_VERSION) "." xstr( | |
NNG_PATCH_VERSION) NNG_RELEASE_SUFFIX); | |
} | |
+ | |
+void | |
+nng_set_ncpu_max(int limit) | |
+{ | |
+ nni_thr_set_ncpu_max(limit); | |
+} | |
+ | |
+void | |
+nng_set_pool_thread_limit_min(int limit) | |
+{ | |
+ nni_thr_set_pool_thread_limit_min(limit); | |
+} | |
+ | |
+void | |
+nng_set_pool_thread_limit_max(int limit) | |
+{ | |
+ nni_thr_set_pool_thread_limit_max(limit); | |
+} | |
+ | |
+void | |
+nng_set_resolve_thread_max(int limit) | |
+{ | |
+ nni_set_resolve_thread_max(limit); | |
+} | |
\ No newline at end of file | |
diff --git a/src/platform/posix/posix_resolv_gai.c b/src/platform/posix/posix_resolv_gai.c | |
index 83974b3d..8e1878b2 100644 | |
--- a/src/platform/posix/posix_resolv_gai.c | |
+++ b/src/platform/posix/posix_resolv_gai.c | |
@@ -440,6 +440,14 @@ nni_parse_ip_port(const char *addr, nni_sockaddr *sa) | |
return (parse_ip(addr, sa, true)); | |
} | |
+static int nng_resolv_concurrency = NNG_RESOLV_CONCURRENCY; | |
+ | |
+void | |
+nni_set_resolve_thread_max(int limit) | |
+{ | |
+ nng_resolv_concurrency = limit; | |
+} | |
+ | |
int | |
nni_posix_resolv_sysinit(void) | |
{ | |
diff --git a/src/platform/posix/posix_thread.c b/src/platform/posix/posix_thread.c | |
index c47bcec6..8d0788f4 100644 | |
--- a/src/platform/posix/posix_thread.c | |
+++ b/src/platform/posix/posix_thread.c | |
@@ -444,14 +444,21 @@ nni_plat_fini(void) | |
int | |
nni_plat_ncpu(void) | |
{ | |
+ int system_proc_count = 1; | |
+ int proc_limit = nni_thr_get_ncpu_max(); | |
+ | |
// POSIX specifies sysconf exists, but not the value | |
// _SC_NPROCESSORS_ONLN. Nonetheless, everybody implements it. | |
// If you don't we'll assume you only have a single logical CPU. | |
#ifdef _SC_NPROCESSORS_ONLN | |
- return (sysconf(_SC_NPROCESSORS_ONLN)); | |
-#else | |
- return (1); | |
+ system_proc_count = (sysconf(_SC_NPROCESSORS_ONLN)); | |
#endif | |
+ | |
+ if (system_proc_count > proc_limit) { | |
+ return proc_limit; | |
+ } else { | |
+ return system_proc_count; | |
+ } | |
} | |
#endif // NNG_PLATFORM_POSIX | |
diff --git a/src/platform/windows/win_io.c b/src/platform/windows/win_io.c | |
index 489dc01a..17a047cc 100644 | |
--- a/src/platform/windows/win_io.c | |
+++ b/src/platform/windows/win_io.c | |
@@ -92,11 +92,11 @@ nni_win_io_sysinit(void) | |
int nthr = nni_plat_ncpu() * 2; | |
// Limits on the thread count. This is fairly arbitrary. | |
- if (nthr < 4) { | |
- nthr = 4; | |
+ if (nthr < nni_thr_get_pool_thread_limit_min()) { | |
+ nthr = nni_thr_get_pool_thread_limit_min(); | |
} | |
- if (nthr > 64) { | |
- nthr = 64; | |
+ if (nthr > nni_thr_get_pool_thread_limit_max()) { | |
+ nthr = nni_thr_get_pool_thread_limit_max(); | |
} | |
if ((win_io_thrs = NNI_ALLOC_STRUCTS(win_io_thrs, nthr)) == NULL) { | |
return (NNG_ENOMEM); | |
diff --git a/src/platform/windows/win_resolv.c b/src/platform/windows/win_resolv.c | |
index 8628719f..e41a999b 100644 | |
--- a/src/platform/windows/win_resolv.c | |
+++ b/src/platform/windows/win_resolv.c | |
@@ -405,6 +405,14 @@ nni_parse_ip_port(const char *addr, nni_sockaddr *sa) | |
return (parse_ip(addr, sa, true)); | |
} | |
+static int nng_resolv_concurrency = NNG_RESOLV_CONCURRENCY; | |
+ | |
+void | |
+nni_set_resolve_thread_max(int limit) | |
+{ | |
+ nng_resolv_concurrency = limit; | |
+} | |
+ | |
int | |
nni_win_resolv_sysinit(void) | |
{ | |
@@ -413,7 +421,7 @@ nni_win_resolv_sysinit(void) | |
nni_aio_list_init(&resolv_aios); | |
resolv_fini = false; | |
- for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) { | |
+ for (int i = 0; i < nng_resolv_concurrency; i++) { | |
int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL); | |
if (rv != 0) { | |
nni_win_resolv_sysfini(); | |
diff --git a/src/platform/windows/win_thread.c b/src/platform/windows/win_thread.c | |
index dc9ed12a..73ae2ecf 100644 | |
--- a/src/platform/windows/win_thread.c | |
+++ b/src/platform/windows/win_thread.c | |
@@ -393,9 +393,17 @@ int | |
nni_plat_ncpu(void) | |
{ | |
SYSTEM_INFO info; | |
+ int ncpu_max = nni_thr_get_ncpu_max(); | |
+ int n = 0; | |
GetSystemInfo(&info); | |
- return ((int) (info.dwNumberOfProcessors)); | |
+ n = ((int) (info.dwNumberOfProcessors)); | |
+ | |
+ if (n > ncpu_max) { | |
+ return ncpu_max; | |
+ } else { | |
+ return n; | |
+ } | |
} | |
int |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment