Commit 68058584 authored by Alessandro Saffiotti's avatar Alessandro Saffiotti
Browse files

Created Lab 3

parent abde0b95
"""
============== UniBo: AI and Robotics 2025 ==============
Base code: skeleton controller, performs the action decision parts of the main loop
To be customized in an incremental way for the different labs
(c) 2024-2025 Alessandro Saffiotti
"""
from fcontrol import Behavior
from fcontrol import ramp_up, ramp_down, triangle, trapezoid
from fcontrol import global_to_local, local_to_global
from math import atan2, degrees, sqrt
class Still (Behavior):
"""
Just stay still. This is the default behavior.
"""
def __init__(self, noparam):
super().__init__()
def update_state(self, state):
"""
No local state variables to set
"""
def setup(self):
"""
Predicates, linguistic variables and rules of our fuzzy behavior
"""
self.fpreds = {
# No fuzzy predicates used in the LHS of rules
}
self.flvars = {
# Definition of the fuzzy linguistic variables, used in the rules' RHS
'Move' : ({'Fast':0.5, 'Slow':0.1, 'None':0, 'Back':-0.1}, 'Vlin'),
'Turn' : ({'Left':30, 'MLeft':10, 'None':0, 'MRight':-10, 'Right':-30}, 'Vrot')
}
self.frules = {
'NoTurn' : ("True", 'Turn', 'None'),
'NoGo' : ("True", 'Move', 'None')
}
# The degree of achievement is given by the built-in predicate 'False' to run forever
self.fgoal = "False"
# Finally, initialize the fuzzy predicats and fuzzy output sets
self.init_flsets()
self.init_fpreds()
class GoToTarget (Behavior):
"""
Navigate to a target object at (x,y,th,radius), where 'x,y' is
the object's center in global frame, and 'radius' is its size
The behavior stops at about 'radius' mt from the object's center
The object's orientation th is ignored
It assumes that there are no obstacles on the way
"""
def __init__(self, target = (0.0, 0.0, 0.0, 0.0)):
self.tpoint = [target[0], target[1], 0.0] # target center point in global coordinates
self.tlocal = [0, 0, 0] # target center point in robot's coordinates
print("Target: " , target)
self.radius = target[3] # distance to stop from target's centerpoint
super().__init__()
def update_state(self, state):
"""
Set the local state variables phi and rho
Use the 'mypose' estimate, passed as part of the global state
"""
# 'phi' and 'rho' are radial coordinates to target
# 'phi' is in degrees, 'rho' is in meters
mypose = state['mypose']
global_to_local(self.tpoint, mypose, self.tlocal)
xt = self.tlocal[0]
yt = self.tlocal[1]
self.state['phi'] = degrees(atan2(yt, xt))
self.state['rho'] = sqrt(xt * xt + yt * yt)
def setup(self):
"""
Predicates, linguistic variables and rules of our fuzzy behavior
"""
robot_radius = 0.5
stop_dist = self.radius + robot_radius
self.fpreds = {
# Fuzzy predicates used in the LHS of rules
'TargetLeft' : (ramp_up(30.0, 60.0), 'phi'),
'TargetRight' : (ramp_down(-60.0, -30.0), 'phi'),
'TargetAhead' : (triangle(-30.0, 0.0, 30.0), 'phi'),
'TargetHere' : (ramp_down(stop_dist, stop_dist+1.0), 'rho'),
}
self.flvars = {
# Definition of the fuzzy linguistic variables, used in the rules' RHS
'Move' : ({'Fast':0.5, 'Slow':0.1, 'None':0, 'Back':-0.1}, 'Vlin'),
'Turn' : ({'Left':30, 'MLeft':10, 'None':0, 'MRight':-10, 'Right':-30}, 'Vrot')
}
self.frules = {
'ToLeft' : ("TargetLeft AND NOT(TargetHere)", 'Turn', 'Left'),
'ToRight' : ("TargetRight AND NOT(TargetHere)", 'Turn', 'Right'),
'Straight' : ("TargetAhead", 'Turn', 'None'),
'Far' : ("TargetAhead AND NOT(TargetHere)", 'Move', 'Fast'),
'Stop' : ("TargetHere OR NOT(TargetAhead)", 'Move', 'None')
}
# The degree of achievement is given by the predicate 'TargetHere'
self.fgoal = "TargetHere"
# Finally, initialize the fuzzy predicats and fuzzy output sets
self.init_flsets()
self.init_fpreds()
class Wander (Behavior):
def __init__(self, noparam):
super().__init__()
def update_state(self, state):
"""
Set the local state variables for clearance (left, right, front, overall)
Uses the 'sdata' sonar ranges, passed as part of the global state
"""
ranges = [s[1] for s in state['sdata']]
self.state['lft_clr'] = min(ranges[1], ranges[2])
self.state['rgt_clr'] = min(ranges[10], ranges[11])
self.state['frt_clr'] = ranges[0]
self.state['all_clr'] = min(ranges)
def setup(self):
"""
Predicates, linguistic variables and rules of our fuzzy behavior
"""
self.fpreds = {
# Definition of the fuzzy predicates, used in the rules' LHS
'ObstacleLeft' : (ramp_down(0.0, 1.0), 'lft_clr'),
'ObstacleRight' : (ramp_down(0.0, 1.0), 'rgt_clr'),
'ObstacleAhead' : (ramp_down(0.0, 1.0), 'frt_clr'),
# Fuzzy predicate do decide between avoid and go
'Danger' : (ramp_down(0.0, 1.0), 'all_clr')
}
self.flvars = {
# Definition of the fuzzy linguistic variables, used in the rules' RHS
'Move' : ({'Fast':0.5, 'Slow':0.1, 'None':0, 'Back':-0.5}, 'Vlin'),
'Turn' : ({'Left':30, 'MLeft':10, 'None':0, 'MRight':-10, 'Right':-30}, 'Vrot')
}
self.frules = {
# Lastly, definition of the actual fuzzy rules
'GoLeft' : ("ObstacleRight AND NOT(ObstacleLeft)", 'Turn', 'Left'),
'GoRight' : ("ObstacleLeft AND NOT(ObstacleRight)", 'Turn', 'Right'),
'Tunnel' : ("ObstacleRight AND ObstacleLeft", 'Turn', 'None'),
'Front' : ("ObstacleAhead", 'Move', 'Back'),
'Slow' : ("Danger", 'Move', 'Slow'),
'Fast' : ("NOT(Danger)", 'Move', 'Fast')
}
# The degree of achievement is set to false: this behavior never ends
self.fgoal = "False"
# Finally, initialize the fuzzy predicats and fuzzy output sets
self.init_flsets()
self.init_fpreds()
class Controller ():
def __init__(self, robot_pars):
self.behavior = None # top-level fuzzy behavior run by the controller
self.achieved = 0.0 # level of achievement of current behavior
self.vlin = 0.0 # current value for vlin control variable
self.vrot = 0.0 # current value for vrot control variable
def set_behavior (self, bname = None, bparam = None):
"""
Initialize the controller, setting the behavior to be executed
Return True if the inizialization is successful
"""
if bname:
self.behavior = globals()[bname](bparam)
return True
def run (self, state, debug):
"""
Action decision. Compute control values (vlin, vrot) given the current robot's pose
by running the current behavior; return the level of achievement of that behavior
"""
self.achieved = self.behavior.run(state, debug)
self.vlin = self.behavior.get_vlin()
self.vrot = self.behavior.get_vrot()
if debug > 1:
print('Goal achievement: {:.2f}'.format(self.achieved))
if debug > 1:
print('(vlin, vrot) = ({:.2f}, {:.2f})'.format(self.vlin, degrees(self.vrot)))
return self.achieved
def get_vlin (self):
return self.vlin
def get_vrot (self):
return self.vrot
""" ========== UniBo: AI and Robotics 2025 ==========
A class to build fuzzy controllers. A fuzzy controller is defined by a set of fuzzy rules
together with the corresponding fuzzy predicates and fuzzy linguistic variables.
A fuzzy rule has the form
{ Name : (Antecedent, Linguistic variable, Linguistic value) }
where:
- Antecedent is an expression in fuzzy propositional logic, buid from a set
of fuzzy predicates (see below) and the fuzzy connectives AND, OR, NOT.
NOT is a prefix unary operator; AND and NOT are infix binary operators
- Linguistic variable is the name of a linguistic variable (see below)
- Linguistic value is a linguistic value for the linguistic variable
Example of a fuzzy rule:
{ 'ToLeft' : ("TargetLeft AND NOT(TargetHere)", 'Turn', 'Left') }
A fuzzy predicate has the form
{ Name : (Membership function, Input variable) }
where:
- Membership function is a function returning a value in [0,1]
- Input variable is the argument to the membership function
Example of a fuzzy predicate:
{ 'TargetLeft' : (ramp_down(-30.0, -5.0), 'phi') }
where ramp_down is a predefined membership function.
A linguistic variable (a.k.a. "fuzzy action") has the form
{ Name : (Linguistic values, Target variable)}
where:
- Linguistic values is a dictionary of {label : value}
- Target variable is the control variable to which the values refer to
Example of a linguistic variable:
{ 'Turn' : ({'Left':-30, 'MLeft':-10, 'None':0, 'MRight':10, 'Right':30}, 'Vrot') }
(c) 2024-2025 Alessandro Saffiotti
"""
from collections import deque
from math import sin, cos, radians
class FController:
"""
This class implements a generic fuzzy rule-based controller
It provides the basic mechanisms for the "FBehavior" class
It uses the "FEval" class to evaluate the truth value of the rule antecedents
"""
def __init__(self):
self.fevaluator = FEval()
self.state = {} # input state, used to compute all the truth values
self.output = {} # output variables, computed by the fuzzy controller
self.frules = {} # fuzzy rules
self.fpreds = {} # fuzzy predicates, used in the LHS of the rules
self.flvars = {} # fuzzy linguistic variables, used in the RHS of the rules
self.flsets = {} # fuzzy sets, hold the values of the linguistic variables
self.fpvals = {} # truth values of the fuzzy pedicates
self.fgoal = "" # goal condition, as a statement in fuzzy logic
def init_flsets(self):
for name in self.flvars:
flvar = self.flvars[name]
cvar = flvar[1]
self.flsets[cvar] = ({}, name)
self.output[cvar] = 0.0
for label in flvar[0]:
self.flsets[cvar][0][label] = 0.0
def init_fpreds(self):
for pred in self.fpreds:
self.fpvals[pred] = 0.0
def eval_fpred(self, name):
fpred = self.fpreds[name]
return(fpred[0](self.state[fpred[1]]))
def eval_fpreds(self):
for pred in self.fpreds:
self.fpvals[pred] = self.eval_fpred(pred)
return self.fpvals
def eval_frule(self, name, debug = 0):
frule = self.frules[name]
ante = frule[0]
flvar = self.flvars[frule[1]]
label = frule[2]
cvar = flvar[1]
flset = self.flsets[cvar][0]
self.fevaluator.set_stmt(ante)
level = self.fevaluator.eval()
flset[label] = max(flset[label], level)
if debug > 2:
print(' {} [{:.2f}] -> {}'.format(name, level, frule))
def eval_frules(self, debug = 0):
if debug > 2:
self.print_fpreds()
self.init_flsets()
if debug > 2:
print('Rules:')
for frule in self.frules:
self.eval_frule(frule, debug)
def eval_goal(self, debug = 0):
self.fevaluator.set_stmt(self.fgoal)
return self.fevaluator.eval()
def defuzzify_var(self, cvar, debug = 0):
lvar = self.flsets[cvar][1]
vals = self.flvars[lvar][0]
mus = self.flsets[cvar][0]
sumv = 0.0
sumu = 0.0
if debug > 2:
print("defuzzify:", cvar)
print(" vals:", vals)
print(" mu's:", mus)
for label in vals:
sumv += mus[label] * vals[label]
sumu += mus[label]
if sumu == 0.0:
return None
if debug > 2:
print(" -->:", sumv / sumu)
return sumv / sumu
def defuzzify(self, debug = 0):
for cvar in self.output:
val = self.defuzzify_var(cvar, debug)
if val is not None:
self.output[cvar] = val
def run(self, state, debug = 0):
self.update_state(state)
if debug > 2:
self.print_state()
self.eval_fpreds()
self.fevaluator.set_interpretation(self.fpvals)
self.eval_frules(debug)
self.defuzzify(debug)
return self.eval_goal()
def print_state(self):
print("Input state:")
for var in self.state:
print(' {}: {:.3f}'.format(var, self.state[var]))
def print_output(self):
print("Output variables:")
for var in self.output:
print(' {}: {:.3f}'.format(var, self.output[var]))
def print_fpreds(self):
print("Fuzzy predicates:")
for pred in self.fpreds:
print(' {}: {:.3f}'.format(pred, self.eval_fpred(pred)))
def print_flvars(self):
print("Fuzzy linguistic variables:")
for name in self.flvars:
flvar = self.flvars[name]
print('', name + ':', flvar[0], '(' + flvar[1] + ')')
def print_flsets(self):
print("Fuzzy linguistic sets:")
for name in self.flsets:
flset = self.flsets[name]
print('', name + ':', flset[0], '(' + flset[1] + ')')
def print_frules(self):
print("Fuzzy Rules:")
for name in self.frules:
frule = self.frules[name]
print("{: <12}".format(name + ": "), end = "")
print("IF " + "{: <50}".format(frule[0]) + " THEN " + frule[1] + "(" + frule[2] + ")")
def print_all(self):
self.print_fpreds()
self.print_flvars()
self.print_flsets()
self.print_frules()
self.print_state()
self.print_output()
class Behavior (FController):
"""
Top level class for defining a fuzzy "behavior" using a fuzzy rule-based controller
Behaviors are specialized fuzzy controllers that realize one specific task.
Each is created by defining its specific rules, fuzzy predicates and fuzzy
linguistic variables (actions), in the function "setup"; as well as the specific
way to compute the relevant state variables, in the function "update_state".
Each specific behavior, like "GoToTarget", must be a sub-classe of this one,
and it must specialize these two functions:
- setup (where all fuzzy predicates, actions and rules are defined)
- update_state (how to compute the relevant state variables)
All behaviors have two control variables: 'Vlin' and 'Vrot'
To execute a behavior, call its run() function inherited from FController:
after that, Vlin and Vrot can be retrieved by get_vlin() and get_vrot().
An example of this is in the step() function in the class TopLevelLoop.
"""
def __init__(self):
super().__init__()
# default for all robot behaviors
self.flvars = {
'Move' : ({}, 'Vlin'),
'Turn' : ({}, 'Vrot')
}
self.setup()
def setup(self):
"""
Define the set of fuzzy rules, fuzzy predicates and fuzzy linguistic variables
Must be instantiated for any specific subclass of FBehavior
"""
pass
def update_state(self, state):
"""
Set the local variables depending on the current robot's state
Must be instantiated for any specific subclass of FBehavior
"""
pass
def get_vlin(self):
return self.output['Vlin']
def get_vrot(self):
return radians(self.output['Vrot'])
class FEval:
"""
A class to evaluate the truth value of statement in propositional fuzzy logic,
using a simple LR(1) parser based on the following BNF:
<stmt> ::= <term> | <term> "OR" <term> | <term> "AND" <term>
<term> ::= <atom> | <encl> | "NOT" <encl>
<encl> ::= "(" <stmt> ")"
Truth values are computed with respect to a given interpretation, that is, an
assignment of a truth value to each fuzzy predicate, given as a dict 'fpreds'.
"""
def __init__(self, debug=0):
self.debug = debug
self.stmt = ""
self.input = None
self.fpreds = {}
self.nest = 1
def set_stmt(self, string):
self.expr = string
self.input = deque(string.replace('(',' ( ').replace(')',' ) ').split())
def set_interpretation(self, dict):
self.fpreds = dict
def this(self):
if len(self.input) < 1:
return None
return self.input[0]
def next(self):
if len(self.input) < 2:
return None
return self.input[1]
def eval(self):
self.value = 0.0
result = self.parse_stmt()
if self.debug > 0:
print("> FEval interp:", self.fpreds)
print("> FEval input:", self.input)
print("> FEval value:", result)
assert result != None, "Invalid syntax in fuzzy expression: " + self.expr
return result
def parse_stmt(self):
self.debug_enter("stmt")
lhs = self.parse_term()
if lhs == None:
self.debug_exit("stmt", None)
return None
op = self.this()
if op == 'AND':
self.input.popleft()
rhs = self.parse_term()
if rhs == None:
self.debug_exit("stmt", None)
return None
result = self.eval_and(lhs, rhs)
elif op == 'OR':
self.input.popleft()
rhs = self.parse_term()
if rhs == None:
self.debug_exit("stmt", None)
return None
result = self.eval_or(lhs, rhs)
self.debug_exit("stmt", result)
return result
else:
result = lhs
self.debug_exit("stmt", result)
return result
def parse_term(self):
self.debug_enter("term")
if self.this() == 'NOT':
self.input.popleft()
term = self.parse_encl()
if term == None:
self.debug_exit("term", None)
return None
result = self.eval_not(term)
elif self.is_atom(self.this()):
result = self.parse_atom()
else:
result = self.parse_encl()
self.debug_exit("term", result)
return result
def parse_encl(self):
self.debug_enter("encl")
if self.this() == '(':
self.input.popleft()
expr = self.parse_stmt()
if expr == None:
self.debug_exit("encl", None)
return None
if self.this() == ')':
self.input.popleft()
result = self.eval_paren(expr)
self.debug_exit("encl", result)
return result
self.debug_exit("encl", None)
return None
def parse_atom(self):
self.debug_enter("atom")
token = self.input.popleft()
if self.is_atom(token):
result = self.eval_pred(token)
self.debug_exit("atom", result)
return result
self.debug_exit("atom", None)
return None
def is_atom(self, token):
if token in ['NOT', 'AND', 'OR', '(', ')']:
return False
return True
def debug_enter(self, category):
self.nest = self.nest + 1
if self.debug > 1:
print("".ljust(self.nest), category, ">", list(self.input))
def debug_exit(self, category, val):
self.nest = self.nest - 1
if self.debug > 1:
print("".ljust(self.nest), category, "<", val)
def eval_pred(self, pred):
if pred == 'True':
return 1.0
if pred == 'False':
return 0.0
if pred not in self.fpreds:
raise LookupError("No truth value provided for fuzzy predicate: " + pred)
if self.debug == 0:
return self.fpreds[pred]
else:
return '[' + str(self.fpreds[pred]) + ']'
def eval_and(self, lhs, rhs):
if self.debug == 0:
return min(lhs, rhs)
else:
return 'min(' + lhs + ', ' + rhs
def eval_or(self, lhs, rhs):
if self.debug == 0:
return max(lhs, rhs)
else:
return 'max(' + lhs + ', ' + rhs
def eval_not(self, term):
if self.debug == 0:
return 1.0 - term
else:
return '1 - ' + term
def eval_paren(self, stmt):
if self.debug == 0:
return stmt
else:
return '(' + stmt + ')'
"""
Some common builders for membership functions
"""
def ramp_up(a, b):
"""
zero until a, then raise linearly until b, then one
"""
assert a <= b, f"ramp_up called with a > b ({a}, {b})"
def mu(x):
if x <= a:
return 0.0
if x > b:
return 1.0
return (x-a)/(b-a)
return mu
def ramp_down(a, b):
"""
one until a, then decrease linearly until b, then zero
"""
assert a <= b, f"ramp_down called with a > b ({a}, {b})"
def mu(x):
if x <= a:
return 1.0
if x > b:
return 0.0
return (b-x)/(b-a)
return mu
def triangle(a, b, c):
"""
zero until a, then raise linearly until b, then decrease linearly until c, then zero
"""
assert a <= b, f"triangle called with a > b ({a}, {b})"
assert b <= c, f"triangle called with b > c ({b}, {c})"
def mu(x):
if x <= a:
return 0.0
if x > c:
return 0.0
if x <= b:
return (x-a)/(b-a)
return (c-x)/(c-b)
return mu
def trapezoid(a, b, c, d):
"""
zero until a, raise linearly until b, one unti c, decrease linearly until d, then zero
"""
assert a <= b, f"trapezoid called with a > b ({a}, {b})"
assert b <= c, f"trapezoid called with b > c ({b}, {c})"
assert c <= d, f"trapezoid called with c > d ({c}, {d})"
def mu(x):
if x <= a:
return 0.0
if x > d:
return 0.0
if x <= b:
return (x-a)/(b-a)
if x <= c:
return 1.0
return (d-x)/(d-c)
return mu
"""
Some basic geometry
"""
def global_to_local(pose, frame, newpose):
"""
Transform pose from global to local frame, store it in newpose
Frame are coordinates of local frame in global one
Poses are triples [x, y, th], in mt and rad
"""
x = pose[0]
y = pose[1]
th = pose[2]
x0 = frame[0]
y0 = frame[1]
th0 = frame[2]
newpose[0] = x * cos(th0) + y * sin(th0) - (x0 * cos(th0) + y0 * sin(th0))
newpose[1] = y * cos(th0) - x * sin(th0) + (x0 * sin(th0) - y0 * cos(th0))
newpose[2] = th - th0
def local_to_global(pose, frame, newpose):
"""
Transform pose from local to global frame, store it in newpose
Frame are coordinates of local frame in global one
Poses are triples [x, y, th], in mt and rad
"""
x = pose[0]
y = pose[1]
th = pose[2]
x0 = frame[0]
y0 = frame[1]
th0 = frame[2]
newpose[0] = x * cos(th0) - y * sin(th0) + x0
newpose[1] = x * sin(th0) + y * cos(th0) + y0
newpose[2] = th + th0
"""
============== UniBo: AI and Robotics 2025 ==============
Base code: HTN planning domain for Pyhop planner
To be customized in an incremental way for the different labs
(c) 2024-2025 Alessandro Saffiotti
"""
import pyhop
class State(pyhop.State):
def __init__(self):
self.__name__ = "s1"
self.pos = {} # named position of robot ('me') or of an objet
self.room = {} # room of robot or of an objet
self.connects = {} # doors' connectivity: pair of rooms
###############################################################################
# OPERATORS
# First argument is current state, then come the operator's parameters
# Return a new state, of False if the operator is not applicable
###############################################################################
def GoTo (state, target):
state.pos['me'] = target
return state
def Cross (state, door):
if (state.pos['me'] != door):
return False
if (state.room['me'] == state.connects[door][0]):
state.room['me'] = state.connects[door][1]
return state
if (state.room['me'] == state.connects[door][1]):
state.room['me'] = state.connects[door][0]
return state
return False
pyhop.declare_operators(GoTo, Cross)
###############################################################################
# METHODS
# First argument is current state, then come the method's parameters
# Return a decomposition into a list of sub-tasks, or False if not applicable
# Primitive sub-tasks will match operators; others will match other methods
###############################################################################
# Method to navigate when we are already at the target
def move_in_place (state, target):
if state.pos['me'] == target:
return []
else:
return False
# Method to navigate when the target is in the same room
def move_in_room(state, target):
if state.room['me'] == state.room[target]:
return [('GoTo', target)]
else:
return False
# Helper function to find connecting doors
def doors_between (state, room1, room2):
doors = []
for d in state.connects:
if (state.connects[d][0] == room1 and state.connects[d][1] == room2) or (state.connects[d][0] == room2 and state.connects[d][1] == room1) :
doors.append(d)
return doors
# Method to navigate when the target is in an adjacent room
def move_across_rooms (state, target):
room1 = state.room['me']
room2 = state.room[target]
if room1 == room2:
return False
doors = doors_between(state, room1, room2)
if doors == []:
return False
door = doors[0]
return [('GoTo', door), ('Cross', door), ('GoTo', target)]
pyhop.declare_methods('navigate_to', move_in_place, move_in_room, move_across_rooms)
#!/usr/bin/env python3
"""
============== UniBo: AI and Robotics 2025 ==============
Base code: start code for Lab 3 (action planning)
(c) 2024-2025 Alessandro Saffiotti
"""
import toplevel, pyhop, htn_domain
# Create an initial state
s1 = htn_domain.State()
s1.room['bed1'] = 'Room1'
s1.room['wardrobe'] = 'Room1'
s1.room['fridge'] = 'Room2'
s1.room['stove'] = 'Room2'
s1.room['sink'] = 'Room2'
s1.room['table1'] = 'Room2'
s1.room['table2'] = 'Room4'
s1.room['table3'] = 'Room4'
s1.connects['D1'] = ('Room3', 'Room4')
s1.connects['D2'] = ('Room1', 'Room4')
s1.connects['D3'] = ('Room1', 'Room3')
s1.connects['D4'] = ('Room2', 'Room3')
s1.pos['me'] = 'table2'
s1.room['me'] = 'Room4'
if __name__ == '__main__':
toplevel = toplevel.TopLevelLoop(mypose = (2.0, -2.0, 0.0), debug = 1)
print("Initial state:")
pyhop.print_state(s1)
myplan = pyhop.pyhop(s1, [('navigate_to', 'table3')], verbose=2)
# myplan = pyhop.pyhop(s1, [('navigate_to', 'bed1')], verbose=2)
# myplan = pyhop.pyhop(s1, [('navigate_to', 'stove')], verbose=2)
print('')
print("Plan:", myplan)
# toplevel.start(behavior = 'GoTo', behavior_params = (6.0, 4.0, 0.0, 0.0))
"""
============== UniBo: AI and Robotics 2025 ==============
Base code: dummy observer, performs the state estimation parts of the main loop
To be customized in an incremental way for the different labs
(c) 2024-2025 Alessandro Saffiotti
"""
from math import sin, cos
class Observer:
"""
Initialize the observer, using the given robot's parameters
"""
def __init__ (self, robot_pars):
self.WHEEL_RADIUS = robot_pars['wheel_radius']
self.WHEEL_AXIS = robot_pars['wheel_axis']
self.gx = 0.0 # current estimate of robot's global position
self.gy = 0.0
self.th = 0.0
# set initial pose
def init_pose (self, pose = None):
"""
Set initial pose of the robot
"""
if pose != None:
self.gx = pose[0]
self.gy = pose[1]
self.th = pose[2]
def update_pose (self, wl, wr):
"""
Incremental position estimation: update the current pose (x, y, th) of the robot
taking into account the newly read positions (rotations) of the left and right wheels
Returns the new robot's pose, as a triple (x, y, th)
"""
return (self.gx, self.gy, self.th)
"""
Pyhop, version 1.2.2 -- a simple SHOP-like planner written in Python.
Author: Dana S. Nau, 2013.05.31
Copyright 2013 Dana S. Nau - http://www.cs.umd.edu/~nau
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Pyhop should work correctly in both Python 2.7 and Python 3.2.
For examples of how to use it, see the example files that come with Pyhop.
Pyhop provides the following classes and functions:
- foo = State('foo') tells Pyhop to create an empty state object named 'foo'.
To put variables and values into it, you should do assignments such as
foo.var1 = val1
- bar = Goal('bar') tells Pyhop to create an empty goal object named 'bar'.
To put variables and values into it, you should do assignments such as
bar.var1 = val1
- print_state(foo) will print the variables and values in the state foo.
- print_goal(foo) will print the variables and values in the goal foo.
- declare_operators(o1, o2, ..., ok) tells Pyhop that o1, o2, ..., ok
are all of the planning operators; this supersedes any previous call
to declare_operators.
- print_operators() will print out the list of available operators.
- declare_methods('foo', m1, m2, ..., mk) tells Pyhop that m1, m2, ..., mk
are all of the methods for tasks having 'foo' as their taskname; this
supersedes any previous call to declare_methods('foo', ...).
- print_methods() will print out a list of all declared methods.
- pyhop(state1,tasklist) tells Pyhop to find a plan for accomplishing tasklist
(a list of tasks), starting from an initial state state1, using whatever
methods and operators you declared previously.
- In the above call to pyhop, you can add an optional 3rd argument called
'verbose' that tells pyhop how much debugging printout it should provide:
- if verbose = 0 (the default), pyhop returns the solution but prints nothing;
- if verbose = 1, it prints the initial parameters and the answer;
- if verbose = 2, it also prints a message on each recursive call;
- if verbose = 3, it also prints info about what it's computing.
"""
# Pyhop's planning algorithm is very similar to the one in SHOP and JSHOP
# (see http://www.cs.umd.edu/projects/shop). Like SHOP and JSHOP, Pyhop uses
# HTN methods to decompose tasks into smaller and smaller subtasks, until it
# finds tasks that correspond directly to actions. But Pyhop differs from
# SHOP and JSHOP in several ways that should make it easier to use Pyhop
# as part of other programs:
#
# (1) In Pyhop, one writes methods and operators as ordinary Python functions
# (rather than using a special-purpose language, as in SHOP and JSHOP).
#
# (2) Instead of representing states as collections of logical assertions,
# Pyhop uses state-variable representation: a state is a Python object
# that contains variable bindings. For example, to define a state in
# which box b is located in room r1, you might write something like this:
# s = State()
# s.loc['b'] = 'r1'
#
# (3) You also can define goals as Python objects. For example, to specify
# that a goal of having box b in room r2, you might write this:
# g = Goal()
# g.loc['b'] = 'r2'
# Like most HTN planners, Pyhop will ignore g unless you explicitly
# tell it what to do with g. You can do that by referring to g in
# your methods and operators, and passing g to them as an argument.
# In the same fashion, you could tell Pyhop to achieve any one of
# several different goals, or to achieve them in some desired sequence.
#
# (4) Unlike SHOP and JSHOP, Pyhop doesn't include a Horn-clause inference
# engine for evaluating preconditions of operators and methods. So far,
# I've seen no need for it; I've found it easier to write precondition
# evaluations directly in Python. But I could consider adding such a
# feature if someone convinces me that it's really necessary.
#
# Accompanying this file are several files that give examples of how to use
# Pyhop. To run them, launch python and type "import blocks_world_examples"
# or "import simple_travel_example".
from __future__ import print_function
import copy,sys, pprint
############################################################
# States and goals
class State():
"""A state is just a collection of variable bindings."""
def __init__(self,name):
self.__name__ = name
class Goal():
"""A goal is just a collection of variable bindings."""
def __init__(self,name):
self.__name__ = name
### print_state and print_goal are identical except for the name
def print_state(state,indent=4):
"""Print each variable in state, indented by indent spaces."""
if state != False:
for (name,val) in vars(state).items():
if name != '__name__':
for x in range(indent): sys.stdout.write(' ')
sys.stdout.write(state.__name__ + '.' + name)
print(' =', val)
else: print('False')
def print_goal(goal,indent=4):
"""Print each variable in goal, indented by indent spaces."""
if goal != False:
for (name,val) in vars(goal).items():
if name != '__name__':
for x in range(indent): sys.stdout.write(' ')
sys.stdout.write(goal.__name__ + '.' + name)
print(' =', val)
else: print('False')
############################################################
# Helper functions that may be useful in domain models
def forall(seq,cond):
"""True if cond(x) holds for all x in seq, otherwise False."""
for x in seq:
if not cond(x): return False
return True
def find_if(cond,seq):
"""
Return the first x in seq such that cond(x) holds, if there is one.
Otherwise return None.
"""
for x in seq:
if cond(x): return x
return None
############################################################
# Commands to tell Pyhop what the operators and methods are
operators = {}
methods = {}
def declare_operators(*op_list):
"""
Call this after defining the operators, to tell Pyhop what they are.
op_list must be a list of functions, not strings.
"""
operators.update({op.__name__:op for op in op_list})
return operators
def declare_methods(task_name,*method_list):
"""
Call this once for each task, to tell Pyhop what the methods are.
task_name must be a string.
method_list must be a list of functions, not strings.
"""
methods.update({task_name:list(method_list)})
return methods[task_name]
############################################################
# Commands to find out what the operators and methods are
def print_operators(olist=operators):
"""Print out the names of the operators"""
print('OPERATORS:', ', '.join(olist))
def print_methods(mlist=methods):
"""Print out a table of what the methods are for each task"""
print('{:<14}{}'.format('TASK:','METHODS:'))
for task in mlist:
print('{:<14}'.format(task) + ', '.join([f.__name__ for f in mlist[task]]))
############################################################
# The actual planner
def pyhop(state,tasks,verbose=0):
"""
Try to find a plan that accomplishes tasks in state.
If successful, return the plan. Otherwise return False.
"""
if verbose>0: print('** pyhop, verbose={}: **\n state = {}\n tasks = {}'.format(verbose, state.__name__, tasks))
result = seek_plan(state,tasks,[],0,verbose)
if verbose>0: print('** result =',result,'\n')
return result
def seek_plan(state,tasks,plan,depth,verbose=0):
"""
Workhorse for pyhop. state and tasks are as in pyhop.
- plan is the current partial plan.
- depth is the recursion depth, for use in debugging
- verbose is whether to print debugging messages
"""
if verbose>1: print('depth {} tasks {}'.format(depth,tasks))
if tasks == []:
if verbose>2: print('depth {} returns plan {}'.format(depth,plan))
return plan
task1 = tasks[0]
if task1[0] in operators:
if verbose>2: print('depth {} action {}'.format(depth,task1))
operator = operators[task1[0]]
newstate = operator(copy.deepcopy(state),*task1[1:])
if verbose>2:
print('depth {} new state:'.format(depth))
print_state(newstate)
if newstate:
solution = seek_plan(newstate,tasks[1:],plan+[task1],depth+1,verbose)
if solution != False:
return solution
if task1[0] in methods:
if verbose>2: print('depth {} method instance {}'.format(depth,task1))
relevant = methods[task1[0]]
for method in relevant:
subtasks = method(state,*task1[1:])
# Can't just say "if subtasks:", because that's wrong if subtasks == []
if verbose>2:
print('depth {} new tasks: {}'.format(depth,subtasks))
if subtasks != False:
solution = seek_plan(state,subtasks+tasks[1:],plan,depth+1,verbose)
if solution != False:
return solution
if verbose>2: print('depth {} returns failure'.format(depth))
return False
"""
============== UniBo: AI and Robotics 2025 ==============
Base code: gateway to the robot (or simulator)
This is a base version, it needs to be completed by the students with new
subscribers, publishers or services needed for the different labs
(c) 2024-2025 Alessandro Saffiotti
"""
import math, time
import numpy as np
import rclpy
import rclpy.executors
from geometry_msgs.msg import Twist
from sensor_msgs.msg import JointState, Range
node = None
robot_subs = None
sonar_subs = []
vel_publisher_node = None
cmd_vel_publisher = None
encoders = None
sonars = [0.0] * 12
class Quaternion:
"""
ROS measures angles in quaternions, this class provides a convenience
function to convert quaternions into Euler angles
"""
def __init__ (self, x, y, z, w):
self.x = x
self.y = y
self.z = z
self.w = w
def to_euler (self):
x = self.x
y = self.y
z = self.z
w = self.w
# Normalize quaternion to avoid errors
norm = np.sqrt(x**2 + y**2 + z**2 + w**2)
x /= norm
y /= norm
z /= norm
w /= norm
# Roll
roll = np.arctan2(2.0 * (w * x + y * z), 1.0 - 2.0 * (x * x + y * y))
# Pitch
sin_pitch = 2.0 * (w * y - z * x)
sin_pitch = np.clip(sin_pitch, -1.0, 1.0)
pitch = np.arcsin(sin_pitch)
# Yaw
yaw = np.arctan2(2.0 * (w * z + x * y), 1.0 - 2.0 * (y * y + z * z))
return roll, pitch, yaw
def sonar_ring_pose (bearing):
"""
This is a convenience function to compute the pose of each sonar sensor in the robot base
"""
rho = 0.260
phi = math.radians(bearing)
x = rho * math.cos(phi)
y = rho * math.sin(phi)
return (x, y, phi)
parameters = { # robot's parameters, these are for the Tiago base
'wheel_radius' : 0.0985,
'wheel_axis' : 0.4044,
'sonar_num' : 12,
'sonar_maxrange' : 3.0,
'sonar_delta' : math.radians(25.0),
'sonar_poses' : [sonar_ring_pose(0.0),
sonar_ring_pose(30.0),
sonar_ring_pose(60.0),
sonar_ring_pose(90.0),
sonar_ring_pose(120.0),
sonar_ring_pose(150.0),
sonar_ring_pose(180.0),
sonar_ring_pose(210.0),
sonar_ring_pose(240.0),
sonar_ring_pose(270.0),
sonar_ring_pose(300.0),
sonar_ring_pose(330.0)]
}
def encoders_callback (msg):
"""
Called when a new wheel encoder message is received
"""
global encoders
encoders = (msg.position[1], msg.position[0])
#node.get_logger().info('Encoders: "%f, %f"' % (msg.position[12], msg.position[13]))
def sonar_callback (i):
"""
Called when a new sonar range message is received
"""
def cb(msg):
global sonars
sonars[i] = msg.range
#robot_subs.get_logger().info('Range: %f' % (msg.range))
return cb
def is_ready ():
"""
Check if ROS has already started publishing messages
"""
executor.spin_once(timeout_sec=0.01)
return encoders != None
def robot_alive ():
"""
Check if ROS is still running
"""
return rclpy.ok()
def init_ros ():
"""
Initialize ROS
"""
rclpy.init()
print("ROS initialized")
def startup_robot ():
"""
Set up the communication channels with the robot (or simulator)
and perform any initialization needed
"""
global parameters
global cmd_vel_publisher, robot_subs
global node, vel_publisher_node, executor
node = rclpy.create_node('robot_client_nodes')
vel_publisher_node = rclpy.create_node('vel_publisher_node')
cmd_vel_publisher = vel_publisher_node.create_publisher(Twist, '/mobile_base_controller/cmd_vel_unstamped', 1000)
robot_subs = init_subs()
executor = rclpy.executors.MultiThreadedExecutor()
executor.add_node(robot_subs)
executor.add_node(vel_publisher_node)
print("Robot started")
return parameters
def init_subs ():
"""
Create subscriptions to all the relevant topics, and link them
to the corresponding callback functions. You may need to extend
this if you want to subscribe to additional topics.
"""
global robot_subs, enc_sub, executor
robot_subs = rclpy.create_node('robot_subs_nodes')
enc_sub = robot_subs.create_subscription(
JointState,
'/joint_states',
encoders_callback,
1)
for i in range(parameters['sonar_num']):
sonar_sub = robot_subs.create_subscription(
Range,
'/sonar{:02d}_base'.format(i+1),
sonar_callback(i),
1)
sonar_subs.append(sonar_sub)
return robot_subs
def get_sonar_data ():
"""
Get the latest received values from the sonar ring
Returns an array of readings in the form (sonar pose, range)
Sonar pose is (x, y, th) in robot's base frame
"""
duration = 0.01 # spin for this duration
start_time = time.time()
executor.add_node(robot_subs)
while rclpy.ok():
executor.spin_once(timeout_sec=0.01)
if time.time() - start_time > duration: # duration exceeded
break
res = []
for i in range(parameters['sonar_num']):
res.append((parameters['sonar_poses'][i], sonars[i]))
#node.get_logger().info('Data %f' % (sonars[i]))
return res
def get_wheel_encoders ():
"""
Get the latest received values of wheel encoders, which give the current
position of each wheel, in radiants
"""
global encoders
duration = 0.01 # spin for this duration
start_time = time.time()
executor.add_node(robot_subs)
while rclpy.ok():
executor.spin_once(timeout_sec=0.01)
if time.time() - start_time > duration: # duration exceeded
break
if encoders == None:
return (0.0, 0.0)
return encoders
def set_vel_values (vlin, vrot):
"""
Set the new linear and rotational velocities for the robot's base
vlin is m/sec, vrot is rad/sec
Returns True if successful
"""
msg = Twist()
msg.linear.x = vlin
msg.angular.z = vrot
cmd_vel_publisher.publish(msg)
return True
def shutdown_robot ():
"""
This should perform any finalization needed on the robot,
and close all the communication channels.
"""
node.destroy_node()
robot_subs.destroy_node()
vel_publisher_node.destroy_node()
rclpy.shutdown()
print("Robot shut")
"""
============== UniBo: AI and Robotics 2025 ==============
Base code: top level execution loop
To be customized in an incremental way for the different labs
(c) 2024-2025 Alessandro Saffiotti
"""
from time import perf_counter, sleep
import robot_gwy, observer, controller
class Timer:
"""
Timing functions for synchronizing the control loop
"""
def __init__(self):
self.last = perf_counter()
def reset(self):
self.last = perf_counter()
def elapsed(self):
"""Retun the time elapsed since last sync"""
return perf_counter() - self.last
def sync(self, cycle):
"""Sleep until elapsed time is cycle"""
current = perf_counter()
elapsed = current - self.last
if cycle > elapsed:
sleep(cycle - elapsed)
class TopLevelLoop:
"""
Top level execution loop
"""
def __init__(self,
mypose = None, # robot's starting pose, takes from the map if None
tcycle = 0.1, # cycle time, in sec
debug = 0 # debug level, use to decide what debug info to print
):
self.tcycle = tcycle
self.debug = debug
self.mypose = mypose # estimated robot's pose
self.obs = None # observer instance
self.ctr = None # controller instance
self.clock = None # timer instance
robot_gwy.init_ros()
def start (self, maxsteps = 0, behavior = 'Wander', behavior_params = None):
"""
Start the Top Level loop, initialize all the components, and run it
Times out with failure after 'maxsteps' steps (zero = no timeout)
Debug level can be used to selectively print debug information
"""
robot_params = robot_gwy.startup_robot() # start robot and get its parameters
self.clock = Timer() # create timer
self.obs = observer.Observer(robot_params) # create observer
self.obs.init_pose(self.mypose) # give inital position to observer
self.ctr = controller.Controller(robot_params) # create controller
self.ctr.set_behavior(bname = behavior, bparam = behavior_params)
while not robot_gwy.is_ready(): # wait until robot is ready
pass
self.run(maxsteps) # run the main loop
robot_gwy.shutdown_robot() # shut down robot
def run (self, maxsteps = 0, threshold = 0.7):
"""
Run this instance of the Top Level loop until completion
Return True for successful execution, False for failure
together with the degree of achievement of the running behavior
"""
if self.debug > 0:
print("Top level loop started")
nsteps = 0
done = 0.0
while True:
if not robot_gwy.robot_alive(): # ROS was killed
if self.debug > 0:
print("ROS killed: exiting")
break
# execute control pipeline
result, done = self.step()
# and check the results
if result == False: # action failure
if self.debug > 0:
print("Execution failed")
break
if done > threshold: # degree of achievement above acceptance
if self.debug > 0:
print("Execution completed")
return True, done # we are done, return True
nsteps += 1
if maxsteps > 0 and nsteps > maxsteps: # timeout
if self.debug > 0:
print("Max number of steps reached:", nsteps, ">", maxsteps, ", exiting")
break
return False, 0.0 # something wrong, return False
def step (self):
"""
The basic control pipeline: read sensors, estimate state, decide controls, send controls
Return True for successful execution, plus the current degree of achievement
"""
# Read proprioceptive sensors (wheel rotations)
wl, wr = robot_gwy.get_wheel_encoders() # read wheel encoders
self.mypose = self.obs.update_pose(wl, wr) # estimate state (robot's pose)
if self.debug > 1:
self.print_pose()
# Read exteroceptive sensors (sonars)
sdata = robot_gwy.get_sonar_data()
if self.debug > 1:
print("sonars =", " ".join(["{:.2f}".format(s[1]) for s in sdata]))
# Compute control values
state = { # state passed to the controller
"mypose": self.mypose,
"sdata": sdata,
}
done = self.ctr.run(state, self.debug) # decide controls (robot's vels)
vlin = self.ctr.get_vlin() # retrieve computed controls (vlin)
vrot = self.ctr.get_vrot() # retrieve computed controls (vrot)
robot_gwy.set_vel_values(vlin, vrot) # send controls (comment this out to use kbd teleop)
# sync on cycle time
self.clock.sync(self.tcycle)
if self.debug == -1:
print("Cycle = {:.3f}".format(self.clock.elapsed()))
self.clock.reset()
return True, done # return False to exit the loop
def print_pose (self):
print("Pose = ({:.2f}, {:.2f}, {:.2f})".format(self.mypose[0], self.mypose[1], self.mypose[2]))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment