Created
February 10, 2022 20:46
-
-
Save ii64/d13eed1a02b1a5220cfda7f74e38f7bc to your computer and use it in GitHub Desktop.
gouring evloop implemetation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"io" | |
"log" | |
"net" | |
"net/http" | |
"runtime" | |
"sync/atomic" | |
"syscall" | |
"unsafe" | |
_ "unsafe" | |
"github.com/ii64/gouring" | |
"github.com/ii64/gouring/queue" | |
xsyscall "github.com/ii64/gouring/syscall" | |
) | |
type evloopAction int | |
const ( | |
ev_continue evloopAction = 0 | |
ev_readmore evloopAction = 1 << 0 | |
ev_closeconn evloopAction = 1 << 1 | |
) | |
type evloopHandler struct { | |
OnConnect func(c *conn, fd int) (evloopAction, error) | |
OnRead func(c *conn, bytesReaded int) (evloopAction, error) | |
OnWrite func(c *conn, bytesWritten int) (evloopAction, error) | |
} | |
func main() { | |
go func() { | |
err := http.ListenAndServe(":8000", nil) | |
if err != nil { | |
panic(err) | |
} | |
}() | |
ring, err := gouring.New(128, nil, gouring.SQThread(6, 60000)) | |
if err != nil { | |
panic(err) | |
} | |
defer ring.Close() | |
q := queue.New(ring) | |
defer q.Close() | |
lis, err := net.Listen("tcp", ":7777") | |
if err != nil { | |
panic(err) | |
} | |
defer lis.Close() | |
l, ok := lis.(*net.TCPListener) | |
if !ok { | |
panic("not tcp") | |
} | |
f, err := l.File() | |
if err != nil { | |
panic(err) | |
} | |
defer f.Close() | |
sfd := f.Fd() | |
fmt.Printf("sfd: %d\n", sfd) | |
// --- | |
Accept := func() { | |
sqe := q.GetSQEntry() | |
ue := newUevent(sqe.Opcode) | |
xsyscall.Accept(sqe, int(sfd), ue.c.raw) | |
ue.op = sqe.Opcode | |
sqe.UserData = uint64(uintptr(unsafe.Pointer(ue))) | |
} | |
Read := func(c *conn) { | |
sqe := q.GetSQEntry() | |
xsyscall.Read(sqe, c.fd, c.b) | |
ue := newUeventConn(sqe.Opcode, c) | |
sqe.UserData = uint64(uintptr(unsafe.Pointer(ue))) | |
} | |
Write := func(c *conn, b []byte) { | |
sqe := q.GetSQEntry() | |
xsyscall.Write(sqe, c.fd, b) | |
ue := newUeventConn(sqe.Opcode, c) | |
sqe.UserData = uint64(uintptr(unsafe.Pointer(ue))) | |
} | |
Close := func(c *conn) { | |
sqe := q.GetSQEntry() | |
xsyscall.Close(sqe, c.fd) | |
ue := newUeventConn(sqe.Opcode, c) | |
sqe.UserData = uint64(uintptr(unsafe.Pointer(ue))) | |
} | |
// --- | |
evh := evloopHandler{ | |
OnConnect: func(c *conn, fd int) (evloopAction, error) { | |
// fmt.Printf("onconnect: %+#v\n", c.sa) | |
return ev_continue, nil | |
}, | |
OnRead: func(c *conn, bytesReaded int) (evloopAction, error) { | |
fmt.Printf("onread: %+#v\n", c.b[:bytesReaded]) | |
Write(c, []byte("HTTP/1.1 200 OK\r\nContent-Length: 3\r\n\r\nyea")) | |
return ev_continue, nil | |
}, | |
OnWrite: func(c *conn, bytesWritten int) (evloopAction, error) { | |
// fmt.Printf("onwrite: %+#v\n", c) | |
return ev_continue, nil | |
}, | |
} | |
// --- | |
Accept() | |
q.Submit() | |
q.RunPoll(true, 1, func(cqe *gouring.CQEntry) (err error) { | |
// fmt.Printf("cqe: %+#v\n", cqe) | |
ue := ud2uevent(cqe.UserData) // unpack uring event | |
if ue == nil { | |
return nil | |
} | |
conn := ue.c // conn | |
if conn == nil { | |
return nil | |
} | |
if conn.state&C_CLOSED == C_CLOSED { // closed conn, ignore | |
return nil | |
} | |
err = func() (err error) { | |
conn.inuse() | |
defer conn.done() | |
// detached conn | |
if conn.state&C_DETACH == C_DETACH { | |
conn.state |= C_CLOSED // add closed | |
return nil | |
} | |
if ue.op == gouring.IORING_OP_ACCEPT { // conn.state&C_RESV == C_RESV { // Conn RESV got ESTAB | |
Accept() | |
conn.fd = int(cqe.Res) | |
conn.state = C_ESTAB | |
conn.sa, _ = anyToSockaddr(conn.raw) | |
act, err := evh.OnConnect(conn, conn.fd) | |
if err != nil { | |
conn.state |= C_DETACH | |
Close(conn) | |
log.Printf("onconnect: err: %+#v\n", err) | |
return nil | |
} | |
switch act { | |
case ev_continue: | |
Read(conn) | |
case ev_closeconn: | |
conn.state |= C_DETACH | |
Close(conn) | |
} | |
} else if ue.op == gouring.IORING_OP_READ { //conn.state&C_ESTAB == C_ESTAB { // Conn ESTAB | |
var act evloopAction | |
var err error | |
if cqe.Res > 0 { | |
act, err = evh.OnRead(conn, int(cqe.Res)) | |
} else if cqe.Res == 0 { | |
err = io.EOF | |
} else { | |
err = syscall.Errno(cqe.Res) | |
} | |
if err != nil { | |
conn.state |= C_DETACH | |
Close(conn) | |
log.Printf("onread: err: %+#v\n", err) | |
return nil | |
} | |
switch act { | |
case ev_closeconn: | |
conn.state |= C_DETACH | |
Close(conn) | |
case ev_readmore, ev_continue: | |
Read(conn) | |
} | |
} else if ue.op == gouring.IORING_OP_WRITE { | |
act, err := evh.OnWrite(conn, int(cqe.Res)) | |
if err != nil { | |
conn.state |= C_DETACH | |
Close(conn) | |
log.Printf("onwrite: err: %+#v\n", err) | |
return nil | |
} | |
switch act { | |
case ev_closeconn: | |
conn.state |= C_DETACH | |
Close(conn) | |
} | |
} | |
return nil | |
}() | |
q.Submit() | |
return nil | |
}) | |
} | |
//go:linkname anyToSockaddr syscall.anyToSockaddr | |
func anyToSockaddr(rsa *syscall.RawSockaddrAny) (syscall.Sockaddr, error) | |
type ConnState int | |
const ( | |
C_RESV ConnState = 1 << 0 | |
C_ESTAB ConnState = 1 << 1 | |
C_DETACH ConnState = 1 << 2 | |
C_CLOSED ConnState = 1 << 3 | |
) | |
type conn struct { | |
fd int | |
raw *syscall.RawSockaddrAny | |
b []byte | |
state ConnState | |
sa syscall.Sockaddr | |
b_inuse int32 | |
} | |
type uevent struct { | |
op gouring.UringOpcode | |
c *conn | |
} | |
func newUevent(op gouring.UringOpcode) *uevent { | |
return &uevent{ | |
op: op, | |
c: newConn(), | |
} | |
} | |
func newUeventConn(op gouring.UringOpcode, c *conn) *uevent { | |
return &uevent{ | |
op: op, | |
c: c, | |
} | |
} | |
func ud2uevent(ud uint64) *uevent { | |
if ud < 0 { | |
return nil | |
} | |
return (*uevent)(unsafe.Pointer(uintptr(ud))) | |
} | |
func newConn() *conn { | |
return &conn{ | |
fd: -1, | |
raw: &syscall.RawSockaddrAny{}, | |
b: make([]byte, 2<<11), | |
state: C_RESV, | |
b_inuse: 0, | |
} | |
} | |
func ud2conn(ud uint64) *conn { | |
if ud < 0 { | |
return nil | |
} | |
return (*conn)(unsafe.Pointer(uintptr(ud))) | |
} | |
func (c *conn) inuse() { | |
for !atomic.CompareAndSwapInt32(&c.b_inuse, 0, 1) { | |
if atomic.LoadInt32(&c.b_inuse) == 1 { | |
return | |
} | |
runtime.Gosched() | |
} | |
} | |
func (c *conn) done() { | |
atomic.StoreInt32(&c.b_inuse, 1) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment