summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorShulhan <ms@kilabit.info>2023-06-29 21:20:39 +0700
committerShulhan <ms@kilabit.info>2023-07-01 17:24:30 +0700
commit90203c3603bfbf01b5517fa9da49e276a506b733 (patch)
treedf7399af33a48da65f364bd2f5c7cdc9c1134714
parent1570b3182b534c5a0a952e54531fff9769430930 (diff)
downloadpakakeh.go-90203c3603bfbf01b5517fa9da49e276a506b733.tar.xz
lib/net: changes the WaitRead/Event model on Poll
Previously, the Pool's WaitRead and WaitReadEVent methods return list of file descriptor (fd) and keeps the fd in the pool. In case we want to process the returned fd concurrently, by running it in different goroutine, the next call WaitRead may return the same fd if its goroutine not fast enought to read from fd. This changes fix this issue by removing list of fd from poll and set the fd flag to blocking mode again after returning it from WaitRead or WaitReadEvent. This changes also remove the ReregisterRead and ReregisterEvent methods since it is not applicable anymore.
-rw-r--r--lib/net/poll.go25
-rw-r--r--lib/net/poll_bsd.go79
-rw-r--r--lib/net/poll_linux.go102
3 files changed, 101 insertions, 105 deletions
diff --git a/lib/net/poll.go b/lib/net/poll.go
index 0f141885..cf7d1ba4 100644
--- a/lib/net/poll.go
+++ b/lib/net/poll.go
@@ -20,27 +20,18 @@ type Poll interface {
// RegisterRead add the file descriptor to read poll.
RegisterRead(fd int) (err error)
- // ReregisterEvent register the event back.
- // This method must be called back after calling WaitReadEvent when
- // PollEvent descriptor still being used.
- ReregisterEvent(pe PollEvent) error
-
- // ReregisterRead register the file descriptor back to events.
- // This method must be called back after calling WaitRead when fd
- // still being used.
- //
- // See https://idea.popcount.org/2017-02-20-epoll-is-fundamentally-broken-12/
- ReregisterRead(idx, fd int)
-
// UnregisterRead remove file descriptor from the poll.
UnregisterRead(fd int) (err error)
- // WaitRead wait for read event received and return list of file
- // descriptor that are ready for reading.
+ // WaitRead wait and return list of file descriptor (fd) that are
+ // ready for reading from the pool.
+ // The returned fd is detached from poll to allow concurrent
+ // processing of fd at the same time.
+ // Once the data has been read from the fd and its still need to be
+ // used, one need to put it back to poll using RegisterRead.
WaitRead() (fds []int, err error)
- // WaitReadEvents wait for read event received and return list of
- // PollEvent that contains the file descriptor and the underlying
- // OS specific event state.
+ // WaitReadEvents wait and return list of PollEvent that contains the
+ // file descriptor and the underlying OS specific event state.
WaitReadEvents() (events []PollEvent, err error)
}
diff --git a/lib/net/poll_bsd.go b/lib/net/poll_bsd.go
index f857e5d6..c2387c06 100644
--- a/lib/net/poll_bsd.go
+++ b/lib/net/poll_bsd.go
@@ -60,25 +60,6 @@ func (poll *kqueue) RegisterRead(fd int) (err error) {
return nil
}
-func (poll *kqueue) ReregisterEvent(event PollEvent) (err error) {
- var (
- logp = `ReregisterEvent`
- fd = int(event.Descriptor())
- )
- err = unix.SetNonblock(fd, true)
- if err != nil {
- return fmt.Errorf(`%s: %w`, logp, err)
- }
- return nil
-}
-
-func (poll *kqueue) ReregisterRead(idx, fd int) {
- var err = unix.SetNonblock(fd, true)
- if err != nil {
- log.Printf(`ReregisterRead: %s`, err)
- }
-}
-
func (poll *kqueue) UnregisterRead(fd int) (err error) {
kevent := unix.Kevent_t{}
@@ -98,7 +79,11 @@ func (poll *kqueue) UnregisterRead(fd int) (err error) {
func (poll *kqueue) WaitRead() (fds []int, err error) {
var (
- n int
+ logp = `WaitRead`
+
+ n int
+ x int
+ fd int
)
for n == 0 {
n, err = unix.Kevent(poll.read, nil, poll.events[:], nil)
@@ -110,10 +95,26 @@ func (poll *kqueue) WaitRead() (fds []int, err error) {
}
}
- for x := 0; x < n; x++ {
- if poll.events[x].Filter == unix.EVFILT_READ {
- fds = append(fds, int(poll.events[x].Ident))
+ for x = 0; x < n; x++ {
+ if poll.events[x].Filter != unix.EVFILT_READ {
+ continue
}
+
+ fd = int(poll.events[x].Ident)
+
+ err = poll.UnregisterRead(fd)
+ if err != nil {
+ log.Printf(`%s: %s`, logp, err)
+ continue
+ }
+
+ err = unix.SetNonblock(fd, false)
+ if err != nil {
+ log.Printf(`%s: %s`, logp, err)
+ continue
+ }
+
+ fds = append(fds, fd)
}
return fds, nil
@@ -123,8 +124,9 @@ func (poll *kqueue) WaitReadEvents() (events []PollEvent, err error) {
var (
logp = `WaitReadEvents`
- n int
- x int
+ n int
+ x int
+ fd int
)
for n == 0 {
@@ -138,12 +140,29 @@ func (poll *kqueue) WaitReadEvents() (events []PollEvent, err error) {
}
for x = 0; x < n; x++ {
- if poll.events[x].Filter == unix.EVFILT_READ {
- events = append(events, &pollEvent{
- fd: poll.events[x].Ident,
- event: poll.events[x],
- })
+ if poll.events[x].Filter != unix.EVFILT_READ {
+ continue
+ }
+
+ fd = int(poll.events[x].Ident)
+
+ err = poll.UnregisterRead(fd)
+ if err != nil {
+ log.Printf(`%s: %s`, logp, err)
+ continue
+ }
+
+ err = unix.SetNonblock(fd, false)
+ if err != nil {
+ log.Printf(`%s: %s`, logp, err)
+ continue
+ }
+
+ var event = &pollEvent{
+ fd: poll.events[x].Ident,
+ event: poll.events[x],
}
+ events = append(events, event)
}
return events, nil
diff --git a/lib/net/poll_linux.go b/lib/net/poll_linux.go
index 2185b0f3..9d85acdc 100644
--- a/lib/net/poll_linux.go
+++ b/lib/net/poll_linux.go
@@ -57,56 +57,6 @@ func (poll *epoll) RegisterRead(fd int) (err error) {
return nil
}
-func (poll *epoll) ReregisterEvent(event PollEvent) (err error) {
- var (
- logp = `ReregisterEvent`
- fd = int(event.Descriptor())
- obj = event.Event()
-
- epollEvent unix.EpollEvent
- ok bool
- )
-
- epollEvent, ok = obj.(unix.EpollEvent)
- if !ok {
- return fmt.Errorf(`%s: expecting unix.EpollEvent, got %T`, logp, obj)
- }
-
- epollEvent.Events = unix.EPOLLIN | unix.EPOLLONESHOT
-
- err = unix.SetNonblock(fd, true)
- if err != nil {
- return fmt.Errorf(`%s: %w`, logp, err)
- }
-
- err = unix.EpollCtl(poll.read, unix.EPOLL_CTL_MOD, fd, &epollEvent)
- if err != nil {
- return fmt.Errorf(`%s: %w`, logp, err)
- }
-
- return nil
-}
-
-func (poll *epoll) ReregisterRead(idx, fd int) {
- var err error
-
- poll.events[idx].Events = unix.EPOLLIN | unix.EPOLLONESHOT
-
- err = unix.SetNonblock(fd, true)
- if err != nil {
- log.Printf("epoll.ReregisterRead: %s", err.Error())
- }
-
- err = unix.EpollCtl(poll.read, unix.EPOLL_CTL_MOD, fd, &poll.events[idx])
- if err != nil {
- log.Println("epoll.RegisterRead: unix.EpollCtl: " + err.Error())
- err = poll.UnregisterRead(fd)
- if err != nil {
- log.Println("epoll.RegisterRead: " + err.Error())
- }
- }
-}
-
func (poll *epoll) UnregisterRead(fd int) (err error) {
err = unix.EpollCtl(poll.read, unix.EPOLL_CTL_DEL, fd, nil)
if err != nil {
@@ -117,20 +67,40 @@ func (poll *epoll) UnregisterRead(fd int) (err error) {
}
func (poll *epoll) WaitRead() (fds []int, err error) {
- var n int
+ var (
+ logp = `WaitRead`
+
+ n int
+ x int
+ fd int
+ )
for {
n, err = unix.EpollWait(poll.read, poll.events[:], -1)
if err != nil {
if err == unix.EINTR {
continue
}
- return nil, fmt.Errorf("epoll.WaitRead: %s", err.Error())
+ return nil, fmt.Errorf(`%s: %w`, logp, err)
}
break
}
- for x := 0; x < n; x++ {
- fds = append(fds, int(poll.events[x].Fd))
+ for x = 0; x < n; x++ {
+ fd = int(poll.events[x].Fd)
+
+ err = poll.UnregisterRead(fd)
+ if err != nil {
+ log.Printf(`%s: %s`, logp, err)
+ continue
+ }
+
+ err = unix.SetNonblock(fd, false)
+ if err != nil {
+ log.Printf(`%s: %s`, logp, err)
+ continue
+ }
+
+ fds = append(fds, fd)
}
return fds, nil
@@ -140,8 +110,9 @@ func (poll *epoll) WaitReadEvents() (events []PollEvent, err error) {
var (
logp = `WaitReadEvents`
- n int
- x int
+ n int
+ x int
+ fd int
)
for n == 0 {
@@ -156,10 +127,25 @@ func (poll *epoll) WaitReadEvents() (events []PollEvent, err error) {
}
for x = 0; x < n; x++ {
- events = append(events, &pollEvent{
+ fd = int(poll.events[x].Fd)
+
+ err = poll.UnregisterRead(fd)
+ if err != nil {
+ log.Printf(`%s: %s`, logp, err)
+ continue
+ }
+
+ err = unix.SetNonblock(fd, false)
+ if err != nil {
+ log.Printf(`%s: %s`, logp, err)
+ continue
+ }
+
+ var event = &pollEvent{
fd: poll.events[x].Fd,
event: poll.events[x],
- })
+ }
+ events = append(events, event)
}
return events, nil