最短路算法学习笔记
项目地址
- https://github.com/YuanshengShe/shortestpathTK
- 使用git clone git@github.com:YuanshengShe/shortestpathTK.git读取到本地仓库
使用邻接矩阵表示图
对于一个有向图
在使用networkx的from_numpy_array生成图时要求提供的就是对应图的邻接矩阵。
Floyd算法
- 算法时间复杂度:
。 - 适用场景:多源最短路、无负自环的图。
假设咱们考虑的图包含
注意:左侧
为允许以 为中间节点时 到 的最短路,右侧 为允许以 为中间节点时 到 的最短路。这实际上就是一个动态规划问题的基本方程(递推关系+边界条件)…接下来利用基本方程推就ok了。这里补充说明下为什么动态规划好于枚举——动态规划是有记忆功能的枚举,从而减少大量重复计算,并且又由于马尔科夫性,动态规划又不需要记忆太多东西,记住上一步就ok了。
用上面的基本方程就可以推导处各个点之间的最短路的长度了,但是我们没办法找到最短路,为此,需要引入一个新的矩阵next,记
可以看到,这实际上还是一个基本方程,并且应该和dist矩阵同步更新,当算法结束,利用next矩阵我们应该就能从头找到这条最短路依次途经了哪些点。
Floyd算法代码实现@record_time def floyd(data,source,target): """Floyd算法求解最短路 Args: data (list):权重矩阵,有弧就是正数,没有弧就是0。 source (int):起点 target (int):终点 Returns: shortest_path (list):最短路构成的列表 shortest_path_len (float):最短路长度 """ # 初始化数组dist和next inf = float('inf') n = len(data) dist_mat = [[inf if (i!=j) and (data[i][j]==0) else data[i][j] for j in range(n)] for i in range(n)] # floyd要维护的dist矩阵和邻接矩阵不同,所以这里要做这样一个转换,没有弧得用无穷大! next_mat = [[inf if dist_mat[i][j] == inf else j for j in range(n)] for i in range(n)] # 负环检测 for i in range(n): if dist_mat[i][i] < 0: print("这个图存在负环,无法使用floyd算法求解") return inf,inf # 开始插入点 for k in range(n): for i in range(n): for j in range(n): if dist_mat[i][j] > dist_mat[i][k]+dist_mat[k][j]: dist_mat[i][j] = dist_mat[i][k]+dist_mat[k][j] next_mat[i][j] = next_mat[i][k] # 获取最短路长和最短路 shortest_path_len = dist_mat[source][target] shortest_path = [source] flag = True while flag: shortest_path.append(next_mat[shortest_path[-1]][target]) if shortest_path[-1] == target: flag = False return shortest_path,shortest_path_len
Dijkstra 算法
- 算法时间复杂度:
。 - 适用场景:单源最短路、无负权的图。
Dijkstra算法基于一个有趣的观察——起点到当前点可能最短路中的最小者一定就是最短路(在网络中不包含负权重时成立。反证法,如果不是最短路,则存在经由另一点到该点使得路径更短,但起点到其它点的距离都更大了,所以矛盾)。算法的基本思想就是从刚刚被考察过的节点(用一个堆来维护)开始,检查它的相邻节点,并更新可能的最短路长和前序节点,当前可能最短路中的最小值一定已经找到最短路了,将该节点标注为已考察,重复该过程直到所有节点都被考察。
有关堆数据结构的补充
Definition
Heaps are binary trees for which every parent node has a value less than or equal to any of its children. We refer to this condition as the heap invariant.The interesting property of a heap is that its smallest element is always the root, heap[0]. Use heap in python
- heappush(heap, item) # pushes a new item on the heap.
- heappop(heap) # pop the smallest item from the heap. 堆和一个一维列表一一对于,所以咱们创建个列表就可以用上面的函数了。一句话讲下heap的作用——你随便往堆里面放元素,当你从堆里取元素出来时一定取的是你最小的元素。
Dijkstar算法代码实现@record_time def dijkstra(data,source,target): """dijkstra算法求解最短南路 Args: data (list):邻接矩阵,有弧就是正数,没有弧就是0。 source (int):起点 target (int):终点 Returns: shortest_path (list):最短路构成的列表 shortest_path_len (float):最短路长度 """ inf = float('inf') n = len(data) # 需要维护和更新的数据 dist_dict = {key:inf for key in range(n)} # 起点到i点的可能最短路长度 dist_dict[source] = 0 pre_dict = {key:None for key in range(n)} # 起点到i点可能最短路上i的前一个点 heap_lst = [(0,source)] # 元组的比较是逐个元素进行的。如果第一个元素不同,则直接比较第一个元素的大小;如果第一个元素相同,则比较第二个元素,以此类推 already = [] # 算法 while heap_lst: current_dist,current_point = heappop(heap_lst) # 弹出最小距离和对应的点 if current_point in already: # 这个点以及找到起点到它的最短路了 continue already.append(current_point) # 起点到这个点一定是最短路(在网络无负权时成立) # 找刚刚确定处起点到该点最短路的哪个点的邻居,对他们的可能最淡路进行更新 for neighbor,weight in [(neighbor,weight) for neighbor,weight in enumerate(data[current_point]) if weight>0]: if current_dist+weight < dist_dict[neighbor]: dist_dict[neighbor] = current_dist+weight pre_dict[neighbor] = current_point heappush(heap_lst,(dist_dict[neighbor],neighbor)) # 推进堆中 shortest_path_len = dist_dict[target] # 回溯,找出最短路 shortest_path = [target] prepoint = target while True: prepoint = pre_dict[prepoint] shortest_path.insert(0,prepoint) if prepoint == source: break return shortest_path,shortest_path_len
A star 算法
- 算法时间复杂度:选择恰当的
,A*算法的时间复杂度在 到 之间。 - 适用场景:单源最短路,无负权。
A star算法和dijkstra算法非常相似,它是将dijkstra和启发式算法结合而成的一种算法。下面谈一谈它和dijkstra的区别——(1)A* 算法中引入了评价函数
A*算法获得最短路的充分条件
在实际中对于点n的
Fig1. dijkstra的搜索过程
|
Fig2. A*的搜索过程
|
网格图下A*算法实现def astar(maparray,fig,ax,result_dir='result_pic'): """ astar算法在栅格图上求解最短路,同时对应的更新rastermap并保存图像 Args: maparray(ndarray) fig(Figure) ax(Axes) Returns: shortest_path_len(float) """ check_dir(result_dir) rsm = RasterMap() start_point,end_point,obstacle_point_group,all_point_group = rsm.buildMap(maparray) # 转换为Point和PointGroup对象 rsm.drawMap(ax) # 算法部分 # 先给每个Point对象新增gcost和hcost属性,再初始化 inf = float('inf') for point in all_point_group: if point is start_point: point.gcost = 0 point.hcost = inf elif point is end_point: point.gcost = inf point.hcost = 0 else: point.gcost = inf point.hcost = inf point.cost = point.gcost+point.hcost point.parent = None to_be_checked_group = PointGroupOrdered("将被检查的节点集合") to_be_checked_group.push(start_point) # a star中再already_checked_group里不一定保证以及找到了从起点到该点的最短路,当能找到更短时还得重新提到to_be_checked_group中去。 # already_checked_group再a star中只起到跳过to_be_checked_group点的作用 already_checked_group = PointGroup("已经被检查的节点的集合") # 因为咱们规定只能上下左右移动,所以曼哈顿距离一定满足 \hat{h} \le h.所以咱们的h就用曼哈顿距离估计了 hath = lambda point:abs(point.x-end_point.x)+abs(point.y-end_point.y) while to_be_checked_group: print(to_be_checked_group,"不能反应算法合适结束,因为算法可能通过if条件提前终止!") fig.savefig('./{}/{}.png'.format(result_dir,time())) current_point = to_be_checked_group.pop() # 取出f最小也就是cost最小的点 if current_point in already_checked_group: # current_point里面可能有冗余,在already_checked_group里面的一定是以及找到最短路的点,并且以及访问过它的邻居节点,直接跳过 continue already_checked_group.append(current_point) rsm.updateMap(ax,current_point,"yellow") # 已经找到起点到终点的最短路了,结束 if current_point is end_point: # 从end_point开始回溯parent,找到构成最短路的节点的集合 shortest_path = PointGroup("构成最短路的节点集合") shortest_path.insert(0,end_point) flag = False while True: shortest_path.insert(0,shortest_path[0].parent) if flag: # 已经把起点加进去了 break if shortest_path[0].parent is start_point: flag = True for point in shortest_path: rsm.updateMap(ax,point,"blue") # 将起点到终点最短路上的点标记未蓝色 fig.savefig('./{}/{}.png'.format(result_dir,time())) print("end_point.cost={}".format(end_point.cost)) return end_point.cost # 上下左右四个方向的邻居 neighbor_top = all_point_group.getPoint(current_point.x,current_point.y+1) neighbor_bottom = all_point_group.getPoint(current_point.x,current_point.y-1) neighbor_left = all_point_group.getPoint(current_point.x-1,current_point.y) neighbor_right = all_point_group.getPoint(current_point.x+1,current_point.y) # 更新邻居,注意看这块条件和dijkstar的区别 if neighbor_top and (neighbor_top not in obstacle_point_group): # 更新gcost和parent if current_point.gcost+1<neighbor_top.gcost: neighbor_top.gcost = current_point.gcost+1 neighbor_top.parent = current_point # 估计hcost neighbor_top.hcost = hath(neighbor_top) if (neighbor_top in already_checked_group) and (neighbor_top.gcost+neighbor_top.hcost < neighbor_top.cost): # 把neibor_top从already_checked_group中删除来 already_checked_group.delPoint(neighbor_top.x,neighbor_top.y) neighbor_top.cost = neighbor_top.gcost+neighbor_top.hcost if neighbor_top not in already_checked_group: to_be_checked_group.push(neighbor_top) if neighbor_bottom and (neighbor_bottom not in obstacle_point_group): if current_point.gcost+1<neighbor_bottom.gcost: neighbor_bottom.gcost = current_point.gcost+1 neighbor_bottom.parent = current_point neighbor_bottom.hcost = hath(neighbor_bottom) if (neighbor_bottom in already_checked_group) and (neighbor_bottom.gcost+neighbor_bottom.hcost)<neighbor_bottom.cost: already_checked_group.delPoint(neighbor_bottom.x,neighbor_bottom.y) neighbor_bottom.cost = neighbor_bottom.gcost+neighbor_bottom.hcost if neighbor_bottom not in already_checked_group: to_be_checked_group.push(neighbor_bottom) if neighbor_left and (neighbor_left not in obstacle_point_group): if current_point.gcost+1<neighbor_left.gcost: neighbor_left.gcost = current_point.gcost+1 neighbor_left.parent = current_point neighbor_left.hcost = hath(neighbor_left) if (neighbor_left in already_checked_group) and (neighbor_left.gcost+neighbor_left.hcost)<neighbor_left.cost: already_checked_group.delPoint(neighbor_left.x,neighbor_left.y) neighbor_left.cost = neighbor_left.gcost+neighbor_left.hcost if neighbor_left not in already_checked_group: to_be_checked_group.push(neighbor_left) if neighbor_right and (neighbor_right not in obstacle_point_group): if current_point.gcost+1<neighbor_right.gcost: neighbor_right.gcost = current_point.gcost+1 neighbor_right.parent = current_point neighbor_right.hcost = hath(neighbor_right) if (neighbor_right in already_checked_group) and (neighbor_right.gcost+neighbor_right.hcost)<neighbor_right.cost: already_checked_group.delPoint(neighbor_right.x,neighbor_right.y) neighbor_right.cost = neighbor_right.gcost+neighbor_right.hcost if neighbor_right not in already_checked_group: to_be_checked_group.push(neighbor_right)
网格图下dijkstra算法实现def dijkstra(maparray,fig,ax,result_dir='result_pic'): """ dijkstra算法在栅格图上求解最短路,同时对应的更新rastermap并保存图像 Args: maparray(ndarray) fig(Figure) ax(Axes) Returns: shortest_path_len(float) """ # 检查放帧文件的目录 check_dir(result_dir) # 创建raster map rsm = RasterMap() start_point,end_point,obstacle_point_group,all_point_group = rsm.buildMap(maparray) # 转换为Point和PointGroup对象 rsm.drawMap(ax) # 算法部分 inf = float('inf') # 初始化cost和parent for point in all_point_group: if point is start_point: point.cost = 0 else: point.cost = inf point.parent = None to_be_checked_group = PointGroupOrdered("将被检查的节点集合") to_be_checked_group.push(start_point) already_checked_group = PointGroup("已经被检查的节点的集合") while to_be_checked_group: print(to_be_checked_group) fig.savefig('./{}/{}.png'.format(result_dir,time())) current_point = to_be_checked_group.pop() # 取出来cost最小的节点,这个节点已经找到最短路了 if current_point in already_checked_group: # current_point里面可能有冗余,在already_checked_group里面的一定是以及找到最短路的点,并且以及访问过它的邻居节点,直接跳过 continue already_checked_group.append(current_point) rsm.updateMap(ax,current_point,"yellow") # 已经找到起点到该点最短路的点颜色改成黄色 # 上下左右四个方向的邻居 neighbor_top = all_point_group.getPoint(current_point.x,current_point.y+1) neighbor_bottom = all_point_group.getPoint(current_point.x,current_point.y-1) neighbor_left = all_point_group.getPoint(current_point.x-1,current_point.y) neighbor_right = all_point_group.getPoint(current_point.x+1,current_point.y) # 对邻居进行更新 if neighbor_top and (neighbor_top not in obstacle_point_group) and (neighbor_top not in already_checked_group): if current_point.cost+1 < neighbor_top.cost: neighbor_top.cost = current_point.cost+1 neighbor_top.parent = current_point to_be_checked_group.push(neighbor_top) if neighbor_bottom and (neighbor_bottom not in obstacle_point_group) and (neighbor_bottom not in already_checked_group): if current_point.cost+1 < neighbor_bottom.cost: neighbor_bottom.cost = current_point.cost+1 neighbor_bottom.parent = current_point to_be_checked_group.push(neighbor_bottom) if neighbor_left and (neighbor_left not in obstacle_point_group) and (neighbor_left not in already_checked_group): if current_point.cost+1 < neighbor_left.cost: neighbor_left.cost = current_point.cost+1 neighbor_left.parent = current_point to_be_checked_group.push(neighbor_left) if neighbor_right and (neighbor_right not in obstacle_point_group) and (neighbor_right not in already_checked_group): if current_point.cost+1 < neighbor_right.cost: neighbor_right.cost = current_point.cost+1 neighbor_right.parent = current_point to_be_checked_group.push(neighbor_right) # 从end_point开始回溯parent,找到构成最短路的节点的集合 shortest_path = PointGroup("构成最短路的节点集合") shortest_path.insert(0,end_point) flag = False while True: shortest_path.insert(0,shortest_path[0].parent) if flag: # 已经把起点加进去了 break if shortest_path[0].parent is start_point: flag = True for point in shortest_path: rsm.updateMap(ax,point,"blue") # 将起点到终点最短路上的点标记未蓝色 fig.savefig('./{}/{}.png'.format(result_dir,time())) print("end_point.cost={}".format(end_point.cost)) return end_point.cost
A*和dijkstra在代码实现上的主要差异就在下面那一大坨if的判断上,dijkstra中维护的already_checked_group有两个作用——(1)跳过to_be_checked_group中的冗余(2)无论如何不再访问被加入already_checked_group中的点。而A*中被加入already_checked_group中的点不一定就是找到了起点到它的最短路,在特定情形下还会从already_checked_group中取出来,它只起到(1)的作用。
Fig1. dijkstra的搜索过程
Fig2. A*的搜索过程





.png)