-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclassical_solvers.py
More file actions
209 lines (167 loc) · 6.61 KB
/
classical_solvers.py
File metadata and controls
209 lines (167 loc) · 6.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
from ortools.sat.python import cp_model
import numpy as np
from sage.all import Matrix, GF, codes, vector, codes
import itertools
from simanneal import Annealer
import random
class AbstractSolver:
def __init__(self, instance):
self.instance = instance.to_max_linsat()
self.solution = None
def compute_solution(self, instance):
raise NotImplementedError()
def get_solution(self):
if self.solution is None:
self.solution = self.compute_solution(self.instance)
return self.solution
def get_solution_quality(self):
solution = self.get_solution()
return self.instance.evaluate_solution(solution)
class OrToolsSolver(AbstractSolver):
def __init__(self, instance, time_limit=None):
super().__init__(instance)
self._is_optimal = None
self.time_limit = time_limit
def compute_solution(self, instance):
if not instance.field.is_prime_field():
raise ValueError("This method only supports prime fields")
p = int(instance.field.order())
B = instance.get_B()
F = instance.get_F()
weights = instance.get_weights()
n = B.ncols()
m = B.nrows()
B = np.array(B)
model = cp_model.CpModel()
x = [model.NewIntVar(0, p - 1, f"x_{i}") for i in range(n)]
satisfied_vars = []
for i in range(m):
for rhs in F[i]:
rhs = int(rhs) % p
linear_sum = sum(B[i, j] * x[j] for j in range(n) if B[i, j] != 0)
mod_var = model.NewIntVar(0, p - 1, f"mod_var_{i}")
satisfied = model.NewBoolVar(f"satisfied_{i}_{rhs}")
model.AddModuloEquality(mod_var, linear_sum, p)
model.Add(mod_var == rhs).OnlyEnforceIf(satisfied)
model.Add(mod_var != rhs).OnlyEnforceIf(satisfied.Not())
satisfied_vars.append(satisfied)
if weights is None:
model.Maximize(sum(satisfied_vars))
else:
model.Maximize(sum([v * w for v, w in zip(satisfied_vars, weights)]))
solver = cp_model.CpSolver()
if self.time_limit is not None:
solver.parameters.max_time_in_seconds = self.time_limit
status = solver.Solve(model)
if status in [cp_model.OPTIMAL, cp_model.FEASIBLE]:
self._is_optimal = status == cp_model.OPTIMAL
return vector(
instance.field, np.array([solver.Value(x[i]) for i in range(n)])
)
else:
raise ValueError("Could not find a solution")
def is_optimal(self):
if self._is_optimal is None:
self.compute_solution()
return self._is_optimal
class BruteForceSolver(AbstractSolver):
def __init__(self, instance):
super().__init__(instance)
self.solution = None
def compute_solution(self, instance):
solutions = itertools.product(list(instance.field), repeat=instance.get_n())
best_solution = None
best_solution_value = None
for solution in solutions:
value = instance.evaluate_solution(solution)
if best_solution_value is None or value > best_solution_value:
best_solution_value = value
best_solution = solution
return best_solution
class MaxLinSatAnneal(Annealer):
def __init__(self, instance, initial_state=None):
self.instance = instance
self.els = list(self.instance.field)
if initial_state is None:
initial_state = vector(
self.instance.field,
[random.choice(self.els) for _ in range(self.instance.get_n())],
)
self.state = initial_state
def move(self):
n = self.instance.get_n()
move = vector(self.instance.field, [0] * n)
index = random.choice(range(n))
el = random.choice(self.els)
move[index] = el
self.state += move
def energy(self):
return self.instance.get_m() - self.instance.evaluate_solution(self.state)
def update(self, *args):
pass
class SimAnnealSolver(AbstractSolver):
def __init__(
self,
instance,
initial_state=None,
Tmax=25000.0,
Tmin=2.5,
steps=10000,
):
super().__init__(instance)
self.initial_state = initial_state
self.Tmax = Tmax
self.Tmin = Tmin
self.steps = steps
def compute_solution(self, instance):
annealer = MaxLinSatAnneal(instance, self.initial_state)
annealer.Tmax = self.Tmax
annealer.Tmin = self.Tmin
annealer.steps = self.steps
return annealer.anneal()[0]
class PrangeSolver(AbstractSolver):
def __init__(self, instance):
super().__init__(instance)
n = instance.get_n()
m = instance.get_m()
B = instance.get_B()
if not (m >= n and B.rank() == n):
raise ValueError(
"To apply Prange's algorithm, there must exist at least as many constraints as variables and the constraint matrix must have full rank"
)
def get_expected_solution_quality(self):
# We assume independence:
# for each constraint, there is a n / m chance to be in the pivot elements
# if it is in the pivot elements it is satisfied with probability 1
# otherwise it is satisfied with probability F_i / q
n = self.instance.get_n()
m = self.instance.get_m()
q = self.instance.field.order()
return sum(
n / m * 1 + (1 - n / m) * len(F_i) / q for F_i in self.instance.get_F()
)
def compute_solution(self, instance):
expected_solution_quality = self.get_expected_solution_quality()
for _ in range(1000):
solution = self.compute_arbitrary_solution(instance)
if instance.evaluate_solution(solution) >= expected_solution_quality:
return solution
raise ValueError("Could not find a solution of the expected quality")
def compute_arbitrary_solution(self, instance):
n = instance.get_n()
m = instance.get_m()
B = instance.get_B()
F = instance.get_F()
v = [random.choice(list(F_i)) for F_i in F]
perm = list(range(m))
random.shuffle(perm)
shuffled_B = B[perm, :]
pivots = shuffled_B.pivots()
selected_rows = [perm[i] for i in pivots]
selected_rows_set = set(selected_rows)
B_subset = B[selected_rows, :]
v_subset = vector(
instance.field, [v_i for i, v_i in enumerate(v) if i in selected_rows]
)
x = B_subset.solve_right(v_subset)
return x