Python 調(diào)用 Kubernetes API 自動(dòng)化管理資源
來源:https://note.qidong.name/2020/08/python-k8s-job

本文給出Python SDK操作Kubernetes Job的更多示例代碼,以及相關(guān)解釋。
pip?install?kubernetes
初始化
from?kubernetes.client?import?BatchV1Api
from?kubernetes.config?import?load_kube_config
load_kube_config()
batch?=?BatchV1Api()
load_kube_config是從默認(rèn)位置,也就是~/.kube/config加載配置。如果在其它位置,可以通過第一個(gè)參數(shù)傳入其路徑。
BatchV1Api()可以當(dāng)做Job的客戶端來用。命名上,Batch和Job是類似的概念,前者強(qiáng)調(diào)批量。
創(chuàng)建Job
以下來自官方樣例job_crud.py。
def?create_job_object():
????
????container?=?client.V1Container(
????????name="pi",
????????image="perl",
????????command=["perl",?"-Mbignum=bpi",?"-wle",?"print?bpi(2000)"])
????
????template?=?client.V1PodTemplateSpec(
????????metadata=client.V1ObjectMeta(labels={"app":?"pi"}),
????????spec=client.V1PodSpec(restart_policy="Never",?containers=[container]))
????
????spec?=?client.V1JobSpec(
????????template=template,
????????backoff_limit=4)
????
????job?=?client.V1Job(
????????api_version="batch/v1",
????????kind="Job",
????????metadata=client.V1ObjectMeta(name=JOB_NAME),
????????spec=spec)
????return?job
def?create_job(api_instance,?job):
????api_response?=?api_instance.create_namespaced_job(
????????body=job,
????????namespace="default")
????print("Job?created.?status='%s'"?%?str(api_response.status))
雖然,根據(jù)官方教程這樣的寫法,也能得到可用的V1Job,拿去執(zhí)行創(chuàng)建操作。但還是過于陌生和偏門,不如主流、常見的YAML方便、易讀寫。
這里該出兩種更方便的做法。
直接使用YAML
---
apiVersion:?batch/v1
kind:?Job
metadata:
??name:?hello
spec:
??template:
????spec:
??????containers:
????????-?name:?echo
??????????image:?alpine:3.11
??????????args:
????????????-?'echo'
????????????-?'Hello?world!'
以上是一個(gè)最精簡的Job配置樣例,
通過讀取文件為dict,可以直接拿去使用。
from?kubernetes.client?import?V1Job
import?yaml
with?open('job.yaml')?as?file:
????cfg?=?yaml.safe_load(file)
job?=?batch.create_namespaced_job(namespace='default',?body=cfg)
assert?isinstance(job,?V1Job)
create_namespaced_job同樣接受字典作為body輸入,因此YAML配置可以讀出后直接傳入。
這里返回的V1Job只是創(chuàng)建時(shí)的狀態(tài),但是會(huì)包含更多集群中的信息。
使用dict
由于create_namespaced_job接受字典作為body輸入,因此直接使用dict也是可行的。
cfg?=?{
????'apiVersion':?'batch/v1',
????'kind':?'Job',
????'metadata':?{
????????'name':?'hello'
????},
????'spec':?{
????????'template':?{
????????????'spec':?{
????????????????'restartPolicy':
????????????????'Never',
????????????????'containers':?[{
????????????????????'name':?'upload',
????????????????????'image':?'alpine:3.11',
????????????????????'args':?['echo',?'Hello?world!']
????????????????}]
????????????}
????????}
????}
}
batch.create_namespaced_job(namespace='default',?body=cfg)
由于dict結(jié)構(gòu)與YAML相同,而又沒有類的束縛,所以也很靈活方便。
此外,從YAML讀出為dict后,也可以通過修改部分字段,達(dá)到動(dòng)態(tài)變化的效果。這種結(jié)合YAML和dict的使用方式,是對(duì)官方用法的最佳替代。
監(jiān)控Job運(yùn)行
在創(chuàng)建Job后,通常需要監(jiān)控Job的運(yùn)行,做一些外圍處理。輪詢當(dāng)然是下下策,而Kubernetes提供了一個(gè)Watch機(jī)制,通過接收Event,實(shí)現(xiàn)對(duì)狀態(tài)變化的掌控。Event只有在狀態(tài)變化時(shí)才會(huì)有,所以是非常理想的回調(diào)。
from?kubernetes.client?import?V1Job
from?kubernetes.watch?import?Watch
job_name?=?'hello'
watcher?=?Watch()
for?event?in?watcher.stream(
????????batch.list_namespaced_job,
????????namespace='default',
????????label_selector=f'job-name={job_name}',
):??
????assert?isinstance(event,?dict)
????job?=?event['object']
????assert?isinstance(job,?V1Job)
Watch().stream就是前面說的理想回調(diào),它第一個(gè)參數(shù)是列出類的函數(shù),這里選擇list_namespaced_job。后面的參數(shù),都是list_namespaced_job的參數(shù)。除了必備的namespace以外,label_selector也是一個(gè)常用參數(shù),可以避免關(guān)注無關(guān)的Job。每個(gè)Job在創(chuàng)建后,都會(huì)自動(dòng)帶一個(gè)f'job-name={job_name}'的Label,可以借此篩選。job_name就是metadata里設(shè)置的name,如這里job-name=hello。
event是一個(gè)dict,只有三個(gè)值。其中event['raw_object']只是event['object']的dict形式,沒有太大意義。event['type']常見三個(gè)值,對(duì)應(yīng)增刪改。
ADDED,創(chuàng)建時(shí)的信息,和create_namespaced_job的返回值通常沒有區(qū)別。MODIFIED,Job狀態(tài)變化時(shí)的信息。DELETED,Job刪除時(shí)的信息。
以上三個(gè)狀態(tài)值,對(duì)其它類型的資源也是通用的,比如Pod、Deployment等。
V1Job的使用
對(duì)于具體的V1Job實(shí)例,其它字段都是和創(chuàng)建時(shí)的配置差不多的,只是多一些集群中的具體信息。所以,常用的還是.status字段。
>>>?from?kubernetes.client?import?V1JobStatus
>>>?isinstance(job.status,?V1JobStatus)
True
>>>?print(job.status)
{'active':?None,
?'completion_time':?datetime.datetime(2020,?8,?10,?9,?49,?38,?tzinfo=tzutc()),
?'conditions':?[{'last_probe_time':?datetime.datetime(2020,?8,?10,?9,?49,?38,?tzinfo=tzutc()),
???'last_transition_time':?datetime.datetime(2020,?8,?10,?9,?49,?38,?tzinfo=tzutc()),
???'message':?None,
???'reason':?None,
???'status':?'True',
???'type':?'Complete'}],
?'failed':?None,
?'start_time':?datetime.datetime(2020,?8,?10,?9,?49,?32,?tzinfo=tzutc()),
?'succeeded':?1}
直接使用job.status.succeeded,可以得到成功的Container數(shù)量。下面幾乎每一級(jí)都有特定的類,可以連續(xù)使用.操作符。如果有特殊需要,也可以用job.to_dict()轉(zhuǎn)換成字典來用。
其它字段,作用基本上也和名稱相關(guān),不難推測。
列出Job
列出所有Job的list_job_for_all_namespaces不常用,一般只列出指定Namespace的Job。
from?kubernetes.client?import?V1JobList,?V1Job
job_list?=?batch.list_namespaced_job(namespace='default')
assert?isinstance(job_list,?V1JobList)
assert?isinstance(job_list.items,?list)
for?job?in?job_list.items:
????assert?isinstance(job,?V1Job)
與監(jiān)控的示例相比,這里去掉了label_selector,可以獲取Namespace中所有的Job。如果有需要,可以通過自定義Label把所有Job分類,并使用label_selector獲取指定類型的Job。
讀取Job
如果知道Job的name,可以直接通過read_*系列接口,獲得指定的V1Job。
from?kubernetes.client?import?V1Job
job?=?batch.read_namespaced_job(name='hello',?namespace='default')
assert?isinstance(job,?V1Job)
如果更看重狀態(tài),可以改用read_namespaced_job_status。雖然訪問的API不同,但在Python的V1Job這個(gè)結(jié)果層面,沒有本質(zhì)差異。
列出一個(gè)Job的Pod
Pod是Kubernetes調(diào)度的最小單元,也是最常用的一種資源。
from?typing?import?List
from?kubernetes.client?import?CoreV1Api,?V1Pod
def?get_pods_by(job_name:?str)?->?List[V1Pod]:
????core?=?CoreV1Api()
????pods?=?core.list_namespaced_pod(
????????namespace='default',
????????label_selector=f'job-name={job_name}',
????????limit=1,
????)
????return?pods.items
這里的get_pods_by,可以用job_name獲取對(duì)應(yīng)的Pod。limit=1是在已知Pod只有一個(gè)的情況下做出的優(yōu)化,可按需調(diào)整或去掉。
刪除Job
刪除一個(gè)Job:
from?kubernetes.client?import?V1Status
status?=?batch.delete_namespaced_job(
????namespace='default',
????name=job_name,
????propagation_policy='Background',
)
assert?isinstance(status,?V1Status)
其中,propagation_policy='Background'是不可省略的關(guān)鍵,否則默認(rèn)是Orphan,其Pod不會(huì)被刪除。這屬于API設(shè)計(jì)的一個(gè)失誤,與kubectl的默認(rèn)行為不符合。而且,應(yīng)該沒有人在刪除了Job之后,還要保留Pod的吧。這里也可以選擇'Foreground',阻塞等待相關(guān)資源的刪除完畢。
刪除多個(gè)、或所有Job:
status?=?batch.delete_collection_namespaced_job(
????namespace='default',
????propagation_policy='Background',
????label_selector='some-label=your-value',
)
assert?isinstance(status,?V1Status)
如果沒有label_selector,那就是刪除一個(gè)Namespace中的所有Job。
更新Job
這個(gè)比較少用,因?yàn)橐话愣际墙ㄐ碌?。用法其?shí)和create_namespaced_job差不多,參考官方樣例即可。
def?update_job(api_instance,?job):
????
????job.spec.template.spec.containers[0].image?=?"perl"
????api_response?=?api_instance.patch_namespaced_job(
????????name=JOB_NAME,
????????namespace="default",
????????body=job)
????print("Job?updated.?status='%s'"?%?str(api_response.status))
總結(jié)
用Python操作Kubernetes的Job,總體上還是比較方便的,雖然有一些坑。
- END -
?推薦閱讀? 30個(gè)Python極簡代碼,10分鐘get常用技巧!
記一次線上商城系統(tǒng)高并發(fā)的優(yōu)化
點(diǎn)亮,服務(wù)器三年不宕機(jī)

