-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathplanpath.py
More file actions
480 lines (372 loc) · 17.1 KB
/
planpath.py
File metadata and controls
480 lines (372 loc) · 17.1 KB
1
import argparse as apimport reimport platformimport heapq######## RUNNING THE CODE ##################################################### You can run this code from terminal by executing the following command# python planpath.py <INPUT/input#.txt> <OUTPUT/output#.txt> <flag># for example: python planpath.py INPUT/input2.txt OUTPUT/output2.txt 0# NOTE: THIS IS JUST ONE EXAMPLE INPUT DATA################################################################################################# YOUR CODE GOES HERE ########################################class Node: """ Node class for record the map node """ def __init__(self, idx, position, h_value, g_value, parent, solution, direction): self.idx = idx self.expend_id = 1 self.direction = direction self.position = position self.h_value = h_value self.g_value = g_value self.f_value = h_value + g_value self.children = [] self.parent = parent self.solution = solution def __lt__(self, other): """ We need to sort these nodes in the open list, since the performance of list sorting tends to degrade when the map is large, here I take the priority queue to improve the performance. I use heapq and rewrite __lt__ function """ if self.f_value < other.f_value: return -1 elif self.f_value == other.f_value: return 0 else: return 1 def __str__(self): """ rewrite toString method """ return "N" + str(self.idx) + ":" + self.solution + " " + self.direction + " " \ + str(self.expend_id) + " " + str(self.g_value) + " " + str(self.h_value) \ + " " + str(self.f_value) def get_close_info(self): """ close_list output format """ direct = self.direction if self.direction == "": direct = "S" return "N" + str(self.idx) + ":" + direct + " " + str(self.expend_id) + " " \ + str(self.g_value) + " " + str(self.h_value) \ + " " + str(self.f_value) def get_children_info(self): """ children output format """ return "N" + str(self.idx) + ":" + self.solution + " " + self.directiondef get_h_value(x, y, goal): """ Get h value """ gx = goal[0] gy = goal[1] gap_x = abs(x - gx) gap_y = abs(y - gy) if gap_x != gap_y: return min(gap_x, gap_y) + 2 * (max(gap_x, gap_y) - min(gap_x, gap_y)) elif gap_x == gap_y: return gap_xdef can_arrive(x, y, direction, maze): """ Determine whether the direction is reachable """ if direction == "U" and x - 1 >= 0 and maze[x - 1][y] != 'X': return True elif direction == "D" and x + 1 < len(maze) and maze[x + 1][y] != 'X': return True elif direction == "R" and y + 1 < len(maze) and maze[x][y + 1] != 'X': return True elif direction == "L" and y - 1 >= 0 and maze[x][y - 1] != 'X': return True elif direction == "LU" and y - 1 >= 0 and x - 1 >= 0 \ and maze[x - 1][y - 1] != 'X' and maze[x][y - 1] != 'X' and maze[x - 1][y] != 'X': return True elif direction == "RU" and y + 1 < len(maze) and x - 1 >= 0 \ and maze[x - 1][y + 1] != 'X' and maze[x][y + 1] != 'X' and maze[x - 1][y] != 'X': return True elif direction == "RD" and y + 1 < len(maze) and x + 1 < len(maze) \ and maze[x + 1][y + 1] != 'X' and maze[x][y + 1] != 'X' and maze[x + 1][y] != 'X': return True elif direction == "LD" and y - 1 >= 0 and x + 1 < len(maze) \ and maze[x + 1][y - 1] != 'X' and maze[x][y - 1] != 'X' and maze[x + 1][y] != 'X': return True return Falsedef graphsearch(map, flag): """ A-Star search algorithm """ # solution = "S-R-RD-D-D-LD-G" # WRITE YOUR CODE HERE # Initialize maze = [] old_maze = [] open_list = [] closed_list = [] start = (0, 0) goal = (0, 0) length = len(map) - 1 expend_id = 1 idx = 0 # Record the map in array for i in range(1, len(map)): maze.append(list(map[i].replace('\n', ''))) old_maze.append(list(map[i].replace('\n', ''))) # Record S and G for i in range(length): for j in range(length): if maze[i][j] == 'S': start = (i, j) if maze[i][j] == 'G': goal = (i, j) # Push start node into heap heapq.heappush(open_list, Node(idx, start, get_h_value(start[0], start[1], goal), 0, None, "S", "")) idx += 1 # Start search solution = "NO-PATH" solution_output = "" cur_node = None while open_list: # Get the node with the lowest cost cur_node = heapq.heappop(open_list) cur_node.expend_id = expend_id x = cur_node.position[0] y = cur_node.position[1] expend_id += 1 closed_list.append(cur_node) maze[x][y] = 'X' if cur_node.position == goal: solution = cur_node.solution break # UP DOWN LEFT RIGHT if can_arrive(x, y, "U", maze): if (x - 1, y) not in [_.position for _ in open_list]: new_node = Node(idx, (x - 1, y), get_h_value(x - 1, y, goal), cur_node.g_value + 2, cur_node, cur_node.solution + "-U", "U") idx += 1 cur_node.children.append(new_node) heapq.heappush(open_list, new_node) else: pre_node = filter(lambda _: _.position == (x - 1, y), open_list) pre_node = list(pre_node)[0] if pre_node.g_value > cur_node.g_value + 2: pre_node.parent = cur_node pre_node.g_value = cur_node.g_value + 2 pre_node.solution = cur_node.solution + "-U" cur_node.children.append(pre_node) if can_arrive(x, y, "L", maze): if (x, y - 1) not in [_.position for _ in open_list]: new_node = Node(idx, (x, y - 1), get_h_value(x, y - 1, goal), cur_node.g_value + 2, cur_node, cur_node.solution + "-L", "L") idx += 1 cur_node.children.append(new_node) heapq.heappush(open_list, new_node) else: pre_node = filter(lambda _: _.position == (x, y - 1), open_list) pre_node = list(pre_node)[0] if pre_node.g_value > cur_node.g_value + 2: pre_node.parent = cur_node pre_node.g_value = cur_node.g_value + 2 pre_node.solution = cur_node.solution + "-L" cur_node.children.append(pre_node) if can_arrive(x, y, "D", maze): if (x + 1, y) not in [_.position for _ in open_list]: new_node = Node(idx, (x + 1, y), get_h_value(x + 1, y, goal), cur_node.g_value + 2, cur_node, cur_node.solution + "-D", "D") idx += 1 cur_node.children.append(new_node) heapq.heappush(open_list, new_node) else: pre_node = filter(lambda _: _.position == (x + 1, y), open_list) pre_node = list(pre_node)[0] if pre_node.g_value > cur_node.g_value + 2: pre_node.parent = cur_node pre_node.g_value = cur_node.g_value + 2 pre_node.solution = cur_node.solution + "-D" cur_node.children.append(pre_node) if can_arrive(x, y, "R", maze): if (x, y + 1) not in [_.position for _ in open_list]: new_node = Node(idx, (x, y + 1), get_h_value(x, y + 1, goal), cur_node.g_value + 2, cur_node, cur_node.solution + "-R", "R") idx += 1 cur_node.children.append(new_node) heapq.heappush(open_list, new_node) else: pre_node = filter(lambda _: _.position == (x, y + 1), open_list) pre_node = list(pre_node)[0] if pre_node.g_value > cur_node.g_value + 2: pre_node.parent = cur_node pre_node.g_value = cur_node.g_value + 2 pre_node.solution = cur_node.solution + "-R" cur_node.children.append(pre_node) # Diagonal direction if can_arrive(x, y, "LU", maze): if (x - 1, y - 1) not in [_.position for _ in open_list]: new_node = Node(idx, (x - 1, y - 1), get_h_value(x - 1, y - 1, goal), cur_node.g_value + 1, cur_node, cur_node.solution + "-LU", "LU") idx += 1 cur_node.children.append(new_node) heapq.heappush(open_list, new_node) else: pre_node = filter(lambda _: _.position == (x - 1, y - 1), open_list) pre_node = list(pre_node)[0] if pre_node.g_value > cur_node.g_value + 1: pre_node.parent = cur_node pre_node.g_value = cur_node.g_value + 1 pre_node.solution = cur_node.solution + "-LU" cur_node.children.append(pre_node) if can_arrive(x, y, "LD", maze): if (x + 1, y - 1) not in [_.position for _ in open_list]: new_node = Node(idx, (x + 1, y - 1), get_h_value(x + 1, y - 1, goal), cur_node.g_value + 1, cur_node, cur_node.solution + "-LD", "LD") idx += 1 cur_node.children.append(new_node) heapq.heappush(open_list, new_node) else: pre_node = filter(lambda _: _.position == (x + 1, y - 1), open_list) pre_node = list(pre_node)[0] if pre_node.g_value > cur_node.g_value + 1: pre_node.parent = cur_node pre_node.g_value = cur_node.g_value + 1 pre_node.solution = cur_node.solution + "-LD" cur_node.children.append(pre_node) if can_arrive(x, y, "RD", maze): if (x + 1, y + 1) not in [_.position for _ in open_list]: new_node = Node(idx, (x + 1, y + 1), get_h_value(x + 1, y + 1, goal), cur_node.g_value + 1, cur_node, cur_node.solution + "-RD", "RD") idx += 1 cur_node.children.append(new_node) heapq.heappush(open_list, new_node) else: pre_node = filter(lambda _: _.position == (x + 1, y + 1), open_list) pre_node = list(pre_node)[0] if pre_node.g_value > cur_node.g_value + 1: pre_node.parent = cur_node pre_node.g_value = cur_node.g_value + 1 pre_node.solution = cur_node.solution + "-RD" cur_node.children.append(pre_node) if can_arrive(x, y, "RU", maze): if (x - 1, y + 1) not in [_.position for _ in open_list]: new_node = Node(idx, (x - 1, y + 1), get_h_value(x - 1, y + 1, goal), cur_node.g_value + 1, cur_node, cur_node.solution + "-RU", "RU") idx += 1 cur_node.children.append(new_node) heapq.heappush(open_list, new_node) else: pre_node = filter(lambda _: _.position == (x - 1, y + 1), open_list) pre_node = list(pre_node)[0] if pre_node.g_value > cur_node.g_value + 1: pre_node.parent = cur_node pre_node.g_value = cur_node.g_value + 1 pre_node.solution = cur_node.solution + "-RU" cur_node.children.append(pre_node) if expend_id < flag + 2: # Console output console_output = str(cur_node) + "\n" console_output += "Children: " + str([_.get_children_info() for _ in cur_node.children]) + "\n" console_output += "OPEN: " + str([str(_) for _ in open_list]) + "\n" console_output += "CLOSE: " + str([_.get_close_info() for _ in closed_list]) + "\n" print(console_output) if expend_id < flag + 2: # Console output console_output = str(cur_node) + "\n" console_output += "Children: " + str([_.get_children_info() for _ in cur_node.children]) + "\n" console_output += "OPEN: " + str([str(_) for _ in open_list]) + "\n" console_output += "CLOSE: " + str([_.get_close_info() for _ in closed_list]) + "\n" print(console_output) # File output if solution != "NO-PATH": while cur_node is not None: new_solution_output = "" x = cur_node.position[0] y = cur_node.position[1] for i in range(length): string = "" for j in range(length): if old_maze[i][j] == "G": string += "G" elif i == x and j == y: string += "*" else: string += old_maze[i][j] new_solution_output += string + "\n" if cur_node.position == goal: new_solution_output += "\n" + cur_node.solution + "-G " + str(cur_node.g_value) + "\n\n" else: new_solution_output += "\n" + cur_node.solution + " " + str(cur_node.g_value) + "\n\n" # Generate file output solution_output = new_solution_output + solution_output cur_node = cur_node.parent return solution_outputdef read_from_file(file_name): # You can change the file reading function to suit the way # you want to parse the file file_handle = open(file_name) map = file_handle.readlines() return map########################################################################################## DO NOT CHANGE ANYTHING BELOW #####################################################################################################################def write_to_file(file_name, solution): file_handle = open(file_name, 'w') file_handle.write(solution)def main(): # create a parser object parser = ap.ArgumentParser() # specify what arguments will be coming from the terminal/commandline parser.add_argument("input_file_name", help="specifies the name of the input file", type=str) parser.add_argument("output_file_name", help="specifies the name of the output file", type=str) parser.add_argument("flag", help="specifies the number of steps that should be printed", type=int) # parser.add_argument("procedure_name", help="specifies the type of algorithm to be applied, can be D, A", type=str) # get all the arguments arguments = parser.parse_args() ############################################################################## # these print statements are here to check if the arguments are correct. # print("The input_file_name is " + arguments.input_file_name) # print("The output_file_name is " + arguments.output_file_name) # print("The flag is " + str(arguments.flag)) # print("The procedure_name is " + arguments.procedure_name) ############################################################################## # Extract the required arguments operating_system = platform.system() if operating_system == "Windows": input_file_name = arguments.input_file_name input_tokens = input_file_name.split("\\") if not re.match(r"(INPUT\\input)(\d)(.txt)", input_file_name): print("Error: input path should be of the format INPUT\input#.txt") return -1 output_file_name = arguments.output_file_name output_tokens = output_file_name.split("\\") if not re.match(r"(OUTPUT\\output)(\d)(.txt)", output_file_name): print("Error: output path should be of the format OUTPUT\output#.txt") return -1 else: input_file_name = arguments.input_file_name input_tokens = input_file_name.split("/") if not re.match(r"(INPUT/input)(\d)(.txt)", input_file_name): print("Error: input path should be of the format INPUT/input#.txt") return -1 output_file_name = arguments.output_file_name output_tokens = output_file_name.split("/") if not re.match(r"(OUTPUT/output)(\d)(.txt)", output_file_name): print("Error: output path should be of the format OUTPUT/output#.txt") return -1 flag = arguments.flag # procedure_name = arguments.procedure_name try: map = read_from_file(input_file_name) # get the map except FileNotFoundError: print("input file is not present") return -1 # print(map) solution_string = "" # contains solution solution_string = graphsearch(map, flag) write_flag = 1 # call function write to file only in case we have a solution if write_flag == 1: write_to_file(output_file_name, solution_string)if __name__ == "__main__": main()