【圖網(wǎng)絡(luò)】如何用Python實現(xiàn)算法:規(guī)劃圖技術(shù)(GraphPlanner)
作者 | Debby Nirwan
作者 | Debby Nirwan
編譯 | VK
來源 | Towards Data Science

介紹
規(guī)劃圖是為了解決經(jīng)典人工智能規(guī)劃方法(又稱STRIPS規(guī)劃器)的復(fù)雜性問題而發(fā)展起來的。我們需要實施兩個主要部分:
規(guī)劃圖:數(shù)據(jù)結(jié)構(gòu)
圖規(guī)劃:搜索算法,為我們找到解決方案
如果你不熟悉規(guī)劃圖,想了解更多,請查看我的以下帖子:
https://towardsdatascience.com/improving-classical-ai-planning-complexity-with-planning-graph-c63d47f87018
領(lǐng)域和問題表示
在我們開始實現(xiàn)之前,我們需要知道我們將如何表示規(guī)劃域和規(guī)劃問題。
規(guī)劃圖及其規(guī)劃器與許多STRIPS規(guī)劃器有相同的表示,因此我們將使用PDDL(規(guī)劃域定義語言)來表示它們。下面是PDDL域文件的一個示例。
;; Specification in PDDL1 of the DWR domain
(define (domain dock-worker-robot-simple)
(:requirements :strips :typing )
(:types
location ; there are several connected locations in the harbor
robot ; holds at most 1 container, only 1 robot per location
container)
(:predicates
(adjacent ?l1 ?l2 - location) ; location ?l1 is adjacent ot ?l2
(atl ?r - robot ?l - location) ; robot ?r is at location ?l
(loaded ?r - robot ?c - container ) ; robot ?r is loaded with container ?c
(unloaded ?r - robot) ; robot ?r is empty
(in ?c - container ?l - location) ; container ?c is within location ?l
)
;; there are 3 operators in this domain:
;; moves a robot between two adjacent locations
(:action move
:parameters (?r - robot ?from ?to - location)
:precondition (and (adjacent ?from ?to) (atl ?r ?from) )
:effect (and (atl ?r ?to)
(not (atl ?r ?from)) ))
;; loads an empty robot with a container held by a nearby crane
(:action load
:parameters (?l - location ?c - container ?r - robot)
:precondition (and (atl ?r ?l) (in ?c ?l) (unloaded ?r))
:effect (and (loaded ?r ?c)
(not (in ?c ?l)) (not (unloaded ?r)) ))
;; unloads a robot holding a container with a nearby crane
(:action unload
:parameters (?l - location ?c - container ?r - robot)
:precondition (and (atl ?r ?l) (loaded ?r ?c) )
:effect (and (unloaded ?r) (in ?c ?l)
(not (loaded ?r ?c)) )) )
我們可以將PDDL看作JSON或XML之類的東西,這意味著我們需要一個解析器來反序列化用它編寫的表示。當(dāng)我在Github上搜索時,其中有幾個出現(xiàn)了,但是有一個似乎很適合我們的項目pddlpy。
https://github.com/hfoffani/pddl-lib
但是,它在開發(fā)中不再活躍,我發(fā)現(xiàn)了一個bug和一些問題。因此,我決定使用它并編寫一個適配器/包裝器,這是一個薄層,我們添加它來修復(fù)錯誤并解決其他問題。
PDDL適配器
我們需要的代表如下:
世界的初始狀態(tài):數(shù)據(jù)類型為set
目標狀態(tài):數(shù)據(jù)類型為set
運算符(也稱為操作)的列表,這些運算符已用實變量實例化:數(shù)據(jù)類型為List[Operator]
我們將只使用pddlpy庫中的一個接口,DomainProblem()類構(gòu)造函數(shù)。
我們需要提供上面列出的三個接口:初始狀態(tài)、目標狀態(tài)和運算符列表。
我們創(chuàng)建了一個名為PlanningProblem的類:
class PlanningProblem(object):
def __init__(self, dom_file: str, problem_file: str):
self._domain_file = dom_file
self._problem_file = problem_file
self._domain_problem = DomainProblem(self._domain_file,
self._problem_file)
self._initial_state = self._to_set_of_tuples(self._domain_problem.
initialstate())
self._goal_state = self._to_set_of_tuples(self._domain_problem.goals())
self._actions = self._get_ground_operators()
庫提供的狀態(tài)不是我們想要的正確數(shù)據(jù)類型,因此需要將它們轉(zhuǎn)換為一組元組。
我們使用set()數(shù)據(jù)類型,以便以后實現(xiàn)數(shù)據(jù)結(jié)構(gòu)和算法。因為在經(jīng)典的人工智能規(guī)劃中,我們大量使用集合論,我們應(yīng)該使用set()數(shù)據(jù)類型來利用內(nèi)置函數(shù)來加速我們的實現(xiàn)。我們將在下一節(jié)看到更多內(nèi)容。
我們還必須創(chuàng)建一個運算符列表,我們稱之為操作。下面是適配器的最終代碼。
class PlanningProblem(object):
def __init__(self, dom_file: str, problem_file: str):
self._domain_file = dom_file
self._problem_file = problem_file
self._domain_problem = DomainProblem(self._domain_file,
self._problem_file)
self._initial_state = self._to_set_of_tuples(self._domain_problem.
initialstate())
self._goal_state = self._to_set_of_tuples(self._domain_problem.goals())
self._actions = self._get_ground_operators()
@staticmethod
def _type_symbols(variable_type, world_objects: dict):
# 如果變量類型是在world對象中找到的,
# 返回對象名稱列表,如robr, robq
return (k for k, v in world_objects.items() if v == variable_type)
def _instantiate(self, variables, world_objects: dict):
variable_ground_space = []
for variable_name, variable_type in variables:
c = []
for symbol in self._type_symbols(variable_type, world_objects):
c.append((variable_name, symbol))
variable_ground_space.append(c)
return itertools.product(*variable_ground_space)
def _get_ground_operators(self) -> List[Operator]:
ground_operators = []
for operator in self._domain_problem.operators():
op = self._domain_problem.domain.operators[operator]
for ground in self._instantiate(op.variable_list.items(),
self._domain_problem.
worldobjects()):
st = dict(ground)
gop = Operator(operator)
gop.variable_list = st
gop.precondition_pos = set(
[a.ground(st) for a in op.precondition_pos])
gop.precondition_neg = set(
[a.ground(st) for a in op.precondition_neg])
gop.effect_pos = set([a.ground(st) for a in op.effect_pos])
gop.effect_neg = set([a.ground(st) for a in op.effect_neg])
ground_operators.append(gop)
return ground_operators
@staticmethod
def _to_set_of_tuples(state: Set[Atom]) -> Set[Tuple]:
set_of_tuples = set()
for atom in state:
tup = tuple(atom.predicate)
set_of_tuples.add(tup)
return set_of_tuples
@property
def initial_state(self):
return self._initial_state
@property
def goal_state(self):
return self._goal_state
@property
def actions(self):
return self._actions
我們現(xiàn)在可以使用這個類來解決規(guī)劃領(lǐng)域和規(guī)劃問題,并將重點放在數(shù)據(jù)結(jié)構(gòu)和算法實現(xiàn)上。
規(guī)劃圖:數(shù)據(jù)結(jié)構(gòu)
在這里我們將只看偽代碼和方程,然后接下來的重點是將它們翻譯成代碼
以下是我們?nèi)绾螛?gòu)建規(guī)劃圖的偽代碼:

有四個步驟,我們一個接一個地講。
計算動作
這是指以下步驟:

其中包括兩部分:
對于PDDL適配器提供的所有操作,我們在當(dāng)前狀態(tài)下搜索可用的操作
我們確保那些可應(yīng)用的操作的前提條件不在前提條件的互斥體中
class PlanningGraph(object):
def __init__(self, dom_file: str, problem_file: str):
self._planning_problem = PlanningProblem(dom_file, problem_file)
self._graph: Graph = Graph(visualize)
def compute_actions(self, gr: Graph):
graph_result = gr
level = gr.num_of_levels
# 計算 Ai
action_list = []
for action in self._planning_problem.actions:
if self._applicable(action, graph_result.prop[level - 1],
graph_result.prop_mutexes[level - 1]):
action_list.append(action)
graph_result.act[level] = action_list
@staticmethod
def _applicable(action, state, preconditions_mutex) -> bool:
if action.precondition_pos.issubset(state) and \
action.precondition_neg.isdisjoint(state):
applicable = True
if preconditions_mutex is not None:
for precondition in list(permutations(action.precondition_pos, 2)):
if precondition in preconditions_mutex:
applicable = False
break
else:
applicable = False
return applicable)
計算前提條件
下一步是計算前提條件,也就是這一步:

這一步非常簡單:
class PlanningGraph(object):
def __init__(self, dom_file: str, problem_file: str):
self._planning_problem = PlanningProblem(dom_file, problem_file)
self._graph: Graph = Graph(visualize)
def compute_preconditions(self, gr: Graph):
graph_result = gr
level = gr.num_of_levels
proposition_list = set()
for action in action_list:
for effect in action.effect_pos:
proposition_list.add(effect)
graph_result.prop[level] = proposition_list
我們只存儲計算出的動作效果。
計算操作互斥
該算法的下一步是計算Actions Mutex(操作互斥),這是一組相互抵消效果的操作。

在這個等式中有三個部分,對于動作中所有可能的排列,我們希望在我們的列表中包括以下內(nèi)容:
行動的消極影響會干擾另一方的積極影響或先決條件
第二部分是相同的,只是在另一個方向(b到a)
第三部分是它們的前提條件是互斥
class PlanningGraph(object):
def __init__(self, dom_file: str, problem_file: str):
self._planning_problem = PlanningProblem(dom_file, problem_file)
self._graph: Graph = Graph(visualize)
def compute_actions_mutex(self, gr: Graph):
graph_result = gr
level = gr.num_of_levels
action_mutex_list = []
for action_pair in list(permutations(action_list, 2)):
if self.compute_action_mutex(action_pair,
graph_result.prop_mutexes[level - 1]):
action_mutex_list.append(action_pair)
graph_result.act_mutexes[level] = action_mutex_list
@staticmethod
def compute_action_mutex(pair, preconds_mutex) -> bool:
a = pair[0]
b = pair[1]
# 兩個動作是相互依存的
if a.effect_neg.intersection(b.precondition_pos.union(b.effect_pos)) \
!= set():
return True
if b.effect_neg.intersection(a.precondition_pos.union(a.effect_pos)) \
!= set():
return True
# 它們的先決條件互斥
if preconds_mutex is not None:
for mutex in preconds_mutex:
# (p, q)
p = mutex[0]
q = mutex[1]
if p in a.precondition_pos and q in b.precondition_pos:
return True
return False
計算互斥的前提條件
建立規(guī)劃圖的算法的最后一步是計算預(yù)條件互斥

這意味著我們要尋找一對互斥的前提條件。它們是互斥的當(dāng)且僅當(dāng):
對于同時產(chǎn)生p和q的所有操作對,它們都在actions Mutex列表中
沒有同時產(chǎn)生p和q的單一操作
class PlanningGraph(object):
def __init__(self, dom_file: str, problem_file: str):
self._planning_problem = PlanningProblem(dom_file, problem_file)
self._graph: Graph = Graph()
def compute_preconditions_mutex(self, gr: Graph):
proposition_mutex_list = []
for proposition_pair in list(permutations(proposition_list, 2)):
if self.compute_precondition_mutex(proposition_pair, action_list, action_mutex_list):
if proposition_pair not in proposition_mutex_list:
swapped = (proposition_pair[1], proposition_pair[0])
if swapped not in proposition_mutex_list:
proposition_mutex_list.append(proposition_pair)
graph_result.prop_mutexes[level] = proposition_mutex_list
@staticmethod
def compute_precondition_mutex(proposition_pair, action_list, action_mutex):
p = proposition_pair[0]
q = proposition_pair[1]
for action in action_list:
if p in action.effect_pos and q in action.effect_pos:
# (p, q)不是互斥對象,如果它們都是由同一個動作產(chǎn)生的
return False
# 每一個產(chǎn)生p的動作
actions_with_p = set()
for action in action_list:
if p in action.effect_pos:
actions_with_p.add(action)
# 每一個產(chǎn)生q的動作
actions_with_q = set()
for action in action_list:
if q in action.effect_pos:
actions_with_q.add(action)
all_mutex = True
for p_action in actions_with_p:
for q_action in actions_with_q:
if p_action == q_action:
return False
if (p_action, q_action) not in action_mutex:
all_mutex = False
break
if not all_mutex:
break
return all_mutex
我們現(xiàn)在已經(jīng)完成了構(gòu)建數(shù)據(jù)結(jié)構(gòu)的代碼,規(guī)劃圖。為了幫助調(diào)試,可以使用pydot擴充代碼以生成圖形可視化。下面是一個例子。

搜索算法:GraphPlanner
我們現(xiàn)在已經(jīng)準備好了數(shù)據(jù)結(jié)構(gòu),我們可以開始實現(xiàn)搜索算法,為我們的計劃問題找到解決方案。
該算法是遞歸的,分為三個部分:
計劃
提取
搜索
提取和搜索
這兩個步驟是遞歸的,算法如下。第一部分是Extract:

下一部分是Search:

這是這兩個函數(shù)如何遞歸工作的示例:

它遞歸地調(diào)用Search(),直到所有的命題都被解析,并調(diào)用Extract()進入規(guī)劃圖的下一級。
Python編寫如下:
class GraphPlanner(object):
def __init__(self):
self._layered_plan: LayeredPlan = LayeredPlan()
self._mutex = {}
def _extract(self, gr: Graph, g: set, index: int):
if index == 0:
return Plan()
return self._search(gr, g, Plan(), index)
def _search(self, gr: Graph, g: set, plan: Plan, index: int):
if g == set():
new_goals = set()
for action in plan.plan:
for proposition in action.precondition_pos:
if 'adjacent' not in proposition:
new_goals.add(proposition)
extracted_plan = self._extract(gr, new_goals, index-1)
if extracted_plan is None:
return None
else:
self._layered_plan[index-1] = extracted_plan
self._layered_plan[index] = plan
return plan
else:
# 選擇g中的任意p
proposition = g.pop()
# 計算解析器
resolvers = []
for action in gr.act[index]:
if proposition in action.effect_pos:
if plan.plan:
mutex = False
for action2 in plan.plan:
if (action, action2) in \
gr.act_mutexes[index]:
mutex = True
break
if not mutex:
resolvers.append(action)
else:
resolvers.append(action)
# 沒有解析器
if not resolvers:
return None
# 選擇非確定性,如果失敗則回溯
while resolvers:
resolver = resolvers.pop()
plan.append(resolver)
plan_result = self._search(gr, g - resolver.effect_pos,
plan, index)
if plan_result is not None:
return plan_result
else:
plan.remove(resolver)
g.add(proposition)
return None
主程序
最后,我們到達了算法的最后一步、以下是主要步驟和入口點:

在某些情況下,我們需要計劃更多的步驟來創(chuàng)建解決方案計劃,我們需要展開規(guī)劃圖并重試搜索。
我們還需要添加一個額外的步驟,以確保算法在沒有可能的解決方案時終止。下面是我們的最后一段代碼:
class GraphPlanner(object):
def __init__(self):
self._layered_plan: LayeredPlan = LayeredPlan()
self._mutex = {}
def plan(self, gr: Graph, g: set):
index = gr.num_of_levels - 1
if not g.issubset(gr.prop[index]):
return None
plan = self._extract(gr, g, index)
if plan:
return self._layered_plan
if gr.fixed_point:
n = 0
try:
props_mutex = self._mutex[gr.num_of_levels - 1]
except KeyError:
props_mutex = None
if props_mutex:
n = len(props_mutex)
else:
n = 0
while True:
index += 1
gr = PlanningGraph.expand(gr)
plan = self._extract(gr, g, index)
if plan:
return self._layered_plan
elif gr.fixed_point:
try:
props_mutex = self._mutex[gr.num_of_levels-1]
except KeyError:
props_mutex = None
if props_mutex:
if n == len(props_mutex):
# 這意味著它已經(jīng)穩(wěn)定下來
return None
else:
n = len(props_mutex)
結(jié)尾
我意識到要描述實現(xiàn)這個算法的思想過程并不容易。但我希望至少你能對如何實現(xiàn)從等式、偽代碼到Python代碼的算法有一些基本的理解。
完整代碼可在我的Github上獲得,如下所示:
https://github.com/debbynirwan/planning_graph
往期精彩回顧
本站qq群851320808,加入微信群請掃碼:
