Redis + NodeJS 實現(xiàn)一個能處理海量數(shù)據(jù)的異步任務(wù)隊列系統(tǒng)
一、引言
總耗時時間 = 數(shù)據(jù)量 × 單條數(shù)據(jù)處理時間 T = N * t (N = 100,000; t = 25s) 總耗時時間 = 2,500,000 秒 ≈ 695 小時 ≈ 29 天
二、異步任務(wù)隊列原理

三、使用 NodeJS 操作 Redis
docker pull redis:latest
docker run -itd --name redis-local -p 6379:6379 redis

import * as Redis from 'redis'const client = Redis.createClient({ host: '127.0.0.1', port: 6379})export default client
import client from './mqClient'// 獲取 Redis 中某個 key 的內(nèi)容export const getRedisValue = (key: string): Promise<string | null> => new Promise(resolve => client.get(key, (err, reply) => resolve(reply)))// 設(shè)置 Redis 中某個 key 的內(nèi)容export const setRedisValue = (key: string, value: string) => new Promise(resolve => client.set(key, value, resolve))// 刪除 Redis 中某個 key 及其內(nèi)容export const delRedisKey = (key: string) => new Promise(resolve => client.del(key, resolve))
import { TASK_NAME, TASK_AMOUNT, setRedisValue, delRedisKey } from './utils'import client from './mqClient'client.on('ready', async () => {await delRedisKey(TASK_NAME)for (let i = TASK_AMOUNT; i > 0 ; i--) {client.lpush(TASK_NAME, `task-${i}`)}client.lrange(TASK_NAME, 0, TASK_AMOUNT, async (err, reply) => {if (err) {console.error(err)return}console.log(reply)process.exit()})})


四、異步任務(wù)處理
import taskHandler from './tasksHandler'import client from './mqClient'client.on('connect', () => {console.log('Redis is connected!')})client.on('ready', async () => {console.log('Redis is ready!')await taskHandler()})client.on('error', (e) => {console.log('Redis error! ' + e)})
function handleTask(task: string) {return new Promise((resolve) => {setTimeout(async () => {console.log(`Handling task: ${task}...`)resolve()}, 2000)})}
import { popTask } from './utils'import client from './mqClient'function handleTask(task: string) { /* ... */}export default async function tasksHandler() {// 從隊列中取出一個任務(wù)const task = await popTask()// 處理任務(wù)await handleTask(task)// 遞歸運行await tasksHandler()}
pm2 start ./dist/index.js -i 4 && pm2 logs


五、統(tǒng)計任務(wù)完成耗時
總耗時 = 最后一個任務(wù)的完成時間 - 首個任務(wù)被取得的時間

node-redlock 是 Redis 分布式鎖 Redlock 算法的 JavaScript 實現(xiàn),關(guān)于該算法的講解可參考:https://redis.io/topics/distlock 值得注意的是,在 node-redlock 在使用的過程中,如果要鎖一個已存在的 key,就必須為該 key 添加一個前綴 locks:,否則會報錯。
export const setBeginTime = async (redlock: Redlock) => {// 讀取標記值前先把它鎖住const lock = await redlock.lock(`lock:${TASK_NAME}_SET_FIRST`, 1000)const setFirst = await getRedisValue(`${TASK_NAME}_SET_FIRST`)// 當且僅當標記值不等于 true 時,才設(shè)置起始時間if (setFirst !== 'true') {console.log(`${pm2tips} Get the first task!`)await setRedisValue(`${TASK_NAME}_SET_FIRST`, 'true')await setRedisValue(`${TASK_NAME}_BEGIN_TIME`, `${new Date().getTime()}`)}// 完成標記值的讀寫操作后,釋放鎖await lock.unlock().catch(e => e)}
export default async function tasksHandler() {+ // 獲取第一個任務(wù)被取得的時間+ await setBeginTime(redlock)// 從隊列中取出一個任務(wù)const task = await popTask()// 處理任務(wù)await handleTask(task)// 遞歸運行await tasksHandler()}

export default async function tasksHandler() {+ // 獲取標識值和隊列初始長度+ let curIndex = Number(await getRedisValue(`${TASK_NAME}_CUR_INDEX`))+ const taskAmount = Number(await getRedisValue(`${TASK_NAME}_TOTAL`))+ // 等待新任務(wù)+ if (taskAmount === 0) {+ console.log(`${pm2tips} Wating new tasks...`)+ await sleep(2000)+ await tasksHandler()+ return+ }+ // 判斷所有任務(wù)已經(jīng)完成+ if (curIndex === taskAmount) {+ const beginTime = await getRedisValue(`${TASK_NAME}_BEGIN_TIME`)+ // 獲取總耗時+ const cost = new Date().getTime() - Number(beginTime)+ console.log(`${pm2tips} All tasks were completed! Time cost: ${cost}ms. ${beginTime}`)+ // 初始化 Redis 的一些標識值+ await setRedisValue(`${TASK_NAME}_TOTAL`, '0')+ await setRedisValue(`${TASK_NAME}_CUR_INDEX`, '0')+ await setRedisValue(`${TASK_NAME}_SET_FIRST`, 'false')+ await delRedisKey(`${TASK_NAME}_BEGIN_TIME`)+ await sleep(2000)+ await tasksHandler()}// 獲取第一個任務(wù)被取得的時間await setBeginTime(redlock)// 從隊列中取出一個任務(wù)const task = await popTask()// 處理任務(wù)await handleTask(task)+ // 任務(wù)完成后需要為標識位加一+ try {+ const lock = await redlock.lock(`lock:${TASK_NAME}_CUR_INDEX`, 1000)+ curIndex = await getCurIndex()+ await setCurIndex(curIndex + 1)+ await lock.unlock().catch((e) => e)+ } catch (e) {+ console.log(e)+ }+ // recursion+ await tasksHandler()+}// 遞歸運行await tasksHandler()}


六、結(jié)語
評論
圖片
表情
