大数据采集中的异步处理问题

这段时间在学习nodejs,用jsdom采集了一些数据,也遇到了一些问题,贴出来征求一下你们的解决方案。node

首先说一下目的,有几十万条图片数据,把这些图片抓取到本地文件夹中,采集完成后把成功数据归档为done.json,失败数据归档为undone.json,以下:json

1 const IMGS = [
2     "http://xxx.com/1.jpg",
3     "http://xxx.com/2.jpg",
4     "http://xxx.com/3.jpg",
5     "http://xxx.com/4.jpg",
6     ...//此处省略无数的数据
7 ];

采集的心路历程:promise

方法1 forEach循环直接请求(失败网络

1 IMGS.forEach(v=>{
2      http.get(v, (res)=> {
3          //blablabla
4      });
5 }); 

这种方法一会儿就把全部数据都异步的请求出去了,超出了网络连接数,直接就把服务爆掉了。并发

方法2 采用递归处理完一条自动处理下一条。(成功,可是有问题dom

 1 const DONE = [],
 2       UNDONE = [],
 3       FILEDONE = './done.json',
 4       FILEUNDONE = './undone.json';
 5 let counter = 0;
 6 const writeResult() =>{
 7     fs.writeFile(FILEDONE, JSON.stringify(DONE), "utf8", ()=>console.log('done saved!'));
 8     fs.writeFile(FILEUNDONE , JSON.stringify(UNDONE), "utf8", ()=>console.log('undone saved!'));
 9 }
10 const saveImage = (data, fn)=> {
11     fs.writeFile(counter++ + '.jpg', data, fn);
12 }
13 const downloadImage = (i)=> {
14     if(i >= IMGS.length) {
15         writeResult();
16         return;
17     }
18     http.get(IMGS[i], (res)=> {
19         //此处略去了异常处理的内容
20         res.setEncoding="binary";
21         let dt = '';
22         res.on('data', (ck)=> dt+=ck;)
23         res.on('end',()=> {
24             saveImage(dt, ()=> {
25                 DONE.push(IMGS[i]);
26                 downloadImag(i++);
27             });
28         })
29         
30     }).on('error',()=>UNDONE.push(IMGS[i])) ;
31 }
32 
33 downloadImage(0);

这种方法的确是能成功抓取到了数据,可是有一个最大的弊端就是只能一条一条顺序抓取,没办法批量并发一块儿抓取,这简直就是浪费了nodejs非阻塞机制。异步

好,变通处理添加循环多处理机制。学习

方法3 采用添加间隔器的方式进行批量处理。spa

const DONE = [],
    UNDONE = [],
    FILEDONE = './done.json',
    FILEUNDONE = './undone.json',
    STEP = 20;
let counter = 0, stepi = 0;
const writeResult() =>{
    if(stepi < STEP) return; //只有所有都请求完后才真正执行归档功能
    fs.writeFile(FILEDONE, JSON.stringify(DONE), "utf8", ()=>console.log('done saved!'));
    fs.writeFile(FILEUNDONE , JSON.stringify(UNDONE), "utf8", ()=>console.log('undone saved!'));
}
const saveImage = (data, fn)=> {
    fs.writeFile(counter++ + '.jpg', data, fn);
}
const downloadImage = (i)=> {
    if(i >= IMGS.length) {
        stepi++;
        writeResult();
        return;
    }
    http.get(IMGS[i], (res)=> {
            //此处略去了异常处理的内容
         res.setEncoding="binary";
            let dt = '';
            res.on('data', (ck)=> dt+=ck;)
            res.on('end',()=> {
                saveImage(dt, ()=> {
                    DONE.push(IMGS[i]);
                    downloadImag(i + STEP);
                });
            })
        }).on('error',()=>{
            downloadImag(i + STEP);
            UNDONE.push(IMGS[i])
        });
}

for(let i = 0; i < STEP; i++) { downloadImage(i); }//循环step个请求

 

方法4. 利用promise对象(暂时还未成功,还在想办法,有想法的同窗能够贴出来你们借鉴一下)code

let promise_imgs = IMGS.map(v => {
    new Promise((resolve, reject)=> {
        setTimeout(()=>{
            http.get(v, (res)=> {
                //blablabla
                res.on('end', ()=>
                    saveImage(dt, ()=>resolve())
                })
            })
        }, 0);
    });
});

Promise.all(promise_imgs).then(writeResult);//这里仍是会发生和方法1同样的问题,多回调异步并发形成网络连接数的用尽,

你们有什么好的方法快快贴出来吧!

相关文章
相关标签/搜索