Python 高效流程編排引擎 bamboo-pipeline
Bamboo-pipeline 是藍(lán)鯨智云旗下SaaS標(biāo)準(zhǔn)運(yùn)維的流程編排引擎。其具備以下特點(diǎn):
1.多種流程模式:支持串行、并行,支持子流程,可以根據(jù)全局參數(shù)自動(dòng)選擇分支執(zhí)行,節(jié)點(diǎn)失敗處理機(jī)制可配置。
2.參數(shù)引擎:支持參數(shù)共享,支持參數(shù)替換。
3.可交互的任務(wù)執(zhí)行:任務(wù)執(zhí)行中可以隨時(shí)暫停、繼續(xù)、撤銷(xiāo),節(jié)點(diǎn)失敗后可以重試、跳過(guò)。

本文需要涉及到 Django, Celery 的前置知識(shí),如果你還不了解這兩者,建議谷歌或百度搜索了解一下,或者閱讀我之前的文章:
Django 往期推薦
celery:
Celery 往期推薦
1.準(zhǔn)備
開(kāi)始之前,你要確保Python和pip已經(jīng)成功安裝在電腦上,如果沒(méi)有,可以訪問(wèn)這篇文章:超詳細(xì)Python安裝指南 進(jìn)行安裝。
(可選1) 如果你用Python的目的是數(shù)據(jù)分析,可以直接安裝Anaconda:Python數(shù)據(jù)分析與挖掘好幫手—Anaconda,它內(nèi)置了Python和pip.
(可選2) 此外,推薦大家用VSCode編輯器,它有許多的優(yōu)點(diǎn):Python 編程的最好搭檔—VSCode 詳細(xì)指南。
請(qǐng)選擇以下任一種方式輸入命令安裝依賴(lài):
1. Windows 環(huán)境 打開(kāi) Cmd (開(kāi)始-運(yùn)行-CMD)。
2. MacOS 環(huán)境 打開(kāi) Terminal (command+空格輸入Terminal)。
3. 如果你用的是 VSCode編輯器 或 Pycharm,可以直接使用界面下方的Terminal.
pip install bamboo-engine
pip install bamboo-pipeline
pip install django
pip install celery2. 項(xiàng)目初始化
(選項(xiàng)一:無(wú)Django項(xiàng)目)如果你沒(méi)有任何的現(xiàn)成Django項(xiàng)目,請(qǐng)按下面的流程初始化
由于 bamboo-pipeline 運(yùn)行時(shí)基于 Django 實(shí)現(xiàn),所以需要新建一個(gè) Django 項(xiàng)目:
django-admin startproject easy_pipeline
cd easy_pipeline在 easy_pipeline.settings.py 下添加如下配置:
from pipeline.eri.celery.queues import *
from celery import Celery
app = Celery("proj")
app.config_from_object("django.conf:settings")
INSTALLED_APPS = [
...
"pipeline",
"pipeline.engine",
"pipeline.component_framework",
"pipeline.eri",
...
]在 easy_pipeline 目錄下初始化數(shù)據(jù)庫(kù):
python manage.py migrate(選項(xiàng)二:有Django項(xiàng)目需要使用流程引擎)如果你有現(xiàn)成的PipeLine項(xiàng)目需要使用此流程引擎,請(qǐng)?jiān)陧?xiàng)目的 settings.py 下添加如下配置:
from pipeline.eri.celery.queues import *
from celery import Celery
app = Celery("proj")
app.config_from_object("django.conf:settings")
INSTALLED_APPS = [
...
"pipeline",
"pipeline.engine",
"pipeline.component_framework",
"pipeline.eri",
...
]然后重新執(zhí)行migrate,生成pipeline相關(guān)的流程模型:
python manage.py migratemigrate 執(zhí)行完畢后會(huì)如下圖所示:

由于是在原有項(xiàng)目上使用流程引擎,可能會(huì)遇到一些版本不匹配的問(wèn)題,如果遇到報(bào)錯(cuò),請(qǐng)排查解決或到藍(lán)鯨官網(wǎng)上進(jìn)行詢(xún)問(wèn)。
3. 簡(jiǎn)單的流程例子
首先在項(xiàng)目目錄下啟動(dòng) celery worker:
python manage.py celery worker -Q er_execute,er_schedule --pool=solo -l info啟動(dòng)成功類(lèi)似下圖所示:

(注意)如果你是在你的原有Django項(xiàng)目上做改造,它并不一定能夠順利地啟動(dòng)成功,這是因?yàn)镻ipeline使用了 Django 2.2.24,會(huì)存在許多版本不兼容的情況。如果遇到報(bào)錯(cuò),請(qǐng)排查解決或到藍(lán)鯨官網(wǎng)上進(jìn)行詢(xún)問(wèn)。
在下面的例子中,我們將會(huì)創(chuàng)建并執(zhí)行一個(gè)簡(jiǎn)單的流程:

3.1 創(chuàng)建流程APP
在 bamboo_pipeline 中,一個(gè)流程由多個(gè)組件組成,官方推薦使用APP統(tǒng)一管控組件:
python manage.py create_plugins_app big_calculator該命令會(huì)在 Django 工程根目錄下生成擁有以下目錄結(jié)構(gòu)的 APP:
big_calculator
├── __init__.py
├── components
│ ├── __init__.py
│ └── collections
│ ├── __init__.py
│ └── plugins.py
├── migrations
│ └── __init__.py
└── static
└── big_calculator
└── plugins.js別忘了把新創(chuàng)建的這個(gè)插件添加到 Django 配置的 INSTALLED_APPS 中:
INSTALLED_APPS = (
...
'big_calculator',
...
)3.2 編寫(xiě)流程的Service原子
組件服務(wù) Service 是組件的核心,Service 定義了組件被調(diào)用時(shí)執(zhí)行的邏輯,下面讓我們實(shí)現(xiàn)一個(gè)計(jì)算傳入的參數(shù)n的階乘,并把結(jié)果寫(xiě)到輸出中的 Service ,在 big_calculator/components/collections/plugins.py 中輸入以下代碼:
import math
from pipeline.core.flow.activity import Service
class FactorialCalculateService(Service):
def execute(self, data, parent_data):
"""
組件被調(diào)用時(shí)的執(zhí)行邏輯
:param data: 當(dāng)前節(jié)點(diǎn)的數(shù)據(jù)對(duì)象
:param parent_data: 該節(jié)點(diǎn)所屬流程的數(shù)據(jù)對(duì)象
:return:
"""
n = data.get_one_of_inputs('n')
if not isinstance(n, int):
data.outputs.ex_data = 'n must be a integer!'
return False
data.outputs.factorial_of_n = math.factorial(n)
return True
def inputs_format(self):
"""
組件所需的輸入字段,每個(gè)字段都包含字段名、字段鍵、字段類(lèi)型及是否必填的說(shuō)明。
:return:必須返回一個(gè) InputItem 的數(shù)組,返回的這些信息能夠用于確認(rèn)該組件需要獲取什么樣的輸入數(shù)據(jù)。
"""
return [
Service.InputItem(name='integer n', key='n', type='int', required=True)
]
def outputs_format(self):
"""
組件執(zhí)行成功時(shí)輸出的字段,每個(gè)字段都包含字段名、字段鍵及字段類(lèi)型的說(shuō)明
:return: 必須返回一個(gè) OutputItem 的數(shù)組, 便于在流程上下文或后續(xù)節(jié)點(diǎn)中進(jìn)行引用
"""
return [
Service.OutputItem(name='factorial of n', key='factorial_of_n', type='int')
]首先我們繼承了 Service 基類(lèi),并實(shí)現(xiàn)了 execute() 和 outputs_format() 這兩個(gè)方法,他們的作用如下:
1. execute:組件被調(diào)用時(shí)執(zhí)行的邏輯。接收 data 和 parent_data 兩個(gè)參數(shù)。
其中,data 是當(dāng)前節(jié)點(diǎn)的數(shù)據(jù)對(duì)象,這個(gè)數(shù)據(jù)對(duì)象存儲(chǔ)了用戶(hù)傳遞給當(dāng)前節(jié)點(diǎn)的參數(shù)的值以及當(dāng)前節(jié)點(diǎn)輸出的值。parent_data 則是該節(jié)點(diǎn)所屬流程的數(shù)據(jù)對(duì)象,通常會(huì)將一些全局使用的常量存儲(chǔ)在該對(duì)象中,如當(dāng)前流程的執(zhí)行者、流程的開(kāi)始時(shí)間等。
2. outputs_format:組件執(zhí)行成功時(shí)輸出的字段,每個(gè)字段都包含字段名、字段鍵及字段類(lèi)型的說(shuō)明。這個(gè)方法必須返回一個(gè) OutputItem 的數(shù)組,返回的這些信息能夠用于確認(rèn)某個(gè)組件在執(zhí)行成功時(shí)輸出的數(shù)據(jù),便于在流程上下文或后續(xù)節(jié)點(diǎn)中進(jìn)行引用。
3. inputs_format:組件所需的輸入字段,每個(gè)字段都包含字段名、字段鍵、字段類(lèi)型及是否必填的說(shuō)明。這個(gè)方法必須返回一個(gè) InputItem 的數(shù)組,返回的這些信息能夠用于確認(rèn)某個(gè)組件需要獲取什么樣的輸入數(shù)據(jù)。
下面我們來(lái)看一下 execute() 方法內(nèi)部執(zhí)行的邏輯,首先我們嘗試從當(dāng)前節(jié)點(diǎn)數(shù)據(jù)對(duì)象的輸出中獲取輸入?yún)?shù)n,如果獲取到的參數(shù)不是一個(gè) int 實(shí)例,那么我們會(huì)將異常信息寫(xiě)入到當(dāng)前節(jié)點(diǎn)輸出的 ex_data 字段中,這個(gè)字段是引擎內(nèi)部的保留字段,節(jié)點(diǎn)執(zhí)行失敗時(shí)產(chǎn)生的異常信息都應(yīng)該寫(xiě)入到該字段中。隨后我們返回 False 代表組件本次執(zhí)行失敗,隨后節(jié)點(diǎn)會(huì)進(jìn)入失敗狀態(tài):
n = data.get_one_of_inputs('n')
if not isinstance(n, int):
data.outputs.ex_data = 'n must be a integer!'
return False若獲取到的 n 是一個(gè)正常的 int,我們就調(diào)用 math.factorial() 函數(shù)來(lái)計(jì)算 n 的階乘,計(jì)算完成后,我們會(huì)將結(jié)果寫(xiě)入到輸出的 factorial_of_n 字段中,以供流程中的其他節(jié)點(diǎn)使用:
data.outputs.factorial_of_n = math.factorial(n)
return True3.3 編寫(xiě)流程組件,綁定Service原子
完成 Service 的編寫(xiě)后,我們需要將其與一個(gè) Component 綁定起來(lái),才能夠注冊(cè)到組件庫(kù)中,在big_calculator\components\__init__.py 文件下添加如下的代碼:
import logging
from pipeline.component_framework.component import Component
from big_calculator.components.collections.plugins import FactorialCalculateService
logger = logging.getLogger('celery')
class FactorialCalculateComponent(Component):
name = 'FactorialCalculateComponent'
code = 'fac_cal_comp'
bound_service = FactorialCalculateService我們定義了一個(gè)繼承自基類(lèi) Component 的類(lèi) FactorialCalculateComponent,他擁有以下屬性:
1.name:組件名。
2.code:組件代碼,這個(gè)代碼必須是全局唯一的。
3.bound_service:與該組件綁定的 Service。
這樣一來(lái),我們就完成了一個(gè)流程原子的開(kāi)發(fā)。
3.4 生成流程,測(cè)試剛編寫(xiě)的組件
在 big_calculator\test.py 寫(xiě)入以下內(nèi)容,生成一個(gè)流程,測(cè)試剛剛編寫(xiě)的組件:
# Python 實(shí)用寶典
# 2021/06/20
import time
from bamboo_engine.builder import *
from big_calculator.components import FactorialCalculateComponent
from pipeline.eri.runtime import BambooDjangoRuntime
from bamboo_engine import api
from bamboo_engine import builder
def bamboo_playground():
"""
測(cè)試流程引擎
"""
# 使用 builder 構(gòu)造出流程描述結(jié)構(gòu)
start = EmptyStartEvent()
# 這里使用 我們剛創(chuàng)建好的n階乘組件
act = ServiceActivity(component_code=FactorialCalculateComponent.code)
# 傳入?yún)?shù)
act.component.inputs.n = Var(type=Var.PLAIN, value=4)
end = EmptyEndEvent()
start.extend(act).extend(end)
pipeline = builder.build_tree(start)
api.run_pipeline(runtime=BambooDjangoRuntime(), pipeline=pipeline)
# 等待 1s 后獲取流程執(zhí)行結(jié)果
time.sleep(1)
result = api.get_execution_data_outputs(BambooDjangoRuntime(), act.id).data
print(result)隨后,在命令行輸入:
python manage.py shell打開(kāi) django console, 輸入以下命令,執(zhí)行此流程:
from big_calculator.test import bamboo_playground
bamboo_playground()流程運(yùn)行完后,獲取節(jié)點(diǎn)的執(zhí)行結(jié)果,可以看到,該節(jié)點(diǎn)輸出了 factorial_of_n,并且值為 24(4 * 3 * 2 *1),這正是我們需要的效果:
{'_loop': 0, '_result': True, 'factorial_of_n': 24}恭喜你,你已經(jīng)成功的創(chuàng)建了一個(gè)流程并把它運(yùn)行起來(lái)了!在這期間你可能會(huì)遇到不少的坑,建議嘗試先自行解決,如果實(shí)在無(wú)法解決,可以前往 標(biāo)準(zhǔn)運(yùn)維 倉(cāng)庫(kù)提 issues,或者前往藍(lán)鯨智云官網(wǎng)提問(wèn)。
我們的文章到此就結(jié)束啦,如果你喜歡今天的Python 實(shí)戰(zhàn)教程,請(qǐng)持續(xù)關(guān)注Python實(shí)用寶典。
有任何問(wèn)題,可以在公眾號(hào)后臺(tái)回復(fù):加群,回答相應(yīng)紅字驗(yàn)證信息,進(jìn)入互助群詢(xún)問(wèn)。
原創(chuàng)不易,希望你能在下面點(diǎn)個(gè)贊和在看支持我繼續(xù)創(chuàng)作,謝謝!
點(diǎn)擊下方閱讀原文可獲得更好的閱讀體驗(yàn)
Python實(shí)用寶典 (pythondict.com)
不只是一個(gè)寶典
歡迎關(guān)注公眾號(hào):Python實(shí)用寶典
