Skip to content

Instantly share code, notes, and snippets.

@evax
Created April 8, 2011 21:50
Show Gist options
  • Save evax/910812 to your computer and use it in GitHub Desktop.
Save evax/910812 to your computer and use it in GitHub Desktop.
diff --git a/c_src/erlzmq_nif.c b/c_src/erlzmq_nif.c
index 06ac022..0f4bfa7 100644
--- a/c_src/erlzmq_nif.c
+++ b/c_src/erlzmq_nif.c
@@ -506,6 +506,10 @@ NIF(erlzmq_nif_send)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
enif_mutex_lock(socket->context->mutex);
+ if (socket->context->thread_socket_name == NULL) {
+ enif_mutex_unlock(socket->context->mutex);
+ return return_zmq_errno(env, ETERM);
+ }
if (zmq_send(socket->context->thread_socket, &msg, 0)) {
enif_mutex_unlock(socket->context->mutex);
@@ -580,6 +584,10 @@ NIF(erlzmq_nif_recv)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
enif_mutex_lock(socket->context->mutex);
+ if (socket->context->thread_socket_name == NULL) {
+ enif_mutex_unlock(socket->context->mutex);
+ return return_zmq_errno(env, ETERM);
+ }
if (zmq_send(socket->context->thread_socket, &msg, 0)) {
enif_mutex_unlock(socket->context->mutex);
@@ -636,6 +644,17 @@ NIF(erlzmq_nif_close)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
enif_mutex_lock(socket->context->mutex);
+ if (socket->context->thread_socket_name == NULL) {
+ // context is gone
+ enif_mutex_lock(socket->mutex);
+ zmq_msg_close(&msg);
+ zmq_close(socket->socket_zmq);
+ enif_mutex_unlock(socket->mutex);
+ enif_mutex_destroy(socket->mutex);
+ enif_release_resource(socket);
+ enif_mutex_unlock(socket->context->mutex);
+ return enif_make_atom(env, "ok");
+ }
if (zmq_send(socket->context->thread_socket, &msg, 0)) {
enif_mutex_unlock(socket->context->mutex);
zmq_msg_close(&msg);
@@ -901,6 +920,11 @@ static void * polling_thread(void * handle)
zmq_msg_close(&msg);
}
else if (r->type == ERLZMQ_THREAD_REQUEST_TERM) {
+ enif_mutex_lock(context->mutex);
+ free(context->thread_socket_name);
+ // use this to flag context is over
+ context->thread_socket_name = NULL;
+ enif_mutex_unlock(context->mutex);
// cleanup pending requests
for (i = 1; i < vector_count(&requests); ++i) {
erlzmq_thread_request_t * r_old = vector_get(erlzmq_thread_request_t,
@@ -922,9 +946,10 @@ static void * polling_thread(void * handle)
zmq_close(thread_socket);
zmq_close(context->thread_socket);
enif_mutex_unlock(context->mutex);
- enif_mutex_destroy(context->mutex);
- free(context->thread_socket_name);
zmq_term(context->context_zmq);
+ enif_mutex_lock(context->mutex);
+ enif_mutex_unlock(context->mutex);
+ enif_mutex_destroy(context->mutex);
enif_release_resource(context);
// notify the waiting request
enif_send(NULL, &r->data.term.pid, r->data.term.env,
diff --git a/test/erlzmq_test.erl b/test/erlzmq_test.erl
index 171767a..03d4dbb 100644
--- a/test/erlzmq_test.erl
+++ b/test/erlzmq_test.erl
@@ -83,10 +83,10 @@ shutdown_blocking_test() ->
shutdown_blocking_unblocking_test() ->
{ok, C} = erlzmq:context(),
{ok, S} = erlzmq:socket(C, [pub, {active, false}]),
- erlzmq:close(S),
- V = erlzmq:term(C, 0),
+ V = erlzmq:term(C, 500),
?assertMatch({error, {timeout, _}}, V),
{error, {timeout, Ref}} = V,
+ erlzmq:close(S),
receive
{Ref, ok} ->
ok
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment