實(shí)現(xiàn)并發(fā)控制函數(shù)

neo-async 和 async 模塊都是為了解決嵌套金字塔,和異步流程控制而生,neo-async 是 async 的替代品,因?yàn)?neo-async 比 async 的性能更快。
為什么要使用這個(gè)模塊,舉個(gè)例子,比如我們需要讀取多個(gè)文件,將讀取文件的結(jié)果保存在數(shù)組中。
let list = []fs.readFile('file1', 'utf8', function (err, res) {list.push(res)fs.readFile('file2', 'utf8', function (err, res) {list.push(res)fs.readFile('file3', 'utf8', function (err, res) {list.push(res)??????console.log(list)})})})
使用 neo-async 我們可以這樣寫:
function getFile(file, callback) {fs.readFile(file, 'utf8', function (err, res) {if (err) {return callback(err);}callback(null, res);});}async.map(['file1','file2','file3'], getFile, function(err, results) {// 返回一個(gè)新數(shù)組console.log(results)});
each
我們要學(xué)習(xí)的第一個(gè)函數(shù)是 each,注意,由于此函數(shù)并行地將 iterator 應(yīng)用于每個(gè)項(xiàng),因此不能保證 iterator 函數(shù)將按順序完成。
// arrayvar order = [];var array = [1, 3, 2];var iterator = function(num, done) {setTimeout(function() {order.push(num);done();}, num * 10);};async.each(array, iterator, function(err, res) {console.log(res); // undefinedconsole.log(order); // [1, 2, 3]});
另外,eachLimit 與 each 相同,但一次最多運(yùn)行多少異步操作,可以自己設(shè)置限制。
eachLimit
// arrayvar order = [];var array = [1, 5, 3, 4, 2];var iterator = function(num, done) {setTimeout(function() {order.push(num);done();}, num * 10);};async.eachLimit(array, 2, iterator, function(err, res) {console.log(res); // undefinedconsole.log(order); // [1, 3, 5, 2, 4]});
控制并發(fā)數(shù)量的實(shí)現(xiàn)原理如下:
var noop = function noop() {};function timesSync(n, iterator) {var index = -1;while (++index < n) {iterator(index);}}function eachLimit(collection, limit, iterator, callback) {callback = callback || noop;var size = collection.length;var sync = false;var started = 0;var completed = 0;timesSync(limit > size ? size : limit, iterate);function iterate() {if (started < size) {iterator(collection[started++], done);}}function done(err, bool) {if (err) {callback(err);} else if (++completed === size) {callback(null);} else {iterate();}}}
promise
我們都知道異步編程的方式有很多種,包括回調(diào)和 promise 的形式,下面是通過 promise 控制并發(fā)數(shù)量。
/*** @params list {Array} - 要迭代的數(shù)組* @params limit {Number} - 并發(fā)數(shù)量控制數(shù)* @params asyncHandle {Function} - 對`list`的每一個(gè)項(xiàng)的處理函數(shù),參數(shù)為當(dāng)前處理項(xiàng),必須 return 一個(gè)Promise來確定是否繼續(xù)進(jìn)行迭代* @return {Promise} - 返回一個(gè) Promise 值來確認(rèn)所有數(shù)據(jù)是否迭代完成*/let eachLimit = (list, limit, asyncHandle) => {let recursion = (arr) => {????return?asyncHandle(arr.shift()).then((res)?=>?{if (arr.length !== 0) {return recursion(arr) // 數(shù)組還未迭代完,遞歸繼續(xù)進(jìn)行迭代}})};let listCopy = [].concat(list);let asyncList = []; // 正在進(jìn)行的所有并發(fā)異步操作while (limit--) {asyncList.push(recursion(listCopy));}return Promise.all(asyncList); // 所有并發(fā)異步操作都完成后,本次并發(fā)控制迭代完成}const order = []const timeout = i => new Promise(resolve => {setTimeout(() => {order.push(i)resolve(i)}, i * 10)});eachLimit([1, 5, 3, 4, 2], 2, timeout).then((res) => {console.log(order)})
評論
圖片
表情
