Commit 366df144 authored by Nicholas Shindler's avatar Nicholas Shindler
Browse files

add missing scripts

parent 63a9a014
# mp-test.py -- tests for the mindprobe running under the BTB
from mpclient import *
import sys
import unittest
class TestParameters(unittest.TestCase):
def setUp(self):
btb.clear_world()
btb.set_default_params()
def testSimpleParams(self):
"""Test that simple parameters can be set to non-default values."""
simple_params = ("robot_crash_radius",
"robot_wheelbase",
"robot_wheel_diameter",
"robot_wheel_clicks",
"robot_max_acceleration",
"robot_motor_command_max",
"max_run_time",
"ticks_per_sec",
"field_radius",
"pot_radius")
for param in simple_params:
exec "x0 = get_%s()" % param
self.assert_(x0 > 0)
exec "set_%s(x0 + 1)" % param
exec "x1 = get_%s()" % param
self.assertEqual(x1, x0 + 1)
def testBoundaryParams(self):
"""Test that the boundary can be set."""
(x0, y0, x1, y1) = get_boundary_location()
set_boundary_location(x0 + 2, y0 - 2, x1 + 3, y1 - 4)
(x0b, y0b, x1b, y1b) = get_boundary_location()
self.assertEqual(x0b, x0 + 2)
self.assertEqual(y0b, y0 - 2)
self.assertEqual(x1b, x1 + 3)
self.assertEqual(y1b, y1 - 4)
class TestRobotMotion(unittest.TestCase):
def setUp(self):
btb.clear_world()
btb.set_default_params()
self.x0 = 1
self.y0 = 1
self.h0 = deg2rad(45)
self.r = create_robot(x = self.x0, y = self.y0, heading = self.h0)
def testRobotStopped(self):
"""Test that a robot stopped does not move."""
send_magic_message(self.r, "stop")
run_world(seconds = 1)
(x, y, heading) = get_robot_position(self.r)
self.assertEqual(x, self.x0)
self.assertEqual(y, self.y0)
self.assertEqual(heading, deg2rad(45))
def testRobotForward(self):
"""Test that a robot moves forward"""
send_magic_message(self.r, "forward")
send_magic_message(self.r, "slow")
run_world(seconds = 1)
(x, y, heading) = get_robot_position(self.r)
self.assertEqual(x, y)
self.assert_(x > self.x0)
self.assertEqual(heading, self.h0)
def testRobotTurnLeft(self):
"""Test that a robot turns left"""
send_magic_message(self.r, "left")
send_magic_message(self.r, "slow")
run_world(seconds = 1)
(x, y, heading) = get_robot_position(self.r)
self.assert_(heading > self.h0)
self.assert_(x > self.x0)
self.assert_(y > self.y0)
def testRobotTurnRight(self):
"""Test that a robot turns left"""
send_magic_message(self.r, "right")
send_magic_message(self.r, "slow")
run_world(seconds = 1)
(x, y, heading) = get_robot_position(self.r)
self.assert_(heading < self.h0)
self.assert_(x > self.x0)
self.assert_(y > self.y0)
class TestRobotErrors(unittest.TestCase):
def setUp(self):
btb.clear_world()
btb.set_default_params()
def testRobotRobotCrash(self):
"""Test that robots crashing generates an exception"""
r0 = create_robot(x = 0, y = 0, heading = 0)
r1 = create_robot(x = 2, y = 0, heading = 0)
send_magic_message(r0, "forward")
send_magic_message(r0, "slow")
send_magic_message(r1, "reverse")
send_magic_message(r1, "fast")
try:
run_world() # kablam!
except RuntimeError, (message,):
self.assertEqual(message, "robot-robot crash detected")
(x0, y0, h0) = get_robot_position(r0)
(x1, y1, h1) = get_robot_position(r1)
self.assertEqual(h0, 0)
self.assertEqual(h1, 0)
radius = get_robot_crash_radius()
self.assert_(distance(x0, y0, x1, y1) <= 2 * radius)
def testRobotAwol(self):
"""Test that BTB detects robot wandering off"""
field_radius = 3
r0 = create_robot(x = 0, y = 0, heading = 0)
set_field_radius(field_radius)
send_magic_message(r0, "forward")
send_magic_message(r0, "fast")
try:
run_world()
except RuntimeError, (message,):
self.assertEqual(message, "robot wandered off")
(x0, y0, h0) = get_robot_position(r0)
self.assertEqual(h0, 0)
self.assert_(distance(0, 0, x0, y0) >= field_radius)
if __name__ == '__main__':
unittest.main()
......@@ -20,26 +20,5 @@ print host, port
connect(host, port)
# enable_probes(1125)
# start_probes()
# write_probe(1125,1)
# enable_probes((1125,1077,1080,1206,1207))
# time.sleep(0.3)
# start_probes()
# time.sleep(0.3)
# write_probe(1125,1)
# time.sleep(0.5)
# for i in range(0, 100, 5):
# n = float(i) / 100.0
# print n
# write_probes([(1206,n),(1207,-n)])
# time.sleep(0.1)
# for i in range(0, 100, 2):
# n = float(i) / 100.0
# print n
# write_probes([(1206,n),(1207,n)])
# time.sleep(0.1)
code.interact("", local=globals())
#
# mp-client.py -- dummy mindprobe client for testing
#
......@@ -13,46 +14,45 @@ import os
import sys
import subprocess
MP_TLV_NULL = 0
MP_TLV_CONNECTED = 1
MP_TLV_DISCONNECTED = 2
MP_TLV_PROTOCOL_VERSION = 3
MP_TLV_DISCOVER_PROBES = 4
MP_TLV_HZ = 5
MP_TLV_PROBE_DEF = 6
MP_TLV_ENABLE_PROBES = 7
MP_TLV_START_PROBES = 8
MP_TLV_STOP_PROBES = 9
MP_TLV_PROBE_DATA = 10
MP_TLV_CURRENT_TICK = 11
MP_TLV_MISSED_DATA = 12
MP_TLV_DEBUG_MESSAGE = 13
MP_TLV_MESSAGE_TEXT = 14
MP_TLV_NULL = 0
MP_TLV_CONNECTED = 1
MP_TLV_DISCONNECTED = 2
MP_TLV_PROTOCOL_VERSION = 3
MP_TLV_DISCOVER_PROBES = 4
MP_TLV_HZ = 5
MP_TLV_PROBE_DEF = 6
MP_TLV_ENABLE_PROBES = 7
MP_TLV_START_PROBES = 8
MP_TLV_STOP_PROBES = 9
MP_TLV_PROBE_DATA = 10
MP_TLV_CURRENT_TICK = 11
MP_TLV_MISSED_DATA = 12
MP_TLV_DEBUG_MESSAGE = 13
MP_TLV_MESSAGE_TEXT = 14
MP_TLV_MISSED_DEBUG_MESSAGES = 15
MP_TLV_WRITE_PROBES = 16
MP_TLV_WRITE_PROBES = 16
# Type names for probe type enumerations, and pack/unpack directives:
type_table = {
1: ("uint8", "<B"), # 1-byte unsigned integer
2: ("int8", "<b"), # 1-byte signed integer
3: ("uint16", "<H"), # 2-byte unsigned integer
4: ("int16", "<h"), # 2-byte signed integer
5: ("uint32", "<I"), # 4-byte unsigned integer
6: ("int32", "<i"), # 4-byte signed integer
7: ("uint64", "<Q"), # 8-byte unsigned integer
8: ("int64", "<q"), # 8-byte signed integer
9: ("float", "<f"), # 4-byte float (IEEE 754)
10: ("double", "<d"), # 8-byte float (IEEE 754)
11: ("tlv", ""), # Variable-length TLV data
12: ("bool", "<B"), # 1-byte boolean integer (1 or 0)
13: ("string", ""), # variable-size null-terminated char string
1: ('uint8', '<B'), # 1-byte unsigned integer
2: ('int8', '<b'), # 1-byte signed integer
3: ('uint16', '<H'), # 2-byte unsigned integer
4: ('int16', '<h'), # 2-byte signed integer
5: ('uint32', '<I'), # 4-byte unsigned integer
6: ('int32', '<i'), # 4-byte signed integer
7: ('uint64', '<Q'), # 8-byte unsigned integer
8: ('int64', '<q'), # 8-byte signed integer
9: ('float', '<f'), # 4-byte float (IEEE 754)
10: ('double', '<d'), # 8-byte float (IEEE 754)
11: ('tlv', ''), # Variable-length TLV data
12: ('bool', '<B'), # 1-byte boolean integer (1 or 0)
13: ('string', ''), # variable-size null-terminated char string
}
capture_buffer = ()
def connect(host, port=4400):
def connect(host, port = 4400):
global s, recvbuf, probe_defs, hz, version
global capture_buffer_lock
......@@ -60,7 +60,7 @@ def connect(host, port=4400):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
recvbuf = ""
recvbuf = ''
print "Connected"
start_listener()
......@@ -68,80 +68,63 @@ def connect(host, port=4400):
# Expect the server to send us the protocol version, unsolicited:
(t, l, v) = get_next_message()
if (t != MP_TLV_PROTOCOL_VERSION) or (l != 2):
raise RuntimeError("unexpected message t=%d l=%d" % (t, l))
raise RuntimeError('unexpected message t=%d l=%d' % (t, l))
version = struct.unpack("<H", v)[0]
# Query the tick rate:
send_message(make_tlv(MP_TLV_HZ))
(t, l, v) = get_next_message()
while t != MP_TLV_HZ:
print ("message id {}").format(t)
time.sleep(0.1)
(t, l, v) = get_next_message()
if (t != MP_TLV_HZ) or (l != 2):
raise RuntimeError("unexpected message t=%d l=%d" % (t, l))
raise RuntimeError('unexpected message t=%d l=%d' % (t, l))
hz = struct.unpack("<H", v)[0]
# Discover the probes:
discover_probes()
print "Protocol version = 0x%04x, sample rate is %d Hz, %d probes" % (
version,
hz,
len(probe_defs.keys()),
)
print "Protocol version = 0x%04x, sample rate is %d Hz, %d probes" % \
(version, hz, len(probe_defs.keys()))
def disconnect():
global s
stop_listener()
s.close()
def discover_probes():
global probe_defs, probe_names, version
new_probe_defs = {} # dictionary of id: (name, type)
new_probe_names = {} # dictionary of name: id
try:
probe_names
except NameError:
probe_names = {}
probe_defs = {}
new_probe_names = {} # dictionary of name: id
send_message(make_tlv(MP_TLV_DISCOVER_PROBES))
while True:
(t, l, def_items) = get_next_message()
if t != MP_TLV_DISCOVER_PROBES:
raise RuntimeError("unexpected message t=%d l=%d" % (t, l))
if (t != MP_TLV_DISCOVER_PROBES):
raise RuntimeError('unexpected message t=%d l=%d' % (t, l))
while len(def_items):
(t, l, v, def_items) = next_tlv(def_items)
if t != MP_TLV_PROBE_DEF:
raise RuntimeError("unexpected message t=%d l=%d" % (t, l))
raise RuntimeError('unexpected message t=%d l=%d' % (t, l))
(probe_id, probe_type, probe_length) = struct.unpack("<HHH", v[0:6])
# Name has null terminator, which Python doesn't need.
probe_name = v[6 : len(v) - 1]
# new_probe_names[probe_name] = probe_id
# new_probe_defs[probe_id] = (probe_name, probe_type, probe_length)
probe_names[probe_name] = probe_id
probe_defs[probe_id] = (probe_name, probe_type, probe_length)
probe_name = v[6:len(v)-1]
new_probe_names[probe_name] = probe_id
new_probe_defs[probe_id] = (probe_name, probe_type, probe_length)
pass
if len(def_items) == 0 or version < 0x0104:
# probe_defs = new_probe_defs
# probe_names = new_probe_names
if (len(def_items) == 0 or
version < 0x0104):
probe_defs = new_probe_defs
probe_names = new_probe_names
break
pass
return
def show_probes():
global probe_defs
......@@ -150,33 +133,26 @@ def show_probes():
type_name = type_table[type][0]
print "probe id %d: %s type %s length %d" % (id, name, type_name, length)
def make_tlv_uint8(t, v):
return struct.pack("<HHH", t, 1, v)
def make_tlv_uint16(t, v):
return struct.pack("<HHH", t, 2, v)
def make_tlv_uint32(t, v):
return struct.pack("<HHI", t, 4, v)
def make_tlv(t, v=""):
def make_tlv(t, v = ''):
return struct.pack("<HH", t, len(v)) + v
def tlv_len(message):
(t, l) = struct.unpack("<HH", message[0:4])
return l
def next_tlv(message):
(t, l) = struct.unpack("<HH", message[0:4])
v = message[4 : 4 + l]
return (t, l, v, message[4 + l :])
v = message[4:4+l]
return (t, l, v, message[4+l:])
def send_message(message):
global s
......@@ -184,59 +160,53 @@ def send_message(message):
# print struct.unpack("%dB" % len(message), message)
s.send(message)
def lookup_probe_id(probe):
global probe_names
if type(probe) == str:
if not probe in probe_names:
raise ValueError("unknown probe %s" % probe)
raise ValueError('unknown probe %s' % probe)
probe_id = probe_names[probe]
elif type(probe) == int:
probe_id = probe
else:
raise ValueError("probe must be int or string")
raise ValueError('probe must be int or string')
return probe_id
def enable_probes(probes):
# allow for singleton argument
if type(probes) != tuple:
probes = ((probes),)
v = ""
probes = (probes),
v = ''
for probe in probes:
probe_id = lookup_probe_id(probe)
v += struct.pack("<H", probe_id)
send_message(make_tlv(MP_TLV_ENABLE_PROBES, v))
def start_probes():
global capturing
capturing = True
send_message(make_tlv(MP_TLV_START_PROBES))
def stop_probes():
global capturing
send_message(make_tlv(MP_TLV_STOP_PROBES))
capturing = False
def do_debug_message(message):
(t, l, v, message) = next_tlv(message)
if t != MP_TLV_CURRENT_TICK:
raise RuntimeError("unexpected type = %d" % t)
tick = struct.unpack("<I", v)[0]
raise RuntimeError('unexpected type = %d' % t)
tick = struct.unpack('<I', v)[0]
(t, l, v, message) = next_tlv(message)
if t != MP_TLV_MESSAGE_TEXT:
raise RuntimeError("unexpected type = %d" % t)
message_text = v[0 : len(v) - 1]
if message_text[-1] == "\n":
message_text = message_text[0 : len(message_text) - 1]
print ("Debug message (t = %d): %s" % (tick, message_text))
raise RuntimeError('unexpected type = %d' % t)
message_text = v[0:len(v)-1]
if message_text[-1] == '\n':
message_text = message_text[0:len(message_text)-1]
print("Debug message (t = %d): %s" % (tick, message_text))
def do_async_message(t, l, v):
if t == MP_TLV_PROBE_DATA:
capture_data(v)
......@@ -245,20 +215,19 @@ def do_async_message(t, l, v):
do_debug_message(v)
return True
elif t == MP_TLV_MISSED_DATA:
print ("Missed %d probe messages" % struct.unpack("<I", v))
print("Missed %d probe messages" % struct.unpack('<I', v))
return True
elif t == MP_TLV_MISSED_DEBUG_MESSAGES:
print ("Missed %d debug messages" % struct.unpack("<I", v))
print("Missed %d debug messages" % struct.unpack('<I', v))
return True
else:
return False # not an asynchronous message
# Receive one message from the socket:
def receive_next_message():
global recvbuf, s
while (len(recvbuf) < 4) or (len(recvbuf) < tlv_len(recvbuf)):
while ((len(recvbuf) < 4) or (len(recvbuf) < tlv_len(recvbuf))):
recvbuf += s.recv(65536)
# print "recvbuf is: ",
# print struct.unpack("%dB" % len(recvbuf), recvbuf)
......@@ -267,7 +236,6 @@ def receive_next_message():
return (t, l, v)
def get_next_message():
global listener_q
(t, l, v) = listener_q.get()
......@@ -281,7 +249,6 @@ def capture_buffer_len():
capture_buffer_lock.release()
return l
def capture(t):
global capture_buffer, hz
......@@ -295,19 +262,17 @@ def capture(t):
time.sleep(t) # let the listener thread do the capturing
stop_probes()
def capture_data(data):
global capturing
global capture_buffer
# Add the data an item in the caputre buffer.
if capturing:
capture_buffer_lock.acquire()
capture_buffer += (data,)
capture_buffer += (data, )
capture_buffer_lock.release()
def show_capture(interval=None):
def show_capture(interval = None):
global capture_buffer, hz
global probe_defs
......@@ -329,23 +294,20 @@ def show_capture(interval=None):
print "bad capture item, first element not tick"
continue
(val,) = struct.unpack("<I", v)
print "tick =", val,
(val, ) = struct.unpack('<I', v)
print 'tick =', val,
while len(item):
(t, l, v, item) = next_tlv(item)
name = probe_defs[t][0]
type = probe_defs[t][1]
print (len(v))
print (v)
val = decode_probe_data(type, v)
print name, "=", val,
print name, '=', val,
print ""
capture_buffer_lock.release()
def write_probe(probe, value):
global probe_defs
......@@ -355,11 +317,10 @@ def write_probe(probe, value):
v = make_tlv(probe_id, encode_probe_value(probe_type, value))
send_message(make_tlv(MP_TLV_WRITE_PROBES, v))
def write_probes(probevals):
global probe_defs
v = ""
v = ''
for (probe, value) in probevals:
probe_id = lookup_probe_id(probe)
......@@ -368,62 +329,55 @@ def write_probes(probevals):
v += make_tlv(probe_id, encode_probe_value(probe_type, value))
send_message(make_tlv(MP_TLV_WRITE_PROBES, v))
def encode_probe_value(probe_type, value):
global type_table
if type_table[probe_type][0] == "tlv":