Unified cost search in Python
I have implemented a simple graph data structure in Python with the following structure below. The code here is just to clarify what functions / variables mean, but they are pretty self-explanatory so you can skip reading it.
# Node data structure
class Node:
def __init__(self, label):
self.out_edges = []
self.label = label
self.is_goal = False
def add_edge(self, node, weight = 0):
self.out_edges.append(Edge(node, weight))
# Edge data structure
class Edge:
def __init__(self, node, weight = 0):
self.node = node
self.weight = weight
def to(self):
return self.node
# Graph data structure, utilises classes Node and Edge
class Graph:
def __init__(self):
self.nodes = []
# some other functions here populate the graph, and randomly select three goal nodes.
Now I'm trying to implement a uniform search (i.e. a BFS with a priority queue guaranteeing the shortest path) that starts at the given node v
and returns the shortest path (as a list) to one of the three node targets. By the purpose of node, I mean node with the attribute is_goal
set to true.
This is my implementation:
def ucs(G, v):
visited = set() # set of visited nodes
visited.add(v) # mark the starting vertex as visited
q = queue.PriorityQueue() # we store vertices in the (priority) queue as tuples with cumulative cost
q.put((0, v)) # add the starting node, this has zero *cumulative* cost
goal_node = None # this will be set as the goal node if one is found
parents = {v:None} # this dictionary contains the parent of each node, necessary for path construction
while not q.empty(): # while the queue is nonempty
dequeued_item = q.get()
current_node = dequeued_item[1] # get node at top of queue
current_node_priority = dequeued_item[0] # get the cumulative priority for later
if current_node.is_goal: # if the current node is the goal
path_to_goal = [current_node] # the path to the goal ends with the current node (obviously)
prev_node = current_node # set the previous node to be the current node (this will changed with each iteration)
while prev_node != v: # go back up the path using parents, and add to path
parent = parents[prev_node]
path_to_goal.append(parent)
prev_node = parent
path_to_goal.reverse() # reverse the path
return path_to_goal # return it
else:
for edge in current_node.out_edges: # otherwise, for each adjacent node
child = edge.to() # (avoid calling .to() in future)
if child not in visited: # if it is not visited
visited.add(child) # mark it as visited
parents[child] = current_node # set the current node as the parent of child
q.put((current_node_priority + edge.weight, child)) # and enqueue it with *cumulative* priority
Now, after a lot of testing and comparison with other allographs, this implementation seemed to work very well until I tried it with this graph:
For whatever reason, ucs(G,v)
returned a path H -> I
that costs 0.87, as opposed to a path H -> F -> I
that costs 0.71 (this path was obtained when DFS was started). The following graph also gave the wrong path:
The algorithm gave G -> F
instead G -> E -> F
, received again by DFS. Among these rare cases, I can only observe the fact that the selected target node always has a loop. However, I cannot figure out what is going on. Any advice would be much appreciated.
Usually for searching, I try to keep the path to the node part of the queue. It is not very memory efficient, but cheaper to implement.
If you want a parent map, remember that it is safe to update the parent map when the child is on top of the queue. Only then did the algorithm figure out the shortest path to the current node.
def ucs(G, v):
visited = set() # set of visited nodes
q = queue.PriorityQueue() # we store vertices in the (priority) queue as tuples
# (f, n, path), with
# f: the cumulative cost,
# n: the current node,
# path: the path that led to the expansion of the current node
q.put((0, v, [v])) # add the starting node, this has zero *cumulative* cost
# and it path contains only itself.
while not q.empty(): # while the queue is nonempty
f, current_node, path = q.get()
visited.add(current_node) # mark node visited on expansion,
# only now we know we are on the cheapest path to
# the current node.
if current_node.is_goal: # if the current node is a goal
return path # return its path
else:
for edge in in current_node.out_edges:
child = edge.to()
if child not in visited:
q.put((current_node_priority + edge.weight, child, path + [child]))
Note. I haven't really tested this, so feel free to comment if it doesn't work right away.