Skip to content

Instantly share code, notes, and snippets.

@kubo
Created March 17, 2013 13:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kubo/5181580 to your computer and use it in GitHub Desktop.
Save kubo/5181580 to your computer and use it in GitHub Desktop.
diff --git a/ext/oci8/oci8.c b/ext/oci8/oci8.c
index 2de46e4..b10507e 100644
--- a/ext/oci8/oci8.c
+++ b/ext/oci8/oci8.c
@@ -134,6 +134,11 @@ static void oci8_svcctx_init(oci8_base_t *base)
oci8_svcctx_t *svcctx = (oci8_svcctx_t *)base;
VALUE obj;
+#ifdef NATIVE_THREAD_WITH_GVL
+ svcctx->waiters.next = &svcctx->waiters;
+ svcctx->waiters.prev = &svcctx->waiters;
+ svcctx->wait_timeout = 10; /* 10 seconds */
+#endif
svcctx->executing_thread = Qnil;
/* set session handle */
obj = rb_obj_alloc(cSession);
diff --git a/ext/oci8/oci8.h b/ext/oci8/oci8.h
index 57e009d..86c90b0 100644
--- a/ext/oci8/oci8.h
+++ b/ext/oci8/oci8.h
@@ -352,12 +352,22 @@ typedef struct oci8_temp_lob {
OCILobLocator *lob;
} oci8_temp_lob_t;
+typedef struct oci8_waiter {
+ struct oci8_waiter *next;
+ struct oci8_waiter *prev;
+ VALUE thread;
+} oci8_waiter_t;
+
typedef struct oci8_svcctx {
oci8_base_t base;
volatile VALUE executing_thread;
const oci8_logoff_strategy_t *logoff_strategy;
OCISession *usrhp;
OCIServer *srvhp;
+#ifdef NATIVE_THREAD_WITH_GVL
+ oci8_waiter_t waiters;
+ int wait_timeout;
+#endif
rb_pid_t pid;
unsigned char state;
char is_autocommit;
diff --git a/ext/oci8/oci8lib.c b/ext/oci8/oci8lib.c
index 9075db1..012fb16 100644
--- a/ext/oci8/oci8lib.c
+++ b/ext/oci8/oci8lib.c
@@ -212,6 +212,54 @@ void oci8_unlink_from_parent(oci8_base_t *base)
#ifdef NATIVE_THREAD_WITH_GVL
+static void check_concurrent_execution(oci8_svcctx_t *svcctx)
+{
+ VALUE current_thread = rb_thread_current();
+
+ if (!NIL_P(svcctx->executing_thread)) {
+ if (svcctx->wait_timeout == 0) {
+ rb_raise(rb_eRuntimeError, "executing in another thread");
+ } else {
+ oci8_waiter_t waiter;
+ struct timeval sleep_time;
+ struct timeval limit;
+
+ sleep_time.tv_sec = svcctx->wait_timeout;
+ sleep_time.tv_usec = 0;
+ gettimeofday(&limit, NULL);
+ limit.tv_sec += svcctx->wait_timeout;
+
+ waiter.next = &svcctx->waiters;
+ waiter.prev = svcctx->waiters.prev;
+ waiter.thread = current_thread;
+ svcctx->waiters.prev->next = &waiter;
+ svcctx->waiters.prev = &waiter;
+ do {
+ if (svcctx->wait_timeout > 0) {
+ rb_thread_wait_for(sleep_time);
+ gettimeofday(&sleep_time, NULL);
+ sleep_time.tv_sec = limit.tv_sec - sleep_time.tv_sec;
+ sleep_time.tv_usec = limit.tv_usec - sleep_time.tv_usec;
+ if (sleep_time.tv_usec < 0) {
+ sleep_time.tv_sec -= 1;
+ sleep_time.tv_usec += 1000000;
+ }
+ if (sleep_time.tv_sec < 0) {
+ waiter.next->prev = waiter.prev;
+ waiter.prev->next = waiter.next;
+ rb_raise(rb_eRuntimeError, "executing in another thread (wait %d seconds", svcctx->wait_timeout);
+ }
+ } else {
+ /* no timeout */
+ rb_thread_sleep_forever();
+ }
+ } while (!NIL_P(svcctx->executing_thread));
+ waiter.next->prev = waiter.prev;
+ waiter.prev->next = waiter.next;
+ }
+ }
+}
+
static void oci8_unblock_func(void *user_data)
{
oci8_svcctx_t *svcctx = (oci8_svcctx_t *)user_data;
@@ -234,64 +282,88 @@ static void *free_temp_lob(void *user_data)
return (void*)(VALUE)rv;
}
+typedef struct protect_arg {
+ void *(*func)(void *);
+ void *data;
+ oci8_svcctx_t *svcctx;
+} protect_arg_t;
+
+static VALUE protected_func(VALUE data)
+{
+ struct protect_arg *arg = (struct protect_arg*)data;
+ VALUE rv;
+
+ check_concurrent_execution(arg->svcctx);
+ arg->svcctx->executing_thread = rb_thread_current();
+#ifdef HAVE_RB_THREAD_CALL_WITHOUT_GVL
+ rv = (VALUE)rb_thread_call_without_gvl(arg->func, arg->data, oci8_unblock_func, arg->svcctx);
+#else
+ rv = rb_thread_blocking_region((VALUE(*)(void*))arg->func, arg->data, oci8_unblock_func, arg->svcctx);
+#endif
+ if ((sword)rv == OCI_ERROR) {
+ if (oci8_get_error_code(oci8_errhp) == 1013) {
+ rb_raise(eOCIBreak, "Canceled by user request.");
+ }
+ }
+ return rv;
+}
+
+static sword call_without_gvl(oci8_svcctx_t *svcctx, void *(*func)(void *), void *data, OCIError *errhp, int *state)
+{
+ protect_arg_t arg;
+ sword rv;
+
+ arg.svcctx = svcctx;
+ arg.func = func;
+ arg.data = data;
+ rv = (sword)rb_protect(protected_func, (VALUE)&arg, state);
+ if (svcctx->waiters.next != &svcctx->waiters) {
+ rb_thread_wakeup_alive(svcctx->waiters.next->thread);
+ }
+ return rv;
+}
+
/* ruby 1.9 */
sword oci8_call_without_gvl(oci8_svcctx_t *svcctx, void *(*func)(void *), void *data)
{
OCIError *errhp = oci8_errhp;
+ int state;
+ sword rv;
- if (!NIL_P(svcctx->executing_thread)) {
- rb_raise(rb_eRuntimeError /* FIXME */, "executing in another thread");
+ if (!svcctx->non_blocking) {
+ check_concurrent_execution(svcctx);
}
-
if (!svcctx->suppress_free_temp_lobs) {
- oci8_temp_lob_t *lob = svcctx->temp_lobs;
- while (lob != NULL) {
- oci8_temp_lob_t *lob_next = lob->next;
+ while (svcctx->temp_lobs != NULL) {
+ oci8_temp_lob_t *lob = svcctx->temp_lobs;
+ svcctx->temp_lobs = lob->next;
if (svcctx->non_blocking) {
free_temp_lob_arg_t arg;
- sword rv;
arg.svcctx = svcctx;
arg.svchp = svcctx->base.hp.svc;
arg.errhp = errhp;
arg.lob = lob->lob;
- svcctx->executing_thread = rb_thread_current();
-#ifdef HAVE_RB_THREAD_CALL_WITHOUT_GVL
- rv = (sword)(VALUE)rb_thread_call_without_gvl(free_temp_lob, &arg, oci8_unblock_func, svcctx);
-#else
- rv = (sword)rb_thread_blocking_region((VALUE(*)(void*))free_temp_lob, &arg, oci8_unblock_func, svcctx);
-#endif
- if (rv == OCI_ERROR) {
- if (oci8_get_error_code(errhp) == 1013) {
- rb_raise(eOCIBreak, "Canceled by user request.");
- }
+ call_without_gvl(svcctx, free_temp_lob, &arg, errhp, &state);
+ if (state) {
+ lob->next = svcctx->temp_lobs;
+ svcctx->temp_lobs = lob;
+ rb_jump_tag(state);
}
} else {
OCILobFreeTemporary(svcctx->base.hp.svc, errhp, lob->lob);
}
OCIDescriptorFree(lob->lob, OCI_DTYPE_LOB);
-
xfree(lob);
- svcctx->temp_lobs = lob = lob_next;
}
}
if (svcctx->non_blocking) {
- sword rv;
-
- svcctx->executing_thread = rb_thread_current();
- /* Note: executing_thread is cleard at the end of the blocking function. */
-#ifdef HAVE_RB_THREAD_CALL_WITHOUT_GVL
- rv = (sword)(VALUE)rb_thread_call_without_gvl(func, data, oci8_unblock_func, svcctx);
-#else
- rv = (sword)rb_thread_blocking_region((VALUE(*)(void*))func, data, oci8_unblock_func, svcctx);
-#endif
- if (rv == OCI_ERROR) {
- if (oci8_get_error_code(errhp) == 1013) {
- rb_raise(eOCIBreak, "Canceled by user request.");
- }
+ rv = call_without_gvl(svcctx, func, data, errhp, &state);
+ if (state) {
+ rb_jump_tag(state);
}
return rv;
} else {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment