-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil.py
181 lines (160 loc) · 5.3 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
from stn import STN
from stn import loadSTNfromJSONfile
from dc_stn import DC_STN, edgeType
import random
import heapq
import math
import json
import glob
import os
import sys
##
# \file util.py
# \brief helpful functions
##
# \fn STNtoDCSTN(S)
# \brief Convert an STN object to a DC_STN object
#
# @param S An input STN object to convert
#
# @return A DC_STN object that has the same vertices and edges as the input
def STNtoDCSTN(S):
new_STN = DC_STN()
for edge in list(S.edges.values()):
new_STN.addEdge(edge.i, edge.j, edge.Cij)
new_STN.addEdge(edge.j, edge.i, edge.Cji)
if edge.isContingent():
new_STN.addEdge(
edge.i,
edge.j,
-edge.Cji,
edge_type=edgeType.LOWER,
parent=edge.j)
new_STN.addEdge(
edge.j,
edge.i,
-edge.Cij,
edge_type=edgeType.UPPER,
parent=edge.j)
return new_STN
##
# \fn dc_checking(STN)
# \brief Check if an input STN is dynamically controllable
#
# @param S An input STN object to check
#
# @return Return True if the input STN is dynamically controllable. Return
# False otherwise.
def dc_checking(STN):
return STNtoDCSTN(STN).is_DC()
##
# \fn dc_checking_file(filename)
# \brief Check if an input STN is dynamically controllable
#
# @param filename An input STN object to check
#
# @return Return True if the input STN is dynamically controllable. Return
# False otherwise.
def dc_checking_file(filename):
STN = loadSTNfromJSONfile(filename)
return dc_checking(STN)
##
# \fn normal(STN)
# \brief convert a given STNU to its normal form labeled graph using DC_STN
#
# \details a normal form labeled graph is needed for Williams algorithm.
# The robotbrunch DC_STN class keeps track of the upper, lower case
# edges, so we need to use it for labeled graph.
#
# @param STN an STN to convert
#
# @return Return the DC_STN that is the normal form labeled graph for input STN
# and a dictionary storing vertices added to create normal form
def normal(STN):
new = DC_STN()
for i in list(STN.verts.keys()):
new.addVertex(i)
changed = {}
for e in list(STN.edges.values()):
if not e.isContingent():
new.addEdge(e.i, e.j, e.Cij)
new.addEdge(e.j, e.i, e.Cji)
else:
if e.Cji != 0:
new_vert = len(new.verts)
changed[new_vert] = e
new.addVertex(new_vert)
new.addEdge(e.i, new_vert, -e.Cji)
new.addEdge(new_vert, e.i, e.Cji)
upper = e.Cij + e.Cji
new.addEdge(new_vert, e.j, upper)
new.addEdge(e.j, new_vert, 0)
new.addEdge(
new_vert, e.j, 0, edge_type=edgeType.LOWER, parent=e.j)
new.addEdge(
e.j,
new_vert,
-upper,
edge_type=edgeType.UPPER,
parent=e.j)
else:
new.addEdge(e.i, e.j, e.Cij)
new.addEdge(e.j, e.i, e.Cji)
new.addEdge(
e.i, e.j, -e.Cji, edge_type=edgeType.LOWER, parent=e.j)
new.addEdge(
e.j, e.i, -e.Cij, edge_type=edgeType.UPPER, parent=e.j)
return new, changed
##
# \class PriorityQueue
# \brief A simple Priority Queue implementation that pop the item with the
# lowest priority
# \note This code is adapted from UC Berkeley AI course project util.py file
class PriorityQueue:
##
# \brief PriorityQueue Constructor
def __init__(self):
self.queue = []
##
# \brief Push an element with given priority into the Priority queue
#
# @param data An element to be added
# @param priority The priority associated with input data
#
# @post A Priority Queue object with input data added
def push(self, data, priority):
heapq.heappush(self.queue, (priority, data))
##
# \brief Pop the element with the lowest priority in Priority Queue
#
# @return A tuple in the form of (priority, data)
def pop(self):
return heapq.heappop(self.queue)
##
# \brief Check if the Priority Queue has element inside
#
# @return Return True if the queue is empty and return False otherwise
def isEmpty(self):
return len(self.queue) == 0
##
# \brief Add or decrease the priority of an input data
#
# \detail If input data is already in the queue, modify its priority if the
# input priority is strictly lower than its original priority,
# do nothing otherwise. If input data is not in the queue, push it
# to the queue with given priority.
#
# @param data An element to be added/modified
# @param priority The priority associated with input data
#
# @post A Priority Queue object with input data added/modified
def addOrDecKey(self, data, priority):
for i, (p, d) in enumerate(self.queue):
if d == data:
if priority >= p:
break
del self.queue[i]
self.push(data, priority)
break
else:
self.push(data, priority)