LOADING...
LOADING...
LOADING...
当前位置: 玩币族首页 > 新闻观点 > 精通IPFS:IPFS 保存内容之中篇

精通IPFS:IPFS 保存内容之中篇

2020-06-10 星鉴网 来源:区块链网络

在上一篇文章中,我们分析了保存文件/内容的整体流程,基本上知道在这个过程中文件/内容是怎么处理的。

但是,还留下了一个疑问,就是文件是怎么分片的,又是怎么保存到本地系统。

这篇文章我们就来解决这几个问题。

通过上一篇文章,我们知道ipfs-unixfs-importer这个类库,它实现了 IPFS 用于处理文件的布局和分块机制。

它的index.js文件内容只有一行代码require('./importer'),接下来我们直接来看这个importer/index.js是怎么处理的。

把参数传递的选项和默认选项进行合并,生成新的选项,然后检查选项的相关配置。

constoptions=Object.assign({},defaultOptions,_options)
options.cidVersion=options.cidVersion||0
if(options.cidVersion>0&&_options.rawLeaves===undefined){
options.rawLeaves=true
}
if(_options&&_options.hash!==undefined&&_options.rawLeaves===undefined){
options.rawLeaves=true
}

默认选项即defaultOptions内容如下:

constdefaultOptions={
chunker:'fixed',
rawLeaves:false,
hashOnly:false,
cidVersion:0,
hash:null,
leafType:'file',
hashAlg:'sha2-256'
}

根据选项中指定的分割方式,从 IPFS 中提供的所有分割方法找到对应的分割对象。

constChunker=chunkers[options.chunker]

chunkers表示系统提供的所有分割方法对象。

在父目录下chunker/index.js文件中定义的,默认有fixed、rabin两种方法,默认使用的是的前者,即固定大小。

生成一个 pull-through 的双向流,双向流的意思就是即可以从它读取数据,又可以提供数据让其它流读取。

constentry={
sink:writable(
(nodes,callback)=>{
pending+=nodes.length
nodes.forEach((node)=>entry.source.push(node))
setImmediate(callback)
},
null,
1,
(err)=>entry.source.end(err)
),
source:pushable()
}

source流是 pull-pushable 类库提供的一个可以其它流主动 push 的 pull-stream 源流,它提供了一个push方法。

当调用这个方法时,它开始调用回调函数,从而把数据传递给后续的 through 或 sink。

当时,它还提供了一个end方法,当数据读取完成后,调用这个方法。

sink流是 pull-write 类库提供的一个创建通用 pull-streamsinks流的基础类。

它的签名如下:(write, reduce, max, cb), 因为它是一个sinks流,所以它会读取前一个流的数据,在读取到数据之后就调用它的write方法保存读取到的数据,如果数据读取完成就调用它的cb方法。

在这里sink函数从前一个流中读取数据,然后放入source中。同时,source成为下一个流的读取函数。

生成一个dagStream对象,这个对象也是一个{source,sink}对象。

constdagStream=DAGBuilder(Chunker,ipld,options)

DAGBuilder函数定义于父目录下的builder/index.js中,接下来我们看下这个执行过程:

第 1 个参数Chunker/createChunker,它表示具体分割内容的策略,默认情况下为fixed,详见第一步中的defaultOptions变量内容;

第 2 个参数ipld/ipld,这个是 IPFS 对象的_ipld属性,在 IPFS 对象创建时生成的,表示星际接续的数据,目前它可以连接比特币、以太坊、git、zcash 等,在 IPFS 体系中具有非常重要的位置;

第 3 个参数reducer/createReducer是具体的 reduce 策略,默认情况为balanced,详见第四步中生成reducer变量的过程。

第 4 个参数options/_options为选项。

合并指定的选项和自身默认的选项。

constoptions=extend({},defaultOptions,_options)

默认选项如下:

constdefaultOptions={
chunkerOptions:{
maxChunkSize:262144,
avgChunkSize:262144
},
rawLeaves:false,
hashAlg:'sha2-256',
leafType:'file',
cidVersion:0,
progress:()=>{}
}

返回一个函数对象。

returnfunction(source){
returnfunction(items,cb){
parallel(items.map((item)=>(cb)=>{
if(!item.content){
returncreateAndStoreDir(item,(err,node)=>{
if(err){
returncb(err)
}
if(node){
source.push(node)
}
cb()
})
}
createAndStoreFile(item,(err,node)=>{
if(err){
returncb(err)
}
if(node){
source.push(node)
}
cb()
})
}),cb)
}
}

合并选项参数和默认选项

constoptions=Object.assign({},defaultOptions,_options)

默认选项如下:

constdefaultOptions={
strategy:'balanced',
highWaterMark:100,
reduceSingleLeafToSelf:true
}

根据选项指定的 reduce 策略,从系统提供的多个策略中选择指定的策略。

conststrategyName=options.strategy
constreducer=reducers[strategyName]

系统定义的的策略如下:

constreducers={
flat:require('./flat'),
balanced:require('./balanced'),
trickle:require('./trickle')
}

在用户不指定具体策略的默认情况下,根据前面执行过程,最终选定的策略为balanced。

调用Builder方法创建最终的策略对象。

constcreateStrategy=Builder(Chunker,ipld,reducer,options)

Builder方法位于builder.js文件中,它会创建一个 pull-stream 的 through 流对象。

在看它的内部之前,我们首先看下的 4个参数。看完参数,接下来,我们看下它的执行逻辑。

返回的这个函数,最终成为了一个sink流的write方法。

调用createBuildStream方法,生成一个双向流对象。

createBuildStream(createStrategy,ipld,options)

createBuildStream方法位于create-build-stream.js文件中,代码如下:

constsource=pullPushable()
constsink=pullWrite(
createStrategy(source),
null,
options.highWaterMark,
(err)=>source.end(err)
)
return{
source:source,
sink:sink
}

在这段代码中,source流是 pull-pushable 类库提供的一个可以主动 push 到其它流的 pull-stream 源流,这个类库在前面我们已经分析过,这里就直接略过。

sink流是 pull-write 类库提供的一个创建通用 pull-streamsinks流的基础类,这个类库也在前面分析过,这里也不细讲,我们只看下它的write方法。

这里的createStrategy函数正是调用Builder方法返回的createStrategy函数,用source作为参数,调用它,用返回的第二层匿名函数作为write方法。

生成一个树构建器流对象,并返回其双向流对象。

consttreeBuilder=createTreeBuilder(ipld,options)
consttreeBuilderStream=treeBuilder.stream()

createTreeBuilder函数位于tree-builder.js文件中,我们来看它的执行逻辑。

首先,合并默认选项对象和指定的选项对象。

constoptions=Object.assign({},defaultOptions,_options)

默认选择对象如下:

constdefaultOptions={
wrap:false,
shardSplitThreshold:1000,
onlyHash:false
}

onlyHash表示是否不保存文件/内容,只计算其哈希。

创建一个队列对象。

constqueue=createQueue(consumeQueue,1)

创建一个双向流对象

letstream=createStream()

其中sink对象是一个pull-write类库提供的流,这个已经见过多次了。

它的write方法后面遇到时再来看,source是一个pull-pushable类库提供的流,这个也见过多次。

创建一个DirFlat对象。

lettree=DirFlat({
path:'',
root:true,
dir:true,
dirty:false,
flat:true
},options)

返回特权函数构成的对象。

return{
flush:flushRoot,
stream:getStream
}

创建一个暂停流。这里什么也不做。

调用pull方法,创建一个完整的流来保存文件内容。

pull(
entry,
pausable,
dagStream,
map((node)=>{
pending--
if(!pending){
process.nextTick(()=>{
while(waitingPending.length){
waitingPending.shift()()
}
})
}
returnnode
}),
treeBuilderStream
)

pull 函数是 pull-stream 是类库中的核心函数。

在它的执行过程中,最后的 sink 流通过依次调用前面的 through 流,最终从最前面的 source 流中拉取数据。

除了最前面的 Source 流和最后面的 Sink 流,中间的都是 through 流,它们即可以被后面的流调用以提供数据,也可以调用前面的流来读取数据。

当pull函数在调用某个参数从前面读取数据时,如果当前参数是一个对象(即双向流)时,那么就会调用它的sink方法来读取。

同时用它的source方法作为后面参数的读取方法。

下面我们分析这段代码中的几个流,它们太重要了。

首先是entry流,它是一个双向流,它的sink函数(类型为pull-write流)调用前一个流的read方法来读取数据,并把读取到的数据放在source中(类型为 pull-pushable )。

然后是dagStream流,它也是一个双向流,它的sink函数(类型为pull-write流)调用entry流的source方法来读取数据。

sink函数的异步写函数参数为builder.js中返回的第二层函数,当读取到数据之后,调用builder.js中返回的第二层函数进行处理,在第二层函数中,大致流程是把数据保存自身的source中(类型为 pull-pushable )。

dagStream 在create-build-stream.js中生成。为了方便理解,这里我们再次看下它的代码。

const source = pullPushable()

const sink = pullWrite( createStrategy(source), null, options.highWaterMark, (err) => source.end(err) )

return { source: source, sink: sink }

最后是treeBuilderStream流,它也是一个双向流,它的sink函数(类型为pull-write流)调用dagStream流的source方法来读取数据,并把读取到的数据放在source中(类型为 pull-pushable )。

其他两个流对流程没有任何影响,读者可以自行分析,这里略过不提。

在这一步中,通过pull函数把最重要的几个流连接到一起,并通过下面最后一步,把它们与外部的流联系到一起。

最后,返回双向流对象。

{
sink:entry.sink,
source:treeBuilderStream.source,
flush:flush
}

到这里,文件已经保存完成了。

啥?文件已经保存完成了?

什么都没看到就把文件保存完了,不会骗我们的吧?

哈,因为保存文件这个动作太复杂了,所以上面只是静态的从代码层面进行梳理。

下面我们从头到尾从动态处理的过程来看下文件到底是怎么保存在本地的。

一切要从我们在上篇写的这个示例说起

const{createNode}=require('ipfs')

constnode=createNode({
libp2p:{
config:{
dht:{
enabled:true
}
}
}
})

node.on('ready',async()=>{

constcontent=`我爱黑萤`;

constfilesAdded=awaitnode.add({
content:Buffer.from(content)
},{
chunkerOptions:{
maxChunkSize:1000,
avgChunkSize:1000
}
})

console.log('Addedfile:',filesAdded[0].path,filesAdded[0].hash)
})

上面这段代码,最终执行的是core/components/files-regular/add-pull-stream.js文件中的函数,它的主体就是下面的这段代码:

pull(
pull.map(content=>normalizeContent(content,opts)),
pull.flatten(),
importer(self._ipld,opts),
pull.asyncMap((file,cb)=>prepareFile(file,self,opts,cb)),
pull.map(file=>preloadFile(file,self,opts)),
pull.asyncMap((file,cb)=>pinFile(file,self,opts,cb))
)

为了便于分析理解,我们在分析过程中仍然使用推的方式,从源流推到目的流中,注意这个仅是为了理解方便,真实的过程是目的流从源流中拉取数据。

下面代码简单解释如下:

首先,调用第一个pull.map流,对收到的文件或内容并进行一些必要的转换,

调用pull.flatten流,把前一步生成的数组进行扁平化处理。

调用importer流来保存内容。

调用pull.asyncMap方法,对已经保存的文件/内容进行预处理,生成用户看到的内容。

调用pull.map方法,把已经保存到本地的文件预加载到指定节点。

调用pull.asyncMap方法,把已经保存到本地的文件长期保存在本地,确保不被垃圾回收。

下面我们重点看下文件内容在importer流中的处理逻辑。

1,调用entry.sink函数从前面的pull.flatten流中读取保存的每一个文件/内容。

2,调用dagStream.sink函数从前面的流中读取数据,并在读取到数据之后,调用builder.js中定义的第二层匿名函数进行处理。

在这个函数中,调用异步流程库async的parallel方法对收到的每个要处理的文件内容进行处理。

具体处理如下:如果保存的是目录,那么调用createAndStoreDir方法,创建并保存目录。

如果保存的是文件,那么调用createAndStoreFile方法,创建并保存主文件。

因为我们保存的是文件,所以在这里详细看下createAndStoreFile方法,它的过程如下:

设置源流为file.content。

调用chunker流,对保存的内容进行分块。

调用paraMap流(类型为 pull-paramap),对每一个分块进行处理。

调用pullThrough流(类型为 pull-through 流),对收到的每个数据进行处理。

调用reducer流,把所有生成的分块进行 reduce 处理。如果文件进行了多次分块,这里就会根据生成的分块生成一个父块。

调用collect流,调用回调函数即createAndStoreFile,把保存文件的结果传递到外部函数中。

如果保存的内容是 Buffer,那么调用 pull-stream 的values方法,生成内容源流。

if(Buffer.isBuffer(file.content)){
file.content=values([file.content])
}

调用createReducer方法,创建 reducer 对象,默认为balanced,所以这里创建的 reducer 对象类型为balanced/balanced-reducer.js文件中定义的函数。

constreducer=createReducer(reduce(file,ipld,options),options)

调用createChunker方法,创建 chunker 对象,默认为fixed,所以这里创建的 chunker 对象类型为chunker/fixed-size.js主文件中定义的函数。

chunker=createChunker(options.chunkerOptions)

调用pull函数进行保存文件。这个pull函数会进行 IPFS 特有业务,涉及到 IPFS 保存文件核心逻辑,这块我们留在下一篇文章中进行分析。

调用treeBuilderStream.sink函数从前面的流中读取数据,在这里即为保存文件的结果,并在读取到保存文件结果之后,把结果保存在source中。

当把保存文件的结果保存到source中之后,core/components/files-regular/add-pull-stream.js文件中定义的pull.asyncMap就可以得到这个结果了。

作者介绍:


乔疯,区块链狂热爱好者,熟悉比特币、EOS、以太坊源码及合约的开发,有着数年区块链开发经验,坚信技术是第一生产力,区块链改变整个人类,开设巴比特专栏以来已经获得100多万次的阅读量。


参与湖南天河国云 Ulord 公链的开发和面向区块链行业的风险监控平台,后者在近期成功入选由工信部评选的 101 个网络安全技术应用试点示范项目


在爱健康金融金融有限公司参与组建彗星信息科技有限公司,并担任第一任技术部负责人,开发出了彗星播报等深受大家喜爱的区块链产品。


具有良好的协调沟通能力和团队协作精神!熟悉Scrum、XP、看板等敏捷项目管理,拥有PMP证书!


熟悉JAVA、Python、NodeJS、C/C++、Linux下的开发,熟悉分布式架构设计!熟悉互联网金融行业,具有丰富的互联网金融产品开发经验,对互联金融有着深入的了解。

精品文推荐

曝光:IPFS星际银河矿机,圈走了几个亿的传销盘

何晓阳:POC硬盘挖矿加入不透明抵押机制就是庞氏骗局

硬盘挖矿=存力挖矿?看董天一、冷波、李万胜技术大牛们怎么说

火币中国CEO袁煜明:5G和IPFS将加速web3.0到来

莱比特矿池CEO江卓尔:一切自由皆正义

—-

编译者/作者:星鉴网

玩币族申明:玩币族作为开放的资讯翻译/分享平台,所提供的所有资讯仅代表作者个人观点,与玩币族平台立场无关,且不构成任何投资理财建议。文章版权归原作者所有。

LOADING...
LOADING...