LOADING...
LOADING...
LOADING...
当前位置: 玩币族首页 > 行情分析 > 精通IPFS:IPFS 保存内容之下篇

精通IPFS:IPFS 保存内容之下篇

2020-06-10 星鉴网 来源:区块链网络

在上一篇文章中,我们指出在 builder/builder.js 文件中调用调用 pull 函数进行保存文件,这篇文章我们就来详细研究下这个过程。

1,设置源流为 file.content。

2,调用 chunker 流,对保存的内容进行分块。通过前面的文章,我们知道 chunker 流的默认实现为 chunker/fixed-size.js,它是一个 pull-through 流。

这个流提供了两个函数,分别称为 onData 和 onEnd,前者在每次数据到来时调用,后者当数据发送完成时调用。fixed-size.js 在初始化时,根据选项中指定的maxChunkSize 属性设置每一个区块的大小。

下面,我们来看下它的在它的 onData 和 onEnd 两个方法。

onData 函数处理如下:

以上就 IPFS 中固定分块的逻辑,其实也很简单。

从缓冲列表中取得规定的区块大小数据到队列中。

this.queue(bl.slice(0,maxSize))

如果缓冲列表的长度刚好等于规定的区块大小,那么重新一个新的缓冲区列表,并将当前数据长度设置为 0;

否则,生成一个新的缓冲区列表,并从老缓冲区中区块大小处把数据读取到新的缓冲区列表中(0 到区块大小处的数据已经在上面读取过)

同时设置其为老的缓冲区列表,并更新当前数据长度设减去前一步读取到区块大小长度,从而更缓冲区列表及其长度。

if(maxSize===bl.length){

bl=newBufferList()

currentLength=0

}else{

onstnewBl=newBufferList()

newBl.append(bl.shallowSlice(maxSize))

bl=newBl

currentLength-=maxSize}

每次收到数据之后就保存在 BufferList 中,同时把当前数据长度也加上读取到数据的长度。

bl.append(buffer)

currentLength += buffer.lengt

如果当前数据长度大于等于规定的区块大小时,那么就进行下面的循环处理,直到当前数据长度小于规定的区块大小。

看完了 onData 方法,接下来我们再看 onEnd 函数,这个函数首先检查缓冲列表中是否有数据(少于区块大小),如果有则同样保存到队列中。

if(currentLength)

{this.queue(bl.slice(0,currentLength))

emitted=true}

if(!emitted) {

this.queue(Buffer.alloc(0))}

this.queue(null)

调用 paraMap 流(类型为 pull-paramap),对每一个分块进行处理。

当前面的流对文件进行分块之后,每一个分区都会下一个流进行拉取,在这里就是这个函数,我们看下这个函数是如何处理每一个分块的。

它的主体是一个 waterfall 函数,这个函数正如其名字所示,每一个函数都进行各自的处理,并把结果传递给下一个函数,我们看下它的几个处理函数。

首先,我们来看第一个函数,它主要用来创建 DAGNode,并把相关信息传递给第二个函数,它的执行逻辑如下:

接下来,我们看第二个函数,它的主要作用是把生成的 DAGNode 保存到系统中,并把保存的结果传递给下一个函数,它的执行逻辑如下:

从选项中获取 CID 版本号、哈希算法、编码方式等。

letcidVersion=options.cidVersion||defaultOptions.cidVersion

lethashAlg=options.hashAlg||defaultOptions.hashAlg

letcodec=options.codec||defaultOptions.codec

if(Buffer.isBuffer(node)){

cidVersion=1

codec='raw'}

if(hashAlg!=='sha2-256'){

cidVersion=1}

默认情况下,版本号为0,哈希算法为 SHA256,编码方式为 dag-pb,这是一种基于 Protocol 规定的 JS 实现。

如果选项中指定不保存而仅仅是计算哈希值,那么调用 ipld-dag-pb 库中的 util.js 中的 cid 函数,获取 DAG 节点的 CID,然后直接返回。

if(options.onlyHash){

returncid(node,{

version:cidVersion,

hashAlg:hashAlg},

(err,cid)=>{

callback(err,{cid,

node})})}

如果不是只计算哈希,那么调用 IPLD 对象的 put 来保存 DAG 节点。

ipld.put(node,{

version:cidVersion,

hashAlg:hashAlg,

format:codec},

(error,cid)=>{

callback(error,

{cid,

node

})

})

IPLD 对象定义于 ipld 库中。

IPLD 在 IPFS 中具有非常重要的作用,它是 InterPlanetary Linked-Data 的缩写,代表了 IPFS 的野心与希望,把一切东西连结起来的愿望.

目前可以边结比特币、以太坊、Zcash、git 等。它持有 ipfs-block-service,后者又持有 ipfs 仓库对象和 bitswap 对象,这几个对象构成了 ipfs 的核心。

下面我们来看 put 方法,看它是怎么来保存 DAG 对象的。

它的主体是调用内部方法获取当前 DAG 对象编码用的格式,然后使用与这种格式相匹配的 cid 方法来取得对象的 CID 对象,然后调用内部的_put 来保存数据。

this._getFormat(options.format,(err,format)=>{

if(err)returncallback(err)

format.util.cid(node,options,(err,cid)=>{

if(err){

returncallback(err)}

if(options.onlyHash){

returncallback(null,cid)}

this._put(cid,node,callback)

})

})

接下来,我们来看这个内部_put 方法,这个方法主体是一个 waterfall 函数,它内部的几个函数分别根据 CID 对象获得对应的编码格式,然后使用编码格式对应的方法序列化 DAG 节点对象,最后生成区块 Block 对象,并调用区块服务对象的 put 方法来保存区块。

区块服务对象定义于 ipfs-block-service 库,它的 put 方法,根据是否有 bitswap 对象(初始化是这个对象为空)来决定是调用仓库对象来保存区块,还是调用 bitswap 来保存区块。对于我们的例子来说,它会调用 bitswap 来保存区块。

bitswap 对象的 put 方法,不仅会把区块保存在底层的 blockstore 中,还会把它发送给那些需要它的节点。它的主体是一个 waterfall 函数,其中第一个函数检查本地区块存储是否有这个区块,第二个根据本地是否有这个区块来确定是否忽略调用,还是真正来保存区块。

waterfall([

(cb)=>this.blockstore.has(block.cid,cb),

(has,cb)=>{

if(has){

returnnextTick(cb)

}

this._putBlock(block,cb)

}

],callback)

bitswap 对象的_putBlock 方法调用区块存储对象的 put 方法在本地仓库中保存区块对象,并在成功之后触发一个收到区块的事件.

同时通过网络对象的 provide 方法,从而把 CID 保存在最近的节点中,然后调用引擎对象的 receivedBlocks 方法,把接收到的区块对象发送到所有想要这个区块的所有节点中。

this.blockstore.put(block,(err)=>{

if(err){

returncallback(err)

}

this.notifications.hasBlock(block)

this.network.provide(block.cid,(err)=>{

if(err){

this._log.error('Failedtoprovide:%s',err.message)

}})

this.engine.receivedBlocks([block.cid])

callback()

})

bitswap 对象中有两个重要的对象,一个是网络对象,一个是引擎对象。

网络对象的 provide 方法直接调用 libp2p 对象的内容路由的同名方法来处理区块的 CID。

libp2p 对象的内容路由中保存所有具体的路由方法,默认情况下,是空的,即没有任何路由方法。

而我们通过在配置文件中,指定 libp2p.config.dht.enabled 为真,为内容路由指定了 DHT 路由。

所以最终区块的 CID 会被保存在最合适的节点中。

网络对象在初始方法中,指定了自身的两个方法作为 libp2p 对象的节点连接与断开事件的处理器,从而在连接与断开时获得相应的通知。

并且还调用了 libp2p 对象的 handle 方法,从而使自己成为 libp2p 对象/ipfs/bitswap/1.0.0和/ipfs/bitswap/1.1.0这两种协义的处理对象,从而当 libp2p 收到这两种消息时,会调用网络对象对象的相应方法进行处理。

网络对象处理 bitswap 协义是通过 pull 函数处理的,大致流程如下:从连接对象中获取消息,反序列化成为消息对象,然后通过连接对象获取它的节点信息对象。

调用 bitswap 对象的内部方法_receiveMessage 处理传递进来的消息,而这个方法又会调用引擎对象的 messageReceived 方法来处理接收到的消息。

引擎对象的 messageReceived 方法的大致流程如下:

1)调用内部方法_findOrCreate,找到或创建远程对等节点的总账本对象 Ledger,如果是新创建的总账本对象,还要放入内部映射集合中,key 为远程对等节点的 Base58 字符串;

2)如果这个消息是完全的消息,则生成一个新的想要请求列表。

3)调用内部方法_processBlocks,处理消息中的区块对象。

4)如果消息中的想要列表为空的,则退出方法。

5)遍历消息中的想要列表,如果当前想要的实体被取消,则从对应的节点的总账本中去掉对应项,同时保存在取消项列表中;否则,把当前项保存在对应节点的总账本中,同时保存在想要列表中。

6)调用内部方法_cancelWants ,把任务中已经取消的过滤掉,即删除任务中已经取消的任务。

7)调用内部方法_addWants,处理远程对等节点所有想要的列表。调用区块存储对象判断想要的项本地仓库中是否已经有,如果已经有,则生成相应的任务。

引擎对象的 receivedBlocks 方法在收到具体区块时,检查所有已连接的远程节点(总账本对象),看它们是否想要这个区块,如果是则生成一个任务,在后台进行处理。

调用 persist 方法,保存 DAG 节点。这是非常重要的一步,它不仅把区块对象保存在本地仓库,也涉及与是否把区块 CID 保存在与它最近的节点上,还涉及到把区块通过 bitswap 协义发送到那些想要它的节点中。它的执行如下:

生成一个 UnixFS 对象。

constfile=newUnixFS(options.leafType,buffer)

UnixFS 是一种基于协议缓冲区的格式,用于描述IPFS中的文件,目录和符号链接。目前它支持:原始数据、目录、文件、原数据、符号连接、hamt-sharded-directory 等几种类型。

leafType 默认为文件,在文件初始化时通过默认选项 defaultOptions 指定的。

调用 DAGNode.create 静态方法,创建 DAGNode 节点,成功之后,把相信信息传递下一个函数。

DAGNode.create(file.marshal(),[],(err,node)=>{

if(err){

returncb(err)

}

cb(null,{size:node.size,

leafSize:file.fileSize(),

data:node})})

UnixFS 的 marshal 方法主要内容是对文件内容(字节缓冲区)进行编码。

这里 DAGNode 引用的是 ipld-dag-pb 库中的 dag-node/index.js 中定义的 DAGNode 函数对象.

它的 create 方法,定义于同一个目录下的 create.js 中,我们来看下这个方法。

它的主要内容是对文件的分区数据和对其他区块的连接 link 进行检查,并把两者序列后之后再创建 DAGNode 对象。

而后者的构造函数比较简单,仅把区块的数据及与其他区块的连接(代表与其他区块的关系)保存起来。

调用 pullThrough 流(类型为 pull-through 流),对收到的每个数据进行处理。这个过程比较简单,这里不细讲。

调用 reducer 流,把所有生成的分块进行归一处理。在默认情况下,reducer 流是在 balanced/index.js中通过调用 balanced/balanced-reducer.js 中的 balancedReduceToRoot 的函数生成的。

我们看下这个函数的执行过程:

第一个函数是前面建立的 source 流。

第二个函数是一个 pull-batch 类库定义的流,这是一个 pull-through 流,它实现了自己的 writer、ender 两个函数,它把每次获取到的数据保存在内部数组中,达到一定程序之后才会保存到 pull-through 流的队列中。

第三个函数是 pull-stream 类库的 async-map 流,这是一个 through 流,与 map 流相似,但有更好的性能。

它的归一处理函数 reduce 默认情况下为 builder/reduce.js 中返回的 reducefile 函数。

它的流程如下:1)如果当前叶子节点数量是1,并且其 single 标志为真,并且选项中有配置把单独叶子归一到自身,那么直接调用回调对象;

否则,执行下面的流。

if(leaves.length===1&&leaves[0].single&&options.reduceSingleLeafToSelf){

constleaf=leaves[0]

returncallback(null,{

size:leaf.size,

leafSize:leaf.leafSize,

multihash:leaf.multihash,

path:file.path,

name:leaf.name})}

创建父节点,并添加它的所有叶子节点。当文件比较大的时候,IPFS 会进行分块,每一个分块就构成了这里的叶子节点.

最终这些叶子按照它们分块的顺序,生成对应的 DAGLink ,然后依次添加到父 DAGNode 中.

这时候父 DAGNode 保存的不是文件内容,而是这些叶子节点的 DAGLink,从而构成文件的完整内容。

constf=newUnixFS('file')

constlinks=leaves.map((leaf)=>{

f.addBlockSize(leaf.leafSize)

returnnewDAGLink(leaf.name,leaf.size,leaf.multihash)})

调用 waterfall 函数,顺序处理父节点。这个地方和处理单个分块类似,就是创建 DAGNode 对象、调用 persist 函数进行持久化处理。

注意:这里的区别是父节点有叶子节点,即 links 不空。

waterfall([

(cb)=>DAGNode.create(f.marshal(),links,cb),

(node,cb)=>persist(node,ipld,options,cb)

],(error,result)=>{

if(error){

returncallback(error)}

callback(null,{

size:result.node.size,

leafSize:f.fileSize(),

multihash:result.cid.buffer,

path:file.path,

name:''})})

上面 waterfall 函数处理完成后,调用回调函数进行继续处理。

归一处理函数 reduce 中的回调函数是下面 collect 流即 sink 流中的读取回调函数,当归一函数读取到数据之后,调用这个回调函数,从而数据 pull 到 collect 流,进而进入 reduced 函数中进行处理。

第四个函数是 pull-stream 类库的 collect 流,这是一个 sink 流。它的处理函数 reduced 流程如下:1)如果前面的流有错误,则直接调用 reduceToParents 函数的回调函数进行处理;

否则,如果当前收到的数据长度大于1,即前面归一处理之后,还是有多个根 DAGNode,则调用 reduceToParents 函数继续进行归一处理;

否则,调用 reduceToParents 函数的回调函数进行处理。

reduceToParents 函数的回调函数,这是一个很关键的函数,在这个函数内部把读取到的数据写入 result 表示的 pull-pushable 流,以便在它后面的外部流流获取数据。

生成 pull-pair 对象和 pull-pushable 对象。

constpair=pullPair()

constsource=pair.source

constresult=pushable()

调用 reduceToParents 函数,建立内部 pull 流。函数的主体就是一个 pull 函数建立起来的流,它的几个函数如下:

返回双向流对象。这里返回的双向流对象为

{sink:pair.sink,

source:result}

其中 sink 是 pull-pair 类库中定义的 sink 流,它被外部的 pull 函数调用用来从前面一个流中读取数据;source 是 pull-pushable 类库中的流。

在 reduceToParents 函数的回调函数中被 push 数据,从而外部的 pull 函数中相关的流可以从它中读取函数。

调用 collect 流,在这个流的处理函数中,把保存文件的结果传递到外部函数中。

collect((err,roots)=>{

if(err){

callback(err)

}else{

callback(null,roots[0])}

})

这里的 callback 是调用 createAndStoreFile 函数时传递进来的,而它的调用是在 builer/builder.js文件中,简单回顾一下调用代码:

createAndStoreFile(item,(err,node)=>{

if(err){returncb(err)}

if(node){

source.push(node)}

cb()})

这里的匿名回调函数即是上面的 callback,在回调函数中,通过保存文件的结果写入 source 流中,从而把数据传递到更外层的 pull 流中。


到这里,我们已经把保存文件/内容这一核心流程完整分析了一遍,从头看到尾的你是不是收获很大。接下来,敬请期待下篇获取内容。

—-

编译者/作者:星鉴网

玩币族申明:玩币族作为开放的资讯翻译/分享平台,所提供的所有资讯仅代表作者个人观点,与玩币族平台立场无关,且不构成任何投资理财建议。文章版权归原作者所有。

LOADING...
LOADING...