Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save pengdu/98eee5d2bca0df7fdc0e90cc5f47e626 to your computer and use it in GitHub Desktop.
Save pengdu/98eee5d2bca0df7fdc0e90cc5f47e626 to your computer and use it in GitHub Desktop.
chatgpt 如何在主进程中增加一个反应器

好的,以下是一个示例代码,展示了如何在主进程中增加一个反应器(reactor)来处理子进程的状态和其他事件:

#include <iostream>
#include <vector>
#include <queue>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/wait.h>
#include <signal.h>
#include <sys/epoll.h>

class Reactor {
public:
    Reactor() {
        epollFd = epoll_create1(0);
        if (epollFd == -1) {
            std::cerr << "Failed to create epoll" << std::endl;
            exit(1);
        }
    }

    ~Reactor() {
        close(epollFd);
    }

    void addEvent(int fd, uint32_t events) {
        struct epoll_event event;
        event.events = events;
        event.data.fd = fd;
        if (epoll_ctl(epollFd, EPOLL_CTL_ADD, fd, &event) == -1) {
            std::cerr << "Failed to add event to epoll" << std::endl;
            exit(1);
        }
    }

    void removeEvent(int fd) {
        if (epoll_ctl(epollFd, EPOLL_CTL_DEL, fd, nullptr) == -1) {
            std::cerr << "Failed to remove event from epoll" << std::endl;
            exit(1);
        }
    }

    void waitEvents() {
        struct epoll_event events[MAX_EVENTS];
        int numEvents = epoll_wait(epollFd, events, MAX_EVENTS, -1);
        if (numEvents == -1) {
            std::cerr << "Failed to wait for events" << std::endl;
            exit(1);
        }
        for (int i = 0; i < numEvents; ++i) {
            handleEvent(events[i]);
        }
    }

    virtual void handleEvent(struct epoll_event event) = 0;

private:
    static const int MAX_EVENTS = 10;
    int epollFd;
};

class ProcessPool : public Reactor {
public:
    ProcessPool(size_t numProcesses) {
        for (size_t i = 0; i < numProcesses; ++i) {
            int sockets[2];
            if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockets) == -1) {
                std::cerr << "Failed to create socket pair" << std::endl;
                exit(1);
            }

            pid_t pid = fork();
            if (pid == -1) {
                std::cerr << "Failed to fork process" << std::endl;
                exit(1);
            } else if (pid == 0) {
                // Child process
                close(sockets[0]);
                handleClientConnection(sockets[1]);
                exit(0);
            } else {
                // Parent process
                close(sockets[1]);
                processIds.push_back(pid);
                workerSockets.push_back(sockets[0]);
                addEvent(sockets[0], EPOLLIN);
            }
        }
    }

    ~ProcessPool() {
        for (pid_t pid : processIds) {
            kill(pid, SIGTERM);
        }
        for (pid_t pid : processIds) {
            waitpid(pid, nullptr, 0);
        }
    }

    void sendRequest(const std::string& request) {
        if (currentWorker < workerSockets.size()) {
            int socket = workerSockets[currentWorker];
            send(socket, request.c_str(), request.length(), 0);
            ++currentWorker;
        } else {
            std::cerr << "No available worker" << std::endl;
        }
    }

    void handleEvent(struct epoll_event event) override {
        int socket = event.data.fd;
        if (event.events & EPOLLIN) {
            char buffer[1024];
            ssize_t bytesRead = recv(socket, buffer, sizeof(buffer) - 1, 0);
            if (bytesRead <= 0) {
                // Worker has closed the connection, restart it
                restartWorker(socket);
            } else {
                // Received data from worker, handle it if needed
                // ...
            }
        } else if (event.events & (EPOLLHUP | EPOLLERR)) {
            // Worker has closed the connection, restart it
            restartWorker(socket);
        }
    }

private:
    void handleClientConnection(int socket) {
        while (true) {
            char buffer[1024];
            ssize_t bytesRead = recv(socket, buffer, sizeof(buffer) - 1, 0);
            if (bytesRead <= 0) {
                break;
            }
            buffer[bytesRead] = '\0';
            std::cout << "Received request: " << buffer << std::endl;
            // Handle client request
            // ...
        }
        close(socket);
    }

    void restartWorker(int socket) {
        size_t index = 0;
        for (; index < workerSockets.size(); ++index) {
            if (workerSockets[index] == socket) {
                break;
            }
        }
        if (index == workerSockets.size()) {
            std::cerr << "Failed to find worker socket" << std::endl;
            return;
        }

        pid_t pid = fork();
        if (pid == -1) {
            std::cerr << "Failed to fork process" << std::endl;
            exit(1);
        } else if (pid == 0) {
            // Child process
            close(socket);
            handleClientConnection(socket);
            exit(0);
        } else {
            // Parent process
            processIds[index] = pid;
            workerSockets[index] = createWorkerSocket();
            removeEvent(socket);
            addEvent(workerSockets[index], EPOLLIN);
        }
    }

    int createWorkerSocket() {
        int sockets[2];
        if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockets) == -1) {
            std::cerr << "Failed to create socket pair" << std::endl;
            exit(1);
        }
        return sockets[0];
    }

    std::vector<pid_t> processIds;
    std::vector<int> workerSockets;
    size_t currentWorker = 0;
};

int main() {
    ProcessPool pool(4);

    pool.sendRequest("Request 1");
    pool.sendRequest("Request 2");
    pool.sendRequest("Request 3");

    while (true) {
        pool.waitEvents();
        // Do other work
        // ...
        sleep(1);
    }

    return 0;
}

在这个示例中,我们定义了一个Reactor类,它封装了epoll的相关操作,并提供了一个handleEvent虚函数,用于处理事件。ProcessPool类继承了Reactor类,并在其构造函数中将子进程的socket添加到反应器中。在handleEvent函数中,我们根据事件的类型来处理子进程的状态和其他事件。当子进程关闭连接时,我们调用restartWorker函数重新启动子进程,并更新反应器中的事件。

main函数中,我们创建了一个包含4个子进程的进程池pool。然后,我们使用sendRequest函数向进程池中的子进程发送请求。在这个示例中,我们发送了3个请求,每个请求都会被发送给一个可用的子进程。然后,我们在一个无限循环中调用waitEvents函数来等待事件,并在需要时处理事件。你可以根据需要在主进程中进行其他工作。

希望对你有帮助!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment