Linux環(huán)境下批量執(zhí)行隊(duì)列任務(wù)的優(yōu)雅方案
設(shè)想我們有一個(gè)程序,需要在不同的參數(shù)下執(zhí)行很多次,我們希望能夠批量進(jìn)行提交。
但是程序?qū)ο到y(tǒng)的資源消耗比較大,而系統(tǒng)的資源是有限。
我們希望這些任務(wù)是按照隊(duì)列排隊(duì)提交的,每次只執(zhí)行3個(gè)。
只有當(dāng)隊(duì)列中有的程序執(zhí)行完了,后面的配備了其他參數(shù)程序才會(huì)繼續(xù)執(zhí)行。
在Linux環(huán)境下,我們可以用FIFO管道控制多進(jìn)程任務(wù)來(lái)實(shí)現(xiàn)這個(gè)功能。
這種使用場(chǎng)景在數(shù)據(jù)挖掘相關(guān)的業(yè)務(wù)中是非常普遍的。
例如需要批量提交spark任務(wù)來(lái)對(duì)不同城市的業(yè)務(wù)數(shù)據(jù)進(jìn)行挖掘,但由于計(jì)算資源有限,最好控制每次只執(zhí)行幾個(gè)任務(wù)。
效果如下:

公眾號(hào)后臺(tái)回復(fù)關(guān)鍵字:源碼,獲取本文所在github項(xiàng)目源碼。
一,任務(wù)腳本
下面是一個(gè)玩具Python代碼,從參數(shù)中讀取城市和日期信息,在該城市該日期參數(shù)下進(jìn)行數(shù)據(jù)挖掘!??!?
import?sys
arg?=?sys.argv[1]
city,date?=?arg.split("?")
print(f"data?mining?task@{city},?date={date}")
python?task.py?"北京?20200101"
二,提交腳本
下面使用bash腳本批量提交數(shù)據(jù)挖掘任務(wù)。
注意使用THREAD_NUM參數(shù)來(lái)控制并行執(zhí)行的任務(wù)數(shù)量。
這個(gè)腳本的關(guān)鍵有三處:
一是使用字符串分割轉(zhuǎn)換成數(shù)組來(lái)獲取參數(shù)列表。
二是使用Linux中的&符號(hào)開(kāi)啟多進(jìn)程任務(wù)并行執(zhí)行不同參數(shù)的任務(wù)。
三是使用FIFO管道在進(jìn)程間通信來(lái)控制并行的任務(wù)數(shù)量。
#!/bin/bash
#允許同時(shí)跑的任務(wù)數(shù)為T(mén)HREAD_NUM
THREAD_NUM=3?#todo:?revise?me?
args="""
北京?20200101
上海?20200202
深圳?20200303
廣州?20200404
南京?20201001
天津?20200901
武漢?20201101
南昌?20200809
成都?20200901
"""?#todo:?revise?me
#指定分隔符
IFS='
'
array=(${args})
#定義描述符為9的FIFO管道
mkfifo?tmp
exec?9<>tmp
rm?-f?tmp
#預(yù)先寫(xiě)入指定數(shù)量的空格符,一個(gè)空格符代表一個(gè)進(jìn)程
for?((i=0;i<$THREAD_NUM;i++))
do
????echo?>&9
done
for?arg?in?${array[@]};?do
??#控制進(jìn)程數(shù):讀出一個(gè)空格字符,如果管道為空,此處將阻塞
??read?-u9
??{
?????#打印參數(shù)?
?????#echo?${arg}?
?????#此行代碼指定任務(wù)提交方法
?????python?task.py?${arg}?#todo?:?revise?me!
?????#每執(zhí)行完一個(gè)程序,睡眠3s
?????sleep?3
?????#控制進(jìn)程數(shù):一個(gè)任務(wù)完成后,寫(xiě)入一個(gè)空格字符到管道,新的任務(wù)將可以執(zhí)行
?????echo?>&9
??}&
done
wait
echo?"\n全部任務(wù)執(zhí)行結(jié)束"
sh?a_lot_jobs.sh

