Last active
March 22, 2022 02:24
-
-
Save godzie44/190406f64d788ff901f8469aaca61bb6 to your computer and use it in GitHub Desktop.
Example of io_uring reactor and echo-server (using go-uring lib)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"errors" | |
"flag" | |
"fmt" | |
"github.com/godzie44/go-uring/uring" | |
"io" | |
"log" | |
"net" | |
"runtime" | |
"strconv" | |
"sync" | |
"sync/atomic" | |
"syscall" | |
"time" | |
) | |
const MaxConns = 4096 | |
const MaxMsgLen = 2048 | |
func initBuffs() [][]byte { | |
buffs := make([][]byte, MaxConns) | |
for i := range buffs { | |
buffs[i] = make([]byte, MaxMsgLen) | |
} | |
return buffs | |
} | |
var buffs = initBuffs() | |
func main() { | |
flag.Parse() | |
portStr := flag.Arg(0) | |
port, _ := strconv.Atoi(portStr) | |
var listener net.Listener | |
var err error | |
ring, err := uring.New(uring.MaxEntries >> 3) | |
rea := New(ring) | |
go rea.Run(context.Background()) | |
listener, err = NewListener(fmt.Sprintf("0.0.0.0:%d", port), rea) | |
checkErr(err) | |
log.Printf("start naive echo-server, mode: defaut net/http, port: %d", port) | |
runServer(listener) | |
} | |
func runServer(listener net.Listener) { | |
for { | |
conn, err := listener.Accept() | |
checkErr(err) | |
go handleConn(conn) | |
} | |
} | |
func handleConn(conn net.Conn) { | |
fd := conn.(*Conn).Fd | |
buff := buffs[fd] | |
for { | |
n, err := conn.Read(buff) | |
if err == io.EOF || n == 0 { | |
checkErr(conn.Close()) | |
return | |
} | |
checkErr(err) | |
_, err = conn.Write(buff[:n]) | |
checkErr(err) | |
} | |
} | |
func checkErr(err error) { | |
if err != nil { | |
log.Fatal(err) | |
} | |
} | |
type Callback func(event uring.CQEvent) | |
type Reactor struct { | |
ring *uring.Ring | |
currentNonce uint64 | |
callbacks map[uint64]Callback | |
callbacksLock sync.Mutex | |
queueSQELock sync.Mutex | |
submitSignal chan struct{} | |
} | |
//New создаем новый reactor. | |
func New(ring *uring.Ring) *Reactor { | |
return &Reactor{ | |
ring: ring, | |
submitSignal: make(chan struct{}), | |
callbacks: map[uint64]Callback{}, | |
} | |
} | |
func (r *Reactor) Run(ctx context.Context) { | |
wg := &sync.WaitGroup{} | |
//запускаем два компонента реактора | |
//consumer - обрабатывает новые CQE | |
//producer - отправляет новые SQE на обработку | |
wg.Add(2) | |
go func() { | |
defer wg.Done() | |
r.runConsumer(ctx) | |
}() | |
go func() { | |
defer wg.Done() | |
r.runPublisher(ctx) | |
}() | |
wg.Wait() | |
} | |
func (r *Reactor) runPublisher(ctx context.Context) { | |
defer close(r.submitSignal) | |
for { | |
select { | |
//при поступлении сигнала самбитим новые SQE | |
case <-r.submitSignal: | |
r.queueSQELock.Lock() | |
_, err := r.ring.Submit() | |
r.queueSQELock.Unlock() | |
if err != nil { | |
log.Println("io_uring submit", err) | |
} | |
case <-ctx.Done(): | |
return | |
} | |
} | |
} | |
func (r *Reactor) runConsumer(ctx context.Context) { | |
cqeBuff := make([]*uring.CQEvent, 512) | |
// начинаем цикл обработки результатов операций (CQE) | |
for { | |
// командуем Publisher'у обработать новые SQE | |
r.submitSignal <- struct{}{} | |
// ждем хотябы одну завершенную операцию | |
_, err := r.ring.WaitCQEventsWithTimeout(1, time.Millisecond*1) | |
if errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EINTR) || errors.Is(err, syscall.ETIME) { | |
runtime.Gosched() | |
goto CheckCtxAndContinue | |
} | |
if err != nil { | |
log.Println("io_uring wait", err) | |
goto CheckCtxAndContinue | |
} | |
// складываем все CQE в буффер и обрабатываем их | |
for n := r.ring.PeekCQEventBatch(cqeBuff); n > 0; n = r.ring.PeekCQEventBatch(cqeBuff) { | |
for i := 0; i < n; i++ { | |
cqe := cqeBuff[i] | |
nonce := cqe.UserData | |
//находим соответствующий callback | |
r.callbacksLock.Lock() | |
cb := r.callbacks[nonce] | |
delete(r.callbacks, nonce) | |
r.callbacksLock.Unlock() | |
cb(uring.CQEvent{ | |
UserData: cqe.UserData, | |
Res: cqe.Res, | |
Flags: cqe.Flags, | |
}) | |
} | |
//сообщаем io_uring о том что мы просмотрели n CQE | |
r.ring.AdvanceCQ(uint32(n)) | |
} | |
//проверяем не пора ли завершить цикл обработки CQE | |
CheckCtxAndContinue: | |
select { | |
case <-ctx.Done(): | |
return | |
default: | |
continue | |
} | |
} | |
} | |
//Queue добавляет операцию в SQ. | |
func (r *Reactor) Queue(op uring.Operation, cb Callback) (uint64, error) { | |
//генерируем уникальное значение UserData | |
nonce := r.nextNonce() | |
r.queueSQELock.Lock() | |
defer r.queueSQELock.Unlock() | |
//помещаем операцию в SQ | |
err := r.ring.QueueSQE(op, 0, nonce) | |
if err == nil { | |
r.callbacksLock.Lock() | |
r.callbacks[nonce] = cb | |
r.callbacksLock.Unlock() | |
} | |
return nonce, err | |
} | |
func (r *Reactor) nextNonce() uint64 { | |
return atomic.AddUint64(&r.currentNonce, 1) | |
} | |
type Listener struct { | |
fd int | |
reactor *Reactor | |
acceptChan chan uring.CQEvent | |
addr net.Addr | |
} | |
func NewListener(addr string, reactor *Reactor) (*Listener, error) { | |
tcpAddr, err := net.ResolveTCPAddr("tcp", addr) | |
if err != nil { | |
return nil, err | |
} | |
sockFd, err := serverSocket(tcpAddr) | |
if err != nil { | |
return nil, err | |
} | |
l := &Listener{ | |
fd: sockFd, | |
addr: tcpAddr, | |
reactor: reactor, | |
acceptChan: make(chan uring.CQEvent), | |
} | |
return l, nil | |
} | |
func serverSocket(tcpAddr *net.TCPAddr) (int, error) { | |
sockFd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM|syscall.SOCK_CLOEXEC, 0) | |
if err != nil { | |
return 0, err | |
} | |
//_ = setDefaultListenerSockopts(sockFd) | |
addr := syscall.SockaddrInet4{ | |
Port: tcpAddr.Port, | |
} | |
copy(addr.Addr[:], tcpAddr.IP.To4()) | |
_ = syscall.Bind(sockFd, &addr) | |
_ = syscall.Listen(sockFd, syscall.SOMAXCONN) | |
return sockFd, nil | |
} | |
func (l *Listener) Accept() (net.Conn, error) { | |
//помещаем accept операцию в реактор | |
op := uring.Accept(uintptr(l.fd), 0) | |
l.reactor.Queue(op, func(event uring.CQEvent) { | |
l.acceptChan <- event | |
}) | |
// ждем появление подключения | |
cqe := <-l.acceptChan | |
if err := cqe.Error(); err != nil { | |
return nil, err | |
} | |
fd := int(cqe.Res) | |
rAddr, _ := op.Addr() | |
tc := newConn(fd, l.addr, rAddr, l.reactor) | |
return tc, nil | |
} | |
func (l *Listener) Close() (err error) { | |
err = syscall.Close(l.fd) | |
return err | |
} | |
func (l *Listener) Addr() net.Addr { | |
return l.addr | |
} | |
type Conn struct { | |
Fd int | |
lAddr, rAddr net.Addr | |
reactor *Reactor | |
readChan, writeChan chan uring.CQEvent | |
readLock, writeLock sync.Mutex | |
} | |
func newConn(fd int, lAddr, rAddr net.Addr, r *Reactor) *Conn { | |
return &Conn{ | |
lAddr: lAddr, | |
rAddr: rAddr, | |
Fd: fd, | |
reactor: r, | |
readChan: make(chan uring.CQEvent), | |
writeChan: make(chan uring.CQEvent), | |
} | |
} | |
func (c *Conn) Read(b []byte) (n int, err error) { | |
c.readLock.Lock() | |
defer c.readLock.Unlock() | |
//помещаем Recv операцию в реактор | |
op := uring.Recv(uintptr(c.Fd), b, 0) | |
c.reactor.Queue(op, func(event uring.CQEvent) { | |
c.readChan <- event | |
}) | |
//ждем завершения чтения из сокета | |
cqe := <-c.readChan | |
if err = cqe.Error(); err != nil { | |
return 0, &net.OpError{Op: "read", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: err} | |
} | |
if cqe.Res == 0 { | |
err = &net.OpError{Op: "read", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: io.EOF} | |
} | |
runtime.KeepAlive(b) | |
return int(cqe.Res), err | |
} | |
func (c *Conn) Write(b []byte) (n int, err error) { | |
c.writeLock.Lock() | |
defer c.writeLock.Unlock() | |
//помещаем Send операцию в реактор | |
op := uring.Send(uintptr(c.Fd), b, 0) | |
c.reactor.Queue(op, func(event uring.CQEvent) { | |
c.writeChan <- event | |
}) | |
//ждем завершения записи в сокет | |
cqe := <-c.writeChan | |
if err = cqe.Error(); err != nil { | |
return 0, &net.OpError{Op: "write", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: err} | |
} | |
if cqe.Res == 0 { | |
err = &net.OpError{Op: "write", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: io.ErrUnexpectedEOF} | |
} | |
runtime.KeepAlive(b) | |
return int(cqe.Res), err | |
} | |
func (c *Conn) Close() error { | |
return syscall.Close(c.Fd) | |
} | |
func (c *Conn) LocalAddr() net.Addr { | |
return c.lAddr | |
} | |
func (c *Conn) RemoteAddr() net.Addr { | |
return c.rAddr | |
} | |
func (c *Conn) SetDeadline(t time.Time) error { | |
return nil | |
} | |
func (c *Conn) SetReadDeadline(t time.Time) error { | |
return nil | |
} | |
func (c *Conn) SetWriteDeadline(t time.Time) error { | |
return nil | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment