Skip to content

Instantly share code, notes, and snippets.

@godzie44
Last active March 22, 2022 02:24
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save godzie44/190406f64d788ff901f8469aaca61bb6 to your computer and use it in GitHub Desktop.
Save godzie44/190406f64d788ff901f8469aaca61bb6 to your computer and use it in GitHub Desktop.
Example of io_uring reactor and echo-server (using go-uring lib)
package main
import (
"context"
"errors"
"flag"
"fmt"
"github.com/godzie44/go-uring/uring"
"io"
"log"
"net"
"runtime"
"strconv"
"sync"
"sync/atomic"
"syscall"
"time"
)
const MaxConns = 4096
const MaxMsgLen = 2048
func initBuffs() [][]byte {
buffs := make([][]byte, MaxConns)
for i := range buffs {
buffs[i] = make([]byte, MaxMsgLen)
}
return buffs
}
var buffs = initBuffs()
func main() {
flag.Parse()
portStr := flag.Arg(0)
port, _ := strconv.Atoi(portStr)
var listener net.Listener
var err error
ring, err := uring.New(uring.MaxEntries >> 3)
rea := New(ring)
go rea.Run(context.Background())
listener, err = NewListener(fmt.Sprintf("0.0.0.0:%d", port), rea)
checkErr(err)
log.Printf("start naive echo-server, mode: defaut net/http, port: %d", port)
runServer(listener)
}
func runServer(listener net.Listener) {
for {
conn, err := listener.Accept()
checkErr(err)
go handleConn(conn)
}
}
func handleConn(conn net.Conn) {
fd := conn.(*Conn).Fd
buff := buffs[fd]
for {
n, err := conn.Read(buff)
if err == io.EOF || n == 0 {
checkErr(conn.Close())
return
}
checkErr(err)
_, err = conn.Write(buff[:n])
checkErr(err)
}
}
func checkErr(err error) {
if err != nil {
log.Fatal(err)
}
}
type Callback func(event uring.CQEvent)
type Reactor struct {
ring *uring.Ring
currentNonce uint64
callbacks map[uint64]Callback
callbacksLock sync.Mutex
queueSQELock sync.Mutex
submitSignal chan struct{}
}
//New создаем новый reactor.
func New(ring *uring.Ring) *Reactor {
return &Reactor{
ring: ring,
submitSignal: make(chan struct{}),
callbacks: map[uint64]Callback{},
}
}
func (r *Reactor) Run(ctx context.Context) {
wg := &sync.WaitGroup{}
//запускаем два компонента реактора
//consumer - обрабатывает новые CQE
//producer - отправляет новые SQE на обработку
wg.Add(2)
go func() {
defer wg.Done()
r.runConsumer(ctx)
}()
go func() {
defer wg.Done()
r.runPublisher(ctx)
}()
wg.Wait()
}
func (r *Reactor) runPublisher(ctx context.Context) {
defer close(r.submitSignal)
for {
select {
//при поступлении сигнала самбитим новые SQE
case <-r.submitSignal:
r.queueSQELock.Lock()
_, err := r.ring.Submit()
r.queueSQELock.Unlock()
if err != nil {
log.Println("io_uring submit", err)
}
case <-ctx.Done():
return
}
}
}
func (r *Reactor) runConsumer(ctx context.Context) {
cqeBuff := make([]*uring.CQEvent, 512)
// начинаем цикл обработки результатов операций (CQE)
for {
// командуем Publisher'у обработать новые SQE
r.submitSignal <- struct{}{}
// ждем хотябы одну завершенную операцию
_, err := r.ring.WaitCQEventsWithTimeout(1, time.Millisecond*1)
if errors.Is(err, syscall.EAGAIN) || errors.Is(err, syscall.EINTR) || errors.Is(err, syscall.ETIME) {
runtime.Gosched()
goto CheckCtxAndContinue
}
if err != nil {
log.Println("io_uring wait", err)
goto CheckCtxAndContinue
}
// складываем все CQE в буффер и обрабатываем их
for n := r.ring.PeekCQEventBatch(cqeBuff); n > 0; n = r.ring.PeekCQEventBatch(cqeBuff) {
for i := 0; i < n; i++ {
cqe := cqeBuff[i]
nonce := cqe.UserData
//находим соответствующий callback
r.callbacksLock.Lock()
cb := r.callbacks[nonce]
delete(r.callbacks, nonce)
r.callbacksLock.Unlock()
cb(uring.CQEvent{
UserData: cqe.UserData,
Res: cqe.Res,
Flags: cqe.Flags,
})
}
//сообщаем io_uring о том что мы просмотрели n CQE
r.ring.AdvanceCQ(uint32(n))
}
//проверяем не пора ли завершить цикл обработки CQE
CheckCtxAndContinue:
select {
case <-ctx.Done():
return
default:
continue
}
}
}
//Queue добавляет операцию в SQ.
func (r *Reactor) Queue(op uring.Operation, cb Callback) (uint64, error) {
//генерируем уникальное значение UserData
nonce := r.nextNonce()
r.queueSQELock.Lock()
defer r.queueSQELock.Unlock()
//помещаем операцию в SQ
err := r.ring.QueueSQE(op, 0, nonce)
if err == nil {
r.callbacksLock.Lock()
r.callbacks[nonce] = cb
r.callbacksLock.Unlock()
}
return nonce, err
}
func (r *Reactor) nextNonce() uint64 {
return atomic.AddUint64(&r.currentNonce, 1)
}
type Listener struct {
fd int
reactor *Reactor
acceptChan chan uring.CQEvent
addr net.Addr
}
func NewListener(addr string, reactor *Reactor) (*Listener, error) {
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
return nil, err
}
sockFd, err := serverSocket(tcpAddr)
if err != nil {
return nil, err
}
l := &Listener{
fd: sockFd,
addr: tcpAddr,
reactor: reactor,
acceptChan: make(chan uring.CQEvent),
}
return l, nil
}
func serverSocket(tcpAddr *net.TCPAddr) (int, error) {
sockFd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_STREAM|syscall.SOCK_CLOEXEC, 0)
if err != nil {
return 0, err
}
//_ = setDefaultListenerSockopts(sockFd)
addr := syscall.SockaddrInet4{
Port: tcpAddr.Port,
}
copy(addr.Addr[:], tcpAddr.IP.To4())
_ = syscall.Bind(sockFd, &addr)
_ = syscall.Listen(sockFd, syscall.SOMAXCONN)
return sockFd, nil
}
func (l *Listener) Accept() (net.Conn, error) {
//помещаем accept операцию в реактор
op := uring.Accept(uintptr(l.fd), 0)
l.reactor.Queue(op, func(event uring.CQEvent) {
l.acceptChan <- event
})
// ждем появление подключения
cqe := <-l.acceptChan
if err := cqe.Error(); err != nil {
return nil, err
}
fd := int(cqe.Res)
rAddr, _ := op.Addr()
tc := newConn(fd, l.addr, rAddr, l.reactor)
return tc, nil
}
func (l *Listener) Close() (err error) {
err = syscall.Close(l.fd)
return err
}
func (l *Listener) Addr() net.Addr {
return l.addr
}
type Conn struct {
Fd int
lAddr, rAddr net.Addr
reactor *Reactor
readChan, writeChan chan uring.CQEvent
readLock, writeLock sync.Mutex
}
func newConn(fd int, lAddr, rAddr net.Addr, r *Reactor) *Conn {
return &Conn{
lAddr: lAddr,
rAddr: rAddr,
Fd: fd,
reactor: r,
readChan: make(chan uring.CQEvent),
writeChan: make(chan uring.CQEvent),
}
}
func (c *Conn) Read(b []byte) (n int, err error) {
c.readLock.Lock()
defer c.readLock.Unlock()
//помещаем Recv операцию в реактор
op := uring.Recv(uintptr(c.Fd), b, 0)
c.reactor.Queue(op, func(event uring.CQEvent) {
c.readChan <- event
})
//ждем завершения чтения из сокета
cqe := <-c.readChan
if err = cqe.Error(); err != nil {
return 0, &net.OpError{Op: "read", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: err}
}
if cqe.Res == 0 {
err = &net.OpError{Op: "read", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: io.EOF}
}
runtime.KeepAlive(b)
return int(cqe.Res), err
}
func (c *Conn) Write(b []byte) (n int, err error) {
c.writeLock.Lock()
defer c.writeLock.Unlock()
//помещаем Send операцию в реактор
op := uring.Send(uintptr(c.Fd), b, 0)
c.reactor.Queue(op, func(event uring.CQEvent) {
c.writeChan <- event
})
//ждем завершения записи в сокет
cqe := <-c.writeChan
if err = cqe.Error(); err != nil {
return 0, &net.OpError{Op: "write", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: err}
}
if cqe.Res == 0 {
err = &net.OpError{Op: "write", Net: "tcp", Source: c.lAddr, Addr: c.rAddr, Err: io.ErrUnexpectedEOF}
}
runtime.KeepAlive(b)
return int(cqe.Res), err
}
func (c *Conn) Close() error {
return syscall.Close(c.Fd)
}
func (c *Conn) LocalAddr() net.Addr {
return c.lAddr
}
func (c *Conn) RemoteAddr() net.Addr {
return c.rAddr
}
func (c *Conn) SetDeadline(t time.Time) error {
return nil
}
func (c *Conn) SetReadDeadline(t time.Time) error {
return nil
}
func (c *Conn) SetWriteDeadline(t time.Time) error {
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment