Last active
August 29, 2015 14:03
-
-
Save clicube/bdc800ee7b1a767489b6 to your computer and use it in GitHub Desktop.
POSIX Semaphore with Ruby...
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'mkmf' | |
$CFLAGS << " -std=c99" | |
create_makefile 'semaphore' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <ruby.h> | |
#include <ruby/thread.h> | |
#include <sys/mman.h> | |
#include <sys/stat.h> | |
#include <fcntl.h> | |
#include <semaphore.h> | |
#include <stdio.h> | |
#include <errno.h> | |
#include <unistd.h> | |
typedef struct sem_data_t { | |
char* name; | |
sem_t* sem; | |
} sem_data_t; | |
void release(sem_data_t* sem_data) | |
{ | |
int ret; | |
int unlink_errno; | |
ret = sem_close(sem_data->sem); | |
// if(ret < 0){ rb_sys_fail("sem_close"); } | |
ret = sem_unlink(sem_data->name); | |
unlink_errno = errno; | |
ruby_xfree(sem_data->name); | |
ruby_xfree(sem_data); | |
errno = unlink_errno; | |
// if(ret < 0){ rb_sys_fail("sem_unlink"); } | |
return; | |
} | |
long sem_index = 0; | |
VALUE semaphore_initialize(VALUE self, VALUE val) | |
{ | |
int ret; | |
sem_t* sem; | |
sem_data_t* sem_data; | |
char* name; | |
asprintf(&name, "/%d-%ld", getpid(), sem_index); | |
sem_index++; | |
sem = sem_open(name, O_CREAT|O_EXCL, 600, FIX2INT(val)); | |
if(sem == SEM_FAILED){ rb_sys_fail("sem_open"); } | |
sem_data = ruby_xmalloc(sizeof(sem_data_t)); | |
sem_data->name = name; | |
sem_data->sem = sem; | |
VALUE rb_sem_data; | |
rb_sem_data = Data_Wrap_Struct(rb_cObject, 0, release, sem_data); | |
rb_ivar_set(self, rb_intern("rb_sem_data"), rb_sem_data); | |
return self; | |
} | |
VALUE semaphore_wait(VALUE self) | |
{ | |
sem_data_t* sem_data; | |
VALUE rb_sem_data = rb_ivar_get(self, rb_intern("rb_sem_data")); | |
Data_Get_Struct(rb_sem_data, sem_data_t, sem_data); | |
int ret; | |
ret = (int)rb_thread_call_without_gvl((void *(*)(void *))sem_wait, sem_data->sem, RUBY_UBF_IO, NULL); | |
if(ret < 0){ rb_sys_fail("sem_wait"); } | |
return self; | |
} | |
VALUE semaphore_try_wait(VALUE self) | |
{ | |
sem_data_t* sem_data; | |
VALUE rb_sem_data = rb_ivar_get(self, rb_intern("rb_sem_data")); | |
Data_Get_Struct(rb_sem_data, sem_data_t, sem_data); | |
int ret; | |
ret = sem_trywait(sem_data->sem); | |
if(ret < 0) | |
{ | |
if(errno == EAGAIN){ return Qfalse; } | |
else{ rb_sys_fail("sem_trywait"); } | |
} | |
return Qtrue; | |
} | |
VALUE semaphore_post(VALUE self) | |
{ | |
sem_data_t* sem_data; | |
VALUE rb_sem_data = rb_ivar_get(self, rb_intern("rb_sem_data")); | |
Data_Get_Struct(rb_sem_data, sem_data_t, sem_data); | |
int ret; | |
ret = sem_post(sem_data->sem); | |
if(ret < 0){ rb_sys_fail("sem_post"); } | |
return self; | |
} | |
VALUE semaphore_value(VALUE self) | |
{ | |
sem_data_t* sem_data; | |
VALUE rb_sem_data = rb_ivar_get(self, rb_intern("rb_sem_data")); | |
Data_Get_Struct(rb_sem_data, sem_data_t, sem_data); | |
int val; | |
int ret; | |
ret = sem_getvalue(sem_data->sem, &val); | |
if(ret >= 0){ return INT2FIX(val); } | |
if(errno != ENOSYS){ rb_sys_fail("sem_getvalue"); } | |
/* For systems that sem_getvalue() is not implemeted like Mac OS X */ | |
val = -1; | |
do { | |
ret = sem_trywait(sem_data->sem); | |
val++; | |
} while( ret >= 0 ); | |
for(int i=0; i<val; i++) | |
{ | |
sem_post(sem_data->sem); | |
} | |
return INT2FIX(val); | |
} | |
// TODO: sem_timedwait() | |
void Init_semaphore(void) | |
{ | |
VALUE mMultiProcessing = rb_define_module("MultiProcessing"); | |
VALUE cSemaphore = rb_define_class_under(mMultiProcessing, | |
"Semaphore", rb_cObject); | |
rb_define_method(cSemaphore, "initialize", semaphore_initialize, 1); | |
rb_define_method(cSemaphore, "wait", semaphore_wait, 0); | |
rb_define_method(cSemaphore, "try_wait", semaphore_try_wait, 0); | |
rb_define_method(cSemaphore, "post", semaphore_post, 0); | |
rb_define_method(cSemaphore, "value", semaphore_value, 0); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative 'semaphore.o' | |
module MultiProcessing | |
class Semaphore | |
alias lock wait | |
alias try_lock try_wait | |
alias unlock post | |
def synchronize | |
self.lock | |
begin | |
ret = yield | |
ensure | |
self.unlock | |
end | |
ret | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment