Untitled diff
105 lines
import copy
import copy
import heapq
import heapq
from dotmap import Map
#from dotmap import Map
from itertools import count
from itertools import count
class Map(dict):
def __getattr__(self, k):
return self[k]
def __setattr__(self, k, v):
self[k] = v
graph = {
graph = {
'A': ['B', 'E'],
'A': ['B', 'E'],
'B': ['C', 'D'],
'B': ['C', 'D'],
'C': ['G'],
'C': ['G'],
'D': ['C', 'F'],
'D': ['C', 'F'],
'E': ['D'],
'E': ['D'],
'F': [],
'F': [],
'G': ['F']
'G': ['F']
}
}
class PriorityQueue:
class PriorityQueue:
def __init__(self):
def __init__(self):
self.elements = []
self.elements = []
self._counter = count()
self._counter = count()
def empty(self):
def empty(self):
return len(self.elements) == 0
return len(self.elements) == 0
def put(self, item, priority):
def put(self, item, priority):
heapq.heappush(self.elements, (priority, item,))
heapq.heappush(self.elements, (priority, item,))
def get(self):
def get(self):
return heapq.heappop(self.elements)[1]
return heapq.heappop(self.elements)[1]
class AStar:
class AStar:
def __init__(self, graph, start, goals=[]):
def __init__(self, graph, start, goals=[]):
self.graph = graph
self.graph = graph
self.start = start
self.start = start
self.frontier = PriorityQueue()
self.frontier = PriorityQueue()
self.frontier.put(start, 0)
self.frontier.put(start, 0)
self.previous = {}
self.previous = {}
self.previous[start] = None
self.previous[start] = None
self.costs = {}
self.costs = {}
self.costs[start] = 0
self.costs[start] = 0
self.final_path = None
self.final_path = None
self.goals = goals
self.goals = goals
self.goal = None
self.goal = None
def search(self):
def search(self):
graph = self.graph
graph = self.graph
frontier = self.frontier
frontier = self.frontier
goals = self.goals
goals = self.goals
costs = self.costs
costs = self.costs
while not frontier.empty():
while not frontier.empty():
state = frontier.get()
state = frontier.get()
if state in goals:
if state in goals:
cost = self.costs[state]
cost = self.costs[state]
self.goal = state
self.goal = state
self.final_path = self.trace_path()
self.final_path = self.trace_path()
return Map({'path': self.final_path, 'cost': cost})
return Map({'path': self.final_path, 'cost': cost})
for next_state in graph[state]:
for next_state in graph[state]:
new_cost = costs[state] + 1
new_cost = costs[state] + 1
if next_state not in costs or new_cost < costs[next_state]:
if next_state not in costs or new_cost < costs[next_state]:
costs[next_state] = new_cost
costs[next_state] = new_cost
priority = new_cost
priority = new_cost
frontier.put(next_state, priority)
frontier.put(next_state, priority)
self.previous[next_state] = state
self.previous[next_state] = state
# No path found
# No path found
return Map({'path': [], 'cost': 0})
return Map({'path': [], 'cost': float('inf')})
def trace_path(self):
def trace_path(self):
current = self.goal
current = self.goal
path = []
path = []
while current != self.start:
while current != self.start:
path.append(current)
path.append(current)
current = self.previous[current]
current = self.previous[current]
path.append(self.start)
path.append(self.start)
path.reverse()
path.reverse()
return path
return path
def YenKSP(graph, source, sink, k_paths):
def YenKSP(graph, source, sink, k_paths):
graph_clone = copy.deepcopy(graph)
A = [AStar(graph, source, sink).search().path]
A = [AStar(graph, source, sink).search().path]
B = []
B = []
for k in range(1, k_paths):
for _ in range(1, k_paths):
for i in range(len(A[-1]) - 1):
for i in range(len(A[-1]) - 1):
graph_clone = copy.deepcopy(graph)
spur_node = A[-1][i]
spur_node = A[-1][i]
root_path = A[-1][:i+1]
root_path = A[-1][:i+1]
for path in A:
for path in A:
if len(path) > i and root_path == path[:i+1]:
if len(path) > i and root_path == path[:i+1]:
graph_clone[path[i]].remove(path[i+1])
if path[i+1] in graph_clone[path[i]]:
graph_clone[path[i]].remove(path[i+1])
result = AStar(graph_clone, spur_node, sink).search()
result = AStar(graph_clone, spur_node, sink).search()
spur_path = result.path
spur_path = result.path
total_path = root_path[:-1] + spur_path
total_path = root_path[:-1] + spur_path
spur_cost = AStar(graph_clone, source, spur_node).search().cost
spur_cost = AStar(graph_clone, source, spur_node).search().cost
B.append(Map({'path': total_path, 'cost': result.cost + spur_cost}))
B.append(Map({'path': total_path, 'cost': result.cost + spur_cost}))
graph_clone = copy.deepcopy(graph)
if len(B) == 0:
if len(B) == 0:
break
break
B.sort(key=lambda p: (p.cost, len(p.path)))
B.sort(key=lambda p: (p.cost, len(p.path)))
A.append(B[0].path)
best_b = B.pop(0)
B.pop()
if best_b.cost != float('inf'):
A.append(best_b.path)
return A
return A
paths = YenKSP(graph, 'A', 'F', 4)
paths = YenKSP(graph, 'A', 'F', 4)
print(paths)
print(paths)