Created
March 17, 2013 13:49
-
-
Save kubo/5181580 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/ext/oci8/oci8.c b/ext/oci8/oci8.c | |
index 2de46e4..b10507e 100644 | |
--- a/ext/oci8/oci8.c | |
+++ b/ext/oci8/oci8.c | |
@@ -134,6 +134,11 @@ static void oci8_svcctx_init(oci8_base_t *base) | |
oci8_svcctx_t *svcctx = (oci8_svcctx_t *)base; | |
VALUE obj; | |
+#ifdef NATIVE_THREAD_WITH_GVL | |
+ svcctx->waiters.next = &svcctx->waiters; | |
+ svcctx->waiters.prev = &svcctx->waiters; | |
+ svcctx->wait_timeout = 10; /* 10 seconds */ | |
+#endif | |
svcctx->executing_thread = Qnil; | |
/* set session handle */ | |
obj = rb_obj_alloc(cSession); | |
diff --git a/ext/oci8/oci8.h b/ext/oci8/oci8.h | |
index 57e009d..86c90b0 100644 | |
--- a/ext/oci8/oci8.h | |
+++ b/ext/oci8/oci8.h | |
@@ -352,12 +352,22 @@ typedef struct oci8_temp_lob { | |
OCILobLocator *lob; | |
} oci8_temp_lob_t; | |
+typedef struct oci8_waiter { | |
+ struct oci8_waiter *next; | |
+ struct oci8_waiter *prev; | |
+ VALUE thread; | |
+} oci8_waiter_t; | |
+ | |
typedef struct oci8_svcctx { | |
oci8_base_t base; | |
volatile VALUE executing_thread; | |
const oci8_logoff_strategy_t *logoff_strategy; | |
OCISession *usrhp; | |
OCIServer *srvhp; | |
+#ifdef NATIVE_THREAD_WITH_GVL | |
+ oci8_waiter_t waiters; | |
+ int wait_timeout; | |
+#endif | |
rb_pid_t pid; | |
unsigned char state; | |
char is_autocommit; | |
diff --git a/ext/oci8/oci8lib.c b/ext/oci8/oci8lib.c | |
index 9075db1..012fb16 100644 | |
--- a/ext/oci8/oci8lib.c | |
+++ b/ext/oci8/oci8lib.c | |
@@ -212,6 +212,54 @@ void oci8_unlink_from_parent(oci8_base_t *base) | |
#ifdef NATIVE_THREAD_WITH_GVL | |
+static void check_concurrent_execution(oci8_svcctx_t *svcctx) | |
+{ | |
+ VALUE current_thread = rb_thread_current(); | |
+ | |
+ if (!NIL_P(svcctx->executing_thread)) { | |
+ if (svcctx->wait_timeout == 0) { | |
+ rb_raise(rb_eRuntimeError, "executing in another thread"); | |
+ } else { | |
+ oci8_waiter_t waiter; | |
+ struct timeval sleep_time; | |
+ struct timeval limit; | |
+ | |
+ sleep_time.tv_sec = svcctx->wait_timeout; | |
+ sleep_time.tv_usec = 0; | |
+ gettimeofday(&limit, NULL); | |
+ limit.tv_sec += svcctx->wait_timeout; | |
+ | |
+ waiter.next = &svcctx->waiters; | |
+ waiter.prev = svcctx->waiters.prev; | |
+ waiter.thread = current_thread; | |
+ svcctx->waiters.prev->next = &waiter; | |
+ svcctx->waiters.prev = &waiter; | |
+ do { | |
+ if (svcctx->wait_timeout > 0) { | |
+ rb_thread_wait_for(sleep_time); | |
+ gettimeofday(&sleep_time, NULL); | |
+ sleep_time.tv_sec = limit.tv_sec - sleep_time.tv_sec; | |
+ sleep_time.tv_usec = limit.tv_usec - sleep_time.tv_usec; | |
+ if (sleep_time.tv_usec < 0) { | |
+ sleep_time.tv_sec -= 1; | |
+ sleep_time.tv_usec += 1000000; | |
+ } | |
+ if (sleep_time.tv_sec < 0) { | |
+ waiter.next->prev = waiter.prev; | |
+ waiter.prev->next = waiter.next; | |
+ rb_raise(rb_eRuntimeError, "executing in another thread (wait %d seconds", svcctx->wait_timeout); | |
+ } | |
+ } else { | |
+ /* no timeout */ | |
+ rb_thread_sleep_forever(); | |
+ } | |
+ } while (!NIL_P(svcctx->executing_thread)); | |
+ waiter.next->prev = waiter.prev; | |
+ waiter.prev->next = waiter.next; | |
+ } | |
+ } | |
+} | |
+ | |
static void oci8_unblock_func(void *user_data) | |
{ | |
oci8_svcctx_t *svcctx = (oci8_svcctx_t *)user_data; | |
@@ -234,64 +282,88 @@ static void *free_temp_lob(void *user_data) | |
return (void*)(VALUE)rv; | |
} | |
+typedef struct protect_arg { | |
+ void *(*func)(void *); | |
+ void *data; | |
+ oci8_svcctx_t *svcctx; | |
+} protect_arg_t; | |
+ | |
+static VALUE protected_func(VALUE data) | |
+{ | |
+ struct protect_arg *arg = (struct protect_arg*)data; | |
+ VALUE rv; | |
+ | |
+ check_concurrent_execution(arg->svcctx); | |
+ arg->svcctx->executing_thread = rb_thread_current(); | |
+#ifdef HAVE_RB_THREAD_CALL_WITHOUT_GVL | |
+ rv = (VALUE)rb_thread_call_without_gvl(arg->func, arg->data, oci8_unblock_func, arg->svcctx); | |
+#else | |
+ rv = rb_thread_blocking_region((VALUE(*)(void*))arg->func, arg->data, oci8_unblock_func, arg->svcctx); | |
+#endif | |
+ if ((sword)rv == OCI_ERROR) { | |
+ if (oci8_get_error_code(oci8_errhp) == 1013) { | |
+ rb_raise(eOCIBreak, "Canceled by user request."); | |
+ } | |
+ } | |
+ return rv; | |
+} | |
+ | |
+static sword call_without_gvl(oci8_svcctx_t *svcctx, void *(*func)(void *), void *data, OCIError *errhp, int *state) | |
+{ | |
+ protect_arg_t arg; | |
+ sword rv; | |
+ | |
+ arg.svcctx = svcctx; | |
+ arg.func = func; | |
+ arg.data = data; | |
+ rv = (sword)rb_protect(protected_func, (VALUE)&arg, state); | |
+ if (svcctx->waiters.next != &svcctx->waiters) { | |
+ rb_thread_wakeup_alive(svcctx->waiters.next->thread); | |
+ } | |
+ return rv; | |
+} | |
+ | |
/* ruby 1.9 */ | |
sword oci8_call_without_gvl(oci8_svcctx_t *svcctx, void *(*func)(void *), void *data) | |
{ | |
OCIError *errhp = oci8_errhp; | |
+ int state; | |
+ sword rv; | |
- if (!NIL_P(svcctx->executing_thread)) { | |
- rb_raise(rb_eRuntimeError /* FIXME */, "executing in another thread"); | |
+ if (!svcctx->non_blocking) { | |
+ check_concurrent_execution(svcctx); | |
} | |
- | |
if (!svcctx->suppress_free_temp_lobs) { | |
- oci8_temp_lob_t *lob = svcctx->temp_lobs; | |
- while (lob != NULL) { | |
- oci8_temp_lob_t *lob_next = lob->next; | |
+ while (svcctx->temp_lobs != NULL) { | |
+ oci8_temp_lob_t *lob = svcctx->temp_lobs; | |
+ svcctx->temp_lobs = lob->next; | |
if (svcctx->non_blocking) { | |
free_temp_lob_arg_t arg; | |
- sword rv; | |
arg.svcctx = svcctx; | |
arg.svchp = svcctx->base.hp.svc; | |
arg.errhp = errhp; | |
arg.lob = lob->lob; | |
- svcctx->executing_thread = rb_thread_current(); | |
-#ifdef HAVE_RB_THREAD_CALL_WITHOUT_GVL | |
- rv = (sword)(VALUE)rb_thread_call_without_gvl(free_temp_lob, &arg, oci8_unblock_func, svcctx); | |
-#else | |
- rv = (sword)rb_thread_blocking_region((VALUE(*)(void*))free_temp_lob, &arg, oci8_unblock_func, svcctx); | |
-#endif | |
- if (rv == OCI_ERROR) { | |
- if (oci8_get_error_code(errhp) == 1013) { | |
- rb_raise(eOCIBreak, "Canceled by user request."); | |
- } | |
+ call_without_gvl(svcctx, free_temp_lob, &arg, errhp, &state); | |
+ if (state) { | |
+ lob->next = svcctx->temp_lobs; | |
+ svcctx->temp_lobs = lob; | |
+ rb_jump_tag(state); | |
} | |
} else { | |
OCILobFreeTemporary(svcctx->base.hp.svc, errhp, lob->lob); | |
} | |
OCIDescriptorFree(lob->lob, OCI_DTYPE_LOB); | |
- | |
xfree(lob); | |
- svcctx->temp_lobs = lob = lob_next; | |
} | |
} | |
if (svcctx->non_blocking) { | |
- sword rv; | |
- | |
- svcctx->executing_thread = rb_thread_current(); | |
- /* Note: executing_thread is cleard at the end of the blocking function. */ | |
-#ifdef HAVE_RB_THREAD_CALL_WITHOUT_GVL | |
- rv = (sword)(VALUE)rb_thread_call_without_gvl(func, data, oci8_unblock_func, svcctx); | |
-#else | |
- rv = (sword)rb_thread_blocking_region((VALUE(*)(void*))func, data, oci8_unblock_func, svcctx); | |
-#endif | |
- if (rv == OCI_ERROR) { | |
- if (oci8_get_error_code(errhp) == 1013) { | |
- rb_raise(eOCIBreak, "Canceled by user request."); | |
- } | |
+ rv = call_without_gvl(svcctx, func, data, errhp, &state); | |
+ if (state) { | |
+ rb_jump_tag(state); | |
} | |
return rv; | |
} else { |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment