>下载包含大量有关网页元数据的大型文件
>使用NPM事件流包从正则表达式拆分的文件元数据中创建流
>检查集合中的元数据是否匹配(我一直试图将每个网页的元数据流式传输到另一个函数来执行此操作)
该文件太大而无法缓冲,因此需要流式传输. Here is a small file with a few examples of the metadata 如果你想试试这个.
作业集合包中的每个作业都已在异步函数中:
- var request = Npm.require('request');
- var zlib = Npm.require('zlib');
- var EventStream = Meteor.npmRequire('event-stream');
- function (job,callback) {
- //This download is much too long to block
- request({url: job.fileURL,encoding: null},function (error,response,body) {
- if (error) console.error('Error downloading File');
- if (response.statusCode !== 200) console.error(downloadResponse.statusCode,'Status not 200');
- var responseEncoding = response.headers['content-type'];
- console.log('response encoding is %s',responseEncoding);
- if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') {
- console.log('Received binary/octet-stream');
- var regexSplit = /WARC\/1\./;
- response.pipe(zlib.createGunzip()
- .pipe(EventStream.split(regexSplit))
- .pipe(EventStream.map(function (webpageMetaData) {
- /* Need parse the MetaData or pass each webpageMetaData to function
- * This next function could block if it had to */
- searchPageMetaData(webpageMetaData); // pass each Metadatum to this function to update a collection - this function can be synchronous
- }));
- } else {
- console.error('Wrong encoding');
- }
- });
- }
- function searchWebPageMetaData(MetaData) {
- // Parse JSON and search collection for match
- }
>有没有更好的方法来构建它?我是在正确的轨道上吗?
>在哪里放置Meteor.bindEnvironment? – 每次传递给searchWebPageMetaData()时,我都会绑定环境吗?我需要在这里明确使用纤维吗?
>如果我将其运行到process.stdout,则在运行此流时会停止.我应该将流放入Meteor的一个包裹中
>我知道Meteor.wrapAsync.我想在Meteor.wrapAsync中包装最里面的searchWebPageMetaData()函数吗? (当我输入时,想想我回答的是)
>流是否会缓慢以补偿数据库调用的缓慢?我的猜测不是,但我该怎么处理?
我花了很长时间了解Meteor的wrapAsync和bindEnvironment,但是无法将它们整合在一起并了解在哪里使用它们.
补充1
只是为了澄清,步骤是:
>下载文件;
>创建流;
>解压缩;
>将其拆分为单独的webPages – EventStream处理此问题
>将其发送给函数 – 不需要返回值;这可能是阻塞,只是一些搜索和数据库调用
我试图做这样的事情,除了我需要帮助的核心代码是在一个不同文件的函数中.以下代码中包含@ electric-jesus的大部分答案.
- processJobs('parseWatFile',{
- concurrency: 1,cargo: 1,pollInterval: 1000,prefetch: 1
- },function (job,callback) {
- if (job.data.watZipFileLink) {
- queue.pause();
- console.log('queue should be paused now');
- var watFileUrl = 'https://s3.amazonaws.com/ja-common-crawl/exampleWatFile.wat.gz';
- function searchPageMetaData(webpageMetaData,callback) {
- console.log(webpageMetaData); // Would be nice to just get this function logging each webPageMetaData
- future.return(callback(webpageMetaData)); //I don't need this to return any value - do I have to return something?
- }
- if (!watFile)
- console.error('No watFile passed to downloadAndSearchWatFileForEntity ');
- var future = new Future(); // Doc Brown would be proud.
- if(typeof callback !== 'function') future.throw('callbacks are supposed to be functions.');
- request({url: watFile,body) {
- if (error) future.throw('Error Downloading File');
- if (response.statusCode !== 200) future.throw('Expected status 200,got ' + response.statusCode + '.');
- var responseEncoding = response.headers['content-type'];
- if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') {
- var regexSplit = /WARC\/1\./;
- response.pipe(zlib.createGunzip()
- .pipe(EventStream.split(regexSplit))
- .pipe(EventStream.map(function (webpageMetaData) {
- searchPageMetaData(webpageMetaData,callback);
- })
- ));
- } else {
- future.throw('Wrong encoding');
- }
- });
- return future.wait();
- } else {
- console.log('No watZipFileLink for this job');
- job.log('ERROR: NO watZipFileLink from commonCrawlJob collection');
- }
- queue.resume();
- job.done;
- callback();
- }
解决方法
- var request = Npm.require('request');
- var zlib = Npm.require('zlib');
- var EventStream = Meteor.npmRequire('event-stream');
- var Future = Npm.require('fibers/future');
- var searchWebPageMetaData = function (MetaData) {
- // Parse JSON and search collection for match
- // make it return something
- var result = /droids/ig.test(MetaData);
- return result;
- }
- var processJob = function (job,callback) {
- var future = new Future(); // Doc Brown would be proud.
- if(typeof callback !== 'function') future.throw("Oops,you forgot that callbacks are supposed to be functions.. not undefined or whatever.");
- //This download is much too long to block
- request({url: job.fileURL,body) {
- if (error) future.throw("Error Downloading File");
- if (response.statusCode !== 200) future.throw("Expected status 200,got " + downloadResponse.statusCode + ".");
- var responseEncoding = response.headers['content-type'];
- if (responseEncoding === 'application/octet-stream' || 'binary/octet-stream') {
- var regexSplit = /WARC\/1\./;
- response.pipe(zlib.createGunzip()
- .pipe(EventStream.split(regexSplit))
- .pipe(EventStream.map(function (webpageMetaData) {
- /* Need parse the MetaData or pass each webpageMetaData to function
- * This next function could block if it had to */
- // pass each Metadatum to this function to update a collection - this function can be synchronous
- future.return(callback(webpageMetaData)); // this way,processJob returns whatever we find in the completed webpage,via callback.
- }));
- } else {
- future.throw('Wrong encoding');
- }
- });
- return future.wait();
- }
用法示例:
所以每当你在这里分配变量:
- var currentJob = processJob(myjob,searchWebPageMetaData);
即使使用同步类型获取/变量分配,您也可以及时完成异步工作并及时传输.
要回答你的问题,
>在哪里放置Meteor.bindEnvironment? – 每次传递给searchWebPageMetaData()时,我都会绑定环境吗?我需要在这里明确使用纤维吗?
不是真的,我相信明确使用纤维/未来已经解决了这个问题.
>如果我将其运行到process.stdout,则在运行此流时会停止.我应该将流放入Meteor的一个包裹中
你是什么意思?我依稀记得process.stdout是阻塞的,这可能是一个原因.再次,将结果包装在未来应该照顾这一点.
>我知道Meteor.wrapAsync.我想在Meteor.wrapAsync中包装最里面的searchWebPageMetaData()函数吗? (当我输入时,想想我回答的是)
Take a look at the Meteor.wrapAsync helper code.它基本上是应用的未来分辨率,当然你可以再做一遍,你也可以自己明确地使用光纤/未来没有问题.
>流是否会缓慢以补偿数据库调用的缓慢?我的猜测不是,但我该怎么处理?
不太确定你在这里的意思..但由于我们正在尝试使用异步光纤,我的猜测也不是很好.我还没有看到使用纤维有任何缓慢.可能只有在同时启动(并同时运行)多个作业的情况下,您才会在内存使用方面遇到性能问题.保持并发队列低,因为Fibers可以在运行内容时非常强大.你只有一个核心来处理它,这是一个可悲的事实,因为节点不能多核:(