Skip to content

Commit

Permalink
Merge pull request #625 from panjf2000/dev
Browse files Browse the repository at this point in the history
patch: v2.5.7
  • Loading branch information
panjf2000 authored Jul 3, 2024
2 parents f28ea30 c7fa145 commit 0feaabb
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 62 deletions.
2 changes: 1 addition & 1 deletion client_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 205,7 @@ func (cli *Client) EnrollContext(nc net.Conn, ctx interface{}) (gc Conn, err err
c := newUDPConn(cli.el, nil, nc.LocalAddr(), nc.RemoteAddr())
c.SetContext(ctx)
c.rawConn = nc
cli.el.ch <- &openConn{c: c, isDatagram: true, cb: func() { close(connOpened) }}
cli.el.ch <- &openConn{c: c, cb: func() { close(connOpened) }}
go func(uc net.Conn, el *eventloop) {
var buffer [0x10000]byte
for {
Expand Down
1 change: 0 additions & 1 deletion connection_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 45,6 @@ type udpConn struct {
type openConn struct {
c *conn
cb func()
isDatagram bool
}

type conn struct {
Expand Down
61 changes: 21 additions & 40 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,28 226,13 @@ loop:
return nil
}

func (el *eventloop) close(c *conn, err error) (rerr error) {
if addr := c.localAddr; addr != nil && strings.HasPrefix(c.localAddr.Network(), "udp") {
rerr = el.poller.Delete(c.fd)
if _, ok := el.listeners[c.fd]; !ok {
rerr = unix.Close(c.fd)
el.connections.delConn(c)
}
if el.eventHandler.OnClose(c, err) == Shutdown {
return errorx.ErrEngineShutdown
}
c.release()
return
}

func (el *eventloop) close(c *conn, err error) error {
if !c.opened || el.connections.getConn(c.fd) == nil {
return // ignore stale connections
return nil // ignore stale connections
}

el.connections.delConn(c)
if el.eventHandler.OnClose(c, err) == Shutdown {
rerr = errorx.ErrEngineShutdown
}
action := el.eventHandler.OnClose(c, err)

// Send residual data in buffer back to the remote before actually closing the connection.
for !c.outboundBuffer.IsEmpty() {
Expand All @@ -262,8 247,10 @@ func (el *eventloop) close(c *conn, err error) (rerr error) {
}
}

err0, err1 := el.poller.Delete(c.fd), unix.Close(c.fd)
c.release()

var errStr strings.Builder
err0, err1 := el.poller.Delete(c.fd), unix.Close(c.fd)
if err0 != nil {
err0 = fmt.Errorf("failed to delete fd=%d from poller in event-loop(%d): %v",
c.fd, el.idx, os.NewSyscallError("delete", err0))
Expand All @@ -276,16 263,10 @@ func (el *eventloop) close(c *conn, err error) (rerr error) {
errStr.WriteString(err1.Error())
}
if errStr.Len() > 0 {
if rerr != nil {
el.getLogger().Errorf(strings.TrimSuffix(errStr.String(), " | "))
} else {
rerr = errors.New(strings.TrimSuffix(errStr.String(), " | "))
}
return errors.New(strings.TrimSuffix(errStr.String(), " | "))
}

c.release()

return
return el.handleAction(c, action)
}

func (el *eventloop) wake(c *conn) error {
Expand Down Expand Up @@ -333,19 314,6 @@ func (el *eventloop) ticker(ctx context.Context) {
}
}

func (el *eventloop) handleAction(c *conn, action Action) error {
switch action {
case None:
return nil
case Close:
return el.close(c, nil)
case Shutdown:
return errorx.ErrEngineShutdown
default:
return nil
}
}

func (el *eventloop) readUDP(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
n, sa, err := unix.Recvfrom(fd, el.buffer, 0)
if err != nil {
Expand All @@ -372,6 340,19 @@ func (el *eventloop) readUDP(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error
return nil
}

func (el *eventloop) handleAction(c *conn, action Action) error {
switch action {
case None:
return nil
case Close:
return el.close(c, nil)
case Shutdown:
return errorx.ErrEngineShutdown
default:
return nil
}
}

/*
func (el *eventloop) execCmd(itf interface{}) (err error) {
cmd := itf.(*asyncCmd)
Expand Down
28 changes: 8 additions & 20 deletions eventloop_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 18,8 @@ import (
"bytes"
"context"
"errors"
"fmt"
"runtime"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -97,10 97,8 @@ func (el *eventloop) open(oc *openConn) error {
}

c := oc.c
if !oc.isDatagram {
el.connections[c] = struct{}{}
el.incConn(1)
}
el.connections[c] = struct{}{}
el.incConn(1)

out, action := el.eventHandler.OnOpen(c)
if out != nil {
Expand Down Expand Up @@ -188,28 186,18 @@ func (el *eventloop) wake(c *conn) error {
}

func (el *eventloop) close(c *conn, err error) error {
if addr := c.localAddr; addr != nil && strings.HasPrefix(addr.Network(), "udp") {
action := el.eventHandler.OnClose(c, err)
if c.rawConn != nil {
if err := c.rawConn.Close(); err != nil {
el.getLogger().Errorf("failed to close connection(%s), error:%v", c.remoteAddr.String(), err)
}
}
c.release()
return el.handleAction(c, action)
}

if _, ok := el.connections[c]; !ok {
if _, ok := el.connections[c]; c.rawConn == nil || !ok {
return nil // ignore stale wakes.
}

delete(el.connections, c)
el.incConn(-1)
action := el.eventHandler.OnClose(c, err)
if err := c.rawConn.Close(); err != nil {
el.getLogger().Errorf("failed to close connection(%s), error:%v", c.remoteAddr.String(), err)
}
err = c.rawConn.Close()
c.release()
if err != nil {
return fmt.Errorf("failed to close connection=%s in event-loop(%d): %v", c.remoteAddr, el.idx, err)
}

return el.handleAction(c, action)
}
Expand Down

0 comments on commit 0feaabb

Please sign in to comment.