Stretch Body API Reference
Stretch Body is the Python interface for working with Stretch. This page serves as a reference for the interfaces defined in the stretch_body
library.
See the Stretch Body Tutorials for additional information on working with this library.
The Robot Class
The most common interface to Stretch is the Robot class. This class encapsulates all devices on the robot. It is typically initialized as:
1 2 3 4 5 6 7 8 9 10 11 |
|
The startup()
and home()
methods start communication with and home each of the robot's devices, respectively. Through the Robot class, users can interact with all devices on the robot. For example, continuing the example above:
12 13 14 15 16 17 18 19 20 21 22 23 |
|
Each of these devices is defined in separate modules within stretch_body
. In the following section, we'll look at the API of these classes. The stop()
method shuts down communication with the robot's devices.
All methods in the Robot class are documented below.
API to the Stretch Robot
Source code in stretch_body/robot.py
class Robot(Device):
"""
API to the Stretch Robot
"""
def __init__(self):
Device.__init__(self, 'robot')
self.monitor = RobotMonitor(self)
self.trace = RobotTrace(self)
self.collision = RobotCollisionMgmt(self)
self.dirty_push_command = False
self.lock = threading.RLock() #Prevent status thread from triggering motor sync prematurely
self.status = {'pimu': {}, 'base': {}, 'lift': {}, 'arm': {}, 'head': {}, 'wacc': {}, 'end_of_arm': {}}
self.async_event_loop = None
self.pimu=pimu.Pimu()
self.status['pimu']=self.pimu.status
self.base=base.Base()
self.status['base']=self.base.status
self.lift=lift.Lift()
self.status['lift']=self.lift.status
self.arm=arm.Arm()
self.status['arm']=self.arm.status
self.head=head.Head()
self.status['head']=self.head.status
if 'custom_wacc' in self.params:
module_name = self.params['custom_wacc']['py_module_name']
class_name = self.params['custom_wacc']['py_class_name']
self.wacc=getattr(importlib.import_module(module_name), class_name)(self)
else:
self.wacc=wacc.Wacc()
self.status['wacc']=self.wacc.status
self.non_dxl_thread = None
self.dxl_end_of_arm_thread = None
self.sys_thread = None
self.dxl_head_thread = None
self.event_loop_thread = None
self.collision_mgmt_thread = None
self.eoa_name= self.params['tool']
module_name = self.robot_params[self.eoa_name]['py_module_name']
class_name = self.robot_params[self.eoa_name]['py_class_name']
self.end_of_arm = getattr(importlib.import_module(module_name), class_name)()
self.status['end_of_arm'] = self.end_of_arm.status
self.devices={ 'pimu':self.pimu, 'base':self.base, 'lift':self.lift, 'arm': self.arm, 'head': self.head, 'wacc':self.wacc, 'end_of_arm':self.end_of_arm}
self.GLOBAL_EXCEPTIONS_LIST = []
threading.excepthook = self.custom_excepthook
def custom_excepthook(self, args):
thread_name = args.thread.name
Exec = {}
Exec[thread_name] = {
'thread': args.thread,
'exception': {
'type': args.exc_type,
'value': args.exc_value,
'traceback': args.exc_traceback
}
}
# Filter RuntimeError
if Exec[thread_name]['exception']['type'] == RuntimeError:
pass
else:
print(f"Caught Exception in Thread: {thread_name}")
traceback.print_exception(args.exc_value)
self.logger.debug(f"Caught Exception in Thread: {thread_name}")
self.logger.debug(traceback.format_exception(args.exc_value))
self.GLOBAL_EXCEPTIONS_LIST.append(Exec[thread_name])
# ########### Device Methods #############
def startup(self,start_non_dxl_thread=True,start_dxl_thread=True,start_sys_mon_thread=True):
"""
To be called once after class instantiation.
Prepares devices for communications and motion
Returns
-------
bool
true if startup of robot succeeded
"""
did_acquire, self._file_lock = hello_utils.acquire_body_filelock()
if not did_acquire:
print('Another process is already using Stretch. Try running "stretch_free_robot_process.py"')
return False
self.logger.debug('Starting up Robot {0} of batch {1}'.format(self.params['serial_no'], self.params['batch_name']))
success = True
for k in self.devices:
if self.devices[k] is not None:
if not self.devices[k].startup(threaded=False):
success = False
if (self.arm.motor.transport.version==0 \
or self.lift.motor.transport.version==0 \
or self.base.right_wheel.transport.version == 0 \
or self.base.left_wheel.transport.version == 0 \
or self.wacc.transport.version==0 \
or self.pimu.transport.version==0) and self.params['use_asyncio'] :
self.logger.warning('Not able to use asyncio for transport communications. Defaulting to sync.')
self.params['use_asyncio']=0
else:
self.start_event_loop()
#Always startup to load URDFs now and not while thread is running
self.collision.startup()
if not self.params['use_collision_manager']: #Turn it off here but allow user to enable it via SW later
self.disable_collision_mgmt()
else:
self.enable_collision_mgmt()
# Register the signal handlers
signal.signal(signal.SIGTERM, hello_utils.thread_service_shutdown)
signal.signal(signal.SIGINT, hello_utils.thread_service_shutdown)
self.non_dxl_thread = NonDXLStatusThread(self, target_rate_hz=self.params['rates']['NonDXLStatusThread_Hz'])
self.dxl_end_of_arm_thread = DXLEndOfArmStatusThread(self,target_rate_hz=self.params['rates']['DXLStatusThread_Hz'])
self.sys_thread = SystemMonitorThread(self, target_rate_hz=self.params['rates']['SystemMonitorThread_Hz'])
self.dxl_head_thread = DXLHeadStatusThread(self, target_rate_hz=self.params['rates']['DXLStatusThread_Hz'])
self.collision_mgmt_thread = CollisionMonitorThread(self, target_rate_hz=100)
if start_non_dxl_thread:
self.non_dxl_thread.daemon = True
self.non_dxl_thread.start()
ts = time.time()
while not self.non_dxl_thread.first_status and time.time() - ts < 3.0:
time.sleep(0.01)
if start_dxl_thread:
self.dxl_head_thread.daemon = True
self.dxl_head_thread.start()
self.dxl_end_of_arm_thread.daemon = True
self.dxl_end_of_arm_thread.start()
if start_sys_mon_thread:
self.sys_thread.daemon = True
self.sys_thread.start()
if start_sys_mon_thread and self.collision_mgmt_thread:
self.collision_mgmt_thread.daemon = True
self.collision_mgmt_thread.start()
return success
def stop(self):
"""
To be called once before exiting a program
Cleanly stops down motion and communication
"""
self.logger.debug('---- Shutting down robot ----')
self._file_lock.release()
if self.non_dxl_thread:
if self.non_dxl_thread.running:
self.non_dxl_thread.shutdown_flag.set()
self.non_dxl_thread.join(1)
if self.dxl_head_thread:
if self.dxl_head_thread.running:
self.dxl_head_thread.shutdown_flag.set()
self.dxl_head_thread.join(1)
if self.dxl_end_of_arm_thread:
if self.dxl_end_of_arm_thread.running:
self.dxl_end_of_arm_thread.shutdown_flag.set()
self.dxl_end_of_arm_thread.join(1)
if self.sys_thread:
if self.sys_thread.running:
self.sys_thread.shutdown_flag.set()
self.sys_thread.join(1)
if self.collision_mgmt_thread:
if self.collision_mgmt_thread.running:
self.collision_mgmt_thread.shutdown_flag.set()
self.collision_mgmt_thread.join(1)
self.collision.stop()
for k in self.devices:
if self.devices[k] is not None:
self.logger.debug('Shutting down %s'%k)
self.devices[k].stop()
self.stop_event_loop()
self.logger.debug('---- Shutdown complete ----')
def get_status(self):
"""
Thread safe and atomic read of current Robot status data
Returns as a dict.
"""
with self.lock:
return self.status.copy()
def pretty_print(self):
s=self.get_status()
print('##################### HELLO ROBOT ##################### ')
print('Time',time.time())
print('Serial No',self.params['serial_no'])
print('Batch', self.params['batch_name'])
hello_utils.pretty_print_dict('Status',s)
async def push_command_coro(self):
tasks = [self.base.left_wheel.push_command_async(),
self.base.right_wheel.push_command_async(),
self.arm.push_command_async(),
self.lift.push_command_async(),
self.pimu.push_command_async(),
self.wacc.push_command_async()]
results = await asyncio.gather(*tasks)
return results
def push_command(self):
"""
Cause all queued up RPC commands to be sent down to Devices
"""
with self.lock:
ready = self.pimu.is_ready_for_sync()
if self.params['use_asyncio'] and self.async_event_loop is not None:
future = asyncio.run_coroutine_threadsafe(self.push_command_coro(),self.async_event_loop)
future.result()
else:
self.base.push_command()
self.arm.push_command()
self.lift.push_command()
self.pimu.push_command()
self.wacc.push_command()
# Check if need to do a motor sync by looking at if there's been a pimu sync signal sent
# since the last stepper.set_command for each joint
sync_required = (self.pimu.ts_last_motor_sync is not None) and (
self.arm.motor.is_sync_required(self.pimu.ts_last_motor_sync)
or self.lift.motor.is_sync_required(self.pimu.ts_last_motor_sync)
or self.base.right_wheel.is_sync_required(self.pimu.ts_last_motor_sync)
or self.base.left_wheel.is_sync_required(self.pimu.ts_last_motor_sync))
if (self.pimu.ts_last_motor_sync is None or ( ready and sync_required)):
self.pimu.trigger_motor_sync()
def enable_collision_mgmt(self):
self.collision.enable()
def disable_collision_mgmt(self):
self.collision.disable()
def wait_command(self, timeout=15.0, use_motion_generator=True):
"""Pause program execution until all motion complete.
Queuing up motion and pushing it to the hardware with
push_command() is designed to be asynchronous, enabling
reactive control of the robot. However, you might want
sychronous control, where each command's motion is completed
entirely before the program moves on to the next command.
This is where you would use wait_command()
Parameters
----------
timeout : float
How long to wait for motion to complete. Must be > 0.1 sec.
Returns
-------
bool
True if motion completed, False if timed out before motion completed
"""
time.sleep(0.1)
timeout = max(0.0, timeout - 0.1)
done = []
def check_wait(wait_method):
done.append(wait_method(timeout, use_motion_generator))
start = time.time()
threads = []
threads.append(threading.Thread(target=check_wait, args=(self.base.wait_while_is_moving,)))
threads.append(threading.Thread(target=check_wait, args=(self.arm.wait_while_is_moving,)))
threads.append(threading.Thread(target=check_wait, args=(self.lift.wait_while_is_moving,)))
threads.append(threading.Thread(target=check_wait, args=(self.head.wait_until_at_setpoint,)))
threads.append(threading.Thread(target=check_wait, args=(self.end_of_arm.wait_until_at_setpoint,)))
[thread.start() for thread in threads]
[thread.join() for thread in threads]
return all(done)
# ######### Waypoint Trajectory Interface ##############################
def is_trajectory_active(self):
return self.arm.is_trajectory_active() or self.lift.is_trajectory_active() or \
self.base.is_trajectory_active() or self.end_of_arm.is_trajectory_active() or \
self.head.is_trajectory_active()
def follow_trajectory(self):
success = True
success = success and self.arm.follow_trajectory(move_to_start_point=False)
success = success and self.lift.follow_trajectory(move_to_start_point=False)
success = success and self.base.follow_trajectory()
# Check if need to do a motor sync by looking at if there's been a pimu sync signal sent
# since the last stepper.set_command for each joint
sync_required = (self.pimu.ts_last_motor_sync is not None) and (
self.arm.motor.ts_last_syncd_motion > self.pimu.ts_last_motor_sync
or self.lift.motor.ts_last_syncd_motion > self.pimu.ts_last_motor_sync
or self.base.right_wheel.ts_last_syncd_motion > self.pimu.ts_last_motor_sync
or self.base.left_wheel.ts_last_syncd_motion > self.pimu.ts_last_motor_sync)
if self.pimu.ts_last_motor_sync is None or sync_required:
self.pimu.trigger_motor_sync()
success = success and self.end_of_arm.follow_trajectory(move_to_start_point=False)
success = success and self.head.follow_trajectory(move_to_start_point=False)
return success
def stop_trajectory(self):
self.arm.stop_trajectory()
self.lift.stop_trajectory()
self.base.stop_trajectory()
self.end_of_arm.stop_trajectory()
self.head.stop_trajectory()
# ##################Home and Stow #######################################
def is_calibrated(self):
"""
DEPRECATED: replaced by Robot.is_homed()
"""
self.logger.warn('is_calibrated() has been replaced by is_homed()')
return self.is_homed()
def is_homed(self):
"""
Returns true if homing has been run all joints that require it
"""
ready = self.lift.motor.status['pos_calibrated']
ready = ready and self.arm.motor.status['pos_calibrated']
for j in self.end_of_arm.joints:
req = self.end_of_arm.motors[j].params['req_calibration'] and not self.end_of_arm.motors[j].is_calibrated
ready = ready and not req
return ready
def get_stow_pos(self,joint):
"""
Return the stow position of a joint.
Allow the end_of_arm to override the defaults in order to accomodate stowing different tools
"""
if 'stow' in self.end_of_arm.params:
if joint in self.end_of_arm.params['stow']:
return self.end_of_arm.params['stow'][joint]
return self.params['stow'][joint]
def stow(self):
"""
Cause the robot to move to its stow position
Blocking.
"""
self.disable_collision_mgmt()
self.head.move_to('head_pan', self.get_stow_pos('head_pan'))
self.head.move_to('head_tilt',self.get_stow_pos('head_pan'))
lift_stowed=False
pos_lift = self.get_stow_pos('lift')
if self.lift.status['pos']<=pos_lift: #Needs to come up before bring in arm
print('--------- Stowing Lift ----')
self.lift.move_to(pos_lift)
self.push_command()
time.sleep(0.25)
ts = time.time()
while not self.lift.motor.status['near_pos_setpoint'] and time.time() - ts < 4.0:
time.sleep(0.1)
lift_stowed=True
# Run pre stow specific to each end of arm
self.end_of_arm.pre_stow(self)
#Bring in arm before bring down
print('--------- Stowing Arm ----')
self.arm.move_to(self.get_stow_pos('arm'))
self.push_command()
time.sleep(0.25)
ts = time.time()
while not self.arm.motor.status['near_pos_setpoint'] and time.time() - ts < 6.0:
time.sleep(0.1)
self.end_of_arm.stow()
time.sleep(0.25)
#Now bring lift down
if not lift_stowed:
print('--------- Stowing Lift ----')
self.lift.move_to(pos_lift)
self.push_command()
time.sleep(0.25)
ts = time.time()
while not self.lift.motor.status['near_pos_setpoint'] and time.time() - ts < 12.0:
time.sleep(0.1)
#Make sure wrist yaw is done before exiting
while self.end_of_arm.motors['wrist_yaw'].motor.is_moving():
time.sleep(0.1)
self.enable_collision_mgmt()
def home(self):
"""
Cause the robot to home its joints by moving to hardstops
Blocking.
"""
self.disable_collision_mgmt()
if self.head is not None:
print('--------- Homing Head ----')
self.head.home()
# Wrist pitch should be angled somewhere in between the lift leaving
# the base of its range and before it reaches the top of its range.
def _angle_pitch(angle):
time.sleep(1) # wait 1 sec to leave bottom of lift's range
if 'wrist_pitch' in self.end_of_arm.joints:
self.end_of_arm.move_to('wrist_pitch', angle)
# If gripper present
if 'stretch_gripper' in self.end_of_arm.joints and 'wrist_pitch' in self.end_of_arm.joints:
threading.Thread(target=_angle_pitch,args=(self.end_of_arm.params['stow']['wrist_pitch'],)).start()
# No gripper but tablet or accessory present in dw3 wrist, wrist pitch should face downward
elif 'wrist_pitch' in self.end_of_arm.joints:
threading.Thread(target=_angle_pitch,args=(-1.57,)).start()
# Home the lift
if self.lift is not None:
print('--------- Homing Lift ----')
self.lift.home()
# Home the arm
if self.arm is not None:
print('--------- Homing Arm ----')
self.arm.home()
# Home the end-of-arm
if self.end_of_arm is not None:
self.end_of_arm.home()
self.enable_collision_mgmt()
#Let user know it is done
self.pimu.trigger_beep()
self.push_command()
# ################ Helpers #################################
def _pull_status_head_dynamixel(self):
try:
self.head.pull_status()
except SerialException:
self.logger.warning('Serial Exception on Robot._pull_status_head_dynamixel')
def _update_trajectory_head_dynamixel(self):
try:
self.head.update_trajectory()
except SerialException:
self.logger.warning('Serial Exception on Robot._update_trajectory_head_dynamixel()')
def _pull_status_end_of_arm_dynamixel(self):
try:
self.end_of_arm.pull_status()
except SerialException:
self.logger.warning('Serial Exception on Robot._pull_status_end_of_arm_dynamixel')
def _update_trajectory_end_of_arm_dynamixel(self):
try:
self.end_of_arm.update_trajectory()
except SerialException:
self.logger.warning('Serial Exception on Robot._update_trajectory_end_of_arm_dynamixel()')
def _pull_status_non_dynamixel(self):
self.wacc.pull_status()
self.base.pull_status()
self.lift.pull_status()
self.arm.pull_status()
self.pimu.pull_status()
def _update_trajectory_non_dynamixel(self):
self.arm.update_trajectory()
self.lift.update_trajectory()
self.base.update_trajectory()
def _step_sentry(self):
self.head.step_sentry(self)
self.base.step_sentry(self)
self.arm.step_sentry(self)
self.lift.step_sentry(self)
self.end_of_arm.step_sentry(self)
def start_event_loop(self):
self.async_event_loop = asyncio.new_event_loop()
self.event_loop_thread = threading.Thread(target=self._event_loop, name='AsyncEvenLoopThread')
self.event_loop_thread.daemon = True
self.event_loop_thread.start()
def _event_loop(self):
asyncio.set_event_loop(self.async_event_loop)
self.async_event_loop.run_forever()
def stop_event_loop(self):
try:
if self.event_loop_thread:
asyncio.run_coroutine_threadsafe(self.async_event_loop.stop())
self.event_loop_thread.join()
except TypeError as e:
pass
get_status(self)
Thread safe and atomic read of current Robot status data Returns as a dict.
Source code in stretch_body/robot.py
def get_status(self):
"""
Thread safe and atomic read of current Robot status data
Returns as a dict.
"""
with self.lock:
return self.status.copy()
get_stow_pos(self, joint)
Return the stow position of a joint. Allow the end_of_arm to override the defaults in order to accomodate stowing different tools
Source code in stretch_body/robot.py
def get_stow_pos(self,joint):
"""
Return the stow position of a joint.
Allow the end_of_arm to override the defaults in order to accomodate stowing different tools
"""
if 'stow' in self.end_of_arm.params:
if joint in self.end_of_arm.params['stow']:
return self.end_of_arm.params['stow'][joint]
return self.params['stow'][joint]
home(self)
Cause the robot to home its joints by moving to hardstops Blocking.
Source code in stretch_body/robot.py
def home(self):
"""
Cause the robot to home its joints by moving to hardstops
Blocking.
"""
self.disable_collision_mgmt()
if self.head is not None:
print('--------- Homing Head ----')
self.head.home()
# Wrist pitch should be angled somewhere in between the lift leaving
# the base of its range and before it reaches the top of its range.
def _angle_pitch(angle):
time.sleep(1) # wait 1 sec to leave bottom of lift's range
if 'wrist_pitch' in self.end_of_arm.joints:
self.end_of_arm.move_to('wrist_pitch', angle)
# If gripper present
if 'stretch_gripper' in self.end_of_arm.joints and 'wrist_pitch' in self.end_of_arm.joints:
threading.Thread(target=_angle_pitch,args=(self.end_of_arm.params['stow']['wrist_pitch'],)).start()
# No gripper but tablet or accessory present in dw3 wrist, wrist pitch should face downward
elif 'wrist_pitch' in self.end_of_arm.joints:
threading.Thread(target=_angle_pitch,args=(-1.57,)).start()
# Home the lift
if self.lift is not None:
print('--------- Homing Lift ----')
self.lift.home()
# Home the arm
if self.arm is not None:
print('--------- Homing Arm ----')
self.arm.home()
# Home the end-of-arm
if self.end_of_arm is not None:
self.end_of_arm.home()
self.enable_collision_mgmt()
#Let user know it is done
self.pimu.trigger_beep()
self.push_command()
is_calibrated(self)
DEPRECATED: replaced by Robot.is_homed()
Source code in stretch_body/robot.py
def is_calibrated(self):
"""
DEPRECATED: replaced by Robot.is_homed()
"""
self.logger.warn('is_calibrated() has been replaced by is_homed()')
return self.is_homed()
is_homed(self)
Returns true if homing has been run all joints that require it
Source code in stretch_body/robot.py
def is_homed(self):
"""
Returns true if homing has been run all joints that require it
"""
ready = self.lift.motor.status['pos_calibrated']
ready = ready and self.arm.motor.status['pos_calibrated']
for j in self.end_of_arm.joints:
req = self.end_of_arm.motors[j].params['req_calibration'] and not self.end_of_arm.motors[j].is_calibrated
ready = ready and not req
return ready
push_command(self)
Cause all queued up RPC commands to be sent down to Devices
Source code in stretch_body/robot.py
def push_command(self):
"""
Cause all queued up RPC commands to be sent down to Devices
"""
with self.lock:
ready = self.pimu.is_ready_for_sync()
if self.params['use_asyncio'] and self.async_event_loop is not None:
future = asyncio.run_coroutine_threadsafe(self.push_command_coro(),self.async_event_loop)
future.result()
else:
self.base.push_command()
self.arm.push_command()
self.lift.push_command()
self.pimu.push_command()
self.wacc.push_command()
# Check if need to do a motor sync by looking at if there's been a pimu sync signal sent
# since the last stepper.set_command for each joint
sync_required = (self.pimu.ts_last_motor_sync is not None) and (
self.arm.motor.is_sync_required(self.pimu.ts_last_motor_sync)
or self.lift.motor.is_sync_required(self.pimu.ts_last_motor_sync)
or self.base.right_wheel.is_sync_required(self.pimu.ts_last_motor_sync)
or self.base.left_wheel.is_sync_required(self.pimu.ts_last_motor_sync))
if (self.pimu.ts_last_motor_sync is None or ( ready and sync_required)):
self.pimu.trigger_motor_sync()
startup(self, start_non_dxl_thread=True, start_dxl_thread=True, start_sys_mon_thread=True)
To be called once after class instantiation. Prepares devices for communications and motion
Returns
bool true if startup of robot succeeded
Source code in stretch_body/robot.py
def startup(self,start_non_dxl_thread=True,start_dxl_thread=True,start_sys_mon_thread=True):
"""
To be called once after class instantiation.
Prepares devices for communications and motion
Returns
-------
bool
true if startup of robot succeeded
"""
did_acquire, self._file_lock = hello_utils.acquire_body_filelock()
if not did_acquire:
print('Another process is already using Stretch. Try running "stretch_free_robot_process.py"')
return False
self.logger.debug('Starting up Robot {0} of batch {1}'.format(self.params['serial_no'], self.params['batch_name']))
success = True
for k in self.devices:
if self.devices[k] is not None:
if not self.devices[k].startup(threaded=False):
success = False
if (self.arm.motor.transport.version==0 \
or self.lift.motor.transport.version==0 \
or self.base.right_wheel.transport.version == 0 \
or self.base.left_wheel.transport.version == 0 \
or self.wacc.transport.version==0 \
or self.pimu.transport.version==0) and self.params['use_asyncio'] :
self.logger.warning('Not able to use asyncio for transport communications. Defaulting to sync.')
self.params['use_asyncio']=0
else:
self.start_event_loop()
#Always startup to load URDFs now and not while thread is running
self.collision.startup()
if not self.params['use_collision_manager']: #Turn it off here but allow user to enable it via SW later
self.disable_collision_mgmt()
else:
self.enable_collision_mgmt()
# Register the signal handlers
signal.signal(signal.SIGTERM, hello_utils.thread_service_shutdown)
signal.signal(signal.SIGINT, hello_utils.thread_service_shutdown)
self.non_dxl_thread = NonDXLStatusThread(self, target_rate_hz=self.params['rates']['NonDXLStatusThread_Hz'])
self.dxl_end_of_arm_thread = DXLEndOfArmStatusThread(self,target_rate_hz=self.params['rates']['DXLStatusThread_Hz'])
self.sys_thread = SystemMonitorThread(self, target_rate_hz=self.params['rates']['SystemMonitorThread_Hz'])
self.dxl_head_thread = DXLHeadStatusThread(self, target_rate_hz=self.params['rates']['DXLStatusThread_Hz'])
self.collision_mgmt_thread = CollisionMonitorThread(self, target_rate_hz=100)
if start_non_dxl_thread:
self.non_dxl_thread.daemon = True
self.non_dxl_thread.start()
ts = time.time()
while not self.non_dxl_thread.first_status and time.time() - ts < 3.0:
time.sleep(0.01)
if start_dxl_thread:
self.dxl_head_thread.daemon = True
self.dxl_head_thread.start()
self.dxl_end_of_arm_thread.daemon = True
self.dxl_end_of_arm_thread.start()
if start_sys_mon_thread:
self.sys_thread.daemon = True
self.sys_thread.start()
if start_sys_mon_thread and self.collision_mgmt_thread:
self.collision_mgmt_thread.daemon = True
self.collision_mgmt_thread.start()
return success
stop(self)
To be called once before exiting a program Cleanly stops down motion and communication
Source code in stretch_body/robot.py
def stop(self):
"""
To be called once before exiting a program
Cleanly stops down motion and communication
"""
self.logger.debug('---- Shutting down robot ----')
self._file_lock.release()
if self.non_dxl_thread:
if self.non_dxl_thread.running:
self.non_dxl_thread.shutdown_flag.set()
self.non_dxl_thread.join(1)
if self.dxl_head_thread:
if self.dxl_head_thread.running:
self.dxl_head_thread.shutdown_flag.set()
self.dxl_head_thread.join(1)
if self.dxl_end_of_arm_thread:
if self.dxl_end_of_arm_thread.running:
self.dxl_end_of_arm_thread.shutdown_flag.set()
self.dxl_end_of_arm_thread.join(1)
if self.sys_thread:
if self.sys_thread.running:
self.sys_thread.shutdown_flag.set()
self.sys_thread.join(1)
if self.collision_mgmt_thread:
if self.collision_mgmt_thread.running:
self.collision_mgmt_thread.shutdown_flag.set()
self.collision_mgmt_thread.join(1)
self.collision.stop()
for k in self.devices:
if self.devices[k] is not None:
self.logger.debug('Shutting down %s'%k)
self.devices[k].stop()
self.stop_event_loop()
self.logger.debug('---- Shutdown complete ----')
stow(self)
Cause the robot to move to its stow position Blocking.
Source code in stretch_body/robot.py
def stow(self):
"""
Cause the robot to move to its stow position
Blocking.
"""
self.disable_collision_mgmt()
self.head.move_to('head_pan', self.get_stow_pos('head_pan'))
self.head.move_to('head_tilt',self.get_stow_pos('head_pan'))
lift_stowed=False
pos_lift = self.get_stow_pos('lift')
if self.lift.status['pos']<=pos_lift: #Needs to come up before bring in arm
print('--------- Stowing Lift ----')
self.lift.move_to(pos_lift)
self.push_command()
time.sleep(0.25)
ts = time.time()
while not self.lift.motor.status['near_pos_setpoint'] and time.time() - ts < 4.0:
time.sleep(0.1)
lift_stowed=True
# Run pre stow specific to each end of arm
self.end_of_arm.pre_stow(self)
#Bring in arm before bring down
print('--------- Stowing Arm ----')
self.arm.move_to(self.get_stow_pos('arm'))
self.push_command()
time.sleep(0.25)
ts = time.time()
while not self.arm.motor.status['near_pos_setpoint'] and time.time() - ts < 6.0:
time.sleep(0.1)
self.end_of_arm.stow()
time.sleep(0.25)
#Now bring lift down
if not lift_stowed:
print('--------- Stowing Lift ----')
self.lift.move_to(pos_lift)
self.push_command()
time.sleep(0.25)
ts = time.time()
while not self.lift.motor.status['near_pos_setpoint'] and time.time() - ts < 12.0:
time.sleep(0.1)
#Make sure wrist yaw is done before exiting
while self.end_of_arm.motors['wrist_yaw'].motor.is_moving():
time.sleep(0.1)
self.enable_collision_mgmt()
wait_command(self, timeout=15.0, use_motion_generator=True)
Pause program execution until all motion complete.
Queuing up motion and pushing it to the hardware with push_command() is designed to be asynchronous, enabling reactive control of the robot. However, you might want sychronous control, where each command's motion is completed entirely before the program moves on to the next command. This is where you would use wait_command()
Parameters
timeout : float How long to wait for motion to complete. Must be > 0.1 sec.
Returns
bool True if motion completed, False if timed out before motion completed
Source code in stretch_body/robot.py
def wait_command(self, timeout=15.0, use_motion_generator=True):
"""Pause program execution until all motion complete.
Queuing up motion and pushing it to the hardware with
push_command() is designed to be asynchronous, enabling
reactive control of the robot. However, you might want
sychronous control, where each command's motion is completed
entirely before the program moves on to the next command.
This is where you would use wait_command()
Parameters
----------
timeout : float
How long to wait for motion to complete. Must be > 0.1 sec.
Returns
-------
bool
True if motion completed, False if timed out before motion completed
"""
time.sleep(0.1)
timeout = max(0.0, timeout - 0.1)
done = []
def check_wait(wait_method):
done.append(wait_method(timeout, use_motion_generator))
start = time.time()
threads = []
threads.append(threading.Thread(target=check_wait, args=(self.base.wait_while_is_moving,)))
threads.append(threading.Thread(target=check_wait, args=(self.arm.wait_while_is_moving,)))
threads.append(threading.Thread(target=check_wait, args=(self.lift.wait_while_is_moving,)))
threads.append(threading.Thread(target=check_wait, args=(self.head.wait_until_at_setpoint,)))
threads.append(threading.Thread(target=check_wait, args=(self.end_of_arm.wait_until_at_setpoint,)))
[thread.start() for thread in threads]
[thread.join() for thread in threads]
return all(done)
The Device Class
The stretch_body
library is modular in design. Each subcomponent of Stretch is defined in its class and the Robot class provides an interface that ties all of these classes together. This modularity allows users to plug in new/modified subcomponents into the Robot interface by extending the Device class.
It is possible to interface with a single subcomponent of Stretch by initializing its device class directly. In this section, we'll look at the API of seven subclasses of the Device class: the Arm, Lift, Base, Head, EndOfArm, Wacc, and Pimu subcomponents of Stretch.
Using the Arm class
The interface to Stretch's telescoping arm is the Arm class. It is typically initialized as:
1 2 3 4 5 6 7 8 9 10 |
|
Since both Arm and Robot are subclasses of the Device class, the same startup()
and stop()
methods are available here, as well as other Device methods such as home()
. Using the Arm class, we can read the arm's current state and send commands to the joint. For example, continuing the example above:
11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
|
The move_to()
and move_by()
methods queue absolute and relative position commands to the arm, respectively, while the nonblocking push_command()
method pushes the queued position commands to the hardware for execution.
The attribute motor
, an instance of the Stepper class, has the method wait_until_at_setpoint()
which blocks program execution until the joint reaches the commanded goal. With firmware P1 or greater installed, it is also possible to queue a waypoint trajectory for the arm to follow:
26 27 28 29 30 31 32 33 34 35 36 |
|
The attribute trajectory
, an instance of the PrismaticTrajectory class, has the method add()
which adds a single waypoint in a linear sliding trajectory. For a well-formed trajectory
(see is_valid()
), the follow_trajectory()
method starts tracking the trajectory for the telescoping arm.
It is also possible to dynamically restrict the arm joint range:
37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
|
The set_soft_motion_limit_min/max()
methods form the basis of an experimental self-collision avoidance system built into Stretch Body.
All methods in the Arm class are documented below.
API to the Stretch Arm
Source code in stretch_body/arm.py
class Arm(PrismaticJoint):
"""
API to the Stretch Arm
"""
def __init__(self,usb=None):
PrismaticJoint.__init__(self, name='arm',usb=usb)
# ######### Utilties ##############################
def motor_rad_to_translate_m(self,ang): #input in rad, output m
return (self.params['chain_pitch']*self.params['chain_sprocket_teeth']/self.params['gr_spur']/(math.pi*2))*ang
def translate_m_to_motor_rad(self, x):
return x/(self.params['chain_pitch']*self.params['chain_sprocket_teeth']/self.params['gr_spur']/(math.pi*2))
def home(self, end_pos=0.1,to_positive_stop=False, measuring=False):
return PrismaticJoint.home(self,end_pos=end_pos,to_positive_stop=to_positive_stop,measuring=measuring)
home(self, end_pos=0.1, to_positive_stop=False, measuring=False)
end_pos: position to end on to_positive_stop: -- True: Move to the positive direction stop and mark to range_m[1] -- False: Move to the negative direction stop and mark to range_m[0] measuring: After homing to stop, move to opposite stop and report back measured distance return measured range-of-motion if measuring. Return None if not a valide measurement
Source code in stretch_body/arm.py
def home(self, end_pos=0.1,to_positive_stop=False, measuring=False):
return PrismaticJoint.home(self,end_pos=end_pos,to_positive_stop=to_positive_stop,measuring=measuring)
Using the Lift class
The interface to Stretch's lift is the Lift class. It is typically initialized as:
1 2 3 4 5 6 7 8 9 10 |
|
The startup()
and home()
methods are extended from the Device class. Reading the lift's current state and sending commands to the joint occurs similarly to the Arm class:
11 12 13 14 15 16 |
|
The attribute status
is a dictionary of the joint's current status. This state information is updated in the background in real-time by default (disable by initializing as startup(threading=False)
). Use the pretty_print()
method to print out this state info in a human-interpretable format. Setting up waypoint trajectories for the lift is also similar to the Arm:
17 18 19 20 21 22 23 24 25 26 |
|
The attribute trajectory
is also an instance of the PrismaticTrajectory class, and by providing the instantaneous velocity argument v_m
to the add()
method, a cubic spline can be loaded into the joint's trajectory
. The call to follow_trajectory()
begins hardware tracking of the spline.
Finally, setting soft motion limits for the lift's range can be done using:
27 28 29 30 31 32 |
|
The set_soft_motion_limit_min/max()
methods perform clipping of the joint's range at the firmware level (can persist across reboots).
All methods in the Lift class are documented below.
API to the Stretch Lift
Source code in stretch_body/lift.py
class Lift(PrismaticJoint):
"""
API to the Stretch Lift
"""
def __init__(self,usb=None):
PrismaticJoint.__init__(self, name='lift',usb=usb)
# ######### Utilties ##############################
def motor_rad_to_translate_m(self,ang): #input in rad
d=self.params['pinion_t']*self.params['belt_pitch_m']/math.pi
lift_m = (math.degrees(ang)/180.0)*math.pi*(d/2)
return lift_m
def translate_m_to_motor_rad(self, x):
d = self.params['pinion_t'] * self.params['belt_pitch_m'] / math.pi
ang = 180*x/((d/2)*math.pi)
return math.radians(ang)
def home(self, end_pos=0.6,to_positive_stop=True, measuring=False):
return PrismaticJoint.home(self,end_pos=end_pos,to_positive_stop=to_positive_stop,measuring=measuring)
home(self, end_pos=0.6, to_positive_stop=True, measuring=False)
end_pos: position to end on to_positive_stop: -- True: Move to the positive direction stop and mark to range_m[1] -- False: Move to the negative direction stop and mark to range_m[0] measuring: After homing to stop, move to opposite stop and report back measured distance return measured range-of-motion if measuring. Return None if not a valide measurement
Source code in stretch_body/lift.py
def home(self, end_pos=0.6,to_positive_stop=True, measuring=False):
return PrismaticJoint.home(self,end_pos=end_pos,to_positive_stop=to_positive_stop,measuring=measuring)
Using the Base class
Item | Notes | |
---|---|---|
A | Drive wheels | 4 inch diameter, urethane rubber shore 60A |
B | Cliff sensors | Sharp GP2Y0A51SK0F, Analog, range 2-15 cm |
C | Mecanum wheel | Diameter 50mm |
The interface to Stretch's mobile base is the Base class. It is typically initialized as:
1 2 3 4 5 6 7 8 9 |
|
Stretch's mobile base is a differential drive configuration. The left and right wheels are accessible through Base left_wheel
and right_wheel
attributes, both of which are instances of the Stepper class. The startup()
method is extended from the Device class. Since the mobile base is unconstrained, there is no homing method. The pretty_print()
method prints out mobile base state information in a human-interpretable format. We can read the base's current state and send commands using:
10 11 12 13 14 15 16 17 18 19 20 |
|
The translate_by()
and rotate_by()
methods send relative commands similar to the way move_by()
behaves for the single degree of freedom joints.
The mobile base also supports velocity control:
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
|
The set_translate_velocity()
and set_rotational_velocity()
methods give velocity control over the translational and rotational components of the mobile base independently. The set_velocity()
method gives control over both of these components simultaneously.
To halt motion, you can command zero velocities or command the base into freewheel mode using enable_freewheel_mode()
. The mobile base also supports waypoint trajectory following, but the waypoints are part of the SE2 group, where a desired waypoint is defined as an (x, y) point and a theta orientation:
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
|
Warning
The Base waypoint trajectory following has no notion of obstacles in the environment. It will blindly follow the commanded waypoints. For obstacle avoidance, we recommend employing perception and a path planner.
The attribute trajectory
is an instance of the DiffDriveTrajectory class. The call to follow_trajectory()
begins hardware tracking of the spline.
All methods of the Base class are documented below.
API to the Stretch Mobile Base
Source code in stretch_body/base.py
class Base(Device):
"""
API to the Stretch Mobile Base
"""
def __init__(self, usb_left=None, usb_right=None):
Device.__init__(self, 'base')
if usb_left is None:
usb_left = self.params['usb_name_left_wheel']
if usb_right is None:
usb_right = self.params['usb_name_right_wheel']
self.left_wheel = Stepper(usb=usb_left, name='hello-motor-left-wheel')
self.right_wheel = Stepper(usb=usb_right, name='hello-motor-right-wheel')
self.status = {'timestamp_pc':0,'x':0,'y':0,'theta':0,'x_vel':0,'y_vel':0,'theta_vel':0, 'pose_time_s':0,'effort': [0, 0], 'left_wheel': self.left_wheel.status, 'right_wheel': self.right_wheel.status, 'translation_force': 0, 'rotation_torque': 0}
self.trajectory = DiffDriveTrajectory()
self._waypoint_lwpos = None
self._waypoint_rwpos = None
self.thread_rate_hz = 5.0
self.first_step=True
wheel_circumference_m = self.params['wheel_diameter_m'] * pi
self.meters_per_motor_rad = (wheel_circumference_m / (2.0 * pi)) / self.params['gr']
self.wheel_separation_m = self.params['wheel_separation_m']
# Default controller params
self.stiffness=1.0
self.vel_mr=self.translate_to_motor_rad(self.params['motion']['default']['vel_m'])
self.accel_mr=self.translate_to_motor_rad(self.params['motion']['default']['accel_m'])
self.fast_motion_allowed = True
# ########### Device Methods #############
def startup(self, threaded=True):
#Startup steppers first so that status is populated before this Device thread begins (if threaded==true)
success = self.left_wheel.startup(threaded=False) and self.right_wheel.startup(threaded=False)
if success:
Device.startup(self, threaded=threaded)
self.__update_status()
return success
def _thread_loop(self):
self.pull_status()
self.update_trajectory()
def stop(self):
Device.stop(self)
if self.left_wheel.hw_valid and int(str(self.left_wheel.board_info['protocol_version'])[1:]) >= 1:
self.left_wheel.stop_waypoint_trajectory()
self._waypoint_lwpos = None
if self.right_wheel.hw_valid and int(str(self.right_wheel.board_info['protocol_version'])[1:]) >= 1:
self.right_wheel.stop_waypoint_trajectory()
self._waypoint_rwpos = None
self.left_wheel.stop()
self.right_wheel.stop()
def pretty_print(self):
print('----------Base------')
print('X (m)',self.status['x'])
print('Y (m)',self.status['y'])
print('Theta (rad)',self.status['theta'])
print('X_vel (m/s)', self.status['x_vel'])
print('Y_vel (m/s)', self.status['y_vel'])
print('Theta_vel (rad/s)', self.status['theta_vel'])
print('Pose time (s)', self.status['pose_time_s'])
print('Timestamp PC (s):', self.status['timestamp_pc'])
print('-----Left-Wheel-----')
self.left_wheel.pretty_print()
print('-----Right-Wheel-----')
self.right_wheel.pretty_print()
# ###################################################
def enable_freewheel_mode(self):
"""
Force motors into freewheel
"""
self.left_wheel.enable_freewheel()
self.right_wheel.enable_freewheel()
def enable_pos_incr_mode(self):
"""
Force motors into incremental position mode
"""
self.left_wheel.enable_pos_traj_incr()
self.right_wheel.enable_pos_traj_incr()
# ###################################################
def wait_for_contact(self, timeout=5.0):
ts = time.time()
while (time.time() - ts < timeout):
self.pull_status()
if self.left_wheel.status['in_guarded_event'] or self.right_wheel.status['in_guarded_event']:
return True
time.sleep(0.01)
return False
def wait_while_is_moving(self,timeout=15.0, use_motion_generator=True):
done = []
def check_wait(wait_method):
done.append(wait_method(timeout,use_motion_generator))
threads = []
threads.append(threading.Thread(target=check_wait, args=(self.left_wheel.wait_while_is_moving,)))
threads.append(threading.Thread(target=check_wait, args=(self.right_wheel.wait_while_is_moving,)))
[thread.start() for thread in threads]
[thread.join() for thread in threads]
return all(done)
def wait_until_at_setpoint(self, timeout=15.0):
#Assume both are in motion. This will exit once both are at setpoints
at_setpoint = []
def check_wait(wait_method):
at_setpoint.append(wait_method(timeout))
threads = []
threads.append(threading.Thread(target=check_wait, args=(self.left_wheel.wait_until_at_setpoint,)))
threads.append(threading.Thread(target=check_wait, args=(self.right_wheel.wait_until_at_setpoint,)))
[done_thread.start() for done_thread in threads]
[done_thread.join() for done_thread in threads]
return all(at_setpoint)
def contact_thresh_to_motor_current(self,is_translate,contact_thresh):
"""
This model converts from a specified percentage effort (-100 to 100) of translate/rotational effort to motor currents
"""
if is_translate:
e_c = self.params['contact_models']['effort_pct']['contact_thresh_translate_default'] if contact_thresh is None else contact_thresh
m=self.params['contact_models']['effort_pct']['contact_thresh_translate_max']
i_l, i_r = self.translation_effort_pct_to_motor_current(min(m,max(e_c,-1*m)))
else:
e_c = self.params['contact_models']['effort_pct']['contact_thresh_rotate_default'] if contact_thresh is None else contact_thresh
m = self.params['contact_models']['effort_pct']['contact_thresh_rotate_max']
i_l, i_r = self.rotation_effort_pct_to_motor_current(min(m, max(e_c, -1 * m)))
return i_l,i_r
def translate_by(self, x_m, v_m=None, a_m=None, stiffness=None, contact_thresh_N=None,contact_thresh=None):
"""
Incremental translation of the base
x_m: desired motion (m)
v_m: velocity for trapezoidal motion profile (m/s)
a_m: acceleration for trapezoidal motion profile (m/s^2)
stiffness: stiffness of motion. Range 0.0 (min) to 1.0 (max)
contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
contact_thresh: effort to stop at (units of effort_pct (-100, 100))
"""
check_deprecated_contact_model_base(self,'translate_by', contact_thresh_N, contact_thresh)
x_mr = self.translate_to_motor_rad(x_m)
if v_m is not None:
v_m = min(abs(v_m), self.params['motion']['max']['vel_m'])
v_mr = self.translate_to_motor_rad(v_m)
else:
v_mr = self.vel_mr
if a_m is not None:
a_m = min(abs(a_m), self.params['motion']['max']['accel_m'])
a_mr = self.translate_to_motor_rad(a_m)
else:
a_mr = self.accel_mr
if not self.fast_motion_allowed:
v_mr=min(self.translate_to_motor_rad(self.params['sentry_max_velocity']['limit_vel_m']),v_mr)
a_mr=min(self.translate_to_motor_rad(self.params['sentry_max_velocity']['limit_accel_m']),a_mr)
i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=True, contact_thresh=contact_thresh)
stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness
self.left_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_INCR, x_des=x_mr,
v_des=v_mr,
a_des=a_mr,
stiffness=stiffness,
i_feedforward=0,
i_contact_pos=i_contact_l,
i_contact_neg=-1*i_contact_l)
self.right_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_INCR, x_des=x_mr,
v_des=v_mr,
a_des=a_mr,
stiffness=stiffness,
i_feedforward=0,
i_contact_pos=i_contact_r,
i_contact_neg=-1*i_contact_r)
def rotate_by(self, x_r, v_r=None, a_r=None, stiffness=None, contact_thresh_N=None, contact_thresh=None):
"""
Incremental rotation of the base
x_r: desired motion (radians)
v_r: velocity for trapezoidal motion profile (rad/s)
a_r: acceleration for trapezoidal motion profile (rad/s^2)
stiffness: stiffness of motion. Range 0.0 (min) to 1.0 (max)
contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
contact_thresh: effort to stop at (units of effort_pct (-100, 100))
"""
check_deprecated_contact_model_base(self,'rotate_by', contact_thresh_N, contact_thresh)
x_mr = self.rotate_to_motor_rad(x_r)
if v_r is not None:
v_mr_max = self.translate_to_motor_rad(self.params['motion']['max']['vel_m'])
v_mr = self.rotate_to_motor_rad(v_r)
v_mr=min(abs(v_mr),v_mr_max)
else:
v_mr = self.vel_mr
if a_r is not None:
a_mr_max = self.translate_to_motor_rad(self.params['motion']['max']['accel_m'])
a_mr = self.rotate_to_motor_rad(a_r)
a_mr = min(abs(a_mr), a_mr_max)
else:
a_mr = self.accel_mr
if not self.fast_motion_allowed:
v_mr=min(self.translate_to_motor_rad(self.params['sentry_max_velocity']['limit_vel_m']),v_mr)
a_mr=min(self.translate_to_motor_rad(self.params['sentry_max_velocity']['limit_accel_m']),a_mr)
i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=False, contact_thresh=contact_thresh)
stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness
self.left_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_INCR,x_des=-1*x_mr,
v_des=v_mr,
a_des=a_mr,
stiffness=stiffness,
i_feedforward=0,
i_contact_pos=i_contact_l,
i_contact_neg=-1 * i_contact_l)
self.right_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_INCR,x_des=x_mr,
v_des=v_mr,
a_des=a_mr,
stiffness=stiffness,
i_feedforward=0,
i_contact_pos=i_contact_r,
i_contact_neg=-1 * i_contact_r)
def set_translate_velocity(self, v_m, a_m=None,stiffness=None, contact_thresh_N=None,contact_thresh=None):
"""
Command the bases translational velocity.
Use care to prevent collisions / avoid runaways
v_m: desired velocity (m/s)
a_m: acceleration of motion profile (m/s^2)
contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
contact_thresh: effort to stop at (units of effort_pct (-100, 100))
"""
check_deprecated_contact_model_base(self,'set_translate_velocity', contact_thresh_N, contact_thresh)
if a_m is not None:
a_m = min(abs(a_m), self.params['motion']['max']['accel_m'])
a_mr = self.translate_to_motor_rad(a_m)
else:
a_mr = self.accel_mr
v_sign = numpy.sign(v_m)
v_m = v_sign * min(abs(v_m), self.params['motion']['max']['vel_m'])
v_mr = self.translate_to_motor_rad(v_m)
stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness
i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=True,contact_thresh=contact_thresh)
self.left_wheel.set_command(mode=Stepper.MODE_VEL_TRAJ, v_des=v_mr, a_des=a_mr,stiffness=stiffness,
i_feedforward=0,
i_contact_pos=i_contact_l,
i_contact_neg=-1 * i_contact_l)
self.right_wheel.set_command(mode=Stepper.MODE_VEL_TRAJ, v_des=v_mr, a_des=a_mr,
stiffness=stiffness,
i_feedforward=0,
i_contact_pos=i_contact_r,
i_contact_neg=-1 * i_contact_r)
def set_rotational_velocity(self, v_r, a_r=None,stiffness=None, contact_thresh_N=None,contact_thresh=None):
"""
Command the bases rotational velocity.
Use care to prevent collisions / avoid runaways
v_r: desired rotational velocity (rad/s)
a_r: acceleration of motion profile (rad/s^2)
contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
contact_thresh: effort to stop at (units of effort_pct (-100, 100))
"""
check_deprecated_contact_model_base(self,'set_rotational_velocity', contact_thresh_N, contact_thresh)
if a_r is not None:
a_mr_max=self.translate_to_motor_rad(self.params['motion']['max']['accel_m'])
a_mr = self.rotate_to_motor_rad(a_r)
a_mr = min(abs(a_mr), a_mr_max)
else:
a_mr = self.accel_mr
w_sign = numpy.sign(v_r)
v_mr_max = self.translate_to_motor_rad(self.params['motion']['max']['vel_m'])
v_mr = self.rotate_to_motor_rad(v_r)
v_mr = w_sign * min(abs(v_mr), v_mr_max)
stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness
i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=False,
contact_thresh=contact_thresh)
self.left_wheel.set_command(mode=Stepper.MODE_VEL_TRAJ, v_des=-1*v_mr, a_des=a_mr,
i_feedforward=0,
i_contact_pos=i_contact_l,
i_contact_neg=-1 * i_contact_l)
self.right_wheel.set_command(mode=Stepper.MODE_VEL_TRAJ, v_des=v_mr, a_des=a_mr,
stiffness=stiffness,
i_feedforward=0,
i_contact_pos=i_contact_r,
i_contact_neg=-1 * i_contact_r)
def set_velocity(self, v_m, w_r, a=None,stiffness=None, contact_thresh_N=None,contact_thresh=None):
"""
Command the bases translational and rotational
velocities simultaneously.
Use care to prevent collisions / avoid runaways
v_m: desired velocity (m/s)
w_r: desired rotational velocity (rad/s)
a: acceleration of motion profile (m/s^2 and rad/s^2)
contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
contact_thresh: effort to stop at (units of effort_pct (-100, 100))
"""
check_deprecated_contact_model_base(self,'set_velocity', contact_thresh_N, contact_thresh)
if a is not None:
a = min(abs(a), self.params['motion']['max']['accel_m'])
a_mr = self.translate_to_motor_rad(a)
else:
a_mr = self.accel_mr
# Unicycle dynamics w/o R because
# translate_to_motor_rad accounts for R and gear ratio
wl_m = ((2 * v_m) - (w_r * self.params['wheel_separation_m'])) / 2.0
wr_m = ((2 * v_m) + (w_r * self.params['wheel_separation_m'])) / 2.0
wl_sign = numpy.sign(wl_m)
wl_m = wl_sign * min(abs(wl_m), self.params['motion']['max']['vel_m'])
wl_r = self.translate_to_motor_rad(wl_m)
wr_sign = numpy.sign(wr_m)
wr_m = wr_sign * min(abs(wr_m), self.params['motion']['max']['vel_m'])
wr_r = self.translate_to_motor_rad(wr_m)
stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness
i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=True,
contact_thresh=contact_thresh)
if self.params['use_vel_traj']:
ctrl_mode =Stepper.MODE_VEL_TRAJ
else:
ctrl_mode =Stepper.MODE_VEL_PID
self.left_wheel.set_command(mode=ctrl_mode, v_des=wl_r, a_des=a_mr,
i_feedforward=0,
i_contact_pos=i_contact_l,
i_contact_neg=-1 * i_contact_l)
self.right_wheel.set_command(mode=ctrl_mode, v_des=wr_r, a_des=a_mr,
stiffness=stiffness,
i_feedforward=0,
i_contact_pos=i_contact_r,
i_contact_neg=-1 * i_contact_r)
# ######### Waypoint Trajectory Interface ##############################
def follow_trajectory(self, v_r=None, a_r=None, stiffness=None, contact_thresh_N=None, contact_thresh=None):
"""Starts executing a waypoint trajectory
`self.trajectory` must be populated with a valid trajectory before calling
this method.
Parameters
----------
v_r : float
velocity limit for trajectory in motor space in meters per second
a_r : float
acceleration limit for trajectory in motor space in meters per second squared
stiffness : float
stiffness of motion. Range 0.0 (min) to 1.0 (max)
contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
contact_thresh: effort to stop at (units of effort_pct (-100, 100))
"""
check_deprecated_contact_model_base(self,'follow_trajectory', contact_thresh_N, contact_thresh)
# check if joint valid, traj active, and right protocol
if not self.left_wheel.hw_valid or not self.right_wheel.hw_valid:
self.logger.warning('Base connection to hardware not valid')
return False
if self._waypoint_lwpos is not None or self._waypoint_rwpos is not None:
self.logger.warning('Base waypoint trajectory already active')
return False
if int(str(self.left_wheel.board_info['protocol_version'])[1:]) < 1:
self.logger.warning("Base left motor firmware version doesn't support waypoint trajectories")
return False
if int(str(self.right_wheel.board_info['protocol_version'])[1:]) < 1:
self.logger.warning("Base right motor firmware version doesn't support waypoint trajectories")
return False
# check if trajectory valid
vel_limit = v_r if v_r is not None else self.params['motion']['trajectory_max']['vel_r']
acc_limit = a_r if a_r is not None else self.params['motion']['trajectory_max']['accel_r']
valid, reason = self.trajectory.is_valid(vel_limit, acc_limit, self.translate_to_motor_rad, self.rotate_to_motor_rad)
if not valid:
self.logger.warning('Base trajectory not valid: {0}'.format(reason))
return False
if valid and reason == "must have at least two waypoints":
# skip this device
return True
# set defaults
stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness
v = self.translate_to_motor_rad(min(abs(v_r), self.params['motion']['trajectory_max']['vel_r'])) \
if v_r is not None else self.translate_to_motor_rad(self.params['motion']['trajectory_max']['vel_r'])
a = self.translate_to_motor_rad(min(abs(a_r), self.params['motion']['trajectory_max']['accel_r'])) \
if a_r is not None else self.translate_to_motor_rad(self.params['motion']['trajectory_max']['accel_r'])
i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=True,contact_thresh=contact_thresh)
# start trajectory
self.left_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_WAYPOINT,
v_des=v,
a_des=a,
stiffness=stiffness,
i_contact_pos=i_contact_l,
i_contact_neg=-i_contact_l*-1)
self.right_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_WAYPOINT,
v_des=v,
a_des=a,
stiffness=stiffness,
i_contact_pos=i_contact_r,
i_contact_neg=-i_contact_r*-1)
self.left_wheel.push_command()
self.right_wheel.push_command()
self.left_wheel.pull_status()
self.right_wheel.pull_status()
self._waypoint_lwpos = self.left_wheel.status['pos']
self._waypoint_rwpos = self.right_wheel.status['pos']
ls0, rs0 = self.trajectory.get_wheel_segments(0, self.translate_to_motor_rad, self.rotate_to_motor_rad,
self._waypoint_lwpos, self._waypoint_rwpos)
return self.left_wheel.start_waypoint_trajectory(ls0.to_array()) and \
self.right_wheel.start_waypoint_trajectory(rs0.to_array())
def reset_odometry(self):
"""
Reset X/Y/Theta to report 0
"""
self.first_step = True
def is_trajectory_active(self):
return (self.left_wheel.status['waypoint_traj']['state'] == 'active' or self.right_wheel.status['waypoint_traj']['state'] == 'active')
def get_trajectory_ts(self):
# Return trajectory execution time
if self.is_trajectory_active() and self.left_wheel._waypoint_ts is not None and self.right_wheel._waypoint_ts:
return max(time.time()-self.left_wheel._waypoint_ts,time.time()-self.right_wheel._waypoint_ts)
elif len(self.trajectory.waypoints):
return self.trajectory.waypoints[-1].time
else:
return 0
def get_trajectory_time_remaining(self):
if not self.is_trajectory_active():
return 0
else:
return max(0,self.trajectory.waypoints[-1].time - self.get_trajectory_ts())
def update_trajectory(self):
"""Updates hardware with the next segment of `self.trajectory`
This method must be called frequently to enable complete trajectory execution
and preemption of future segments. If used with `stretch_body.robot.Robot` or
with `self.startup(threaded=True)`, a background thread is launched for this.
Otherwise, the user must handle calling this method.
"""
# check if joint valid, right protocol, and right mode
if not self.left_wheel.hw_valid or not self.right_wheel.hw_valid:
return
if int(str(self.left_wheel.board_info['protocol_version'])[1:]) < 1:
return
if int(str(self.right_wheel.board_info['protocol_version'])[1:]) < 1:
return
if self.left_wheel.status['mode'] != self.left_wheel.MODE_POS_TRAJ_WAYPOINT:
return
if self.right_wheel.status['mode'] != self.right_wheel.MODE_POS_TRAJ_WAYPOINT:
return
if self._waypoint_lwpos is None or self._waypoint_rwpos is None:
return
if self.left_wheel.status['waypoint_traj']['state'] == 'active' and self.right_wheel.status['waypoint_traj']['state'] == 'active':
next_segment_id = self.left_wheel.status['waypoint_traj']['segment_id'] - 2 + 1 # subtract 2 due to IDs 0 & 1 being reserved by firmware
if next_segment_id < self.trajectory.get_num_segments():
ls1, rs1 = self.trajectory.get_wheel_segments(next_segment_id, self.translate_to_motor_rad, self.rotate_to_motor_rad,
self._waypoint_lwpos, self._waypoint_rwpos)
self.left_wheel.set_next_trajectory_segment(ls1.to_array())
self.right_wheel.set_next_trajectory_segment(rs1.to_array())
elif self.left_wheel.status['waypoint_traj']['state'] == 'idle' and self.left_wheel.status['mode'] == Stepper.MODE_POS_TRAJ_WAYPOINT and \
self.right_wheel.status['waypoint_traj']['state'] == 'idle' and self.right_wheel.status['mode'] == Stepper.MODE_POS_TRAJ_WAYPOINT:
self._waypoint_lwpos = None
self._waypoint_rwpos = None
self.left_wheel.enable_pos_traj()
self.right_wheel.enable_pos_traj()
self.push_command()
def stop_trajectory(self):
"""Stop waypoint trajectory immediately and resets hardware
"""
self.left_wheel.stop_waypoint_trajectory()
self.right_wheel.stop_waypoint_trajectory()
def step_sentry(self,robot):
"""
Only allow fast mobile base motion if the lift is low,
the arm is retracted, and the wrist is stowed. This is
intended to keep the center of mass low for increased
stability and avoid catching the arm or tool on
something.
"""
if self.robot_params['robot_sentry']['base_max_velocity']:
x_lift=robot.lift.status['pos']
x_arm =robot.arm.status['pos']
x_wrist =robot.end_of_arm.motors['wrist_yaw'].status['pos']
if ((x_lift < self.params['sentry_max_velocity']['max_lift_height_m']) and
(x_arm < self.params['sentry_max_velocity']['max_arm_extension_m']) and
(x_wrist > self.params['sentry_max_velocity']['min_wrist_yaw_rad'])):
if not self.fast_motion_allowed:
self.logger.debug('Fast motion turned on')
self.fast_motion_allowed = True
else:
if self.fast_motion_allowed:
self.logger.debug('Fast motion turned off')
self.fast_motion_allowed = False
self.left_wheel.step_sentry(robot)
self.right_wheel.step_sentry(robot)
# ###################################################
def push_command(self):
self.left_wheel.push_command()
self.right_wheel.push_command()
def pull_status(self):
"""
Computes base odometery based on stepper positions / velocities
"""
self.left_wheel.pull_status()
self.right_wheel.pull_status()
self.__update_status()
async def pull_status_async(self):
"""
Computes base odometery based on stepper positions / velocities
"""
await self.left_wheel.pull_status_async()
await self.right_wheel.pull_status_async()
self.__update_status()
def __update_status(self):
self.status['timestamp_pc'] = time.time()
p0 = self.status['left_wheel']['pos']
p1 = self.status['right_wheel']['pos']
t0 = self.status['left_wheel']['timestamp']
t1 = self.status['right_wheel']['timestamp']
self.status['translation_force'] = 0 #Deprecated
self.status['rotation_torque'] = 0 #Deprecated
if self.first_step:
# Upon the first step, simply set the initial pose, since
# no movement has yet been recorded.
self.first_step = False
self.p0 = p0
self.p1 = p1
self.t0_s = t0
self.t1_s = t1
self.status['x'] = 0.0
self.status['y'] = 0.0
self.status['theta'] = 0.0
self.status['x_vel'] = 0.0
self.status['y_vel'] = 0.0
self.status['theta_vel'] = 0.0
else:
######################################################
# The odometry related was wrtten starting on January 14,
# 2019 while looking at the following BSD-3-Clause licensed
# code for reference.
# https://github.com/merose/diff_drive/blob/master/src/diff_drive/odometry.py
# The code uses standard calculations. The reference code
# specifically cites a document with the following link,
# which appears to be broken.
# https://chess.eecs.berkeley.edu/eecs149/documentation/differentialDrive.pdf
# There are many sources on the internet for these
# equations. For example, the following document is
# helpful:
# http://www8.cs.umu.se/kurser/5DV122/HT13/material/Hellstrom-ForwardKinematics.pdf
prev_t0_s = self.t0_s
prev_t1_s = self.t1_s
t0_s = t0
t1_s = t1
delta_t0_s = t0_s - prev_t0_s
delta_t1_s = t1_s - prev_t1_s
if (delta_t0_s > 0.0) and (delta_t1_s > 0.0):
# update if time has passed for both motor readings, otherwise do nothing
average_delta_t_s = (delta_t0_s + delta_t1_s) / 2.0
# update the times, since both motors have new readings
self.prev_t0_s = self.t0_s
self.prev_t1_s = self.t1_s
self.t0_s = t0_s
self.t1_s = t1_s
# need to check on wrap around / rollover for wheel positions
self.prev_p0 = self.p0
self.prev_p1 = self.p1
self.p0 = p0
self.p1 = p1
# Transform the wheel rotations so that left and right
# wheel distances in meters have the convention that
# positive values for each wheel corresponds with forward
# motion of the mobile base.
prev_left_m = self.prev_p0 * self.meters_per_motor_rad
left_m = self.p0 * self.meters_per_motor_rad
prev_right_m = self.prev_p1 * self.meters_per_motor_rad
right_m = self.p1 * self.meters_per_motor_rad
delta_left_m = left_m - prev_left_m
delta_right_m = right_m - prev_right_m
delta_travel = (delta_right_m + delta_left_m) / 2.0
delta_theta = (delta_right_m - delta_left_m) / self.wheel_separation_m
prev_x = self.status['x']
prev_y = self.status['y']
prev_theta = self.status['theta']
if delta_left_m == delta_right_m:
# delta_theta is 0.0, which would result in a divide
# by zero error and corresponds with an infinite
# radius of curvature (0 curvature).
delta_x = delta_travel * cos(prev_theta)
delta_y = delta_travel * sin(prev_theta)
else:
# calculate the instantaneous center of curvature (ICC)
icc_radius = delta_travel / delta_theta
icc_x = prev_x - (icc_radius * sin(prev_theta))
icc_y = prev_y + (icc_radius * cos(prev_theta))
# calculate the change in position based on the ICC
delta_x = ((cos(delta_theta) * (prev_x - icc_x))
- (sin(delta_theta) * (prev_y - icc_y))
+ icc_x - prev_x)
delta_y = ((sin(delta_theta) * (prev_x - icc_x))
+ (cos(delta_theta) * (prev_y - icc_y))
+ icc_y - prev_y)
# update the estimated total time passed since odometry started
self.status['pose_time_s'] = self.status['pose_time_s'] + average_delta_t_s
# update the robot's velocity estimates
self.status['x_vel'] = delta_travel / average_delta_t_s
self.status['y_vel'] = 0.0
self.status['theta_vel'] = delta_theta / average_delta_t_s
# update the robot's pose estimates
self.status['x'] = prev_x + delta_x
self.status['y'] = prev_y + delta_y
self.status['theta'] = (prev_theta + delta_theta) % (2.0 * pi)
# ############## Deprecated Contact API ##################
def motor_current_to_translation_force(self, il, ir):
raise DeprecationWarning('Method motor_current_to_translate_force has been deprecated since v0.3.5')
def motor_current_to_rotation_torque(self, il, ir):
raise DeprecationWarning('Method motor_current_to_rotation_torque has been deprecated since v0.3.5')
def translation_force_to_motor_current(self, f_N): # Assume evenly balanced
raise DeprecationWarning('Method translation_force_to_motor_current has been deprecated since v0.3.5')
def rotation_torque_to_motor_current(self, tq_Nm):
raise DeprecationWarning('Method translation_force_to_motor_current has been deprecated since v0.3.5')
# ########### Effort Contact Conversions ####################
# Rotation effort is -100 to 100 (where 100 = L/R motors at +iMax / +iMax)
# Translation effort is -100 to 100 (where 100 = L/R motors at +iMax / -iMax)
def rotation_effort_pct_to_motor_current(self,e_pct):
il=self.left_wheel.effort_pct_to_current(e_pct)
ir= -1*self.right_wheel.effort_pct_to_current(e_pct)
return il, ir
def translation_effort_pct_to_motor_current(self,e_pct):
il = self.left_wheel.effort_pct_to_current(e_pct)
ir = self.right_wheel.effort_pct_to_current(e_pct)
return il, ir
def motor_current_to_translate_effort_pct(self,il,ir):
el=self.left_wheel.current_to_effort_pct(il)
er = self.right_wheel.current_to_effort_pct(ir)
return (el+er)/2
def motor_current_to_rotation_effort_pct(self,il,ir):
el = self.left_wheel.current_to_effort_pct(il)
er = self.right_wheel.current_to_effort_pct(ir)
return (el-er)/2
# ########### Kinematic Conversions ####################
def translate_to_motor_rad(self,x_m):
circ=self.params['wheel_diameter_m']*math.pi
return deg_to_rad(360*self.params['gr']*x_m/circ)
def motor_rad_to_translate(self,x_r):
circ = self.params['wheel_diameter_m'] * math.pi
return rad_to_deg(x_r)*circ/360/self.params['gr']
def rotate_to_motor_rad(self,x_r):
r = self.params['wheel_separation_m'] / 2.0
c = r * x_r #distance wheel travels (m)
return self.translate_to_motor_rad(c)
def motor_rad_to_rotate(self, x_r):
c = self.motor_rad_to_translate(x_r)
r = self.params['wheel_separation_m'] / 2.0
ang_rad = c /r
return ang_rad
def translation_to_rotation(self,x_m):
x_mr=self.translate_to_motor_rad(x_m)
x_r=self.motor_rad_to_rotate(x_mr)
return x_r
def rotation_to_translation(self,x_r):
x_mr=self.rotate_to_motor_rad(x_r)
x_m=self.motor_rad_to_translate(x_mr)
return x_m
contact_thresh_to_motor_current(self, is_translate, contact_thresh)
This model converts from a specified percentage effort (-100 to 100) of translate/rotational effort to motor currents
Source code in stretch_body/base.py
def contact_thresh_to_motor_current(self,is_translate,contact_thresh):
"""
This model converts from a specified percentage effort (-100 to 100) of translate/rotational effort to motor currents
"""
if is_translate:
e_c = self.params['contact_models']['effort_pct']['contact_thresh_translate_default'] if contact_thresh is None else contact_thresh
m=self.params['contact_models']['effort_pct']['contact_thresh_translate_max']
i_l, i_r = self.translation_effort_pct_to_motor_current(min(m,max(e_c,-1*m)))
else:
e_c = self.params['contact_models']['effort_pct']['contact_thresh_rotate_default'] if contact_thresh is None else contact_thresh
m = self.params['contact_models']['effort_pct']['contact_thresh_rotate_max']
i_l, i_r = self.rotation_effort_pct_to_motor_current(min(m, max(e_c, -1 * m)))
return i_l,i_r
enable_freewheel_mode(self)
Force motors into freewheel
Source code in stretch_body/base.py
def enable_freewheel_mode(self):
"""
Force motors into freewheel
"""
self.left_wheel.enable_freewheel()
self.right_wheel.enable_freewheel()
enable_pos_incr_mode(self)
Force motors into incremental position mode
Source code in stretch_body/base.py
def enable_pos_incr_mode(self):
"""
Force motors into incremental position mode
"""
self.left_wheel.enable_pos_traj_incr()
self.right_wheel.enable_pos_traj_incr()
follow_trajectory(self, v_r=None, a_r=None, stiffness=None, contact_thresh_N=None, contact_thresh=None)
Starts executing a waypoint trajectory
self.trajectory
must be populated with a valid trajectory before calling
this method.
Parameters
v_r : float velocity limit for trajectory in motor space in meters per second a_r : float acceleration limit for trajectory in motor space in meters per second squared stiffness : float stiffness of motion. Range 0.0 (min) to 1.0 (max) contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N) contact_thresh: effort to stop at (units of effort_pct (-100, 100))
Source code in stretch_body/base.py
def follow_trajectory(self, v_r=None, a_r=None, stiffness=None, contact_thresh_N=None, contact_thresh=None):
"""Starts executing a waypoint trajectory
`self.trajectory` must be populated with a valid trajectory before calling
this method.
Parameters
----------
v_r : float
velocity limit for trajectory in motor space in meters per second
a_r : float
acceleration limit for trajectory in motor space in meters per second squared
stiffness : float
stiffness of motion. Range 0.0 (min) to 1.0 (max)
contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
contact_thresh: effort to stop at (units of effort_pct (-100, 100))
"""
check_deprecated_contact_model_base(self,'follow_trajectory', contact_thresh_N, contact_thresh)
# check if joint valid, traj active, and right protocol
if not self.left_wheel.hw_valid or not self.right_wheel.hw_valid:
self.logger.warning('Base connection to hardware not valid')
return False
if self._waypoint_lwpos is not None or self._waypoint_rwpos is not None:
self.logger.warning('Base waypoint trajectory already active')
return False
if int(str(self.left_wheel.board_info['protocol_version'])[1:]) < 1:
self.logger.warning("Base left motor firmware version doesn't support waypoint trajectories")
return False
if int(str(self.right_wheel.board_info['protocol_version'])[1:]) < 1:
self.logger.warning("Base right motor firmware version doesn't support waypoint trajectories")
return False
# check if trajectory valid
vel_limit = v_r if v_r is not None else self.params['motion']['trajectory_max']['vel_r']
acc_limit = a_r if a_r is not None else self.params['motion']['trajectory_max']['accel_r']
valid, reason = self.trajectory.is_valid(vel_limit, acc_limit, self.translate_to_motor_rad, self.rotate_to_motor_rad)
if not valid:
self.logger.warning('Base trajectory not valid: {0}'.format(reason))
return False
if valid and reason == "must have at least two waypoints":
# skip this device
return True
# set defaults
stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness
v = self.translate_to_motor_rad(min(abs(v_r), self.params['motion']['trajectory_max']['vel_r'])) \
if v_r is not None else self.translate_to_motor_rad(self.params['motion']['trajectory_max']['vel_r'])
a = self.translate_to_motor_rad(min(abs(a_r), self.params['motion']['trajectory_max']['accel_r'])) \
if a_r is not None else self.translate_to_motor_rad(self.params['motion']['trajectory_max']['accel_r'])
i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=True,contact_thresh=contact_thresh)
# start trajectory
self.left_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_WAYPOINT,
v_des=v,
a_des=a,
stiffness=stiffness,
i_contact_pos=i_contact_l,
i_contact_neg=-i_contact_l*-1)
self.right_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_WAYPOINT,
v_des=v,
a_des=a,
stiffness=stiffness,
i_contact_pos=i_contact_r,
i_contact_neg=-i_contact_r*-1)
self.left_wheel.push_command()
self.right_wheel.push_command()
self.left_wheel.pull_status()
self.right_wheel.pull_status()
self._waypoint_lwpos = self.left_wheel.status['pos']
self._waypoint_rwpos = self.right_wheel.status['pos']
ls0, rs0 = self.trajectory.get_wheel_segments(0, self.translate_to_motor_rad, self.rotate_to_motor_rad,
self._waypoint_lwpos, self._waypoint_rwpos)
return self.left_wheel.start_waypoint_trajectory(ls0.to_array()) and \
self.right_wheel.start_waypoint_trajectory(rs0.to_array())
pull_status(self)
Computes base odometery based on stepper positions / velocities
Source code in stretch_body/base.py
def pull_status(self):
"""
Computes base odometery based on stepper positions / velocities
"""
self.left_wheel.pull_status()
self.right_wheel.pull_status()
self.__update_status()
pull_status_async(self)
async
Computes base odometery based on stepper positions / velocities
Source code in stretch_body/base.py
async def pull_status_async(self):
"""
Computes base odometery based on stepper positions / velocities
"""
await self.left_wheel.pull_status_async()
await self.right_wheel.pull_status_async()
self.__update_status()
reset_odometry(self)
Reset X/Y/Theta to report 0
Source code in stretch_body/base.py
def reset_odometry(self):
"""
Reset X/Y/Theta to report 0
"""
self.first_step = True
rotate_by(self, x_r, v_r=None, a_r=None, stiffness=None, contact_thresh_N=None, contact_thresh=None)
Incremental rotation of the base x_r: desired motion (radians) v_r: velocity for trapezoidal motion profile (rad/s) a_r: acceleration for trapezoidal motion profile (rad/s^2) stiffness: stiffness of motion. Range 0.0 (min) to 1.0 (max) contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N) contact_thresh: effort to stop at (units of effort_pct (-100, 100))
Source code in stretch_body/base.py
def rotate_by(self, x_r, v_r=None, a_r=None, stiffness=None, contact_thresh_N=None, contact_thresh=None):
"""
Incremental rotation of the base
x_r: desired motion (radians)
v_r: velocity for trapezoidal motion profile (rad/s)
a_r: acceleration for trapezoidal motion profile (rad/s^2)
stiffness: stiffness of motion. Range 0.0 (min) to 1.0 (max)
contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
contact_thresh: effort to stop at (units of effort_pct (-100, 100))
"""
check_deprecated_contact_model_base(self,'rotate_by', contact_thresh_N, contact_thresh)
x_mr = self.rotate_to_motor_rad(x_r)
if v_r is not None:
v_mr_max = self.translate_to_motor_rad(self.params['motion']['max']['vel_m'])
v_mr = self.rotate_to_motor_rad(v_r)
v_mr=min(abs(v_mr),v_mr_max)
else:
v_mr = self.vel_mr
if a_r is not None:
a_mr_max = self.translate_to_motor_rad(self.params['motion']['max']['accel_m'])
a_mr = self.rotate_to_motor_rad(a_r)
a_mr = min(abs(a_mr), a_mr_max)
else:
a_mr = self.accel_mr
if not self.fast_motion_allowed:
v_mr=min(self.translate_to_motor_rad(self.params['sentry_max_velocity']['limit_vel_m']),v_mr)
a_mr=min(self.translate_to_motor_rad(self.params['sentry_max_velocity']['limit_accel_m']),a_mr)
i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=False, contact_thresh=contact_thresh)
stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness
self.left_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_INCR,x_des=-1*x_mr,
v_des=v_mr,
a_des=a_mr,
stiffness=stiffness,
i_feedforward=0,
i_contact_pos=i_contact_l,
i_contact_neg=-1 * i_contact_l)
self.right_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_INCR,x_des=x_mr,
v_des=v_mr,
a_des=a_mr,
stiffness=stiffness,
i_feedforward=0,
i_contact_pos=i_contact_r,
i_contact_neg=-1 * i_contact_r)
set_rotational_velocity(self, v_r, a_r=None, stiffness=None, contact_thresh_N=None, contact_thresh=None)
Command the bases rotational velocity. Use care to prevent collisions / avoid runaways v_r: desired rotational velocity (rad/s) a_r: acceleration of motion profile (rad/s^2) contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N) contact_thresh: effort to stop at (units of effort_pct (-100, 100))
Source code in stretch_body/base.py
def set_rotational_velocity(self, v_r, a_r=None,stiffness=None, contact_thresh_N=None,contact_thresh=None):
"""
Command the bases rotational velocity.
Use care to prevent collisions / avoid runaways
v_r: desired rotational velocity (rad/s)
a_r: acceleration of motion profile (rad/s^2)
contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
contact_thresh: effort to stop at (units of effort_pct (-100, 100))
"""
check_deprecated_contact_model_base(self,'set_rotational_velocity', contact_thresh_N, contact_thresh)
if a_r is not None:
a_mr_max=self.translate_to_motor_rad(self.params['motion']['max']['accel_m'])
a_mr = self.rotate_to_motor_rad(a_r)
a_mr = min(abs(a_mr), a_mr_max)
else:
a_mr = self.accel_mr
w_sign = numpy.sign(v_r)
v_mr_max = self.translate_to_motor_rad(self.params['motion']['max']['vel_m'])
v_mr = self.rotate_to_motor_rad(v_r)
v_mr = w_sign * min(abs(v_mr), v_mr_max)
stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness
i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=False,
contact_thresh=contact_thresh)
self.left_wheel.set_command(mode=Stepper.MODE_VEL_TRAJ, v_des=-1*v_mr, a_des=a_mr,
i_feedforward=0,
i_contact_pos=i_contact_l,
i_contact_neg=-1 * i_contact_l)
self.right_wheel.set_command(mode=Stepper.MODE_VEL_TRAJ, v_des=v_mr, a_des=a_mr,
stiffness=stiffness,
i_feedforward=0,
i_contact_pos=i_contact_r,
i_contact_neg=-1 * i_contact_r)
set_translate_velocity(self, v_m, a_m=None, stiffness=None, contact_thresh_N=None, contact_thresh=None)
Command the bases translational velocity. Use care to prevent collisions / avoid runaways v_m: desired velocity (m/s) a_m: acceleration of motion profile (m/s^2) contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N) contact_thresh: effort to stop at (units of effort_pct (-100, 100))
Source code in stretch_body/base.py
def set_translate_velocity(self, v_m, a_m=None,stiffness=None, contact_thresh_N=None,contact_thresh=None):
"""
Command the bases translational velocity.
Use care to prevent collisions / avoid runaways
v_m: desired velocity (m/s)
a_m: acceleration of motion profile (m/s^2)
contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
contact_thresh: effort to stop at (units of effort_pct (-100, 100))
"""
check_deprecated_contact_model_base(self,'set_translate_velocity', contact_thresh_N, contact_thresh)
if a_m is not None:
a_m = min(abs(a_m), self.params['motion']['max']['accel_m'])
a_mr = self.translate_to_motor_rad(a_m)
else:
a_mr = self.accel_mr
v_sign = numpy.sign(v_m)
v_m = v_sign * min(abs(v_m), self.params['motion']['max']['vel_m'])
v_mr = self.translate_to_motor_rad(v_m)
stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness
i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=True,contact_thresh=contact_thresh)
self.left_wheel.set_command(mode=Stepper.MODE_VEL_TRAJ, v_des=v_mr, a_des=a_mr,stiffness=stiffness,
i_feedforward=0,
i_contact_pos=i_contact_l,
i_contact_neg=-1 * i_contact_l)
self.right_wheel.set_command(mode=Stepper.MODE_VEL_TRAJ, v_des=v_mr, a_des=a_mr,
stiffness=stiffness,
i_feedforward=0,
i_contact_pos=i_contact_r,
i_contact_neg=-1 * i_contact_r)
set_velocity(self, v_m, w_r, a=None, stiffness=None, contact_thresh_N=None, contact_thresh=None)
Command the bases translational and rotational velocities simultaneously. Use care to prevent collisions / avoid runaways v_m: desired velocity (m/s) w_r: desired rotational velocity (rad/s) a: acceleration of motion profile (m/s^2 and rad/s^2) contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N) contact_thresh: effort to stop at (units of effort_pct (-100, 100))
Source code in stretch_body/base.py
def set_velocity(self, v_m, w_r, a=None,stiffness=None, contact_thresh_N=None,contact_thresh=None):
"""
Command the bases translational and rotational
velocities simultaneously.
Use care to prevent collisions / avoid runaways
v_m: desired velocity (m/s)
w_r: desired rotational velocity (rad/s)
a: acceleration of motion profile (m/s^2 and rad/s^2)
contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
contact_thresh: effort to stop at (units of effort_pct (-100, 100))
"""
check_deprecated_contact_model_base(self,'set_velocity', contact_thresh_N, contact_thresh)
if a is not None:
a = min(abs(a), self.params['motion']['max']['accel_m'])
a_mr = self.translate_to_motor_rad(a)
else:
a_mr = self.accel_mr
# Unicycle dynamics w/o R because
# translate_to_motor_rad accounts for R and gear ratio
wl_m = ((2 * v_m) - (w_r * self.params['wheel_separation_m'])) / 2.0
wr_m = ((2 * v_m) + (w_r * self.params['wheel_separation_m'])) / 2.0
wl_sign = numpy.sign(wl_m)
wl_m = wl_sign * min(abs(wl_m), self.params['motion']['max']['vel_m'])
wl_r = self.translate_to_motor_rad(wl_m)
wr_sign = numpy.sign(wr_m)
wr_m = wr_sign * min(abs(wr_m), self.params['motion']['max']['vel_m'])
wr_r = self.translate_to_motor_rad(wr_m)
stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness
i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=True,
contact_thresh=contact_thresh)
if self.params['use_vel_traj']:
ctrl_mode =Stepper.MODE_VEL_TRAJ
else:
ctrl_mode =Stepper.MODE_VEL_PID
self.left_wheel.set_command(mode=ctrl_mode, v_des=wl_r, a_des=a_mr,
i_feedforward=0,
i_contact_pos=i_contact_l,
i_contact_neg=-1 * i_contact_l)
self.right_wheel.set_command(mode=ctrl_mode, v_des=wr_r, a_des=a_mr,
stiffness=stiffness,
i_feedforward=0,
i_contact_pos=i_contact_r,
i_contact_neg=-1 * i_contact_r)
startup(self, threaded=True)
Starts machinery required to interface with this device
Parameters
threaded : bool whether a thread manages hardware polling/pushing in the background
Returns
bool whether the startup procedure succeeded
Source code in stretch_body/base.py
def startup(self, threaded=True):
#Startup steppers first so that status is populated before this Device thread begins (if threaded==true)
success = self.left_wheel.startup(threaded=False) and self.right_wheel.startup(threaded=False)
if success:
Device.startup(self, threaded=threaded)
self.__update_status()
return success
step_sentry(self, robot)
Only allow fast mobile base motion if the lift is low, the arm is retracted, and the wrist is stowed. This is intended to keep the center of mass low for increased stability and avoid catching the arm or tool on something.
Source code in stretch_body/base.py
def step_sentry(self,robot):
"""
Only allow fast mobile base motion if the lift is low,
the arm is retracted, and the wrist is stowed. This is
intended to keep the center of mass low for increased
stability and avoid catching the arm or tool on
something.
"""
if self.robot_params['robot_sentry']['base_max_velocity']:
x_lift=robot.lift.status['pos']
x_arm =robot.arm.status['pos']
x_wrist =robot.end_of_arm.motors['wrist_yaw'].status['pos']
if ((x_lift < self.params['sentry_max_velocity']['max_lift_height_m']) and
(x_arm < self.params['sentry_max_velocity']['max_arm_extension_m']) and
(x_wrist > self.params['sentry_max_velocity']['min_wrist_yaw_rad'])):
if not self.fast_motion_allowed:
self.logger.debug('Fast motion turned on')
self.fast_motion_allowed = True
else:
if self.fast_motion_allowed:
self.logger.debug('Fast motion turned off')
self.fast_motion_allowed = False
self.left_wheel.step_sentry(robot)
self.right_wheel.step_sentry(robot)
stop(self)
Shuts down machinery started in startup()
Source code in stretch_body/base.py
def stop(self):
Device.stop(self)
if self.left_wheel.hw_valid and int(str(self.left_wheel.board_info['protocol_version'])[1:]) >= 1:
self.left_wheel.stop_waypoint_trajectory()
self._waypoint_lwpos = None
if self.right_wheel.hw_valid and int(str(self.right_wheel.board_info['protocol_version'])[1:]) >= 1:
self.right_wheel.stop_waypoint_trajectory()
self._waypoint_rwpos = None
self.left_wheel.stop()
self.right_wheel.stop()
stop_trajectory(self)
Stop waypoint trajectory immediately and resets hardware
Source code in stretch_body/base.py
def stop_trajectory(self):
"""Stop waypoint trajectory immediately and resets hardware
"""
self.left_wheel.stop_waypoint_trajectory()
self.right_wheel.stop_waypoint_trajectory()
translate_by(self, x_m, v_m=None, a_m=None, stiffness=None, contact_thresh_N=None, contact_thresh=None)
Incremental translation of the base x_m: desired motion (m) v_m: velocity for trapezoidal motion profile (m/s) a_m: acceleration for trapezoidal motion profile (m/s^2) stiffness: stiffness of motion. Range 0.0 (min) to 1.0 (max) contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N) contact_thresh: effort to stop at (units of effort_pct (-100, 100))
Source code in stretch_body/base.py
def translate_by(self, x_m, v_m=None, a_m=None, stiffness=None, contact_thresh_N=None,contact_thresh=None):
"""
Incremental translation of the base
x_m: desired motion (m)
v_m: velocity for trapezoidal motion profile (m/s)
a_m: acceleration for trapezoidal motion profile (m/s^2)
stiffness: stiffness of motion. Range 0.0 (min) to 1.0 (max)
contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
contact_thresh: effort to stop at (units of effort_pct (-100, 100))
"""
check_deprecated_contact_model_base(self,'translate_by', contact_thresh_N, contact_thresh)
x_mr = self.translate_to_motor_rad(x_m)
if v_m is not None:
v_m = min(abs(v_m), self.params['motion']['max']['vel_m'])
v_mr = self.translate_to_motor_rad(v_m)
else:
v_mr = self.vel_mr
if a_m is not None:
a_m = min(abs(a_m), self.params['motion']['max']['accel_m'])
a_mr = self.translate_to_motor_rad(a_m)
else:
a_mr = self.accel_mr
if not self.fast_motion_allowed:
v_mr=min(self.translate_to_motor_rad(self.params['sentry_max_velocity']['limit_vel_m']),v_mr)
a_mr=min(self.translate_to_motor_rad(self.params['sentry_max_velocity']['limit_accel_m']),a_mr)
i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=True, contact_thresh=contact_thresh)
stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness
self.left_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_INCR, x_des=x_mr,
v_des=v_mr,
a_des=a_mr,
stiffness=stiffness,
i_feedforward=0,
i_contact_pos=i_contact_l,
i_contact_neg=-1*i_contact_l)
self.right_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_INCR, x_des=x_mr,
v_des=v_mr,
a_des=a_mr,
stiffness=stiffness,
i_feedforward=0,
i_contact_pos=i_contact_r,
i_contact_neg=-1*i_contact_r)
update_trajectory(self)
Updates hardware with the next segment of self.trajectory
This method must be called frequently to enable complete trajectory execution
and preemption of future segments. If used with stretch_body.robot.Robot
or
with self.startup(threaded=True)
, a background thread is launched for this.
Otherwise, the user must handle calling this method.
Source code in stretch_body/base.py
def update_trajectory(self):
"""Updates hardware with the next segment of `self.trajectory`
This method must be called frequently to enable complete trajectory execution
and preemption of future segments. If used with `stretch_body.robot.Robot` or
with `self.startup(threaded=True)`, a background thread is launched for this.
Otherwise, the user must handle calling this method.
"""
# check if joint valid, right protocol, and right mode
if not self.left_wheel.hw_valid or not self.right_wheel.hw_valid:
return
if int(str(self.left_wheel.board_info['protocol_version'])[1:]) < 1:
return
if int(str(self.right_wheel.board_info['protocol_version'])[1:]) < 1:
return
if self.left_wheel.status['mode'] != self.left_wheel.MODE_POS_TRAJ_WAYPOINT:
return
if self.right_wheel.status['mode'] != self.right_wheel.MODE_POS_TRAJ_WAYPOINT:
return
if self._waypoint_lwpos is None or self._waypoint_rwpos is None:
return
if self.left_wheel.status['waypoint_traj']['state'] == 'active' and self.right_wheel.status['waypoint_traj']['state'] == 'active':
next_segment_id = self.left_wheel.status['waypoint_traj']['segment_id'] - 2 + 1 # subtract 2 due to IDs 0 & 1 being reserved by firmware
if next_segment_id < self.trajectory.get_num_segments():
ls1, rs1 = self.trajectory.get_wheel_segments(next_segment_id, self.translate_to_motor_rad, self.rotate_to_motor_rad,
self._waypoint_lwpos, self._waypoint_rwpos)
self.left_wheel.set_next_trajectory_segment(ls1.to_array())
self.right_wheel.set_next_trajectory_segment(rs1.to_array())
elif self.left_wheel.status['waypoint_traj']['state'] == 'idle' and self.left_wheel.status['mode'] == Stepper.MODE_POS_TRAJ_WAYPOINT and \
self.right_wheel.status['waypoint_traj']['state'] == 'idle' and self.right_wheel.status['mode'] == Stepper.MODE_POS_TRAJ_WAYPOINT:
self._waypoint_lwpos = None
self._waypoint_rwpos = None
self.left_wheel.enable_pos_traj()
self.right_wheel.enable_pos_traj()
self.push_command()
Using the Head class
The interface to Stretch's head is the Head class. The head contains an Intel Realsense D435i depth camera. The pan and tilt joints in the head allow Stretch to swivel and capture depth imagery of its surrounding. The head is typically initialized as:
1 2 3 4 5 6 7 8 9 |
|
Head is a subclass of the DynamixelXChain class, which in turn is a subclass of the Device class. Therefore, some of Head's methods, such as startup()
and home()
are extended from the Device class, while others come from the DynamixelXChain class. Reading the head's current state and sending commands to its revolute joints (head pan and tilt) can be achieved using:
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
|
The attribute status
is a dictionary of dictionaries, where each subdictionary is the status of one of the head's joints. This state information is updated in the background in real-time by default (disable by initializing as startup(threading=False)
). Use the pretty_print()
method to print out this state information in a human-interpretable format.
Commanding the head's revolute joints is done through the move_to()
and move_by()
methods. Notice that, unlike the previous joints, no push command call is required here. These joints are Dynamixel servos, which behave differently than the Hello Robot steppers. Their commands are not queued and are executed as soon as they're received.
Head's two joints, the 'head_pan' and 'head_tilt' are instances of the DynamixelHelloXL430 class and are retrievable using the get_joint()
method. They have the wait_until_at_setpoint()
method, which blocks program execution until the joint reaches the commanded goal.
The pose()
method makes it easy to command the head to common head poses (e.g. looking 'ahead', at the end-of-arm 'tool', obstacles in front of the 'wheels', or 'up').
The head supports waypoint trajectories as well:
27 28 29 30 31 32 33 34 35 36 37 |
|
The head pan and tilt DynamixelHelloXL430 instances have an attribute trajectory
, which is an instance of the RevoluteTrajectory class. The call to follow_trajectory()
begins software tracking of the spline.
Finally, setting soft motion limits for the head's pan and tilt range can be achieved using:
38 39 40 41 42 43 44 45 46 |
|
The set_soft_motion_limit_min/max()
methods perform clipping of the joint's range at the software level (cannot persist across reboots).
All methods of the Head class are documented below.
API to the Stretch RE1 Head
Source code in stretch_body/head.py
class Head(DynamixelXChain):
"""
API to the Stretch RE1 Head
"""
def __init__(self, usb=None):
if usb is None:
usb = RobotParams.get_params()[1]['head']['usb_name']
DynamixelXChain.__init__(self, usb=usb, name='head')
self.joints = ['head_pan', 'head_tilt']
for j in self.joints:
self.add_motor(DynamixelHelloXL430(name=j, chain=self))
self.poses = {'ahead': [0, 0], 'back': [deg_to_rad(-180), deg_to_rad(0)],
'tool': [deg_to_rad(-90), deg_to_rad(-45)],
'wheels': [deg_to_rad(0), deg_to_rad(-90)], 'left': [deg_to_rad(90), deg_to_rad(0)],
'up': [deg_to_rad(0), deg_to_rad(30)]}
def startup(self, threaded=True):
return DynamixelXChain.startup(self, threaded=threaded)
def get_joint(self, joint_name):
"""Retrieves joint by name.
Parameters
----------
joint_name : str
valid joints defined in ``joints``
Returns
-------
DynamixelHelloXL430 or None
Motor object on valid joint name, else None
"""
return self.get_motor(joint_name)
def move_to(self, joint, x_r, v_r=None, a_r=None):
"""
joint: Name of the joint to move ('head_pan' or 'head_tilt')
x_r: commanded absolute position (radians).
v_r: velocity for trapezoidal motion profile (rad/s).
a_r: acceleration for trapezoidal motion profile (rad/s^2)
"""
self.motors[joint].move_to(x_r,v_r,a_r)
def move_by(self, joint, x_r, v_r=None, a_r=None):
"""
joint: Name of the joint to move ('head_pan' or 'head_tilt')
x_r: commanded incremental motion (radians).
v_r: velocity for trapezoidal motion profile (rad/s).
a_r: acceleration for trapezoidal motion profile (rad/s^2)
"""
self.motors[joint].move_by(x_r,v_r,a_r)
def set_velocity(self, joint, v_r, a_r=None):
"""
joint: name of joint (string)
v_r: commanded velocity (rad/s).
a_r: acceleration motion profile (rad/s^2)
"""
self.motors[joint].set_velocity(v_r, a_r)
def home(self):
if self.motors['head_pan'].params['req_calibration']:
with self.pt_lock:
self.motors['head_pan'].home(single_stop=True)
if self.motors['head_tilt'].params['req_calibration']:
with self.pt_lock:
self.motors['head_tilt'].home(single_stop=True)
self.move_to('head_pan', deg_to_rad(0))
self.move_to('head_tilt', deg_to_rad(0))
def pose(self, p, v_r=[None, None], a_r=[None, None]):
"""
p: Dictionary key to named pose (eg 'ahead')
v_r: list, velocities for trapezoidal motion profile (rad/s).
a_r: list, accelerations for trapezoidal motion profile (rad/s^2)
"""
self.move_to('head_pan', self.poses[p][0], v_r[0], a_r[0])
self.move_to('head_tilt', self.poses[p][1], v_r[0], a_r[1])
get_joint(self, joint_name)
Retrieves joint by name.
Parameters
joint_name : str
valid joints defined in joints
Returns
DynamixelHelloXL430 or None Motor object on valid joint name, else None
Source code in stretch_body/head.py
def get_joint(self, joint_name):
"""Retrieves joint by name.
Parameters
----------
joint_name : str
valid joints defined in ``joints``
Returns
-------
DynamixelHelloXL430 or None
Motor object on valid joint name, else None
"""
return self.get_motor(joint_name)
move_by(self, joint, x_r, v_r=None, a_r=None)
joint: Name of the joint to move ('head_pan' or 'head_tilt') x_r: commanded incremental motion (radians). v_r: velocity for trapezoidal motion profile (rad/s). a_r: acceleration for trapezoidal motion profile (rad/s^2)
Source code in stretch_body/head.py
def move_by(self, joint, x_r, v_r=None, a_r=None):
"""
joint: Name of the joint to move ('head_pan' or 'head_tilt')
x_r: commanded incremental motion (radians).
v_r: velocity for trapezoidal motion profile (rad/s).
a_r: acceleration for trapezoidal motion profile (rad/s^2)
"""
self.motors[joint].move_by(x_r,v_r,a_r)
move_to(self, joint, x_r, v_r=None, a_r=None)
joint: Name of the joint to move ('head_pan' or 'head_tilt') x_r: commanded absolute position (radians). v_r: velocity for trapezoidal motion profile (rad/s). a_r: acceleration for trapezoidal motion profile (rad/s^2)
Source code in stretch_body/head.py
def move_to(self, joint, x_r, v_r=None, a_r=None):
"""
joint: Name of the joint to move ('head_pan' or 'head_tilt')
x_r: commanded absolute position (radians).
v_r: velocity for trapezoidal motion profile (rad/s).
a_r: acceleration for trapezoidal motion profile (rad/s^2)
"""
self.motors[joint].move_to(x_r,v_r,a_r)
pose(self, p, v_r=[None, None], a_r=[None, None])
p: Dictionary key to named pose (eg 'ahead') v_r: list, velocities for trapezoidal motion profile (rad/s). a_r: list, accelerations for trapezoidal motion profile (rad/s^2)
Source code in stretch_body/head.py
def pose(self, p, v_r=[None, None], a_r=[None, None]):
"""
p: Dictionary key to named pose (eg 'ahead')
v_r: list, velocities for trapezoidal motion profile (rad/s).
a_r: list, accelerations for trapezoidal motion profile (rad/s^2)
"""
self.move_to('head_pan', self.poses[p][0], v_r[0], a_r[0])
self.move_to('head_tilt', self.poses[p][1], v_r[0], a_r[1])
set_velocity(self, joint, v_r, a_r=None)
joint: name of joint (string) v_r: commanded velocity (rad/s). a_r: acceleration motion profile (rad/s^2)
Source code in stretch_body/head.py
def set_velocity(self, joint, v_r, a_r=None):
"""
joint: name of joint (string)
v_r: commanded velocity (rad/s).
a_r: acceleration motion profile (rad/s^2)
"""
self.motors[joint].set_velocity(v_r, a_r)
startup(self, threaded=True)
Starts machinery required to interface with this device
Parameters
threaded : bool whether a thread manages hardware polling/pushing in the background
Returns
bool whether the startup procedure succeeded
Source code in stretch_body/head.py
def startup(self, threaded=True):
return DynamixelXChain.startup(self, threaded=threaded)
Using the EndOfArm class
The interface to Stretch's end-of-arm is the EndOfArm class. It is typically initialized as:
1 2 3 4 5 6 7 8 9 |
|
All methods of the EndOfArm class are documented below.
The EndOfArm class allows for an extensible serial chain of Dynamixel X series devices It allows the specific type of device to be declared at runtime via the Yaml parameters In this way, a user can add their own custom Dynamixel based tools to the robot end-of-arm by simply deriving it from DynamixelHelloXL430 and declaring the class name / Python module name in the User YAML file
Source code in stretch_body/end_of_arm.py
class EndOfArm(DynamixelXChain):
"""
The EndOfArm class allows for an extensible serial chain of Dynamixel X series devices
It allows the specific type of device to be declared at runtime via the Yaml parameters
In this way, a user can add their own custom Dynamixel based tools to the robot end-of-arm by
simply deriving it from DynamixelHelloXL430 and declaring the class name / Python module name
in the User YAML file
"""
def __init__(self, name='end_of_arm', usb=None):
if usb is None:
usb = RobotParams.get_params()[1]['end_of_arm']['usb_name']
DynamixelXChain.__init__(self, usb=usb, name=name)
self.joints = self.params.get('devices', {}).keys()
for j in self.joints:
module_name = self.params['devices'][j]['py_module_name']
class_name = self.params['devices'][j]['py_class_name']
dynamixel_device = getattr(importlib.import_module(module_name), class_name)(chain=self)
self.add_motor(dynamixel_device)
self.urdf_map={} #Override
def startup(self, threaded=True):
return DynamixelXChain.startup(self, threaded=threaded)
def get_joint(self, joint_name):
"""Retrieves joint by name.
Parameters
----------
joint_name : str
valid joints defined as defined in params['devices']
Returns
-------
DynamixelHelloXL430 or None
Motor object on valid joint name, else None
"""
return self.get_motor(joint_name)
def move_to(self, joint,x_r, v_r=None, a_r=None):
"""
joint: name of joint (string)
x_r: commanded absolute position (radians).
v_r: velocity for trapezoidal motion profile (rad/s).
a_r: acceleration for trapezoidal motion profile (rad/s^2)
"""
with self.pt_lock:
self.motors[joint].move_to(x_r, v_r, a_r)
def move_by(self, joint, x_r, v_r=None, a_r=None):
"""
joint: name of joint (string)
x_r: commanded incremental motion (radians).
v_r: velocity for trapezoidal motion profile (rad/s).
a_r: acceleration for trapezoidal motion profile (rad/s^2)
"""
with self.pt_lock:
self.motors[joint].move_by(x_r, v_r, a_r)
def set_velocity(self, joint, v_r, a_r=None):
"""
joint: name of joint (string)
v_r: commanded velocity (rad/s).
a_r: acceleration motion profile (rad/s^2)
"""
with self.pt_lock:
self.motors[joint].set_velocity(v_r, a_r)
def pose(self,joint, p,v_r=None, a_r=None):
"""
joint: name of joint (string)
p: named pose of joint
v_r: velocity for trapezoidal motion profile (rad/s).
a_r: acceleration for trapezoidal motion profile (rad/s^2)
"""
with self.pt_lock:
self.motors[joint].pose(p, v_r, a_r)
def stow(self):
pass #Override by specific tool
def pre_stow(self,robot=None):
pass #Override by specific tool
def home(self, joint=None):
"""
Home to hardstops
"""
if joint is None:
for j in self.joints:
print('--------- Homing %s ----'%j)
self.motors[j].home()
else:
with self.pt_lock:
self.motors[joint].home()
def is_tool_present(self,class_name):
"""
Return true if the given tool type is present (eg. StretchGripper)
Allows for conditional logic when switching end-of-arm tools
"""
for j in self.joints:
if class_name == self.params['devices'][j]['py_class_name']:
return True
return False
def get_joint_configuration(self,brake_joints={}):
"""
Construct a dictionary of tools current pose (for robot_collision_mgmt)
Keys match joint names in URDF
Specific tools should define urdf_map
"""
ret = {}
for j in self.urdf_map:
jn = self.urdf_map[j]
motor = self.get_joint(jn)
dx = 0.0
try:
if brake_joints[j]:
dx = self.params['collision_mgmt']['k_brake_distance'][jn] * motor.get_braking_distance()
except KeyError:
dx=0
ret[j] = motor.status['pos'] + dx
gripper_joint = None
for j in self.joints:
if 'gripper' in j:
gripper_joint = j
if gripper_joint:
dx = 0
if brake_joints:
for j in brake_joints:
if 'gripper' in j:
dx = self.params['collision_mgmt']['k_brake_distance'][j]
finger_angle = self.get_joint(gripper_joint).status['gripper_conversion']['finger_rad'] + dx
ret['joint_gripper_finger_left'] = finger_angle/2
ret['joint_gripper_finger_right'] = finger_angle/2
return ret
def pre_stow(self,robot=None):
pass
get_joint(self, joint_name)
Retrieves joint by name.
Parameters
joint_name : str valid joints defined as defined in params['devices']
Returns
DynamixelHelloXL430 or None Motor object on valid joint name, else None
Source code in stretch_body/end_of_arm.py
def get_joint(self, joint_name):
"""Retrieves joint by name.
Parameters
----------
joint_name : str
valid joints defined as defined in params['devices']
Returns
-------
DynamixelHelloXL430 or None
Motor object on valid joint name, else None
"""
return self.get_motor(joint_name)
get_joint_configuration(self, brake_joints={})
Construct a dictionary of tools current pose (for robot_collision_mgmt) Keys match joint names in URDF Specific tools should define urdf_map
Source code in stretch_body/end_of_arm.py
def get_joint_configuration(self,brake_joints={}):
"""
Construct a dictionary of tools current pose (for robot_collision_mgmt)
Keys match joint names in URDF
Specific tools should define urdf_map
"""
ret = {}
for j in self.urdf_map:
jn = self.urdf_map[j]
motor = self.get_joint(jn)
dx = 0.0
try:
if brake_joints[j]:
dx = self.params['collision_mgmt']['k_brake_distance'][jn] * motor.get_braking_distance()
except KeyError:
dx=0
ret[j] = motor.status['pos'] + dx
gripper_joint = None
for j in self.joints:
if 'gripper' in j:
gripper_joint = j
if gripper_joint:
dx = 0
if brake_joints:
for j in brake_joints:
if 'gripper' in j:
dx = self.params['collision_mgmt']['k_brake_distance'][j]
finger_angle = self.get_joint(gripper_joint).status['gripper_conversion']['finger_rad'] + dx
ret['joint_gripper_finger_left'] = finger_angle/2
ret['joint_gripper_finger_right'] = finger_angle/2
return ret
home(self, joint=None)
Home to hardstops
Source code in stretch_body/end_of_arm.py
def home(self, joint=None):
"""
Home to hardstops
"""
if joint is None:
for j in self.joints:
print('--------- Homing %s ----'%j)
self.motors[j].home()
else:
with self.pt_lock:
self.motors[joint].home()
is_tool_present(self, class_name)
Return true if the given tool type is present (eg. StretchGripper) Allows for conditional logic when switching end-of-arm tools
Source code in stretch_body/end_of_arm.py
def is_tool_present(self,class_name):
"""
Return true if the given tool type is present (eg. StretchGripper)
Allows for conditional logic when switching end-of-arm tools
"""
for j in self.joints:
if class_name == self.params['devices'][j]['py_class_name']:
return True
return False
move_by(self, joint, x_r, v_r=None, a_r=None)
joint: name of joint (string) x_r: commanded incremental motion (radians). v_r: velocity for trapezoidal motion profile (rad/s). a_r: acceleration for trapezoidal motion profile (rad/s^2)
Source code in stretch_body/end_of_arm.py
def move_by(self, joint, x_r, v_r=None, a_r=None):
"""
joint: name of joint (string)
x_r: commanded incremental motion (radians).
v_r: velocity for trapezoidal motion profile (rad/s).
a_r: acceleration for trapezoidal motion profile (rad/s^2)
"""
with self.pt_lock:
self.motors[joint].move_by(x_r, v_r, a_r)
move_to(self, joint, x_r, v_r=None, a_r=None)
joint: name of joint (string) x_r: commanded absolute position (radians). v_r: velocity for trapezoidal motion profile (rad/s). a_r: acceleration for trapezoidal motion profile (rad/s^2)
Source code in stretch_body/end_of_arm.py
def move_to(self, joint,x_r, v_r=None, a_r=None):
"""
joint: name of joint (string)
x_r: commanded absolute position (radians).
v_r: velocity for trapezoidal motion profile (rad/s).
a_r: acceleration for trapezoidal motion profile (rad/s^2)
"""
with self.pt_lock:
self.motors[joint].move_to(x_r, v_r, a_r)
pose(self, joint, p, v_r=None, a_r=None)
joint: name of joint (string) p: named pose of joint v_r: velocity for trapezoidal motion profile (rad/s). a_r: acceleration for trapezoidal motion profile (rad/s^2)
Source code in stretch_body/end_of_arm.py
def pose(self,joint, p,v_r=None, a_r=None):
"""
joint: name of joint (string)
p: named pose of joint
v_r: velocity for trapezoidal motion profile (rad/s).
a_r: acceleration for trapezoidal motion profile (rad/s^2)
"""
with self.pt_lock:
self.motors[joint].pose(p, v_r, a_r)
set_velocity(self, joint, v_r, a_r=None)
joint: name of joint (string) v_r: commanded velocity (rad/s). a_r: acceleration motion profile (rad/s^2)
Source code in stretch_body/end_of_arm.py
def set_velocity(self, joint, v_r, a_r=None):
"""
joint: name of joint (string)
v_r: commanded velocity (rad/s).
a_r: acceleration motion profile (rad/s^2)
"""
with self.pt_lock:
self.motors[joint].set_velocity(v_r, a_r)
startup(self, threaded=True)
Starts machinery required to interface with this device
Parameters
threaded : bool whether a thread manages hardware polling/pushing in the background
Returns
bool whether the startup procedure succeeded
Source code in stretch_body/end_of_arm.py
def startup(self, threaded=True):
return DynamixelXChain.startup(self, threaded=threaded)
Using the Wacc class
The interface to Stretch's wrist board is the Wacc (wrist + accelerometer) class. This board provides an Arduino and accelerometer sensor that is accessible from the Wacc class. It is typically initialized as:
1 2 3 4 5 6 7 8 9 |
|
All methods of the Wacc class are documented below.
API to the Stretch Wrist Accelerometer (Wacc) Board
Source code in stretch_body/wacc.py
class Wacc(WaccBase):
"""
API to the Stretch Wrist Accelerometer (Wacc) Board
"""
def __init__(self, usb=None, ext_status_cb=None, ext_command_cb=None):
WaccBase.__init__(self, usb=usb, ext_status_cb=ext_status_cb, ext_command_cb=ext_command_cb)
#Order in descending order so more recent protocols/methods override less recent
self.supported_protocols = {'p0': (Wacc_Protocol_P0,), 'p1': (Wacc_Protocol_P1, Wacc_Protocol_P0),'p2': (Wacc_Protocol_P2, Wacc_Protocol_P1, Wacc_Protocol_P0,),
'p3': (Wacc_Protocol_P3, Wacc_Protocol_P2, Wacc_Protocol_P1, Wacc_Protocol_P0,),}
def startup(self, threaded=False):
"""
First determine which protocol version the uC firmware is running.
Based on that version, replaces PimuBase class inheritance with a inheritance to a child class of PimuBase that supports that protocol
"""
WaccBase.startup(self, threaded=threaded)
if self.hw_valid:
if self.board_info['protocol_version'] in self.supported_protocols:
Wacc.__bases__ = self.supported_protocols[self.board_info['protocol_version']]
else:
if self.board_info['protocol_version'] is None:
protocol_msg = """
----------------
Failure in communications for {0} on startup.
Please power cycle the robot and try again.
----------------
""".format(self.name)
else:
protocol_msg = """
----------------
Firmware protocol mismatch on {0}.
Protocol on board is {1}.
Valid protocols are: {2}.
Disabling device.
Please upgrade the firmware and/or version of Stretch Body.
----------------
""".format(self.name, self.board_info['protocol_version'], self.supported_protocols.keys())
self.logger.warning(textwrap.dedent(protocol_msg))
self.hw_valid = False
self.transport.stop()
if self.hw_valid:
self.push_command()
self.pull_status()
return self.hw_valid
startup(self, threaded=False)
First determine which protocol version the uC firmware is running. Based on that version, replaces PimuBase class inheritance with a inheritance to a child class of PimuBase that supports that protocol
Source code in stretch_body/wacc.py
def startup(self, threaded=False):
"""
First determine which protocol version the uC firmware is running.
Based on that version, replaces PimuBase class inheritance with a inheritance to a child class of PimuBase that supports that protocol
"""
WaccBase.startup(self, threaded=threaded)
if self.hw_valid:
if self.board_info['protocol_version'] in self.supported_protocols:
Wacc.__bases__ = self.supported_protocols[self.board_info['protocol_version']]
else:
if self.board_info['protocol_version'] is None:
protocol_msg = """
----------------
Failure in communications for {0} on startup.
Please power cycle the robot and try again.
----------------
""".format(self.name)
else:
protocol_msg = """
----------------
Firmware protocol mismatch on {0}.
Protocol on board is {1}.
Valid protocols are: {2}.
Disabling device.
Please upgrade the firmware and/or version of Stretch Body.
----------------
""".format(self.name, self.board_info['protocol_version'], self.supported_protocols.keys())
self.logger.warning(textwrap.dedent(protocol_msg))
self.hw_valid = False
self.transport.stop()
if self.hw_valid:
self.push_command()
self.pull_status()
return self.hw_valid
Using the Pimu class
The interface to Stretch's power board is the Pimu (power + IMU) class. This board provides a 9-DOF IMU that is accessible from the Pimu class. It is typically initialized as:
1 2 3 4 5 6 7 8 9 |
|
All methods of the Pimu class are documented below.
API to the Stretch Power and IMU board (Pimu)
Source code in stretch_body/pimu.py
class Pimu(PimuBase):
"""
API to the Stretch Power and IMU board (Pimu)
"""
def __init__(self, event_reset=False, usb=None):
PimuBase.__init__(self, event_reset,usb)
# Order in descending order so more recent protocols/methods override less recent
self.supported_protocols = {'p0': (Pimu_Protocol_P0,),
'p1': (Pimu_Protocol_P1, Pimu_Protocol_P0,),
'p2': (Pimu_Protocol_P2, Pimu_Protocol_P1, Pimu_Protocol_P0,),
'p3': (Pimu_Protocol_P3, Pimu_Protocol_P2, Pimu_Protocol_P1, Pimu_Protocol_P0,),
'p4': (Pimu_Protocol_P4, Pimu_Protocol_P3, Pimu_Protocol_P2, Pimu_Protocol_P1, Pimu_Protocol_P0,),
'p5': (Pimu_Protocol_P5, Pimu_Protocol_P4, Pimu_Protocol_P3, Pimu_Protocol_P2, Pimu_Protocol_P1, Pimu_Protocol_P0,),
'p6': (Pimu_Protocol_P6, Pimu_Protocol_P5, Pimu_Protocol_P4, Pimu_Protocol_P3, Pimu_Protocol_P2, Pimu_Protocol_P1, Pimu_Protocol_P0,)
}
def startup(self, threaded=False):
"""
First determine which protocol version the uC firmware is running.
Based on that version, replaces PimuBase class inheritance with a inheritance to a child class of PimuBase that supports that protocol
"""
PimuBase.startup(self, threaded=threaded)
if self.hw_valid:
if self.board_info['protocol_version'] in self.supported_protocols:
Pimu.__bases__ = self.supported_protocols[self.board_info['protocol_version']]
IMU.__bases__= self.imu.supported_protocols[self.board_info['protocol_version']]
else:
if self.board_info['protocol_version'] is None:
protocol_msg = """
----------------
Failure in communications for {0} on startup.
Please power cycle the robot and try again.
----------------
""".format(self.name)
else:
protocol_msg = """
----------------
Firmware protocol mismatch on {0}.
Protocol on board is {1}.
Valid protocols are: {2}.
Disabling device.
Please upgrade the firmware and/or version of Stretch Body.
----------------
""".format(self.name, self.board_info['protocol_version'], self.supported_protocols.keys())
self.logger.warning(textwrap.dedent(protocol_msg))
self.hw_valid = False
self.transport.stop()
if self.hw_valid:
self.push_command()
self.pull_status()
return self.hw_valid
startup(self, threaded=False)
First determine which protocol version the uC firmware is running. Based on that version, replaces PimuBase class inheritance with a inheritance to a child class of PimuBase that supports that protocol
Source code in stretch_body/pimu.py
def startup(self, threaded=False):
"""
First determine which protocol version the uC firmware is running.
Based on that version, replaces PimuBase class inheritance with a inheritance to a child class of PimuBase that supports that protocol
"""
PimuBase.startup(self, threaded=threaded)
if self.hw_valid:
if self.board_info['protocol_version'] in self.supported_protocols:
Pimu.__bases__ = self.supported_protocols[self.board_info['protocol_version']]
IMU.__bases__= self.imu.supported_protocols[self.board_info['protocol_version']]
else:
if self.board_info['protocol_version'] is None:
protocol_msg = """
----------------
Failure in communications for {0} on startup.
Please power cycle the robot and try again.
----------------
""".format(self.name)
else:
protocol_msg = """
----------------
Firmware protocol mismatch on {0}.
Protocol on board is {1}.
Valid protocols are: {2}.
Disabling device.
Please upgrade the firmware and/or version of Stretch Body.
----------------
""".format(self.name, self.board_info['protocol_version'], self.supported_protocols.keys())
self.logger.warning(textwrap.dedent(protocol_msg))
self.hw_valid = False
self.transport.stop()
if self.hw_valid:
self.push_command()
self.pull_status()
return self.hw_valid