Skip to content

Instantly share code, notes, and snippets.

@ClosetGeek-Git
Created July 19, 2023 16:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ClosetGeek-Git/22fcc6572bbb6e7ecbea02b311404d1c to your computer and use it in GitHub Desktop.
Save ClosetGeek-Git/22fcc6572bbb6e7ecbea02b311404d1c to your computer and use it in GitHub Desktop.
Swoole reactor implementation using zmq_poll from libzmq for handling native ZMQSockets objects created via php-zmq along with Swoole network::Socket
#include "zmq.h"
#include "swoole.h"
#include "php_swoole_cxx.h"
#include "swoole_socket.h"
#include "swoole_reactor.h"
namespace swoole {
using network::Socket;
typedef struct _php_zmq_context {
void *z_ctx;
int io_threads;
zend_bool is_persistent;
zend_bool use_shared_ctx;
zend_long socket_count;
int pid;
} php_zmq_context;
typedef struct _php_zmq_socket {
void *z_socket;
int socket_type;
php_zmq_context *ctx;
HashTable connect;
HashTable bind;
zend_bool is_persistent;
int pid;
} php_zmq_socket;
typedef struct _php_zmq_socket_object {
php_zmq_socket *socket;
char *persistent_id;
zval context_obj;
zend_object zo;
} php_zmq_socket_object;
php_zmq_socket_object *php_zmq_socket_fetch_object(zend_object *obj) {
return (php_zmq_socket_object *)((char *)obj - XtOffsetOf(php_zmq_socket_object, zo));
}
struct EventObject {
zval zsocket;
zend_fcall_info_cache fci_cache_read;
zend_fcall_info_cache fci_cache_write;
};
class ReactorZMQpoll : public ReactorImpl {
private:
struct zmq_pollitem_t *events_ = nullptr;
Socket **fds_;
uint32_t max_fd_num;
bool exists(int fd);
public:
ReactorZMQpoll(Reactor *_reactor, int max_events);
~ReactorZMQpoll();
bool ready() override;
int add(Socket *socket, int events) override;
int set(Socket *socket, int events) override;
int del(Socket *socket) override;
int wait(struct timeval *) override;
};
ReactorImpl *make_reactor_zmqpoll(Reactor *_reactor, int max_events) {
return new ReactorZMQpoll(_reactor, max_events);
}
ReactorZMQpoll::ReactorZMQpoll(Reactor *_reactor, int max_events) : ReactorImpl(_reactor) {
if (!ready()) {
swoole_sys_warning("zmq_poll failed");
return;
}
fds_ = new Socket *[max_events];
events_ = new struct zmq_pollitem_t[max_events];
max_fd_num = max_events;
reactor_->max_event_num = max_events;
}
bool ReactorZMQpoll::ready() {
return true;
}
ReactorZMQpoll::~ReactorZMQpoll() {
delete[] fds_;
delete[] events_;
}
int ReactorZMQpoll::add(Socket *socket, int events) {
int fd = socket->fd;
if (exists(fd)) {
swoole_warning("fd#%d is already exists", fd);
return SW_ERR;
}
int cur = reactor_->get_event_num();
if (reactor_->get_event_num() == max_fd_num) {
swoole_warning("too many connection, more than %d", max_fd_num);
return SW_ERR;
}
reactor_->_add(socket, events);
swoole_trace("fd=%d, events=%d", fd, events);
if(socket->fd_type != SW_FD_ZMQ)
{
events_[cur].socket = NULL;
events_[cur].fd = fd;
}else
{
EventObject* tmp = (EventObject*)socket->object;
php_zmq_socket_object *intern;
intern = php_zmq_socket_fetch_object(Z_OBJ(tmp->zsocket));
events_[cur].socket = intern->socket->z_socket;
events_[cur].fd = 0;
}
fds_[cur] = socket;
events_[cur].events = 0;
if (Reactor::isset_read_event(events)) {
events_[cur].events |= ZMQ_POLLIN;
}
if (Reactor::isset_write_event(events)) {
events_[cur].events |= ZMQ_POLLOUT;
}
if (Reactor::isset_error_event(events)) {
events_[cur].events |= ZMQ_POLLERR;
}
events_[cur].revents = 0;
return SW_OK;
}
int ReactorZMQpoll::del(Socket *socket) {
if (socket->removed) {
swoole_error_log(SW_LOG_WARNING, SW_ERROR_EVENT_SOCKET_REMOVED, "failed to delete event[%d], it has already been removed", socket->fd);
return SW_ERR;
}
if(socket->fd_type != SW_FD_ZMQ)
{
for(uint32_t i = 0; i < reactor_->get_event_num(); i++)
{
if(events_[i].fd == socket->fd)
{
for(; i < reactor_->get_event_num(); i++)
{
if(i == reactor_->get_event_num())
{
fds_[i] = nullptr;
events_[i].fd = 0;
events_[i].events = 0;
events_[i].socket = nullptr;
events_[i].revents = 0;
}else
{
fds_[i] = fds_[i + 1];
events_[i] = events_[i + 1];
}
}
reactor_->_del(socket);
return SW_OK;
}
}
} else
{
EventObject* tmp = (EventObject*)socket->object;
php_zmq_socket_object *intern;
intern = php_zmq_socket_fetch_object(Z_OBJ(tmp->zsocket));
for (uint32_t i = 0; i < reactor_->get_event_num(); i++)
{
if(events_[i].socket == intern->socket->z_socket)
{
for(; i < reactor_->get_event_num(); i++)
{
if(i == reactor_->get_event_num())
{
fds_[i] = nullptr;
events_[i].fd = 0;
events_[i].events = 0;
events_[i].socket = nullptr;
events_[i].revents = 0;
}else
{
fds_[i] = fds_[i + 1];
events_[i] = events_[i + 1];
}
}
reactor_->_del(socket);
return SW_OK;
}
}
}
return SW_ERR;
}
int ReactorZMQpoll::set(Socket *socket, int events) {
uint32_t i;
swoole_trace("fd=%d, events=%d", socket->fd, events);
if(socket->fd_type != SW_FD_ZMQ)
{
for(i = 0; i < reactor_->get_event_num(); i++)
{
if(events_[i].fd == socket->fd)
{
events_[i].events = 0;
if (Reactor::isset_read_event(events))
{
events_[i].events |= ZMQ_POLLIN;
}
if (Reactor::isset_write_event(events))
{
events_[i].events |= ZMQ_POLLOUT;
}
reactor_->_set(socket, events);
return SW_OK;
}
}
}else
{
EventObject* tmp = (EventObject*)socket->object;
php_zmq_socket_object *intern;
intern = php_zmq_socket_fetch_object(Z_OBJ(tmp->zsocket));
for(i = 0; i < reactor_->get_event_num(); i++)
{
if(events_[i].socket == intern->socket->z_socket)
{
events_[i].events = 0;
if(Reactor::isset_read_event(events))
{
events_[i].events |= ZMQ_POLLIN;
}
if(Reactor::isset_write_event(events))
{
events_[i].events |= ZMQ_POLLOUT;
}
reactor_->_set(socket, events);
return SW_OK;
}
}
}
return SW_ERR;
}
int ReactorZMQpoll::wait(struct timeval *timeo) {
Event event;
ReactorHandler handler;
int ret;
if (reactor_->timeout_msec == 0) {
if (timeo == nullptr) {
reactor_->timeout_msec = -1;
} else {
reactor_->timeout_msec = timeo->tv_sec * 1000 + timeo->tv_usec / 1000;
}
}
reactor_->before_wait();
while (reactor_->running) {
if (reactor_->onBegin != nullptr) {
reactor_->onBegin(reactor_);
}
ret = zmq_poll(events_, reactor_->get_event_num(), reactor_->get_timeout_msec());
if (ret < 0) {
if (!reactor_->catch_error()) {
swoole_sys_warning("zmq_poll error");
break;
} else {
goto _continue;
}
} else if (ret == 0) {
reactor_->execute_end_callbacks(true);
SW_REACTOR_CONTINUE;
} else {
for (uint32_t i = 0; i < reactor_->get_event_num(); i++) {
event.socket = fds_[i];
event.fd = events_[i].fd;
event.reactor_id = reactor_->id;
event.type = event.socket->fd_type;
if (events_[i].revents & ZMQ_POLLERR) {
event.socket->event_hup = 1;
}
swoole_trace("Event: fd=%d|reactor_id=%d|type=%d", event.fd, reactor_->id, event.type);
if ((events_[i].revents & ZMQ_POLLIN) && !event.socket->removed) {
handler = reactor_->get_handler(SW_EVENT_READ, event.type);
ret = handler(reactor_, &event);
if (ret < 0) {
swoole_sys_warning("zmq_poll[POLLIN] handler failed. fd=%d", event.fd);
}
}
if ((events_[i].revents & ZMQ_POLLOUT) && !event.socket->removed) {
handler = reactor_->get_handler(SW_EVENT_WRITE, event.type);
ret = handler(reactor_, &event);
if (ret < 0) {
swoole_sys_warning("zmq_poll[POLLOUT] handler failed. fd=%d", event.fd);
}
}
if ((events_[i].revents & ZMQ_POLLERR) && !event.socket->removed) {
if ((events_[i].revents & ZMQ_POLLIN) || (events_[i].revents & ZMQ_POLLOUT)) {
continue;
}
handler = reactor_->get_error_handler(event.type);
ret = handler(reactor_, &event);
if (ret < 0) {
swoole_sys_warning("zmq_poll[POLLERR] handler failed. fd=%d", event.fd);
}
}
if (!event.socket->removed && (event.socket->events & SW_EVENT_ONCE)) {
del(event.socket);
}
events_[i].revents = 0;
}
}
_continue:
reactor_->execute_end_callbacks(false);
SW_REACTOR_CONTINUE;
}
return SW_OK;
}
bool ReactorZMQpoll::exists(int fd) {
for (uint32_t i = 0; i < reactor_->get_event_num(); i++) {
if (events_[i].fd == fd) {
return true;
}
}
return false;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment