LOADING...
LOADING...
LOADING...
当前位置: 玩币族首页 > 新闻观点 > 深入理解以太坊P2P网络设计(上)

深入理解以太坊P2P网络设计(上)

2021-09-26 sky110 来源:区块链网络

文章前言

在设计公链时,节点与节点之间建立连接需要 P2P 协议,从而实现数据的同步,于此同时上层应用还需要封装一些通信逻辑,比如节点之间的区块同步、交易数据同步等。

本篇文章将对 P2P 网络发展进行简单概述,同时将从源码角度对以太坊中的节点发现机制、分布式哈希表、节点查找、节点新增、节点移除等进行简单介绍,并对其 P2P 网络安全性设计进行简要分析。

基础知识

P2P网络

P2P网络不同于传统的 CS 结构,在 P2P 网络中每个节点既可以是客户端也可以是服务端,节点之间的通信协议一般直接通过 Socket 实现,P2P技术发展至今经历了以下四个发展阶段:

集中式:是P2P网络模式中最简单的路由方式,即存在一个中心节点,它保存了其他所有节点的索引信息,索引信息一般包括节点IP、端口、节点资源等,集中式路由的优点是结构简单、实现容易,但缺点也很明显,由于中心节点需要存储所有节点的路由信息,当节点规模不断扩展时,就很容易出现性能瓶颈,而且也存在单节点故障问题

分布式:是指移除了中心节点,在 P2P 节点之间建立随机网络,在新加入节点与 P2P 网络中的某个节点之间随机建立连接通道,从而形成一个随机拓扑结构,新节点加入该网络时随机选择一个已经存在的节点并建立邻居关系,在节点与邻居节点建立连接后,还需要进行全网广播,让整个网络知道该节点的存在。

具体的广播步骤是:该节点首先向邻居节点广播,邻居节点收到广播消息后,在继续向自己的邻居节点广播,以此类推,这种广播方式也被称之为"泛洪机制",而泛洪机制的问题在于可控性差,其主要包括两个较大的问题:一个是容易形成泛洪循环,比如节点A给发出的消息结果节点 B 到节点C,节点C再广播到节点 A,这就形成了一个循环,另一个问题是消息响应分包,比如节点 A 想要请求的资源被很多节点所拥有,那么在短时间内,会出现大量节点同时向 A 节点发送响应消息,这就很可能让节点 A 瞬间奔溃。

而消除泛洪循环的方法可以借鉴IP网络路由协议中有关泛洪广播的控制,一种方法是对每一个查询消息设置 TTL 值,泛洪消息每被转发一次,TTL 值减1,当节点接受的TTL 为0时,不再转发消息,这样可以避免查询消息在网络中产生死循环,还可以为泛洪消息设置唯一的标志,对接收到的重复消息不再进行转发从而规避死循环,解决响应风暴的方法可以在数据链路层进行网络分段,减少消息跨段广播。

混合式:混合式其实就是混合集中式和分布式结构,网络中存在很多超级节点组成的分布式网络,而每个超级节点有多个普通节点与它组成局部集中网络,一个新的普通节点加入是可以先选择一个超级节点进行通信,该超级节点再推送其他超级节点列表给新加入节点,加入节点根据列表中的超级节点状态决定选择那个具体的超级节点作为父节点,这种结构的泛洪广播只是发生在超级节点之间,因此可以避免大规模泛洪问题,在实际应用中,混合式结构是相对灵活且比较有效的组网架构,实现难度也相对较小,因此目前较多系统基于混合式结构进行开发实现。

结构化:结构化 P2P 网络是一种分布式网络结构,与上面所讲的分布式结构不同,分布式网络就是一个随机网络,而结构化网络则将所有节点按照某种结构进行有序组织,比如形成一个环状网络或树状网络,结构化网络在具体实现上普遍基于分布式哈希表( Distributed Hash Table,DHI )算法,具体的实现方案有 Chord、Pasty、CAN、Kademlia 等算法

四种网络结构对比如下:

?

节点发现

节点发现是任何节点接入 P2P 网络的第一步,节点发现可以分为两种:

初始节点发现:指节点是一个全新的、从未运行的节点,该节点没有网络中的其他节点的任何数据,此时节点发现只能依靠节点中的硬编码的种子节点获得P2P 网络的信息

已知节点发现:节点之前运行过,节点数据库中保存着网络中的其他节点信息,此时节点发现可以依靠节点数据库汇总的节点获取 P2P 网络的信息,从而构建自己的网络拓扑

种子节点

在 P2P 网络中,初始节点在启动时会通过一些长期稳定运行的节点快速发现网络中的其他节点,这些节点被称为"种子节点"(一般代码中会硬编码种子节点信息),一般情况下种子节点可以分为两种:

DNS-Seed:也被称之为" DNS 种子节点",DNS是互联网提供的一种域名查询服务,它将域名和 IP 地址相互映射保存在一个分布式的数据库中,当我们访问 DNS 服务器时,给它提供一个域名,DNS服务器会将该域名对应的 IP 地址返回

IP-Seed:即将种子节点的 IP 地址硬编码到代码中去,硬编码的这些节点的地址被称为种子节点

KDA算法

Kademlia 是一种分布式哈希表( DHT )技术,与其他 DHT 技术相比,KDA 算法使用异或算法计算节点之间的距离,进而建立了全新的 DHT 拓扑结构,这种算法可以极大地提高路由的查询速度。

HashTable

哈希表是用于存储键值对的一种容器,键值对有被称为 Key/Value 对,哈希表数据结构中包含 N 个 bucket (桶),对于某个具体的哈希表,N (桶的数量)通常是固定不变的,于是可以对每个桶编号,0~N-1,桶是用来存储键值对的,可以简单的将其理解为一个动态数组,里面存放多个键值对,下图展示了哈希表的查找原理,我们可以方便快速地通过 Key 来获取 value,当使用某个 key 进行查找时,先用某个哈希函数计算这个key 的哈希值,得到的哈希值通常是一个整数,之后使用哈希值对 N (桶数)进行取模运算(除法求余数),就可以算出对应的桶编号。

HashCollision

说到哈希表不得不提一下哈希表碰撞,当两个不同的 Key 进行哈希计算得到相同的哈希值时,就是所谓的哈希函数碰撞,一旦出现这种情况,这两个 key 对应的两个键值对就会被存在在同一个桶中( bucket )中,另一中散列碰撞是虽然计算出来的哈希值不同,但经过取模运算之后得到相同的桶编号,这时候也会将两个键值对存储在一个桶中,哈希碰撞原理如下图所示:

如果某个哈希表在存储数据时完全没有碰撞,那么每个桶里都只有0个或1个键值对,这样查找起来就非常快,反之,如果某个哈希表在存储数据时出现严重碰撞,那么就会导致某些桶里存储了很多键值对,那么在查找key的时候需要在这个桶里面逐一对比key是否相同,查找效率会变得很低~

分布式哈希表

分布式哈希表在概念上类似于传统的哈希表,差异在于传统的哈希表主要用于单机上的某个软件中,分布式哈希表主要用于分布式系统(此时,分布式系统的节点可以通俗的理解为 hash 表中的 bucket ),分布式哈希表主要用于存储大量(甚至海量)的数据,分布式哈希表的原理如下图所示:

源码分析

以太坊底层的P2PServer大致可以分为以下三层:

顶层:以太坊中各个协议的具体实现

中层:以太坊中的p2p通信链路层,负责启动监听、处理新加入连接或维护连接

底层:以太坊中的数据通信网络IO层,主要负责路由表的管理以及数据库的读写操作

表的结构

表数据结构如下所示:

//?filedir:go-ethereum-1.10.2\p2p\discover\table.go?L40 const?( ???alpha???????????=?3??//?Kademlia?concurrency?factor ???bucketSize??????=?16?//?Kademlia?bucket?size ???maxReplacements?=?10?//?Size?of?per-bucket?replacement?list ???//?We?keep?buckets?for?the?upper?1/15?of?distances?because ???//?it's?very?unlikely?we'll?ever?encounter?a?node?that's?closer. ???hashBits??????????=?len(common.Hash{})?*?8 ???nBuckets??????????=?hashBits?/?15???????//?Number?of?buckets ???bucketMinDistance?=?hashBits?-?nBuckets?//?Log?distance?of?closest?bucket ???//?IP?address?limits. ???bucketIPLimit,?bucketSubnet?=?2,?24?//?at?most?2?addresses?from?the?same?/24 ???tableIPLimit,?tableSubnet???=?10,?24 ???refreshInterval????=?30?*?time.Minute ???revalidateInterval?=?10?*?time.Second ???copyNodesInterval??=?30?*?time.Second ???seedMinTableTime???=?5?*?time.Minute ???seedCount??????????=?30 ???seedMaxAge?????????=?5?*?24?*?time.Hour )

type?Table?struct?{ ???mutex???sync.Mutex????????//?protects?buckets,?bucket?content,?nursery,?rand ???buckets?[nBuckets]*bucket?//?index?of?known?nodes?by?distance ???nursery?[]*node???????????//?bootstrap?nodes ???rand????*mrand.Rand???????//?source?of?randomness,?periodically?reseeded ???ips?????netutil.DistinctNetSet ???log????????log.Logger ???db?????????*enode.DB?//?database?of?known?nodes ???net????????transport ???refreshReq?chan?chan?struct{} ???initDone???chan?struct{} ???closeReq???chan?struct{} ???closed?????chan?struct{} ???nodeAddedHook?func(*node)?//?for?testing }

type?bucket?struct?{ ???entries?????[]*node?//?live?entries,?sorted?by?time?of?last?contact ???replacements?[]*node?//?recently?seen?nodes?to?be?used?if?revalidation?fails ???ips??????????netutil.DistinctNetSet }

关键的几个变量:

buckets:K桶,每个K桶包含节点(依据最近活跃情况进行降序排列),用于按距离列出已知节点索引

nursery:种子节点,一个节点启动的时候最多能够链接35个种子节点,其中有五个是以太坊官方指定的,另外30个从数据库里面提取

db:用于存储P2P节点的数据库(以太坊中有两个,另一个用于存储链上数据)

refreshReq:刷新K-桶事件的管道

表的创建

newTable函数用于创建新的表:

//?filedir:go-ethereum-1.10.2\p2p\discover\table.go?L102 func?newTable(t?transport,?db?*enode.DB,?bootnodes?[]*enode.Node,?log?log.Logger)?(*Table,?error)?{ ???tab?:=?&Table{ ???????net:????????t, ???????db:?????????db, ???????refreshReq:?make(chan?chan?struct{}), ???????initDone:???make(chan?struct{}), ???????closeReq:???make(chan?struct{}), ???????closed:?????make(chan?struct{}), ???????rand:???????mrand.New(mrand.NewSource(0)), ???????ips:????????netutil.DistinctNetSet{Subnet:?tableSubnet,?Limit:?tableIPLimit}, ???????log:????????log, ??} ???if?err?:=?tab.setFallbackNodes(bootnodes);?err?!=?nil?{ ???????return?nil,?err ??} ???for?i?:=?range?tab.buckets?{ ???????tab.buckets[i]?=?&bucket{ ???????????ips:?netutil.DistinctNetSet{Subnet:?bucketSubnet,?Limit:?bucketIPLimit}, ??????} ??} ???tab.seedRand() ???tab.loadSeedNodes() ???return?tab,?nil }

在上述代码中首先使用传入的参数初始化了一个Table的对象tab,调用setFallbackNodes函数设置初始链接节点(即获得5个nursey节点,后面如果table为空或者数据库中没有节点信息时这些节点将被用于去链接网络),之后通过一个for循环结合函数ValidateComplete来验证节点是否有效

之后初始化K桶:

for?i?:=?range?tab.buckets?{ ???????tab.buckets[i]?=?&bucket{ ???????????ips:?netutil.DistinctNetSet{Subnet:?bucketSubnet,?Limit:?bucketIPLimit}, ??????} ??}

之后从table.buckets中随机取30个节点加载种子节点到相应的bucket:

tab.seedRand() ???tab.loadSeedNodes() ???return?tab,?nil

loadSeedNodes函数的具体实现如下所示(这里的seedCount为table.go中最上方定义的全局变量,值为30):

//?filedir:?go-ethereum-1.10.2\p2p\discover\table.go???L302 func?(tab?*Table)?loadSeedNodes()?{ ???seeds?:=?wrapNodes(tab.db.QuerySeeds(seedCount,?seedMaxAge)) ???seeds?=?append(seeds,?tab.nursery...) ???for?i?:=?range?seeds?{ ???????seed?:=?seeds[i] ???????age?:=?log.Lazy{Fn:?func()?interface{}?{?return?time.Since(tab.db.LastPongReceived(seed.ID(),?seed.IP()))?}} ???????tab.log.Trace("Found?seed?node?in?database",?"id",?seed.ID(),?"addr",?seed.addr(),?"age",?age) ???????tab.addSeenNode(seed) ??} }

这里的addSeenNode即用于添加节点到bucket,在这里会检查要添加的节点是否已经存在以及bucket是否已满,如果已满则调用tab.addReplacement(b, n)将节点添加到replacement列表中去,之后添加IP,之后更新bucket:

//?filedir:?go-ethereum-1.10.2\p2p\discover\table.go?L458 func?(tab?*Table)?addSeenNode(n?*node)?{ ???if?n.ID()?==?tab.self().ID()?{ ???????return ??} ???tab.mutex.Lock() ???defer?tab.mutex.Unlock() ???b?:=?tab.bucket(n.ID()) ???if?contains(b.entries,?n.ID())?{ ???????//?Already?in?bucket,?don't?add. ???????return ??} ???if?len(b.entries)?>=?bucketSize?{ ???????//?Bucket?full,?maybe?add?as?replacement. ???????tab.addReplacement(b,?n) ???????return ??} ???if?!tab.addIP(b,?n.IP())?{ ???????//?Can't?add:?IP?limit?reached. ???????return ??} ???//?Add?to?end?of?bucket: ???b.entries?=?append(b.entries,?n) ???b.replacements?=?deleteNode(b.replacements,?n) ???n.addedAt?=?time.Now() ???if?tab.nodeAddedHook?!=?nil?{ ???????tab.nodeAddedHook(n) ??} }

事件监听

loop函数是table.go中的主循环,在函数开头出定义了后续会使用到的局部变量,之后通过deRefresh进行刷新桶操作,在这里的loop循环会每隔30分钟自动刷新一次K桶,每隔10秒钟验证K桶节点是否可以ping通,每30秒将K桶中存在超过5分钟的节点存储本地数据库,视作稳定节点:

//?filedir:?go-ethereum-1.10.2\p2p\discover\table.go?L55 ???refreshInterval????=?30?*?time.Minute ???revalidateInterval?=?10?*?time.Second ???copyNodesInterval??=?30?*?time.Second //?filedir:?go-ethereum-1.10.2\p2p\discover\table.go?L218 //?loop?schedules?runs?of?doRefresh,?doRevalidate?and?copyLiveNodes. func?(tab?*Table)?loop()?{ ???var?( ???????revalidate?????=?time.NewTimer(tab.nextRevalidateTime()) ???????refresh????????=?time.NewTicker(refreshInterval) ???????copyNodes??????=?time.NewTicker(copyNodesInterval) ???????refreshDone????=?make(chan?struct{})???????????//?where?doRefresh?reports?completion ???????revalidateDone?chan?struct{}???????????????????//?where?doRevalidate?reports?completion ???????waiting????????=?[]chan?struct{}{tab.initDone}?//?holds?waiting?callers?while?doRefresh?runs ??) ???defer?refresh.Stop() ???defer?revalidate.Stop() ???defer?copyNodes.Stop() ???//?Start?initial?refresh. ???go?tab.doRefresh(refreshDone) loop: ???for?{ ???????select?{ ???????case?<-refresh.C:???????????//定时刷新k桶事件,refreshInterval=30?min ???????????tab.seedRand() ???????????if?refreshDone?==?nil?{ ???????????????refreshDone?=?make(chan?struct{}) ???????????????go?tab.doRefresh(refreshDone) ??????????} ???????case?req?:=?<-tab.refreshReq:???//刷新k桶的请求事件 ???????????waiting?=?append(waiting,?req) ???????????if?refreshDone?==?nil?{ ???????????????refreshDone?=?make(chan?struct{}) ???????????????go?tab.doRefresh(refreshDone) ??????????} ???????case?<-refreshDone:?????????//?doRefresh完成 ???????????for?_,?ch?:=?range?waiting?{ ???????????????close(ch) ??????????} ???????????waiting,?refreshDone?=?nil,?nil ???????case?<-revalidate.C:???????//?验证k桶节点有效性,10?second ???????????revalidateDone?=?make(chan?struct{}) ???????????go?tab.doRevalidate(revalidateDone) ???????case?<-revalidateDone:????//?验证K桶节点有效性完成 ???????????revalidate.Reset(tab.nextRevalidateTime()) ???????????revalidateDone?=?nil ???????????case?<-copyNodes.C:???//?定时(30秒)将节点存入数据库,如果某个节点在k桶中存在超过5分钟,则认为它是一个稳定的节点 ???????????go?tab.copyLiveNodes() ???????case?<-tab.closeReq: ???????????break?loop ??????} ??} ???if?refreshDone?!=?nil?{ ???????<-refreshDone ??} ???for?_,?ch?:=?range?waiting?{ ???????close(ch) ??} ???if?revalidateDone?!=?nil?{ ???????<-revalidateDone ??} ???close(tab.closed) }

节点查找

getNode函数用于根据ID来查找节点,如果不存在则返回nil:

//?getNode?returns?the?node?with?the?given?ID?or?nil?if?it?isn't?in?the?table. func?(tab?*Table)?getNode(id?enode.ID)?*enode.Node?{ ???tab.mutex.Lock() ???defer?tab.mutex.Unlock() ???b?:=?tab.bucket(id) ???for?_,?e?:=?range?b.entries?{ ???????if?e.ID()?==?id?{ ???????????return?unwrapNode(e) ??????} ??} ???return?nil }

节点发现

以太坊分布式网络采用了结构化网络模型,其实现方案使用Kademlia协议,下面我们对节点发现进行简单介绍,在以太坊中k值是16,也就是说每个k桶包含16个节点,一共256个k桶,K桶中记录节点的NodeId,Distance,Endpoint,IP等信息,并按照与Target节点的距离排序,节点查找由doRefresh()实现:

//?filedir:go-ethereum-1.10.2\p2p\discover\table.go?L278

func?(tab?*Table)?doRefresh(done?chan?struct{})?{ ???defer?close(done)

tab.loadSeedNodes() ???//?Run?self?lookup?to?discover?new?neighbor?nodes. ???tab.net.lookupSelf()

for?i?:=?0;?i?<?3;?i++?{ ???????tab.net.lookupRandom() ??} }

从上述代码中可以看到这里首先调用tab.loadSeedNodes()从数据库中加载节点并将其插入到表中去:

//?filedir:?go-ethereum-1.10.2\p2p\discover\table.go?L302 func?(tab?*Table)?loadSeedNodes()?{ ???seeds?:=?wrapNodes(tab.db.QuerySeeds(seedCount,?seedMaxAge)) ???seeds?=?append(seeds,?tab.nursery...) ???for?i?:=?range?seeds?{ ???????seed?:=?seeds[i] ???????age?:=?log.Lazy{Fn:?func()?interface{}?{?return?time.Since(tab.db.LastPongReceived(seed.ID(),?seed.IP()))?}} ???????tab.log.Trace("Found?seed?node?in?database",?"id",?seed.ID(),?"addr",?seed.addr(),?"age",?age) ???????tab.addSeenNode(seed) ??} } //?filedir:go-ethereum-1.10.2\p2p\enode\nodedb.go?L440 //?QuerySeeds?retrieves?random?nodes?to?be?used?as?potential?seed?nodes //?for?bootstrapping. func?(db?*DB)?QuerySeeds(n?int,?maxAge?time.Duration)?[]*Node?{ ???var?( ???????now???=?time.Now() ???????nodes?=?make([]*Node,?0,?n) ???????it????=?db.lvl.NewIterator(nil,?nil) ???????id????ID ??) ???defer?it.Release() seek: ???for?seeks?:=?0;?len(nodes)?<?n?&&?seeks?<?n*5;?seeks++?{

ctr?:=?id[0] ???????rand.Read(id[:]) ???????id[0]?=?ctr?+?id[0] ???????it.Seek(nodeKey(id)) ???????n?:=?nextNode(it) ???????if?n?==?nil?{ ???????????id[0]?=?0 ???????????continue?seek?//?iterator?exhausted ??????} ???????if?now.Sub(db.LastPongReceived(n.ID(),?n.IP()))?>?maxAge?{ ???????????continue?seek ??????} ???????for?i?:=?range?nodes?{ ???????????if?nodes[i].ID()?==?n.ID()?{ ???????????????continue?seek?//?duplicate ??????????} ??????} ???????nodes?=?append(nodes,?n) ??} ???return?nodes }

之后通过lookupSelf来发现新的节点,这里会优先使用当前节点的ID来运行newLookup发现邻居节点:

//?filedir:?go-ethereum-1.10.2\p2p\discover\v5_udp.go?L281 //?lookupSelf?looks?up?our?own?node?ID. //?This?is?needed?to?satisfy?the?transport?interface. func?(t?*UDPv5)?lookupSelf()?[]*enode.Node?{ ???return?t.newLookup(t.closeCtx,?t.Self().ID()).run() } //?filedir:?go-ethereum-1.10.2\p2p\discover\v5_udp.go?L292 func?(t?*UDPv5)?newLookup(ctx?context.Context,?target?enode.ID)?*lookup?{ ???return?newLookup(ctx,?t.tab,?target,?func(n?*node)?([]*node,?error)?{ ???????return?t.lookupWorker(n,?target) ??}) } func?newLookup(ctx?context.Context,?tab?*Table,?target?enode.ID,?q?queryFunc)?*lookup?{ ???it?:=?&lookup{ ???????tab:???????tab, ???????queryfunc:?q, ???????asked:?????make(map[enode.ID]bool), ???????seen:??????make(map[enode.ID]bool), ???????result:????nodesByDistance{target:?target}, ???????replyCh:???make(chan?[]*node,?alpha), ???????cancelCh:??ctx.Done(), ???????queries:???-1, ??} ???//?Don't?query?further?if?we?hit?ourself. ???//?Unlikely?to?happen?often?in?practice. ???it.asked[tab.self().ID()]?=?true ???return?it }

最后随机一个target,进行lookup:

//?filedir:?go-ethereum-1.10.2\p2p\discover\v5_udp.go?L275 func?(t?*UDPv5)?lookupRandom()?[]*enode.Node?{ ???return?t.newRandomLookup(t.closeCtx).run() } //?filedir:?go-ethereum-1.10.2\p2p\discover\v5_udp.go?L287 func?(t?*UDPv5)?newRandomLookup(ctx?context.Context)?*lookup?{ ???var?target?enode.ID ???crand.Read(target[:]) ???return?t.newLookup(ctx,?target) } func?(t?*UDPv5)?newLookup(ctx?context.Context,?target?enode.ID)?*lookup?{ ???return?newLookup(ctx,?t.tab,?target,?func(n?*node)?([]*node,?error)?{ ???????return?t.lookupWorker(n,?target) ??}) } //?lookupWorker?performs?FINDNODE?calls?against?a?single?node?during?lookup. func?(t?*UDPv5)?lookupWorker(destNode?*node,?target?enode.ID)?([]*node,?error)?{ ???var?( ???????dists?=?lookupDistances(target,?destNode.ID()) ???????nodes?=?nodesByDistance{target:?target} ???????err???error ??) ???var?r?[]*enode.Node ???r,?err?=?t.findnode(unwrapNode(destNode),?dists) ???if?err?==?errClosed?{ ???????return?nil,?err ??} ???for?_,?n?:=?range?r?{ ???????if?n.ID()?!=?t.Self().ID()?{ ???????????nodes.push(wrapNode(n),?findnodeResultLimit) ??????} ??} ???return?nodes.entries,?err } //?filedir:go-ethereum-1.10.2\p2p\discover\v5_udp.go?L362 //?findnode?calls?FINDNODE?on?a?node?and?waits?for?responses. func?(t?*UDPv5)?findnode(n?*enode.Node,?distances?[]uint)?([]*enode.Node,?error)?{ ???resp?:=?t.call(n,?v5wire.NodesMsg,?&v5wire.Findnode{Distances:?distances}) ???return?t.waitForNodes(resp,?distances) }

func?(t?*UDPv5)?call(node?*enode.Node,?responseType?byte,?packet?v5wire.Packet)?*callV5?{ ???c?:=?&callV5{ ???????node:?????????node, ???????packet:???????packet, ???????responseType:?responseType, ???????reqid:????????make([]byte,?8), ???????ch:???????????make(chan?v5wire.Packet,?1), ???????err:??????????make(chan?error,?1), ??} ???//?Assign?request?ID. ???crand.Read(c.reqid) ???packet.SetRequestID(c.reqid) ???//?Send?call?to?dispatch. ???select?{ ???case?t.callCh?<-?c: ???case?<-t.closeCtx.Done(): ???????c.err?<-?errClosed ??} ???return?c }

服务结构

server端的的数据结构如下所示:

//?filedir:go-ethereum-1.10.2\p2p\server.go?L160 //?Server?manages?all?peer?connections. type?Server?struct?{ ???//?Config?fields?may?not?be?modified?while?the?server?is?running. ???Config ???//?Hooks?for?testing.?These?are?useful?because?we?can?inhibit ???//?the?whole?protocol?stack. ???newTransport?func(net.Conn,?*ecdsa.PublicKey)?transport ???newPeerHook??func(*Peer) ???listenFunc???func(network,?addr?string)?(net.Listener,?error) ???lock????sync.Mutex?//?protects?running ???running?bool ???listener?????net.Listener ???ourHandshake?*protoHandshake ???loopWG???????sync.WaitGroup?//?loop,?listenLoop ???peerFeed?????event.Feed ???log??????????log.Logger ???nodedb????*enode.DB ???localnode?*enode.LocalNode ???ntab??????*discover.UDPv4 ???DiscV5????*discover.UDPv5 ???discmix???*enode.FairMix ???dialsched?*dialScheduler ???//?Channels?into?the?run?loop. ???quit????????????????????chan?struct{} ???addtrusted??????????????chan?*enode.Node ???removetrusted???????????chan?*enode.Node ???peerOp??????????????????chan?peerOpFunc ???peerOpDone??????????????chan?struct{} ???delpeer?????????????????chan?peerDrop ???checkpointPostHandshake?chan?*conn ???checkpointAddPeer???????chan?*conn ???//?State?of?run?loop?and?listenLoop. ???inboundHistory?expHeap }

Server配置(本地节点秘钥、拨号比率、节点最大链接数、拨号比率、事件记录等):

//?Config?holds?Server?options. type?Config?struct?{

PrivateKey?*ecdsa.PrivateKey?`toml:"-"`

MaxPeers?int

MaxPendingPeers?int?`toml:",omitempty"`

DialRatio?int?`toml:",omitempty"`

NoDiscovery?bool

DiscoveryV5?bool?`toml:",omitempty"`

Name?string?`toml:"-"`

BootstrapNodes?[]*enode.Node

BootstrapNodesV5?[]*enode.Node?`toml:",omitempty"`

StaticNodes?[]*enode.Node

TrustedNodes?[]*enode.Node

NetRestrict?*netutil.Netlist?`toml:",omitempty"`

NodeDatabase?string?`toml:",omitempty"`

Protocols?[]Protocol?`toml:"-"`

ListenAddr?string

NAT?nat.Interface?`toml:",omitempty"`

Dialer?NodeDialer?`toml:"-"`

NoDial?bool?`toml:",omitempty"`

EnableMsgEvents?bool

Logger?log.Logger?`toml:",omitempty"` ???clock?mclock.Clock }

新增节点

AddPeer 函数用于新增一个给定的节点,其实现代码如下所示:

//?filedir:go-ethereum-1.10.2\p2p\server.go?L318

func?(srv?*Server)?AddPeer(node?*enode.Node)?{ ???srv.dialsched.addStatic(node) } //?filedir:go-ethereum-1.10.2\p2p\dial.go?L190 //?addStatic?adds?a?static?dial?candidate. func?(d?*dialScheduler)?addStatic(n?*enode.Node)?{ ???select?{ ???case?d.addStaticCh?<-?n: ???case?<-d.ctx.Done(): ??} }

AddTrustedPeer 函数用于新增一个可信任节点:

//?AddTrustedPeer?adds?the?given?node?to?a?reserved?whitelist?which?allows?the //?node?to?always?connect,?even?if?the?slot?are?full. func?(srv?*Server)?AddTrustedPeer(node?*enode.Node)?{ ???select?{ ???case?srv.addtrusted?<-?node: ???case?<-srv.quit: ??} }

移除节点

RemovePeer函数用于移除节点并断开与节点之间的连接:

//?filedir:go-ethereum-1.10.2\p2p\server.go?L325 func?(srv?*Server)?RemovePeer(node?*enode.Node)?{ ???var?( ???????ch??chan?*PeerEvent ???????sub?event.Subscription ??) ???//?Disconnect?the?peer?on?the?main?loop. ???srv.doPeerOp(func(peers?map[enode.ID]*Peer)?{ ???????srv.dialsched.removeStatic(node) ???????if?peer?:=?peers[node.ID()];?peer?!=?nil?{ ???????????ch?=?make(chan?*PeerEvent,?1) ???????????sub?=?srv.peerFeed.Subscribe(ch) ???????????peer.Disconnect(DiscRequested) ??????} ??}) ???//?Wait?for?the?peer?connection?to?end. ???if?ch?!=?nil?{ ???????defer?sub.Unsubscribe() ???????for?ev?:=?range?ch?{ ???????????if?ev.Peer?==?node.ID()?&&?ev.Type?==?PeerEventTypeDrop?{ ???????????????return ??????????} ??????} ??} } //?filedir:go-ethereum-1.10.2\p2p\dial.go?L198 //?removeStatic?removes?a?static?dial?candidate. func?(d?*dialScheduler)?removeStatic(n?*enode.Node)?{ ???select?{ ???case?d.remStaticCh?<-?n: ???case?<-d.ctx.Done(): ??} }

RemoveTrustedPeer函数用于移除一个可信任节点:

//?RemoveTrustedPeer?removes?the?given?node?from?the?trusted?peer?set. func?(srv?*Server)?RemoveTrustedPeer(node?*enode.Node)?{ ???select?{ ???case?srv.removetrusted?<-?node: ???case?<-srv.quit: ??} }

(*本文为《深入理解以太坊P2P网络设计》的上篇,下篇请点击主页阅读。)

查看更多

—-

编译者/作者:sky110

玩币族申明:玩币族作为开放的资讯翻译/分享平台,所提供的所有资讯仅代表作者个人观点,与玩币族平台立场无关,且不构成任何投资理财建议。文章版权归原作者所有。

LOADING...
LOADING...