forked from xtaci/kcp-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kcp2k.go
46 lines (40 loc) · 992 Bytes
/
kcp2k.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package kcp
import (
"io"
"sync/atomic"
"time"
"github.com/pkg/errors"
)
// 读取一个kcp的完整包
func (s *UDPSession) ReadPacket() (packet []byte, err error) {
var timeout *time.Timer
// deadline for current reading operation
var c <-chan time.Time
if !s.rd.IsZero() {
delay := time.Until(s.rd)
timeout = time.NewTimer(delay)
c = timeout.C
defer timeout.Stop()
}
for {
s.mu.Lock()
if size := s.kcp.PeekSize(); size > 0 { // peek data size from kcp
packet = make([]byte, size)
s.kcp.Recv(packet)
s.mu.Unlock()
atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(size))
return packet, nil
}
s.mu.Unlock()
// wait for read event or timeout or error
select {
case <-s.chReadEvent:
case <-c:
return nil, errors.WithStack(errTimeout)
case <-s.chSocketReadError:
return nil, s.socketReadError.Load().(error)
case <-s.die:
return nil, errors.WithStack(io.ErrClosedPipe)
}
}
}