Skip to content

Commit

Permalink
Fix windows server
Browse files Browse the repository at this point in the history
  • Loading branch information
Allenxuxu committed Nov 7, 2022
1 parent a4933e4 commit 4b68cfe
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 47 deletions.
6 changes: 3 additions & 3 deletions example/echo/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 18,17 @@ type example struct {

func (s *example) OnConnect(c *gev.Connection) {
s.Count.Add(1)
//log.Println(" OnConnect : ", c.PeerAddr())
log.Info(" OnConnect : ", c.PeerAddr())
}
func (s *example) OnMessage(c *gev.Connection, ctx interface{}, data []byte) (out interface{}) {
//log.Println("OnMessage")
log.Info("OnMessage")
out = data
return
}

func (s *example) OnClose(c *gev.Connection) {
s.Count.Add(-1)
//log.Println("OnClose")
log.Info("OnClose")
}

func main() {
Expand Down
42 changes: 24 additions & 18 deletions example/websocket/wsserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 9,19 @@ import (
"time"

"github.com/Allenxuxu/gev/log"
"github.com/Allenxuxu/toolkit/sync"

"github.com/Allenxuxu/gev"
"github.com/Allenxuxu/gev/plugins/websocket/ws"
"github.com/Allenxuxu/gev/plugins/websocket/ws/util"
"github.com/Allenxuxu/toolkit/sync"
"github.com/Allenxuxu/toolkit/sync/atomic"
"github.com/stretchr/testify/assert"
"golang.org/x/net/websocket"
)

type wsExample struct {
ClientNum atomic.Int64
StartTime time.Time
}

func (s *wsExample) OnConnect(c *gev.Connection) {
Expand All @@ -30,7 31,19 @@ func (s *wsExample) OnConnect(c *gev.Connection) {
func (s *wsExample) OnMessage(c *gev.Connection, data []byte) (messageType ws.MessageType, out []byte) {
messageType = ws.MessageText

switch rand.Int() % 3 {
log.Info("on Message ", c.PeerAddr())

if time.Now().Sub(s.StartTime) > 10*time.Second {
msg, err := util.PackCloseData("close")
if err != nil {
panic(err)
}
if e := c.Send(msg); e != nil && e != gev.ErrConnectionClosed {
panic(e)
}
}

switch rand.Int() % 2 {
case 0:
out = data
case 1:
Expand All @@ -43,18 56,10 @@ func (s *wsExample) OnMessage(c *gev.Connection, data []byte) (messageType ws.Me
if err != nil {
panic(err)
}
if e := c.Send(msg); e != nil {
if e := c.Send(msg); e != nil && e != gev.ErrConnectionClosed {
panic(e)
}
}
case 2:
msg, err := util.PackCloseData("close")
if err != nil {
panic(err)
}
if e := c.Send(msg); e != nil {
panic(e)
}
}
return
}
Expand All @@ -67,9 72,10 @@ func (s *wsExample) OnClose(c *gev.Connection) {
func TestWebSocketServer_Start(t *testing.T) {
rand.Seed(time.Now().UnixNano())
handler := new(wsExample)
handler.StartTime = time.Now()

s, err := NewWebSocketServer(handler, &ws.Upgrader{},
gev.Address(":1834"),
gev.Address("localhost:1834"),
gev.NumLoops(8))
if err != nil {
t.Fatal(err)
Expand All @@ -93,13 99,13 @@ func TestWebSocketServer_Start(t *testing.T) {

func startWebSocketClient(addr string) {
rand.Seed(time.Now().UnixNano())
addr = "ws://localhost" addr
addr = "ws://" addr
c, err := websocket.Dial(addr, "", addr)
if err != nil {
panic(err)
}
defer c.Close()
duration := time.Duration((rand.Float64()*2 1)*float64(time.Second)) / 8
duration := 2 * time.Second
start := time.Now()
for time.Since(start) < duration {
sz := rand.Int()%(1024*3) 1
Expand All @@ -113,7 119,7 @@ func startWebSocketClient(addr string) {

data2 := make([]byte, len(data))
if n, err := io.ReadFull(c, data2); err != nil || n != len(data) {
if err != io.EOF {
if err != io.EOF && err != io.ErrUnexpectedEOF {
panic(err)
} else {
return
Expand All @@ -130,7 136,7 @@ func TestWebSocketServer_CloseConnection(t *testing.T) {
handler := new(wsExample)

s, err := NewWebSocketServer(handler, &ws.Upgrader{},
gev.Address(":2021"),
gev.Address("localhost:2021"),
gev.NumLoops(8))
if err != nil {
t.Fatal(err)
Expand All @@ -144,7 150,7 @@ func TestWebSocketServer_CloseConnection(t *testing.T) {
n = 100
toClose = 50
conn = make([]*websocket.Conn, n)
addr = "ws://localhost" s.Options().Address
addr = "ws://" s.Options().Address
)

log.SetLevel(log.LevelDebug)
Expand All @@ -162,7 168,7 @@ func TestWebSocketServer_CloseConnection(t *testing.T) {
panic(err)
}
}
time.Sleep(time.Second)
time.Sleep(time.Second * 3)
assert.Equal(t, n-toClose, int(handler.ClientNum.Get()))

s.Stop()
Expand Down
83 changes: 57 additions & 26 deletions server_std.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 4,7 @@ package gev

import (
"errors"
"io"
"net"
stdsync "sync"
"time"
Expand Down Expand Up @@ -31,6 32,7 @@ type Server struct {
timingWheel *timingwheel.TimingWheel
opts *Options
running atomic.Bool
dying chan struct{}
}

// NewServer 创建 Server
Expand All @@ -40,6 42,7 @@ func NewServer(handler Handler, opts ...Option) (server *Server, err error) {
}
options := newOptions(opts...)
server = new(Server)
server.dying = make(chan struct{})
server.callback = handler
server.opts = options
server.timingWheel = timingwheel.NewTimingWheel(server.opts.tick, server.opts.wheelSize)
Expand Down Expand Up @@ -69,31 72,42 @@ func (s *Server) Start() {

sw.AddAndRun(func() {
for {
conn, err := s.listener.Accept()
if err != nil {
log.Error("accept error: %v", err)
continue
}
select {
case <-s.dying:
return

default:
conn, err := s.listener.Accept()
if err != nil {
log.Errorf("accept error: %v", err)
continue
}

connection := NewConnection(conn, s.opts.Protocol, s.timingWheel, s.opts.IdleTime, s.callback)
s.connections = append(s.connections, connection)

connection := NewConnection(conn, s.opts.Protocol, s.timingWheel, s.opts.IdleTime, s.callback)
s.connections = append(s.connections, connection)
s.callback.OnConnect(connection)

sw.AddAndRun(func() {
connection.readLoop()
})
sw.AddAndRun(func() {
connection.writeLoop()
})
sw.AddAndRun(func() {
connection.readLoop()
})
sw.AddAndRun(func() {
connection.writeLoop()
})
}
}
})

s.running.Set(true)

log.Infof("server run in windows")
sw.Wait()
}

// Stop 关闭 Server
func (s *Server) Stop() {
if s.running.Get() {
close(s.dying)
s.running.Set(false)

s.timingWheel.Stop()
Expand Down Expand Up @@ -236,17 250,25 @@ func (c *Connection) Send(data interface{}, opts ...ConnectionOption) error {

// Close 关闭连接
func (c *Connection) Close() error {
if !c.connected.Get() {
return ErrConnectionClosed
log.Info("Close", c.PeerAddr())

if c.connected.Get() {
close(c.dying)
c.connected.Set(false)
c.callBack.OnClose(c)

return c.conn.Close()
}

close(c.dying)
return c.conn.Close()
return nil
}

// ShutdownWrite 关闭可写端,等待读取完接收缓冲区所有数据
func (c *Connection) ShutdownWrite() error {
return c.conn.Close()
log.Info("ShutdownWrite ", c.PeerAddr())

//return nil
return c.Close()
}

// ReadBufferLength read buffer 当前积压的数据长度
Expand All @@ -265,7 287,7 @@ func (c *Connection) HandleEvent(fd int, events poller.Event) {
}

func (c *Connection) readLoop() {
buf := make([]byte, 65535)
buf := make([]byte, 0, 66635)

for {
select {
Expand All @@ -275,8 297,11 @@ func (c *Connection) readLoop() {
default:
n, err := c.conn.Read(buf)
if err != nil {
log.Errorf("read error: %v", err)
c.callBack.OnClose(c)
if err != io.EOF {
log.Info("read error: %v", err)
}

c.Close()
return
}

Expand All @@ -285,11 310,11 @@ func (c *Connection) readLoop() {
c.handlerProtocol(&buf, c.inBuffer)

if len(buf) != 0 {
tmp := make([]byte, 0, len(buf))
tmp := make([]byte, len(buf))
copy(tmp, buf)
_ = c.Send(tmp)
}
buf = buf[:0]
buf = buf[:cap(buf)]

c.inBufferLen.Swap(int64(c.inBuffer.Length()))
}
Expand All @@ -312,15 337,19 @@ func (c *Connection) writeLoop() {
first, end := c.outBuffer.PeekAll()
n, err := c.conn.Write(first)
if err != nil {
c.callBack.OnClose(c)
log.Error("Write error: ", err)

c.Close()
return
}
c.outBuffer.Retrieve(n)

if n == len(first) && len(end) > 0 {
n, err = c.conn.Write(end)
if err != nil {
c.callBack.OnClose(c)
log.Error("Write error: ", err)

c.Close()
return
}
c.outBuffer.Retrieve(n)
Expand Down Expand Up @@ -354,7 383,9 @@ func (c *Connection) sendInLoop(data []byte) (closed bool) {
} else {
n, err := c.conn.Write(data)
if err != nil {
c.callBack.OnClose(c)
log.Error("Write error: ", err)

c.Close()

return true
}
Expand Down

0 comments on commit 4b68cfe

Please sign in to comment.