Skip to content

Commit

Permalink
p2p, p2p/discover, p2p/discv5: implement UDP port sharing (ethereum#1…
Browse files Browse the repository at this point in the history
…5200)

This commit affects p2p/discv5 "topic discovery" by running it on
the same UDP port where the old discovery works. This is realized
by giving an "unhandled" packet channel to the old v4 discovery
packet handler where all invalid packets are sent. These packets
are then processed by v5. v5 packets are always invalid when
interpreted by v4 and vice versa. This is ensured by adding one
to the first byte of the packet hash in v5 packets.

DiscoveryV5Bootnodes is also changed to point to new bootnodes
that are implementing the changed packet format with modified
hash. Existing and new v5 bootnodes are both running on different
ports ATM.
  • Loading branch information
zsfelfoldi authored and fjl committed Jan 22, 2018
1 parent 02aeb3d commit 92580d6
Show file tree
Hide file tree
Showing 17 changed files with 150 additions and 91 deletions.
25 changes: 23 additions & 2 deletions cmd/bootnode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"crypto/ecdsa"
"flag"
"fmt"
"net"
"os"

"github.com/ethereum/go-ethereum/cmd/utils"
Expand Down Expand Up @@ -96,12 +97,32 @@ func main() {
}
}

addr, err := net.ResolveUDPAddr("udp", *listenAddr)
if err != nil {
utils.Fatalf("-ResolveUDPAddr: %v", err)
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
utils.Fatalf("-ListenUDP: %v", err)
}

realaddr := conn.LocalAddr().(*net.UDPAddr)
if natm != nil {
if !realaddr.IP.IsLoopback() {
go nat.Map(natm, nil, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
}
// TODO: react to external IP changes over time.
if ext, err := natm.ExternalIP(); err == nil {
realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
}
}

if *runv5 {
if _, err := discv5.ListenUDP(nodeKey, *listenAddr, natm, "", restrictList); err != nil {
if _, err := discv5.ListenUDP(nodeKey, conn, realaddr, "", restrictList); err != nil {
utils.Fatalf("%v", err)
}
} else {
if _, err := discover.ListenUDP(nodeKey, *listenAddr, natm, "", restrictList); err != nil {
if _, err := discover.ListenUDP(nodeKey, conn, realaddr, nil, "", restrictList); err != nil {
utils.Fatalf("%v", err)
}
}
Expand Down
1 change: 0 additions & 1 deletion cmd/faucet/faucet.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,6 @@ func newFaucet(genesis *core.Genesis, port int, enodes []*discv5.Node, network u
NoDiscovery: true,
DiscoveryV5: true,
ListenAddr: fmt.Sprintf(":%d", port),
DiscoveryV5Addr: fmt.Sprintf(":%d", port+1),
MaxPeers: 25,
BootstrapNodesV5: enodes,
},
Expand Down
10 changes: 0 additions & 10 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,14 +636,6 @@ func setListenAddress(ctx *cli.Context, cfg *p2p.Config) {
}
}

// setDiscoveryV5Address creates a UDP listening address string from set command
// line flags for the V5 discovery protocol.
func setDiscoveryV5Address(ctx *cli.Context, cfg *p2p.Config) {
if ctx.GlobalIsSet(ListenPortFlag.Name) {
cfg.DiscoveryV5Addr = fmt.Sprintf(":%d", ctx.GlobalInt(ListenPortFlag.Name)+1)
}
}

// setNAT creates a port mapper from command line flags.
func setNAT(ctx *cli.Context, cfg *p2p.Config) {
if ctx.GlobalIsSet(NATFlag.Name) {
Expand Down Expand Up @@ -794,7 +786,6 @@ func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config) {
setNodeKey(ctx, cfg)
setNAT(ctx, cfg)
setListenAddress(ctx, cfg)
setDiscoveryV5Address(ctx, cfg)
setBootstrapNodes(ctx, cfg)
setBootstrapNodesV5(ctx, cfg)

Expand Down Expand Up @@ -830,7 +821,6 @@ func SetP2PConfig(ctx *cli.Context, cfg *p2p.Config) {
// --dev mode can't use p2p networking.
cfg.MaxPeers = 0
cfg.ListenAddr = ":0"
cfg.DiscoveryV5Addr = ":0"
cfg.NoDiscovery = true
cfg.DiscoveryV5 = false
}
Expand Down
5 changes: 2 additions & 3 deletions les/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,8 @@ func (s *LightEthereum) Start(srvr *p2p.Server) error {
s.startBloomHandlers()
log.Warn("Light client mode is an experimental feature")
s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.networkId)
// search the topic belonging to the oldest supported protocol because
// servers always advertise all supported protocols
protocolVersion := ClientProtocolVersions[len(ClientProtocolVersions)-1]
// clients are searching for the first advertised protocol in the list
protocolVersion := AdvertiseProtocolVersions[0]
s.serverPool.start(srvr, lesTopic(s.blockchain.Genesis().Hash(), protocolVersion))
s.protocolManager.Start()
return nil
Expand Down
5 changes: 3 additions & 2 deletions les/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ const (

// Supported versions of the les protocol (first is primary)
var (
ClientProtocolVersions = []uint{lpv2, lpv1}
ServerProtocolVersions = []uint{lpv2, lpv1}
ClientProtocolVersions = []uint{lpv2, lpv1}
ServerProtocolVersions = []uint{lpv2, lpv1}
AdvertiseProtocolVersions = []uint{lpv2} // clients are searching for the first advertised protocol in the list
)

// Number of implemented message corresponding to different protocol versions.
Expand Down
4 changes: 2 additions & 2 deletions les/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) {
return nil, err
}

lesTopics := make([]discv5.Topic, len(ServerProtocolVersions))
for i, pv := range ServerProtocolVersions {
lesTopics := make([]discv5.Topic, len(AdvertiseProtocolVersions))
for i, pv := range AdvertiseProtocolVersions {
lesTopics[i] = lesTopic(eth.BlockChain().Genesis().Hash(), pv)
}

Expand Down
1 change: 0 additions & 1 deletion mobile/geth.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ func NewNode(datadir string, config *NodeConfig) (stack *Node, _ error) {
P2P: p2p.Config{
NoDiscovery: true,
DiscoveryV5: true,
DiscoveryV5Addr: ":0",
BootstrapNodesV5: config.BootstrapNodes.nodes,
ListenAddr: ":0",
NAT: nat.Any(),
Expand Down
7 changes: 3 additions & 4 deletions node/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ var DefaultConfig = Config{
WSPort: DefaultWSPort,
WSModules: []string{"net", "web3"},
P2P: p2p.Config{
ListenAddr: ":30303",
DiscoveryV5Addr: ":30304",
MaxPeers: 25,
NAT: nat.Any(),
ListenAddr: ":30303",
MaxPeers: 25,
NAT: nat.Any(),
},
}

Expand Down
44 changes: 20 additions & 24 deletions p2p/discover/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,25 +210,23 @@ type reply struct {
matched chan<- bool
}

// ReadPacket is sent to the unhandled channel when it could not be processed
type ReadPacket struct {
Data []byte
Addr *net.UDPAddr
}

// ListenUDP returns a new table that listens for UDP packets on laddr.
func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, nodeDBPath string, netrestrict *netutil.Netlist) (*Table, error) {
addr, err := net.ResolveUDPAddr("udp", laddr)
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
tab, _, err := newUDP(priv, conn, natm, nodeDBPath, netrestrict)
func ListenUDP(priv *ecdsa.PrivateKey, conn conn, realaddr *net.UDPAddr, unhandled chan ReadPacket, nodeDBPath string, netrestrict *netutil.Netlist) (*Table, error) {
tab, _, err := newUDP(priv, conn, realaddr, unhandled, nodeDBPath, netrestrict)
if err != nil {
return nil, err
}
log.Info("UDP listener up", "self", tab.self)
return tab, nil
}

func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, nodeDBPath string, netrestrict *netutil.Netlist) (*Table, *udp, error) {
func newUDP(priv *ecdsa.PrivateKey, c conn, realaddr *net.UDPAddr, unhandled chan ReadPacket, nodeDBPath string, netrestrict *netutil.Netlist) (*Table, *udp, error) {
udp := &udp{
conn: c,
priv: priv,
Expand All @@ -237,16 +235,6 @@ func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, nodeDBPath strin
gotreply: make(chan reply),
addpending: make(chan *pending),
}
realaddr := c.LocalAddr().(*net.UDPAddr)
if natm != nil {
if !realaddr.IP.IsLoopback() {
go nat.Map(natm, udp.closing, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
}
// TODO: react to external IP changes over time.
if ext, err := natm.ExternalIP(); err == nil {
realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
}
}
// TODO: separate TCP port
udp.ourEndpoint = makeEndpoint(realaddr, uint16(realaddr.Port))
tab, err := newTable(udp, PubkeyID(&priv.PublicKey), realaddr, nodeDBPath)
Expand All @@ -256,7 +244,7 @@ func newUDP(priv *ecdsa.PrivateKey, c conn, natm nat.Interface, nodeDBPath strin
udp.Table = tab

go udp.loop()
go udp.readLoop()
go udp.readLoop(unhandled)
return udp.Table, udp, nil
}

Expand Down Expand Up @@ -492,8 +480,11 @@ func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) ([]byte,
}

// readLoop runs in its own goroutine. it handles incoming UDP packets.
func (t *udp) readLoop() {
func (t *udp) readLoop(unhandled chan ReadPacket) {
defer t.conn.Close()
if unhandled != nil {
defer close(unhandled)
}
// Discovery packets are defined to be no larger than 1280 bytes.
// Packets larger than this size will be cut at the end and treated
// as invalid because their hash won't match.
Expand All @@ -509,7 +500,12 @@ func (t *udp) readLoop() {
log.Debug("UDP read error", "err", err)
return
}
t.handlePacket(from, buf[:nbytes])
if t.handlePacket(from, buf[:nbytes]) != nil && unhandled != nil {
select {
case unhandled <- ReadPacket{buf[:nbytes], from}:
default:
}
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion p2p/discover/udp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func newUDPTest(t *testing.T) *udpTest {
remotekey: newkey(),
remoteaddr: &net.UDPAddr{IP: net.IP{10, 0, 1, 99}, Port: 30303},
}
test.table, test.udp, _ = newUDP(test.localkey, test.pipe, nil, "", nil)
realaddr := test.pipe.LocalAddr().(*net.UDPAddr)
test.table, test.udp, _ = newUDP(test.localkey, test.pipe, realaddr, nil, "", nil)
return test
}

Expand Down
3 changes: 1 addition & 2 deletions p2p/discv5/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/crypto/sha3"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/nat"
"github.com/ethereum/go-ethereum/p2p/netutil"
"github.com/ethereum/go-ethereum/rlp"
)
Expand Down Expand Up @@ -134,7 +133,7 @@ type timeoutEvent struct {
node *Node
}

func newNetwork(conn transport, ourPubkey ecdsa.PublicKey, natm nat.Interface, dbPath string, netrestrict *netutil.Netlist) (*Network, error) {
func newNetwork(conn transport, ourPubkey ecdsa.PublicKey, dbPath string, netrestrict *netutil.Netlist) (*Network, error) {
ourID := PubkeyID(&ourPubkey)

var db *nodeDB
Expand Down
2 changes: 1 addition & 1 deletion p2p/discv5/net_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

func TestNetwork_Lookup(t *testing.T) {
key, _ := crypto.GenerateKey()
network, err := newNetwork(lookupTestnet, key.PublicKey, nil, "", nil)
network, err := newNetwork(lookupTestnet, key.PublicKey, "", nil)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/discv5/sim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func (s *simulation) launchNode(log bool) *Network {
addr := &net.UDPAddr{IP: ip, Port: 30303}

transport := &simTransport{joinTime: time.Now(), sender: id, senderAddr: addr, sim: s, priv: key}
net, err := newNetwork(transport, key.PublicKey, nil, "<no database>", nil)
net, err := newNetwork(transport, key.PublicKey, "<no database>", nil)
if err != nil {
panic("cannot launch new node: " + err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion p2p/discv5/ticket.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ func (s *ticketStore) gotTopicNodes(from *Node, hash common.Hash, nodes []rpcNod
if ip.IsUnspecified() || ip.IsLoopback() {
ip = from.IP
}
n := NewNode(node.ID, ip, node.UDP-1, node.TCP-1) // subtract one from port while discv5 is running in test mode on UDPport+1
n := NewNode(node.ID, ip, node.UDP, node.TCP)
select {
case chn <- n:
default:
Expand Down
46 changes: 18 additions & 28 deletions p2p/discv5/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const Version = 4
// Errors
var (
errPacketTooSmall = errors.New("too small")
errBadHash = errors.New("bad hash")
errBadPrefix = errors.New("bad prefix")
errExpired = errors.New("expired")
errUnsolicitedReply = errors.New("unsolicited reply")
errUnknownNode = errors.New("unknown node")
Expand Down Expand Up @@ -145,10 +145,11 @@ type (
}
)

const (
macSize = 256 / 8
sigSize = 520 / 8
headSize = macSize + sigSize // space of packet frame data
var (
versionPrefix = []byte("temporary discovery v5")
versionPrefixSize = len(versionPrefix)
sigSize = 520 / 8
headSize = versionPrefixSize + sigSize // space of packet frame data
)

// Neighbors replies are sent across multiple packets to
Expand Down Expand Up @@ -237,12 +238,12 @@ type udp struct {
}

// ListenUDP returns a new table that listens for UDP packets on laddr.
func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, nodeDBPath string, netrestrict *netutil.Netlist) (*Network, error) {
transport, err := listenUDP(priv, laddr)
func ListenUDP(priv *ecdsa.PrivateKey, conn conn, realaddr *net.UDPAddr, nodeDBPath string, netrestrict *netutil.Netlist) (*Network, error) {
transport, err := listenUDP(priv, conn, realaddr)
if err != nil {
return nil, err
}
net, err := newNetwork(transport, priv.PublicKey, natm, nodeDBPath, netrestrict)
net, err := newNetwork(transport, priv.PublicKey, nodeDBPath, netrestrict)
if err != nil {
return nil, err
}
Expand All @@ -251,16 +252,8 @@ func ListenUDP(priv *ecdsa.PrivateKey, laddr string, natm nat.Interface, nodeDBP
return net, nil
}

func listenUDP(priv *ecdsa.PrivateKey, laddr string) (*udp, error) {
addr, err := net.ResolveUDPAddr("udp", laddr)
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
}
return &udp{conn: conn, priv: priv, ourEndpoint: makeEndpoint(addr, uint16(addr.Port))}, nil
func listenUDP(priv *ecdsa.PrivateKey, conn conn, realaddr *net.UDPAddr) (*udp, error) {
return &udp{conn: conn, priv: priv, ourEndpoint: makeEndpoint(realaddr, uint16(realaddr.Port))}, nil
}

func (t *udp) localAddr() *net.UDPAddr {
Expand Down Expand Up @@ -372,11 +365,9 @@ func encodePacket(priv *ecdsa.PrivateKey, ptype byte, req interface{}) (p, hash
log.Error(fmt.Sprint("could not sign packet:", err))
return nil, nil, err
}
copy(packet[macSize:], sig)
// add the hash to the front. Note: this doesn't protect the
// packet in any way.
hash = crypto.Keccak256(packet[macSize:])
copy(packet, hash)
copy(packet, versionPrefix)
copy(packet[versionPrefixSize:], sig)
hash = crypto.Keccak256(packet[versionPrefixSize:])
return packet, hash, nil
}

Expand Down Expand Up @@ -420,17 +411,16 @@ func decodePacket(buffer []byte, pkt *ingressPacket) error {
}
buf := make([]byte, len(buffer))
copy(buf, buffer)
hash, sig, sigdata := buf[:macSize], buf[macSize:headSize], buf[headSize:]
shouldhash := crypto.Keccak256(buf[macSize:])
if !bytes.Equal(hash, shouldhash) {
return errBadHash
prefix, sig, sigdata := buf[:versionPrefixSize], buf[versionPrefixSize:headSize], buf[headSize:]
if !bytes.Equal(prefix, versionPrefix) {
return errBadPrefix
}
fromID, err := recoverNodeID(crypto.Keccak256(buf[headSize:]), sig)
if err != nil {
return err
}
pkt.rawData = buf
pkt.hash = hash
pkt.hash = crypto.Keccak256(buf[versionPrefixSize:])
pkt.remoteID = fromID
switch pkt.ev = nodeEvent(sigdata[0]); pkt.ev {
case pingPacket:
Expand Down
Loading

0 comments on commit 92580d6

Please sign in to comment.