diff options
| author | Shulhan <m.shulhan@gmail.com> | 2019-11-01 17:42:29 +0700 |
|---|---|---|
| committer | Shulhan <m.shulhan@gmail.com> | 2019-11-01 17:42:29 +0700 |
| commit | 090518aff64907a8df0d09e4e909cbd6f4f1f985 (patch) | |
| tree | 8f87ed42e99aa01b3ed96a5199885de7a5ce84ae | |
| parent | a2157aeacc23fcc2581681ffaed2609ae0365c0b (diff) | |
| download | pakakeh.go-090518aff64907a8df0d09e4e909cbd6f4f1f985.tar.xz | |
net: implement network polling using epoll and kqueue
This is the first implementation of (almost) generic polling.
The Poll currently only support the Read events from now, because the
most common use case is for handling multiple socket without using
goroutines.
| -rw-r--r-- | lib/net/poll.go | 39 | ||||
| -rw-r--r-- | lib/net/poll_bsd.go | 94 | ||||
| -rw-r--r-- | lib/net/poll_linux.go | 83 |
3 files changed, 216 insertions, 0 deletions
diff --git a/lib/net/poll.go b/lib/net/poll.go new file mode 100644 index 00000000..ec5369b2 --- /dev/null +++ b/lib/net/poll.go @@ -0,0 +1,39 @@ +package net + +const ( + maxQueue = 128 +) + +// +// Poll represent an interface to network polling. +// +type Poll interface { + // + // Close the poll. + // + Close() + + // + // RegisterRead add the file descriptor to read poll. + // + RegisterRead(fd int) (err error) + + // + // ReregisterRead register the file descriptor back to events. + // This method must be used on Linux after calling WaitRead. + // + // See https://idea.popcount.org/2017-02-20-epoll-is-fundamentally-broken-12/ + // + ReregisterRead(idx, fd int) + + // + // UnregisterRead remove file descriptor from the poll. + // + UnregisterRead(fd int) (err error) + + // + // WaitRead wait for read event received and return list of file + // descriptor that are ready for reading. + // + WaitRead() (fds []int, err error) +} diff --git a/lib/net/poll_bsd.go b/lib/net/poll_bsd.go new file mode 100644 index 00000000..7e4ac1e7 --- /dev/null +++ b/lib/net/poll_bsd.go @@ -0,0 +1,94 @@ +// +build darwin dragonfly freebsd netbsd openbsd + +package net + +import ( + "fmt" + + "golang.org/x/sys/unix" +) + +type kqueue struct { + events [maxQueue]unix.Kevent_t + read int +} + +// +// NewPoll create and initialize new poll using epoll for Linux system or +// kqueue for BSD or Darwin (macOS). +// +func NewPoll() (Poll, error) { + var err error + + kq := &kqueue{} + + kq.read, err = unix.Kqueue() + if err != nil { + return nil, fmt.Errorf("kqueue.NewPoll: %s", err.Error()) + } + + return kq, nil +} + +func (poll *kqueue) Close() { + // no-op +} + +func (poll *kqueue) RegisterRead(fd int) (err error) { + kevent := unix.Kevent_t{} + + unix.SetKevent(&kevent, fd, unix.EVFILT_READ, unix.EV_ADD) + + err = unix.SetNonblock(fd, true) + if err != nil { + return fmt.Errorf("kqueue.RegisterRead: %s", err.Error()) + } + + changes := []unix.Kevent_t{ + kevent, + } + + _, err = unix.Kevent(poll.read, changes, nil, nil) + if err != nil { + return fmt.Errorf("kqueue.RegisterRead: %s", err.Error()) + } + + return nil +} + +func (poll *kqueue) ReregisterRead(idx, fd int) { + // no-op +} + +func (poll *kqueue) UnregisterRead(fd int) (err error) { + kevent := unix.Kevent_t{} + + unix.SetKevent(&kevent, fd, unix.EVFILT_READ, unix.EV_DELETE) + + changes := []unix.Kevent_t{ + kevent, + } + + _, err = unix.Kevent(poll.read, changes, nil, nil) + if err != nil { + return fmt.Errorf("kqueue.UnregisterRead: %s", err.Error()) + } + + return nil +} + +func (poll *kqueue) WaitRead() (fds []int, err error) { + n, err := unix.Kevent(poll.read, nil, poll.events[:], nil) + if err != nil { + return nil, fmt.Errorf("kqueue.WaitRead: %s", err.Error()) + } + + for x := 0; x < n; x++ { + switch poll.events[x].Filter { + case unix.EVFILT_READ: + fds = append(fds, int(poll.events[x].Ident)) + } + } + + return fds, nil +} diff --git a/lib/net/poll_linux.go b/lib/net/poll_linux.go new file mode 100644 index 00000000..cabfde39 --- /dev/null +++ b/lib/net/poll_linux.go @@ -0,0 +1,83 @@ +// +build linux + +import ( + "fmt" + "log" + + "golang.org/x/sys/unix" +) + +type epoll struct { + events [maxQueue]unix.EpollEvent + read int +} + +// +// NewPoll create and initialize new poll using epoll for Linux system or +// kqueue for BSD or Darwin (macOS). +// +func NewPoll() (poll Poll, err error) { + poll = &epoll{} + + poll.read, err = unix.EpollCreate1(0) + if err != nil { + return fmt.Errorf("epoll.NewPoll: %s", err.Error()) + } + + return poll, nil +} + +func (poll *epoll) Close() { + unix.Close(read) +} + +func (poll *epoll) RegisterRead(fd int) (err error) { + event := unix.EpollEvent{ + Events: unix.EPOLLIN | unix.EPOLLONESHOT, + Fd: int32(fd), + } + + err = unix.SetNonblock(fd, true) + if err != nil { + return fmt.Errorf("epoll.RegisterRead: %s", err.Error()) + } + + err = unix.EpollCtl(poll.read, unix.EPOLL_CTL_ADD, fd, &event) + if err != nil { + return fmt.Errorf("epoll.RegisterRead: %s", err.Error()) + } + + return nil +} + +func (poll *epoll) ReregisterRead(idx, fd int) { + poll.events[idx].Events = unix.EPOLLIN | unix.EPOLLONESHOT + + err := unix.EpollCtl(serv.read, unix.EPOLL_CTL_MOD, fd, &poll.events[idx]) + if err != nil { + log.Println("epoll.RegisterRead: unix.EpollCtl: " + err.Error()) + poll.UnregisterRead(fd) + } +} + +func (poll *epoll) UnregisterRead(fd int) (err error) { + err := unix.EpollCtl(serv.epollRead, unix.EPOLL_CTL_DEL, fd, nil) + if err != nil { + return fmt.Errorf("epoll.UnregisterRead: %s", err.Error()) + } + + return nil +} + +func (poll *epoll) WaitRead() (fds []int, err error) { + readEvents, err := unix.EpollWait(poll.read, poll.events[:], -1) + if err != nil { + return nil, fmt.Errorf("epoll.WaitRead: %s", err.Error()) + } + + for x := 0; x < len(readEvents); x++ { + fds = append(fds, int(readEvents[x].Fd)) + } + + return fds, nil +} |
