Skip to content

Instantly share code, notes, and snippets.

@ii64
Created February 10, 2022 20:46
Show Gist options
  • Save ii64/d13eed1a02b1a5220cfda7f74e38f7bc to your computer and use it in GitHub Desktop.
Save ii64/d13eed1a02b1a5220cfda7f74e38f7bc to your computer and use it in GitHub Desktop.
gouring evloop implemetation
package main
import (
"fmt"
"io"
"log"
"net"
"net/http"
"runtime"
"sync/atomic"
"syscall"
"unsafe"
_ "unsafe"
"github.com/ii64/gouring"
"github.com/ii64/gouring/queue"
xsyscall "github.com/ii64/gouring/syscall"
)
type evloopAction int
const (
ev_continue evloopAction = 0
ev_readmore evloopAction = 1 << 0
ev_closeconn evloopAction = 1 << 1
)
type evloopHandler struct {
OnConnect func(c *conn, fd int) (evloopAction, error)
OnRead func(c *conn, bytesReaded int) (evloopAction, error)
OnWrite func(c *conn, bytesWritten int) (evloopAction, error)
}
func main() {
go func() {
err := http.ListenAndServe(":8000", nil)
if err != nil {
panic(err)
}
}()
ring, err := gouring.New(128, nil, gouring.SQThread(6, 60000))
if err != nil {
panic(err)
}
defer ring.Close()
q := queue.New(ring)
defer q.Close()
lis, err := net.Listen("tcp", ":7777")
if err != nil {
panic(err)
}
defer lis.Close()
l, ok := lis.(*net.TCPListener)
if !ok {
panic("not tcp")
}
f, err := l.File()
if err != nil {
panic(err)
}
defer f.Close()
sfd := f.Fd()
fmt.Printf("sfd: %d\n", sfd)
// ---
Accept := func() {
sqe := q.GetSQEntry()
ue := newUevent(sqe.Opcode)
xsyscall.Accept(sqe, int(sfd), ue.c.raw)
ue.op = sqe.Opcode
sqe.UserData = uint64(uintptr(unsafe.Pointer(ue)))
}
Read := func(c *conn) {
sqe := q.GetSQEntry()
xsyscall.Read(sqe, c.fd, c.b)
ue := newUeventConn(sqe.Opcode, c)
sqe.UserData = uint64(uintptr(unsafe.Pointer(ue)))
}
Write := func(c *conn, b []byte) {
sqe := q.GetSQEntry()
xsyscall.Write(sqe, c.fd, b)
ue := newUeventConn(sqe.Opcode, c)
sqe.UserData = uint64(uintptr(unsafe.Pointer(ue)))
}
Close := func(c *conn) {
sqe := q.GetSQEntry()
xsyscall.Close(sqe, c.fd)
ue := newUeventConn(sqe.Opcode, c)
sqe.UserData = uint64(uintptr(unsafe.Pointer(ue)))
}
// ---
evh := evloopHandler{
OnConnect: func(c *conn, fd int) (evloopAction, error) {
// fmt.Printf("onconnect: %+#v\n", c.sa)
return ev_continue, nil
},
OnRead: func(c *conn, bytesReaded int) (evloopAction, error) {
fmt.Printf("onread: %+#v\n", c.b[:bytesReaded])
Write(c, []byte("HTTP/1.1 200 OK\r\nContent-Length: 3\r\n\r\nyea"))
return ev_continue, nil
},
OnWrite: func(c *conn, bytesWritten int) (evloopAction, error) {
// fmt.Printf("onwrite: %+#v\n", c)
return ev_continue, nil
},
}
// ---
Accept()
q.Submit()
q.RunPoll(true, 1, func(cqe *gouring.CQEntry) (err error) {
// fmt.Printf("cqe: %+#v\n", cqe)
ue := ud2uevent(cqe.UserData) // unpack uring event
if ue == nil {
return nil
}
conn := ue.c // conn
if conn == nil {
return nil
}
if conn.state&C_CLOSED == C_CLOSED { // closed conn, ignore
return nil
}
err = func() (err error) {
conn.inuse()
defer conn.done()
// detached conn
if conn.state&C_DETACH == C_DETACH {
conn.state |= C_CLOSED // add closed
return nil
}
if ue.op == gouring.IORING_OP_ACCEPT { // conn.state&C_RESV == C_RESV { // Conn RESV got ESTAB
Accept()
conn.fd = int(cqe.Res)
conn.state = C_ESTAB
conn.sa, _ = anyToSockaddr(conn.raw)
act, err := evh.OnConnect(conn, conn.fd)
if err != nil {
conn.state |= C_DETACH
Close(conn)
log.Printf("onconnect: err: %+#v\n", err)
return nil
}
switch act {
case ev_continue:
Read(conn)
case ev_closeconn:
conn.state |= C_DETACH
Close(conn)
}
} else if ue.op == gouring.IORING_OP_READ { //conn.state&C_ESTAB == C_ESTAB { // Conn ESTAB
var act evloopAction
var err error
if cqe.Res > 0 {
act, err = evh.OnRead(conn, int(cqe.Res))
} else if cqe.Res == 0 {
err = io.EOF
} else {
err = syscall.Errno(cqe.Res)
}
if err != nil {
conn.state |= C_DETACH
Close(conn)
log.Printf("onread: err: %+#v\n", err)
return nil
}
switch act {
case ev_closeconn:
conn.state |= C_DETACH
Close(conn)
case ev_readmore, ev_continue:
Read(conn)
}
} else if ue.op == gouring.IORING_OP_WRITE {
act, err := evh.OnWrite(conn, int(cqe.Res))
if err != nil {
conn.state |= C_DETACH
Close(conn)
log.Printf("onwrite: err: %+#v\n", err)
return nil
}
switch act {
case ev_closeconn:
conn.state |= C_DETACH
Close(conn)
}
}
return nil
}()
q.Submit()
return nil
})
}
//go:linkname anyToSockaddr syscall.anyToSockaddr
func anyToSockaddr(rsa *syscall.RawSockaddrAny) (syscall.Sockaddr, error)
type ConnState int
const (
C_RESV ConnState = 1 << 0
C_ESTAB ConnState = 1 << 1
C_DETACH ConnState = 1 << 2
C_CLOSED ConnState = 1 << 3
)
type conn struct {
fd int
raw *syscall.RawSockaddrAny
b []byte
state ConnState
sa syscall.Sockaddr
b_inuse int32
}
type uevent struct {
op gouring.UringOpcode
c *conn
}
func newUevent(op gouring.UringOpcode) *uevent {
return &uevent{
op: op,
c: newConn(),
}
}
func newUeventConn(op gouring.UringOpcode, c *conn) *uevent {
return &uevent{
op: op,
c: c,
}
}
func ud2uevent(ud uint64) *uevent {
if ud < 0 {
return nil
}
return (*uevent)(unsafe.Pointer(uintptr(ud)))
}
func newConn() *conn {
return &conn{
fd: -1,
raw: &syscall.RawSockaddrAny{},
b: make([]byte, 2<<11),
state: C_RESV,
b_inuse: 0,
}
}
func ud2conn(ud uint64) *conn {
if ud < 0 {
return nil
}
return (*conn)(unsafe.Pointer(uintptr(ud)))
}
func (c *conn) inuse() {
for !atomic.CompareAndSwapInt32(&c.b_inuse, 0, 1) {
if atomic.LoadInt32(&c.b_inuse) == 1 {
return
}
runtime.Gosched()
}
}
func (c *conn) done() {
atomic.StoreInt32(&c.b_inuse, 1)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment