Commit 66b23988 authored by Fabrizio Detassis's avatar Fabrizio Detassis
Browse files

Added PySMT implementation for inequality constraints.

parent b798e55b
......@@ -20,15 +20,25 @@ class InequalityGlobalConstraint(AbstractConstraint):
self._checker = self.leq
elif sense == '>=':
self._checker = self.geq
elif sense == '>':
self._checker = self.gt
elif sense == '<':
self._checker = self.lt
else:
raise ValueError("Sense " + str(sense) + " not understood!")
def leq(self, value):
return value <= self.rhs
def lt(self, value):
return value < self.rhs
def geq(self, value):
return value >= self.rhs
def gt(self, value):
return value > self.rhs
def is_satisfied(self, x, y):
c = np.apply_along_axis(self._checker, axis=0, arr=y)
return np.all(c)
......
from _typeshed import NoneType
# from _typeshed import NoneType
from abc import ABC, abstractmethod
from re import A
from aiddl_core.representation.list import List
......
......@@ -47,6 +47,10 @@ class MovingTargetRegCplex(MovingTarget):
M.add_constraints([_x >= cval for _x in x])
elif ctype == '<=':
M.add_constraints([_x <= cval for _x in x])
elif ctype == '<':
M.add_constraints([_x < cval for _x in x])
elif ctype == '>':
M.add_constraints([_x > cval for _x in x])
else:
raise NotImplementedError("Constraint type not recognized " + str(ctype))
......
import numpy as np
from moving_target_abc import MovingTarget
from pysmt.shortcuts import Symbol, LE, GE, LT, GT, Int, And, Equals, Plus, Minus, Div, Times, Max, is_sat, get_model, \
Real, Solver, Ite
from pysmt.typing import REAL
from constraint import InequalityGlobalConstraint
class SMTModel:
def __init__(self, name):
self.name = name
self.domain = None
self.variables = None
self.loss = None
def add_constraint(self, constraint):
if self.domain is None:
self.domain = constraint
else:
self.domain = And(self.domain, constraint)
def add_constraints(self, constraints):
for c in constraints:
self.add_constraint(c)
def optimize(self, alpha=0.2, max_iter=20):
"""
Heuristic search for the optimal value of the SMT.
We proceed as follows:
- check for feasibility: if not, break, else go on.
- get a (random) solution to the CP, that will be the current.
- evaluate the loss on the current solution.
- impose a new constraint on the value of the loss.
"""
it = 0
domain = self.domain
loss = self.loss
while it < max_iter:
model = get_model(domain)
print("Solving problem")
current_loss = model.get_py_value(loss)
print("It: %d; current loss: %.2f" % (it, current_loss))
if it == 0:
ub = current_loss
lb = (1-alpha) * current_loss
print("Bounds: [%.2f - %.2f]" % (lb, ub))
if is_sat(And(domain, LT(loss, Real(lb)))):
print("SAT!")
domain = And(domain, LT(loss, Real(lb)))
ub = lb
lb = (1 - alpha) * ub
else:
print("UNSAT!")
ub = current_loss
alpha = alpha / 2
lb = (1-alpha) * ub
if np.abs(ub-lb) > 0.01:
it += 1
else:
break
self.domain = domain
model = get_model(self.domain)
y_opt = [float(model.get_py_value(y)) for y in self.variables]
return y_opt
class MovingTargetRegSMT(MovingTarget):
x = None
y = None
variables = list()
constraints = list()
""" REMOVED FOR NOW
def set_loss_function(self, M, lf):
pass
"""
def add_constraints(self, M: SMTModel, C, y_s):
"""
Constraints are expressed with a list/set? of tuples, i.e.:
C = [
('>', y, -30),
('fairness', y, 1.0),
]
"""
for c in C:
ctype, cvar, cval = c
# Store the constraint in the class.
cstr = InequalityGlobalConstraint('ct', ctype, cval)
self.constraints.append(cstr)
# Translate the constraint to SMT language.
# todo: indexing of variables contraint <-> model
x = M.variables
if ctype == '>=':
M.add_constraints([GE(_x, Real(cval)) for _x in x])
elif ctype == '<=':
M.add_constraints([LE(_x, Real(cval)) for _x in x])
elif ctype == '<':
M.add_constraints([LT(_x, Real(cval)) for _x in x])
elif ctype == '>':
M.add_constraints([GT(_x, Real(cval)) for _x in x])
else:
raise NotImplementedError("Constraint type not recognized " + str(ctype))
print("Constraint added: " + str(c))
def m_alpha(self, M, L, y_k, alpha):
n_points = len(y_k)
x = M.variables
y_k = y_k.flatten()
if L == 'MSE' or L == 'MeanSquaredError':
y_loss = (1.0 / n_points) * Plus([Times(Minus(Real(float(self.y[i])), x[i]), Minus(Real(float(self.y[i])), x[i]))
for i in range(n_points)])
p_loss = (1.0 / n_points) * Plus([Times(Minus(Real(float(y_k[i])), x[i]), Minus(Real(float(y_k[i])), x[i]))
for i in range(n_points)])
elif L == 'MAE' or L == 'MeanAbsoluteError':
delta = [Minus(Real(float(self.y[i])), x[i]) for i in range(n_points)]
slack = [Ite(d<0, -d, d) for d in delta]
y_loss = (1.0 / n_points) * Plus(slack)
p_loss = (1.0 / n_points) * Plus(slack)
else:
raise NotImplementedError("Loss function not recognized!")
obj_func = Plus(y_loss, Div(p_loss, Real(alpha)))
M.loss = obj_func
def m_beta(self, M, L, y_k, beta):
pass
def check_constraints_ext(self, M, C, d):
sat = True
for c in self.constraints:
sat = sat & c.is_satisfied(*d)
print("Constraint satisfaction: " + str(sat))
return sat
def solve_ext(self, M):
# Check solution.
assert is_sat(M.domain)
# Retrieve solution.
y_opt = M.optimize()
return y_opt
def initialize_ext(self, d, name='smt_model'):
x, y = d
# Store the input data.
self.x = x
self.y = y.flatten()
# Model declaration.
mod = SMTModel(name)
# Variable declaration.
n_points = len(y)
idx_var = [i for i in range(n_points)]
mod.variables = [Symbol("y_%d" % i, REAL) for i in range(n_points)]
# Store variable names.
self.variables = ['y_%d' %i for i in idx_var]
print("Variables added: " + str(self.variables))
return mod
if __name__ == '__main__':
# Generation of synthetic data.
d = 10 * np.random.rand(10, 1), 10 * np.random.rand(10, 1)
# Loss function.
L = 'MAE'
# Costraints declaration.
C = [
("<=", 'y', 5),
(">", 'y', 1.2),
]
# Params.
alpha = 1
beta = 1
print("Original data: ")
x, y = d
for i in range(len(x)):
print("%.2f \t %.2f" % (x[i], y[i]))
m = MovingTargetRegSMT(n=1)
# CPLEX model initialization.
smt_mod = m.initialize_ext(d)
# Add constraints.
m.add_constraints(smt_mod, C, y)
# Check constraint satisfaction.
sat = m.check_constraints_ext(smt_mod, C, d)
if not sat:
# Alpha step.
m.m_alpha(smt_mod, L, y, alpha)
else:
# Beta step.
m.m_beta(smt_mod, L, y, beta)
# Model solving.
y_new = m.solve_ext(smt_mod)
print("Final data: ")
print("x \t y \t y_new")
for i in range(len(x)):
print("%.2f \t %.2f \t %.2f" % (x[i][0], y[i], y_new[i]))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment