<kbd id="afajh"><form id="afajh"></form></kbd>
<strong id="afajh"><dl id="afajh"></dl></strong>
    <del id="afajh"><form id="afajh"></form></del>
        1. <th id="afajh"><progress id="afajh"></progress></th>
          <b id="afajh"><abbr id="afajh"></abbr></b>
          <th id="afajh"><progress id="afajh"></progress></th>

          Python 高效流程編排引擎 bamboo-pipeline

          共 9532字,需瀏覽 20分鐘

           ·

          2021-06-25 20:47

          Bamboo-pipeline 是藍(lán)鯨智云旗下SaaS標(biāo)準(zhǔn)運(yùn)維的流程編排引擎。其具備以下特點(diǎn):

          1.多種流程模式:支持串行、并行,支持子流程,可以根據(jù)全局參數(shù)自動(dòng)選擇分支執(zhí)行,節(jié)點(diǎn)失敗處理機(jī)制可配置。

          2.參數(shù)引擎:支持參數(shù)共享,支持參數(shù)替換。

          3.可交互的任務(wù)執(zhí)行:任務(wù)執(zhí)行中可以隨時(shí)暫停、繼續(xù)、撤銷(xiāo),節(jié)點(diǎn)失敗后可以重試、跳過(guò)。

          本文需要涉及到 Django, Celery 的前置知識(shí),如果你還不了解這兩者,建議谷歌或百度搜索了解一下,或者閱讀我之前的文章:

          Django 往期推薦



          Python Django快速開(kāi)發(fā)音樂(lè)高潮提取網(wǎng)(1)

          Python Django快速開(kāi)發(fā)音樂(lè)高潮提取網(wǎng)(2)

          Python Django快速開(kāi)發(fā)音樂(lè)高潮提取網(wǎng)(3)

          生產(chǎn)利器!Pycharm+Django 安裝及配置指南

          手把手Django+Vue前后端分離入門(mén)教程


          celery:

          Celery 往期推薦



          Python Celery異步快速下載股票數(shù)據(jù)

          實(shí)戰(zhàn)教程!Django Celery 異步與定時(shí)任務(wù)

          網(wǎng)絡(luò)IO誰(shuí)更快?Python與Go請(qǐng)求速度對(duì)比

          什么?Python Celery 也能調(diào)度Go worker?!


          1.準(zhǔn)備



          開(kāi)始之前,你要確保Python和pip已經(jīng)成功安裝在電腦上,如果沒(méi)有,可以訪問(wèn)這篇文章:超詳細(xì)Python安裝指南 進(jìn)行安裝。

          (可選1) 如果你用Python的目的是數(shù)據(jù)分析,可以直接安裝Anaconda:Python數(shù)據(jù)分析與挖掘好幫手—Anaconda,它內(nèi)置了Python和pip.

          (可選2) 此外,推薦大家用VSCode編輯器,它有許多的優(yōu)點(diǎn):Python 編程的最好搭檔—VSCode 詳細(xì)指南。

          請(qǐng)選擇以下任一種方式輸入命令安裝依賴(lài)
          1. Windows 環(huán)境 打開(kāi) Cmd (開(kāi)始-運(yùn)行-CMD)。
          2. MacOS 環(huán)境 打開(kāi) Terminal (command+空格輸入Terminal)。
          3. 如果你用的是 VSCode編輯器 或 Pycharm,可以直接使用界面下方的Terminal.

          pip install bamboo-engine
          pip install bamboo-pipeline
          pip install django
          pip install celery


          2. 項(xiàng)目初始化



          (選項(xiàng)一:無(wú)Django項(xiàng)目)如果你沒(méi)有任何的現(xiàn)成Django項(xiàng)目,請(qǐng)按下面的流程初始化

          由于  bamboo-pipeline  運(yùn)行時(shí)基于 Django 實(shí)現(xiàn),所以需要新建一個(gè) Django 項(xiàng)目:

          django-admin startproject easy_pipeline
          cd easy_pipeline


          在  easy_pipeline.settings.py  下添加如下配置:

          from pipeline.eri.celery.queues import *
          from celery import Celery

          app = Celery("proj")

          app.config_from_object("django.conf:settings")

          INSTALLED_APPS = [
              ...
              "pipeline",
              "pipeline.engine",
              "pipeline.component_framework",
              "pipeline.eri",
              ...
          ]


          在  easy_pipeline 目錄下初始化數(shù)據(jù)庫(kù):

          python manage.py migrate


          (選項(xiàng)二:有Django項(xiàng)目需要使用流程引擎)如果你有現(xiàn)成的PipeLine項(xiàng)目需要使用此流程引擎,請(qǐng)?jiān)陧?xiàng)目的 settings.py 下添加如下配置:

          from pipeline.eri.celery.queues import *
          from celery import Celery

          app = Celery("proj")

          app.config_from_object("django.conf:settings")

          INSTALLED_APPS = [
              ...
              "pipeline",
              "pipeline.engine",
              "pipeline.component_framework",
              "pipeline.eri",
              ...
          ]


          然后重新執(zhí)行migrate,生成pipeline相關(guān)的流程模型:

          python manage.py migrate


          migrate 執(zhí)行完畢后會(huì)如下圖所示:


          由于是在原有項(xiàng)目上使用流程引擎,可能會(huì)遇到一些版本不匹配的問(wèn)題,如果遇到報(bào)錯(cuò),請(qǐng)排查解決或到藍(lán)鯨官網(wǎng)上進(jìn)行詢(xún)問(wèn)。

          3. 簡(jiǎn)單的流程例子



          首先在項(xiàng)目目錄下啟動(dòng) celery worker:

          python manage.py celery worker -Q er_execute,er_schedule --pool=solo -l info


          啟動(dòng)成功類(lèi)似下圖所示:


          (注意)如果你是在你的原有Django項(xiàng)目上做改造,它并不一定能夠順利地啟動(dòng)成功,這是因?yàn)镻ipeline使用了 Django 2.2.24,會(huì)存在許多版本不兼容的情況。如果遇到報(bào)錯(cuò),請(qǐng)排查解決或到藍(lán)鯨官網(wǎng)上進(jìn)行詢(xún)問(wèn)。

          在下面的例子中,我們將會(huì)創(chuàng)建并執(zhí)行一個(gè)簡(jiǎn)單的流程:

          3.1 創(chuàng)建流程APP

          在 bamboo_pipeline 中,一個(gè)流程由多個(gè)組件組成,官方推薦使用APP統(tǒng)一管控組件:

          python manage.py create_plugins_app big_calculator


          該命令會(huì)在 Django 工程根目錄下生成擁有以下目錄結(jié)構(gòu)的 APP:

          big_calculator
          ├── __init__.py
          ├── components
          │   ├── __init__.py
          │   └── collections
          │   ├── __init__.py
          │   └── plugins.py
          ├── migrations
          │   └── __init__.py
          └── static
              └── big_calculator
                  └── plugins.js


          別忘了把新創(chuàng)建的這個(gè)插件添加到 Django 配置的 INSTALLED_APPS 中:

          INSTALLED_APPS = (
              ...
              'big_calculator',
              ...
          )


          3.2 編寫(xiě)流程的Service原子

          組件服務(wù)  Service 是組件的核心,Service  定義了組件被調(diào)用時(shí)執(zhí)行的邏輯,下面讓我們實(shí)現(xiàn)一個(gè)計(jì)算傳入的參數(shù)n的階乘,并把結(jié)果寫(xiě)到輸出中的  Service ,在 big_calculator/components/collections/plugins.py  中輸入以下代碼:

          import math
          from pipeline.core.flow.activity import Service


          class FactorialCalculateService(Service):

              def execute(self, data, parent_data):
                  """
                  組件被調(diào)用時(shí)的執(zhí)行邏輯
                  :param data: 當(dāng)前節(jié)點(diǎn)的數(shù)據(jù)對(duì)象
                  :param parent_data: 該節(jié)點(diǎn)所屬流程的數(shù)據(jù)對(duì)象
                  :return:
                  """

                  n = data.get_one_of_inputs('n')
                  if not isinstance(n, int):
                      data.outputs.ex_data = 'n must be a integer!'
                      return False

                  data.outputs.factorial_of_n = math.factorial(n)
                  return True

              def inputs_format(self):
                  """
                  組件所需的輸入字段,每個(gè)字段都包含字段名、字段鍵、字段類(lèi)型及是否必填的說(shuō)明。
                  :return:必須返回一個(gè) InputItem 的數(shù)組,返回的這些信息能夠用于確認(rèn)該組件需要獲取什么樣的輸入數(shù)據(jù)。
                  """

                  return [
                      Service.InputItem(name='integer n', key='n', type='int', required=True)
                  ]

              def outputs_format(self):
                  """
                  組件執(zhí)行成功時(shí)輸出的字段,每個(gè)字段都包含字段名、字段鍵及字段類(lèi)型的說(shuō)明
                  :return: 必須返回一個(gè) OutputItem 的數(shù)組, 便于在流程上下文或后續(xù)節(jié)點(diǎn)中進(jìn)行引用
                  """

                  return [
                      Service.OutputItem(name='factorial of n', key='factorial_of_n', type='int')
                  ]


          首先我們繼承了  Service 基類(lèi),并實(shí)現(xiàn)了 execute()  和 outputs_format()  這兩個(gè)方法,他們的作用如下:

          1. execute:組件被調(diào)用時(shí)執(zhí)行的邏輯。接收 data 和 parent_data 兩個(gè)參數(shù)。
          其中,data 是當(dāng)前節(jié)點(diǎn)的數(shù)據(jù)對(duì)象,這個(gè)數(shù)據(jù)對(duì)象存儲(chǔ)了用戶(hù)傳遞給當(dāng)前節(jié)點(diǎn)的參數(shù)的值以及當(dāng)前節(jié)點(diǎn)輸出的值。parent_data 則是該節(jié)點(diǎn)所屬流程的數(shù)據(jù)對(duì)象,通常會(huì)將一些全局使用的常量存儲(chǔ)在該對(duì)象中,如當(dāng)前流程的執(zhí)行者、流程的開(kāi)始時(shí)間等。

          2. outputs_format:組件執(zhí)行成功時(shí)輸出的字段,每個(gè)字段都包含字段名、字段鍵及字段類(lèi)型的說(shuō)明。這個(gè)方法必須返回一個(gè) OutputItem 的數(shù)組,返回的這些信息能夠用于確認(rèn)某個(gè)組件在執(zhí)行成功時(shí)輸出的數(shù)據(jù),便于在流程上下文或后續(xù)節(jié)點(diǎn)中進(jìn)行引用。

          3. inputs_format:組件所需的輸入字段,每個(gè)字段都包含字段名、字段鍵、字段類(lèi)型及是否必填的說(shuō)明。這個(gè)方法必須返回一個(gè) InputItem 的數(shù)組,返回的這些信息能夠用于確認(rèn)某個(gè)組件需要獲取什么樣的輸入數(shù)據(jù)。

          下面我們來(lái)看一下 execute() 方法內(nèi)部執(zhí)行的邏輯,首先我們嘗試從當(dāng)前節(jié)點(diǎn)數(shù)據(jù)對(duì)象的輸出中獲取輸入?yún)?shù)n,如果獲取到的參數(shù)不是一個(gè) int 實(shí)例,那么我們會(huì)將異常信息寫(xiě)入到當(dāng)前節(jié)點(diǎn)輸出的 ex_data 字段中,這個(gè)字段是引擎內(nèi)部的保留字段,節(jié)點(diǎn)執(zhí)行失敗時(shí)產(chǎn)生的異常信息都應(yīng)該寫(xiě)入到該字段中。隨后我們返回  False  代表組件本次執(zhí)行失敗,隨后節(jié)點(diǎn)會(huì)進(jìn)入失敗狀態(tài):

          n = data.get_one_of_inputs('n')
          if not isinstance(n, int):
              data.outputs.ex_data = 'n must be a integer!'
              return False


          若獲取到的 n 是一個(gè)正常的 int,我們就調(diào)用 math.factorial() 函數(shù)來(lái)計(jì)算 n 的階乘,計(jì)算完成后,我們會(huì)將結(jié)果寫(xiě)入到輸出的 factorial_of_n 字段中,以供流程中的其他節(jié)點(diǎn)使用:

          data.outputs.factorial_of_n = math.factorial(n)
          return True


          3.3 編寫(xiě)流程組件,綁定Service原子

          完成 Service 的編寫(xiě)后,我們需要將其與一個(gè) Component 綁定起來(lái),才能夠注冊(cè)到組件庫(kù)中,在big_calculator\components\__init__.py 文件下添加如下的代碼:

          import logging
          from pipeline.component_framework.component import Component
          from big_calculator.components.collections.plugins import FactorialCalculateService

          logger = logging.getLogger('celery')


          class FactorialCalculateComponent(Component):
              name = 'FactorialCalculateComponent'
              code = 'fac_cal_comp'
              bound_service = FactorialCalculateService


          我們定義了一個(gè)繼承自基類(lèi) Component 的類(lèi) FactorialCalculateComponent,他擁有以下屬性:

          1.name:組件名。
          2.code:組件代碼,這個(gè)代碼必須是全局唯一的。
          3.bound_service:與該組件綁定的 Service。

          這樣一來(lái),我們就完成了一個(gè)流程原子的開(kāi)發(fā)。

          3.4 生成流程,測(cè)試剛編寫(xiě)的組件

          在 big_calculator\test.py 寫(xiě)入以下內(nèi)容,生成一個(gè)流程,測(cè)試剛剛編寫(xiě)的組件:

          # Python 實(shí)用寶典
          # 2021/06/20

          import time

          from bamboo_engine.builder import *
          from big_calculator.components import FactorialCalculateComponent
          from pipeline.eri.runtime import BambooDjangoRuntime
          from bamboo_engine import api
          from bamboo_engine import builder


          def bamboo_playground():
              """
              測(cè)試流程引擎
              """

              # 使用 builder 構(gòu)造出流程描述結(jié)構(gòu)
              start = EmptyStartEvent()
              # 這里使用 我們剛創(chuàng)建好的n階乘組件
              act = ServiceActivity(component_code=FactorialCalculateComponent.code)
              # 傳入?yún)?shù)
              act.component.inputs.n = Var(type=Var.PLAIN, value=4)
              end = EmptyEndEvent()

              start.extend(act).extend(end)

              pipeline = builder.build_tree(start)
              api.run_pipeline(runtime=BambooDjangoRuntime(), pipeline=pipeline)

              # 等待 1s 后獲取流程執(zhí)行結(jié)果
              time.sleep(1)

              result = api.get_execution_data_outputs(BambooDjangoRuntime(), act.id).data

              print(result)


          隨后,在命令行輸入


          python manage.py shell


          打開(kāi) django console, 輸入以下命令,執(zhí)行此流程:

          from big_calculator.test import bamboo_playground
          bamboo_playground()


          流程運(yùn)行完后,獲取節(jié)點(diǎn)的執(zhí)行結(jié)果,可以看到,該節(jié)點(diǎn)輸出了 factorial_of_n,并且值為 24(4 * 3 * 2 *1),這正是我們需要的效果:


          {'_loop': 0, '_result': True, 'factorial_of_n': 24}


          恭喜你,你已經(jīng)成功的創(chuàng)建了一個(gè)流程并把它運(yùn)行起來(lái)了!在這期間你可能會(huì)遇到不少的坑,建議嘗試先自行解決,如果實(shí)在無(wú)法解決,可以前往 標(biāo)準(zhǔn)運(yùn)維 倉(cāng)庫(kù)提 issues,或者前往藍(lán)鯨智云官網(wǎng)提問(wèn)。


          我們的文章到此就結(jié)束啦,如果你喜歡今天的Python 實(shí)戰(zhàn)教程,請(qǐng)持續(xù)關(guān)注Python實(shí)用寶典。

          有任何問(wèn)題,可以在公眾號(hào)后臺(tái)回復(fù):加群,回答相應(yīng)紅字驗(yàn)證信息,進(jìn)入互助群詢(xún)問(wèn)。

          原創(chuàng)不易,希望你能在下面點(diǎn)個(gè)贊和在看支持我繼續(xù)創(chuàng)作,謝謝!

          點(diǎn)擊下方閱讀原文可獲得更好的閱讀體驗(yàn)

          Python實(shí)用寶典 (pythondict.com)
          不只是一個(gè)寶典
          歡迎關(guān)注公眾號(hào):Python實(shí)用寶典

          瀏覽 124
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          評(píng)論
          圖片
          表情
          推薦
          點(diǎn)贊
          評(píng)論
          收藏
          分享

          手機(jī)掃一掃分享

          分享
          舉報(bào)
          <kbd id="afajh"><form id="afajh"></form></kbd>
          <strong id="afajh"><dl id="afajh"></dl></strong>
            <del id="afajh"><form id="afajh"></form></del>
                1. <th id="afajh"><progress id="afajh"></progress></th>
                  <b id="afajh"><abbr id="afajh"></abbr></b>
                  <th id="afajh"><progress id="afajh"></progress></th>
                  大香蕉视频色 | 性爱无码在线 | 欧美三级网站 | 91人人看 | 欧美极品少妇 |