剥开比原看代码03:比原是如何监听p2p端口的

做者:freewindnode

比原项目仓库:git

Github地址:https://github.com/Bytom/bytomgithub

Gitee地址:https://gitee.com/BytomBlockchain/bytomapi

咱们知道,在使用bytomd init --chain_id mainnet/testnet/solonet初始化比原的时候,它会根据给定的chain_id的不一样,使用不一样的端口(参看config/toml.go#L29):tcp

  1. mainnet(链接到主网): 46657
  2. testnet(链接到测试网): 46656
  3. solonet(本地单独节点): 46658

对于我来讲,因为只须要对本地运行的一个比原节点进行分析,因此能够采用第3个chain_id,即solonet。这样它启动以后,不会与其它的节点主动链接,能够减小其它节点对于咱们的干扰。函数

因此在启动的时候,个人命令是这样的:测试

cd cmd/bytomd
./bytomd init --chain_id solonet
./bytomd node

它就会监听46658端口,等待其它节点的链接。fetch

连上看看

若是这时咱们使用telnet来链接其46658端口,成功链接上以后,能够看到它会发给咱们一些乱码,大概以下:spa

$ telnet localhost 46658
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
ט�S��%�z?��_�端��݂���U[e

咱们也许会好奇,它发给咱们的究竟是什么?设计

可是这个问题留待下次回答,由于首先,比原节点必须可以监听这个端口,咱们才能连上。因此此次咱们的问题是:

比原在代码中是如何监听这个端口的?

端口已经写在config.toml

在前面,当咱们使用./bytomd init --chain_id solonet初始化比原之后,比原会在本地的数据目录中生成一个config.toml的配置文件,内容大约以下:

# This is a TOML config file.
# For more information, see https://github.com/toml-lang/toml
fast_sync = true
db_backend = "leveldb"
api_addr = "0.0.0.0:9888"
chain_id = "solonet"
[p2p]
laddr = "tcp://0.0.0.0:46658"
seeds = ""

其中[p2p]下面的laddr,就是该节点监听的地址和端口。

对于laddr = "tcp://0.0.0.0:46658",它是意思是:

  1. 使用的是tcp协议
  2. 监听的ip是0.0.0.0,是指监听本机全部ip地址。这样该节点既容许本地访问,也容许外部主机访问。若是你只想让它监听某一个ip,手动修改该配置文件便可
  3. 46658,就是咱们在这个问题中关注的端口了,它与该节点与其它节点交互数据使用的端口

比原在监听这个端口的时候,并非如我最开始预期的直接调用net.Listen监听它。实际的过程要比这个复杂,由于比原设计了一个叫Switch的对象,用来统一管理与外界相关的事件,包括监听、链接、发送消息等。而Switch这个对象,又是在SyncManager中建立的。

启动直到进入Switch

因此咱们首先须要知道,比原在源代码中是如何启动,而且一步步走进了Switch的世界。

首先仍是当咱们bytomd node启动比原时,对应的入口函数以下:

cmd/bytomd/main.go#L54

func main() {
    cmd := cli.PrepareBaseCmd(commands.RootCmd, "TM", os.ExpandEnv(config.DefaultDataDir()))
    cmd.Execute()
}

它又会根据传入的node参数,运行下面的函数:

cmd/bytomd/commands/run_node.go#L41

func runNode(cmd *cobra.Command, args []string) error {
    // Create & start node
    n := node.NewNode(config)
    // ...
}

咱们须要关注的是node.NewNode(config)函数,由于是在它里面建立了SyncManager

node/node.go#L59

func NewNode(config *cfg.Config) *Node {
    // ...
    syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
    // ...
}

在建立SyncManager的时候,又建立了Switch:

netsync/handle.go#L42

func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
    // ...
    manager.sw = p2p.NewSwitch(config.P2P, trustHistoryDB)

    // ...
    protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
    manager.sw.AddReactor("PROTOCOL", protocolReactor)

    // Create & add listener
    p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
    l := p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP, nil)
    manager.sw.AddListener(l)

    // ...
}

这里须要注意一下,上面建立的protocolReactor对象,是用来处理当有节点链接上端口后,双方如何交互的事情。跟此次问题“监听端口”没有直接关系,可是这里也能够注意一下。

而后又建立了一个DefaultListener对象,而监听端口的动做,就是在它里面发生的。Listener建立以后,将会添加到manager.sw(即Switch)中,用于在那边进行外界数据与事件的交互。

监听端口

NewDefaultListener中作的事情比较多,因此咱们把它分红几块说:

p2p/listener.go#L52

func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger tlog.Logger) Listener {
    // Local listen IP & port
    lAddrIP, lAddrPort := splitHostPort(lAddr)

    // Create listener
    var listener net.Listener
    var err error
    for i := 0; i < tryListenSeconds; i++ {
        listener, err = net.Listen(protocol, lAddr)
        if err == nil {
            break
        } else if i < tryListenSeconds-1 {
            time.Sleep(time.Second * 1)
        }
    }
    if err != nil {
        cmn.PanicCrisis(err)
    }

    // ...

上面这部分就是真正监听的代码了。经过Go语言提供的net.Listen函数,监听了指定的地址。另外,在监听的时候,进行了屡次尝试,由于当一个刚刚被使用的端口被放开后,还须要一小段时间才能真正释放,因此这里须要多尝试几回。

其中tryListenSeconds是一个常量,值为5,也就是说,大约会尝试5秒钟,要是都绑定不上,才会真正失败,抛出错误。

后面省略了一些代码,主要是用来获取当前监听的实际ip以及外网ip,并记录在日志中。本想在这里简单讲讲,可是发现还有点麻烦,因此打算放在后面专开一个问题。

其实本次问题到这里就已经结束了,由于已经完成了“监听”。可是后面还有一些初始化操做,是为了让比原能够跟链接上该端口的节点进行交互,也值得在这里讲讲。

接着刚才的方法,最后的部分是:

dl := &DefaultListener{
        listener:    listener,
        intAddr:     intAddr,
        extAddr:     extAddr,
        connections: make(chan net.Conn, numBufferedConnections),
    }
    dl.BaseService = *cmn.NewBaseService(logger, "DefaultListener", dl)
    dl.Start() // Started upon construction
    return dl
}

须要注意的是connections,它是一个带有缓冲的channel(numBufferedConnections值为10),用来存放链接上该端口的链接对象。这些操做将在后面的dl.Start()中执行。

dl.Start()将调用DefaultListener对应的OnStart方法,以下:

p2p/listener.go#L114

func (l *DefaultListener) OnStart() error {
    l.BaseService.OnStart()
    go l.listenRoutine()
    return nil
}

其中的l.listenRoutine,就是执行前面所说的向connections channel里放入链接的函数:

p2p/listener.go#L126

func (l *DefaultListener) listenRoutine() {
    for {
        conn, err := l.listener.Accept()
        // ...
        l.connections <- conn
    }

    // Cleanup
    close(l.connections)

    // ...
}

SwitchSyncManager启动的时候会被启动,在它的OnStart方法中,会拿到全部Listener(即监听端口的对象)中connectionschannel中的链接,与它们交互。

https://github.com/freewind/bytom-v1.0.1/blob/master/p2p/switch.go#L498

func (sw *Switch) listenerRoutine(l Listener) {
    for {
        inConn, ok := <-l.Connections()
        if !ok {
            break
        }
        // ...

        err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
        // ...
    }

其中sw.addPeerWithConnectionAndConfig就是与对应节点进行交互的逻辑所在,可是这已经超出了本次问题的范畴,下次再讲。

到此为止,本次的问题,应该已经讲清楚了。

相关文章
相关标签/搜索