Skip to content

Stretch Body API Reference

Stretch Body is the Python interface for working with Stretch. This page serves as a reference for the interfaces defined in the stretch_body library. See the Stretch Body Tutorials for additional information on working with this library.

The Robot Class

The most common interface to Stretch is the Robot class. This class encapsulates all devices on the robot. It is typically initialized as:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
import stretch_body.robot

r = stretch_body.robot.Robot()
if not r.startup():
    exit() # failed to start robot!

# home the joints to find zero, if necessary
if not r.is_calibrated():
    r.home()

# interact with the robot here

The startup() and home() methods start communication with and home each of the robot's devices, respectively. Through the Robot class, users can interact with all devices on the robot. For example, continuing the example above:

12
13
14
15
16
17
18
19
20
21
22
23
# moving joints on the robot
r.arm.pretty_print()
r.lift.pretty_print()
r.base.pretty_print()
r.head.pretty_print()
r.end_of_arm.pretty_print()

# other devices on the robot
r.wacc.pretty_print()
r.pimu.pretty_print()

r.stop()

Each of these devices is defined in separate modules within stretch_body. In the following section, we'll look at the API of these classes. The stop() method shuts down communication with the robot's devices.

All methods in the Robot class are documented below.

API to the Stretch Robot

Source code in stretch_body/robot.py
class Robot(Device):
    """
    API to the Stretch Robot
    """
    def __init__(self):
        Device.__init__(self, 'robot')
        self.monitor = RobotMonitor(self)
        self.trace = RobotTrace(self)
        self.collision = RobotCollisionMgmt(self)
        self.dirty_push_command = False
        self.lock = threading.RLock() #Prevent status thread from triggering motor sync prematurely
        self.status = {'pimu': {}, 'base': {}, 'lift': {}, 'arm': {}, 'head': {}, 'wacc': {}, 'end_of_arm': {}}
        self.async_event_loop = None

        self.pimu=pimu.Pimu()
        self.status['pimu']=self.pimu.status

        self.base=base.Base()
        self.status['base']=self.base.status

        self.lift=lift.Lift()
        self.status['lift']=self.lift.status

        self.arm=arm.Arm()
        self.status['arm']=self.arm.status

        self.head=head.Head()
        self.status['head']=self.head.status

        if 'custom_wacc' in self.params:
            module_name = self.params['custom_wacc']['py_module_name']
            class_name = self.params['custom_wacc']['py_class_name']
            self.wacc=getattr(importlib.import_module(module_name), class_name)(self)
        else:
            self.wacc=wacc.Wacc()
        self.status['wacc']=self.wacc.status

        self.non_dxl_thread = None
        self.dxl_end_of_arm_thread = None
        self.sys_thread = None
        self.dxl_head_thread = None
        self.event_loop_thread = None
        self.collision_mgmt_thread = None

        self.eoa_name= self.params['tool']
        module_name = self.robot_params[self.eoa_name]['py_module_name']
        class_name = self.robot_params[self.eoa_name]['py_class_name']
        self.end_of_arm = getattr(importlib.import_module(module_name), class_name)()
        self.status['end_of_arm'] = self.end_of_arm.status
        self.devices={ 'pimu':self.pimu, 'base':self.base, 'lift':self.lift, 'arm': self.arm, 'head': self.head, 'wacc':self.wacc, 'end_of_arm':self.end_of_arm}

        self.GLOBAL_EXCEPTIONS_LIST = []
        threading.excepthook = self.custom_excepthook


    def custom_excepthook(self, args):
        thread_name = args.thread.name
        Exec = {}
        Exec[thread_name] = {
            'thread': args.thread,
            'exception': {
                'type': args.exc_type,
                'value': args.exc_value,
                'traceback': args.exc_traceback
            }
        }

        # Filter RuntimeError
        if Exec[thread_name]['exception']['type'] == RuntimeError:
            pass
        else:
            print(f"Caught Exception in Thread: {thread_name}")
            traceback.print_exception(args.exc_value)

        self.logger.debug(f"Caught Exception in Thread: {thread_name}")
        self.logger.debug(traceback.format_exception(args.exc_value))

        self.GLOBAL_EXCEPTIONS_LIST.append(Exec[thread_name])

    # ###########  Device Methods #############

    def startup(self,start_non_dxl_thread=True,start_dxl_thread=True,start_sys_mon_thread=True):
        """
        To be called once after class instantiation.
        Prepares devices for communications and motion

        Returns
        -------
        bool
            true if startup of robot succeeded
        """
        did_acquire, self._file_lock = hello_utils.acquire_body_filelock()
        if not did_acquire:
            print('Another process is already using Stretch. Try running "stretch_free_robot_process.py"')
            return False

        self.logger.debug('Starting up Robot {0} of batch {1}'.format(self.params['serial_no'], self.params['batch_name']))
        success = True
        for k in self.devices:
            if self.devices[k] is not None:
                if not self.devices[k].startup(threaded=False):
                    success = False

        if (self.arm.motor.transport.version==0 \
                or self.lift.motor.transport.version==0 \
                or self.base.right_wheel.transport.version == 0 \
                or self.base.left_wheel.transport.version == 0 \
                or self.wacc.transport.version==0 \
                or self.pimu.transport.version==0) and self.params['use_asyncio'] :
            self.logger.warning('Not able to use asyncio for transport communications. Defaulting to sync.')
            self.params['use_asyncio']=0
        else:
            self.start_event_loop()

        #Always startup to load URDFs now and not while thread is running
        self.collision.startup()
        if not self.params['use_collision_manager']: #Turn it off here but allow user to enable it via SW later
            self.disable_collision_mgmt()
        else:
            self.enable_collision_mgmt()

        # Register the signal handlers
        signal.signal(signal.SIGTERM, hello_utils.thread_service_shutdown)
        signal.signal(signal.SIGINT, hello_utils.thread_service_shutdown)

        self.non_dxl_thread = NonDXLStatusThread(self, target_rate_hz=self.params['rates']['NonDXLStatusThread_Hz'])
        self.dxl_end_of_arm_thread = DXLEndOfArmStatusThread(self,target_rate_hz=self.params['rates']['DXLStatusThread_Hz'])
        self.sys_thread = SystemMonitorThread(self, target_rate_hz=self.params['rates']['SystemMonitorThread_Hz'])
        self.dxl_head_thread = DXLHeadStatusThread(self, target_rate_hz=self.params['rates']['DXLStatusThread_Hz'])
        self.collision_mgmt_thread = CollisionMonitorThread(self, target_rate_hz=100)

        if start_non_dxl_thread:
            self.non_dxl_thread.daemon = True
            self.non_dxl_thread.start()
            ts = time.time()
            while not self.non_dxl_thread.first_status and time.time() - ts < 3.0:
                time.sleep(0.01)

        if start_dxl_thread:
            self.dxl_head_thread.daemon = True
            self.dxl_head_thread.start()
            self.dxl_end_of_arm_thread.daemon = True
            self.dxl_end_of_arm_thread.start()

        if start_sys_mon_thread:
            self.sys_thread.daemon = True
            self.sys_thread.start()

        if start_sys_mon_thread and self.collision_mgmt_thread:
            self.collision_mgmt_thread.daemon = True
            self.collision_mgmt_thread.start()

        return success

    def stop(self):
        """
        To be called once before exiting a program
        Cleanly stops down motion and communication
        """
        self.logger.debug('---- Shutting down robot ----')
        self._file_lock.release()
        if self.non_dxl_thread:
            if self.non_dxl_thread.running:
                self.non_dxl_thread.shutdown_flag.set()
                self.non_dxl_thread.join(1)
        if self.dxl_head_thread:
            if self.dxl_head_thread.running:
                self.dxl_head_thread.shutdown_flag.set()
                self.dxl_head_thread.join(1)
        if self.dxl_end_of_arm_thread:
            if self.dxl_end_of_arm_thread.running:
                self.dxl_end_of_arm_thread.shutdown_flag.set()
                self.dxl_end_of_arm_thread.join(1)
        if self.sys_thread:
            if self.sys_thread.running:
                self.sys_thread.shutdown_flag.set()
                self.sys_thread.join(1)
        if self.collision_mgmt_thread:
            if self.collision_mgmt_thread.running:
                self.collision_mgmt_thread.shutdown_flag.set()
                self.collision_mgmt_thread.join(1)
                self.collision.stop()
        for k in self.devices:
            if self.devices[k] is not None:
                self.logger.debug('Shutting down %s'%k)
                self.devices[k].stop()
        self.stop_event_loop()
        self.logger.debug('---- Shutdown complete ----')

    def get_status(self):
        """
        Thread safe and atomic read of current Robot status data
        Returns as a dict.
        """
        with self.lock:
            return self.status.copy()

    def pretty_print(self):
        s=self.get_status()
        print('##################### HELLO ROBOT ##################### ')
        print('Time',time.time())
        print('Serial No',self.params['serial_no'])
        print('Batch', self.params['batch_name'])
        hello_utils.pretty_print_dict('Status',s)

    async def push_command_coro(self):
        tasks = [self.base.left_wheel.push_command_async(),
                 self.base.right_wheel.push_command_async(),
                 self.arm.push_command_async(),
                 self.lift.push_command_async(),
                 self.pimu.push_command_async(),
                 self.wacc.push_command_async()]
        results = await asyncio.gather(*tasks)
        return results

    def push_command(self):
        """
        Cause all queued up RPC commands to be sent down to Devices
        """

        with self.lock:
            ready = self.pimu.is_ready_for_sync()
            if self.params['use_asyncio'] and self.async_event_loop is not None:
                future = asyncio.run_coroutine_threadsafe(self.push_command_coro(),self.async_event_loop)
                future.result()
            else:
                self.base.push_command()
                self.arm.push_command()
                self.lift.push_command()
                self.pimu.push_command()
                self.wacc.push_command()

            # Check if need to do a motor sync by looking at if there's been a pimu sync signal sent
            # since the last stepper.set_command for each joint
            sync_required = (self.pimu.ts_last_motor_sync is not None) and (
                    self.arm.motor.is_sync_required(self.pimu.ts_last_motor_sync)
                    or self.lift.motor.is_sync_required(self.pimu.ts_last_motor_sync)
                    or self.base.right_wheel.is_sync_required(self.pimu.ts_last_motor_sync)
                        or self.base.left_wheel.is_sync_required(self.pimu.ts_last_motor_sync))

            if (self.pimu.ts_last_motor_sync is None or ( ready and sync_required)):
                self.pimu.trigger_motor_sync()

    def enable_collision_mgmt(self):
        self.collision.enable()

    def disable_collision_mgmt(self):
        self.collision.disable()

    def wait_command(self, timeout=15.0, use_motion_generator=True):
        """Pause program execution until all motion complete.

        Queuing up motion and pushing it to the hardware with
        push_command() is designed to be asynchronous, enabling
        reactive control of the robot. However, you might want
        sychronous control, where each command's motion is completed
        entirely before the program moves on to the next command.
        This is where you would use wait_command()

        Parameters
        ----------
        timeout : float
            How long to wait for motion to complete. Must be > 0.1 sec.

        Returns
        -------
        bool
            True if motion completed, False if timed out before motion completed
        """
        time.sleep(0.1)
        timeout = max(0.0, timeout - 0.1)
        done = []
        def check_wait(wait_method):
            done.append(wait_method(timeout, use_motion_generator))
        start = time.time()
        threads = []
        threads.append(threading.Thread(target=check_wait, args=(self.base.wait_while_is_moving,)))
        threads.append(threading.Thread(target=check_wait, args=(self.arm.wait_while_is_moving,)))
        threads.append(threading.Thread(target=check_wait, args=(self.lift.wait_while_is_moving,)))
        threads.append(threading.Thread(target=check_wait, args=(self.head.wait_until_at_setpoint,)))
        threads.append(threading.Thread(target=check_wait, args=(self.end_of_arm.wait_until_at_setpoint,)))
        [thread.start() for thread in threads]
        [thread.join() for thread in threads]
        return all(done)

    # ######### Waypoint Trajectory Interface ##############################


    def is_trajectory_active(self):
        return self.arm.is_trajectory_active() or self.lift.is_trajectory_active() or \
               self.base.is_trajectory_active() or self.end_of_arm.is_trajectory_active() or \
               self.head.is_trajectory_active()


    def follow_trajectory(self):
        success = True
        success = success and self.arm.follow_trajectory(move_to_start_point=False)
        success = success and self.lift.follow_trajectory(move_to_start_point=False)
        success = success and self.base.follow_trajectory()

        # Check if need to do a motor sync by looking at if there's been a pimu sync signal sent
        # since the last stepper.set_command for each joint
        sync_required = (self.pimu.ts_last_motor_sync is not None) and (
                self.arm.motor.ts_last_syncd_motion > self.pimu.ts_last_motor_sync
                or self.lift.motor.ts_last_syncd_motion > self.pimu.ts_last_motor_sync
                or self.base.right_wheel.ts_last_syncd_motion > self.pimu.ts_last_motor_sync
                or self.base.left_wheel.ts_last_syncd_motion > self.pimu.ts_last_motor_sync)
        if self.pimu.ts_last_motor_sync is None or sync_required:
            self.pimu.trigger_motor_sync()

        success = success and self.end_of_arm.follow_trajectory(move_to_start_point=False)
        success = success and self.head.follow_trajectory(move_to_start_point=False)
        return success

    def stop_trajectory(self):
        self.arm.stop_trajectory()
        self.lift.stop_trajectory()
        self.base.stop_trajectory()
        self.end_of_arm.stop_trajectory()
        self.head.stop_trajectory()

# ##################Home and Stow #######################################

    def is_calibrated(self):
        """
        DEPRECATED: replaced by Robot.is_homed()
        """
        self.logger.warn('is_calibrated() has been replaced by is_homed()')
        return self.is_homed()

    def is_homed(self):
        """
        Returns true if homing has been run all joints that require it
        """
        ready = self.lift.motor.status['pos_calibrated']
        ready = ready and self.arm.motor.status['pos_calibrated']
        for j in self.end_of_arm.joints:
            req = self.end_of_arm.motors[j].params['req_calibration'] and not self.end_of_arm.motors[j].is_calibrated
            ready = ready and not req
        return ready

    def get_stow_pos(self,joint):
        """
        Return the stow position of a joint.
        Allow the end_of_arm to override the defaults in order to accomodate stowing different tools
        """
        if 'stow' in self.end_of_arm.params:
            if joint in self.end_of_arm.params['stow']:
                return self.end_of_arm.params['stow'][joint]
        return self.params['stow'][joint]

    def stow(self):
        """
        Cause the robot to move to its stow position
        Blocking.
        """
        self.disable_collision_mgmt()
        self.head.move_to('head_pan', self.get_stow_pos('head_pan'))
        self.head.move_to('head_tilt',self.get_stow_pos('head_pan'))

        lift_stowed=False
        pos_lift = self.get_stow_pos('lift')
        if self.lift.status['pos']<=pos_lift: #Needs to come up before bring in arm
            print('--------- Stowing Lift ----')
            self.lift.move_to(pos_lift)
            self.push_command()
            time.sleep(0.25)
            ts = time.time()
            while not self.lift.motor.status['near_pos_setpoint'] and time.time() - ts < 4.0:
                time.sleep(0.1)
            lift_stowed=True

        # Run pre stow specific to each end of arm
        self.end_of_arm.pre_stow(self)

        #Bring in arm before bring down
        print('--------- Stowing Arm ----')
        self.arm.move_to(self.get_stow_pos('arm'))
        self.push_command()
        time.sleep(0.25)
        ts = time.time()
        while not self.arm.motor.status['near_pos_setpoint'] and time.time() - ts < 6.0:
            time.sleep(0.1)

        self.end_of_arm.stow()
        time.sleep(0.25)

        #Now bring lift down
        if not lift_stowed:
            print('--------- Stowing Lift ----')
            self.lift.move_to(pos_lift)
            self.push_command()
            time.sleep(0.25)
            ts = time.time()
            while not self.lift.motor.status['near_pos_setpoint'] and time.time() - ts < 12.0:
                time.sleep(0.1)
        #Make sure wrist yaw is done before exiting
        while self.end_of_arm.motors['wrist_yaw'].motor.is_moving():
            time.sleep(0.1)
        self.enable_collision_mgmt()

    def home(self):
        """
        Cause the robot to home its joints by moving to hardstops
        Blocking.
        """
        self.disable_collision_mgmt()
        if self.head is not None:
            print('--------- Homing Head ----')
            self.head.home()

        # Wrist pitch should be angled somewhere in between the lift leaving
        # the base of its range and before it reaches the top of its range.
        def _angle_pitch(angle):
            time.sleep(1) # wait 1 sec to leave bottom of lift's range
            if 'wrist_pitch' in self.end_of_arm.joints:
                self.end_of_arm.move_to('wrist_pitch', angle)

        # If gripper present
        if 'stretch_gripper' in self.end_of_arm.joints and 'wrist_pitch' in self.end_of_arm.joints:
            threading.Thread(target=_angle_pitch,args=(self.end_of_arm.params['stow']['wrist_pitch'],)).start()

        # No gripper but tablet or accessory present in dw3 wrist, wrist pitch should face downward
        elif 'wrist_pitch' in self.end_of_arm.joints:
            threading.Thread(target=_angle_pitch,args=(-1.57,)).start()

        # Home the lift
        if self.lift is not None:
            print('--------- Homing Lift ----')
            self.lift.home()

        # Home the arm
        if self.arm is not None:
            print('--------- Homing Arm ----')
            self.arm.home()

        # Home the end-of-arm
        if self.end_of_arm is not None:
            self.end_of_arm.home()

        self.enable_collision_mgmt()
        #Let user know it is done
        self.pimu.trigger_beep()
        self.push_command()
    # ################ Helpers #################################

    def _pull_status_head_dynamixel(self):
        try:
            self.head.pull_status()
        except SerialException:
            self.logger.warning('Serial Exception on Robot._pull_status_head_dynamixel')

    def _update_trajectory_head_dynamixel(self):
        try:
            self.head.update_trajectory()
        except SerialException:
            self.logger.warning('Serial Exception on Robot._update_trajectory_head_dynamixel()')


    def _pull_status_end_of_arm_dynamixel(self):
        try:
            self.end_of_arm.pull_status()
        except SerialException:
            self.logger.warning('Serial Exception on Robot._pull_status_end_of_arm_dynamixel')

    def _update_trajectory_end_of_arm_dynamixel(self):
        try:
            self.end_of_arm.update_trajectory()
        except SerialException:
            self.logger.warning('Serial Exception on Robot._update_trajectory_end_of_arm_dynamixel()')

    def _pull_status_non_dynamixel(self):
        self.wacc.pull_status()
        self.base.pull_status()
        self.lift.pull_status()
        self.arm.pull_status()
        self.pimu.pull_status()

    def _update_trajectory_non_dynamixel(self):
        self.arm.update_trajectory()
        self.lift.update_trajectory()
        self.base.update_trajectory()

    def _step_sentry(self):
        self.head.step_sentry(self)
        self.base.step_sentry(self)
        self.arm.step_sentry(self)
        self.lift.step_sentry(self)
        self.end_of_arm.step_sentry(self)

    def start_event_loop(self):
        self.async_event_loop = asyncio.new_event_loop()
        self.event_loop_thread = threading.Thread(target=self._event_loop, name='AsyncEvenLoopThread')
        self.event_loop_thread.daemon = True
        self.event_loop_thread.start()

    def _event_loop(self):
            asyncio.set_event_loop(self.async_event_loop)
            self.async_event_loop.run_forever()

    def stop_event_loop(self):
        try:
            if self.event_loop_thread:
                asyncio.run_coroutine_threadsafe(self.async_event_loop.stop())
                self.event_loop_thread.join()
        except TypeError as e:
            pass

get_status(self)

Thread safe and atomic read of current Robot status data Returns as a dict.

Source code in stretch_body/robot.py
def get_status(self):
    """
    Thread safe and atomic read of current Robot status data
    Returns as a dict.
    """
    with self.lock:
        return self.status.copy()

get_stow_pos(self, joint)

Return the stow position of a joint. Allow the end_of_arm to override the defaults in order to accomodate stowing different tools

Source code in stretch_body/robot.py
def get_stow_pos(self,joint):
    """
    Return the stow position of a joint.
    Allow the end_of_arm to override the defaults in order to accomodate stowing different tools
    """
    if 'stow' in self.end_of_arm.params:
        if joint in self.end_of_arm.params['stow']:
            return self.end_of_arm.params['stow'][joint]
    return self.params['stow'][joint]

home(self)

Cause the robot to home its joints by moving to hardstops Blocking.

Source code in stretch_body/robot.py
def home(self):
    """
    Cause the robot to home its joints by moving to hardstops
    Blocking.
    """
    self.disable_collision_mgmt()
    if self.head is not None:
        print('--------- Homing Head ----')
        self.head.home()

    # Wrist pitch should be angled somewhere in between the lift leaving
    # the base of its range and before it reaches the top of its range.
    def _angle_pitch(angle):
        time.sleep(1) # wait 1 sec to leave bottom of lift's range
        if 'wrist_pitch' in self.end_of_arm.joints:
            self.end_of_arm.move_to('wrist_pitch', angle)

    # If gripper present
    if 'stretch_gripper' in self.end_of_arm.joints and 'wrist_pitch' in self.end_of_arm.joints:
        threading.Thread(target=_angle_pitch,args=(self.end_of_arm.params['stow']['wrist_pitch'],)).start()

    # No gripper but tablet or accessory present in dw3 wrist, wrist pitch should face downward
    elif 'wrist_pitch' in self.end_of_arm.joints:
        threading.Thread(target=_angle_pitch,args=(-1.57,)).start()

    # Home the lift
    if self.lift is not None:
        print('--------- Homing Lift ----')
        self.lift.home()

    # Home the arm
    if self.arm is not None:
        print('--------- Homing Arm ----')
        self.arm.home()

    # Home the end-of-arm
    if self.end_of_arm is not None:
        self.end_of_arm.home()

    self.enable_collision_mgmt()
    #Let user know it is done
    self.pimu.trigger_beep()
    self.push_command()

is_calibrated(self)

DEPRECATED: replaced by Robot.is_homed()

Source code in stretch_body/robot.py
def is_calibrated(self):
    """
    DEPRECATED: replaced by Robot.is_homed()
    """
    self.logger.warn('is_calibrated() has been replaced by is_homed()')
    return self.is_homed()

is_homed(self)

Returns true if homing has been run all joints that require it

Source code in stretch_body/robot.py
def is_homed(self):
    """
    Returns true if homing has been run all joints that require it
    """
    ready = self.lift.motor.status['pos_calibrated']
    ready = ready and self.arm.motor.status['pos_calibrated']
    for j in self.end_of_arm.joints:
        req = self.end_of_arm.motors[j].params['req_calibration'] and not self.end_of_arm.motors[j].is_calibrated
        ready = ready and not req
    return ready

push_command(self)

Cause all queued up RPC commands to be sent down to Devices

Source code in stretch_body/robot.py
def push_command(self):
    """
    Cause all queued up RPC commands to be sent down to Devices
    """

    with self.lock:
        ready = self.pimu.is_ready_for_sync()
        if self.params['use_asyncio'] and self.async_event_loop is not None:
            future = asyncio.run_coroutine_threadsafe(self.push_command_coro(),self.async_event_loop)
            future.result()
        else:
            self.base.push_command()
            self.arm.push_command()
            self.lift.push_command()
            self.pimu.push_command()
            self.wacc.push_command()

        # Check if need to do a motor sync by looking at if there's been a pimu sync signal sent
        # since the last stepper.set_command for each joint
        sync_required = (self.pimu.ts_last_motor_sync is not None) and (
                self.arm.motor.is_sync_required(self.pimu.ts_last_motor_sync)
                or self.lift.motor.is_sync_required(self.pimu.ts_last_motor_sync)
                or self.base.right_wheel.is_sync_required(self.pimu.ts_last_motor_sync)
                    or self.base.left_wheel.is_sync_required(self.pimu.ts_last_motor_sync))

        if (self.pimu.ts_last_motor_sync is None or ( ready and sync_required)):
            self.pimu.trigger_motor_sync()

startup(self, start_non_dxl_thread=True, start_dxl_thread=True, start_sys_mon_thread=True)

To be called once after class instantiation. Prepares devices for communications and motion

Returns

bool true if startup of robot succeeded

Source code in stretch_body/robot.py
def startup(self,start_non_dxl_thread=True,start_dxl_thread=True,start_sys_mon_thread=True):
    """
    To be called once after class instantiation.
    Prepares devices for communications and motion

    Returns
    -------
    bool
        true if startup of robot succeeded
    """
    did_acquire, self._file_lock = hello_utils.acquire_body_filelock()
    if not did_acquire:
        print('Another process is already using Stretch. Try running "stretch_free_robot_process.py"')
        return False

    self.logger.debug('Starting up Robot {0} of batch {1}'.format(self.params['serial_no'], self.params['batch_name']))
    success = True
    for k in self.devices:
        if self.devices[k] is not None:
            if not self.devices[k].startup(threaded=False):
                success = False

    if (self.arm.motor.transport.version==0 \
            or self.lift.motor.transport.version==0 \
            or self.base.right_wheel.transport.version == 0 \
            or self.base.left_wheel.transport.version == 0 \
            or self.wacc.transport.version==0 \
            or self.pimu.transport.version==0) and self.params['use_asyncio'] :
        self.logger.warning('Not able to use asyncio for transport communications. Defaulting to sync.')
        self.params['use_asyncio']=0
    else:
        self.start_event_loop()

    #Always startup to load URDFs now and not while thread is running
    self.collision.startup()
    if not self.params['use_collision_manager']: #Turn it off here but allow user to enable it via SW later
        self.disable_collision_mgmt()
    else:
        self.enable_collision_mgmt()

    # Register the signal handlers
    signal.signal(signal.SIGTERM, hello_utils.thread_service_shutdown)
    signal.signal(signal.SIGINT, hello_utils.thread_service_shutdown)

    self.non_dxl_thread = NonDXLStatusThread(self, target_rate_hz=self.params['rates']['NonDXLStatusThread_Hz'])
    self.dxl_end_of_arm_thread = DXLEndOfArmStatusThread(self,target_rate_hz=self.params['rates']['DXLStatusThread_Hz'])
    self.sys_thread = SystemMonitorThread(self, target_rate_hz=self.params['rates']['SystemMonitorThread_Hz'])
    self.dxl_head_thread = DXLHeadStatusThread(self, target_rate_hz=self.params['rates']['DXLStatusThread_Hz'])
    self.collision_mgmt_thread = CollisionMonitorThread(self, target_rate_hz=100)

    if start_non_dxl_thread:
        self.non_dxl_thread.daemon = True
        self.non_dxl_thread.start()
        ts = time.time()
        while not self.non_dxl_thread.first_status and time.time() - ts < 3.0:
            time.sleep(0.01)

    if start_dxl_thread:
        self.dxl_head_thread.daemon = True
        self.dxl_head_thread.start()
        self.dxl_end_of_arm_thread.daemon = True
        self.dxl_end_of_arm_thread.start()

    if start_sys_mon_thread:
        self.sys_thread.daemon = True
        self.sys_thread.start()

    if start_sys_mon_thread and self.collision_mgmt_thread:
        self.collision_mgmt_thread.daemon = True
        self.collision_mgmt_thread.start()

    return success

stop(self)

To be called once before exiting a program Cleanly stops down motion and communication

Source code in stretch_body/robot.py
def stop(self):
    """
    To be called once before exiting a program
    Cleanly stops down motion and communication
    """
    self.logger.debug('---- Shutting down robot ----')
    self._file_lock.release()
    if self.non_dxl_thread:
        if self.non_dxl_thread.running:
            self.non_dxl_thread.shutdown_flag.set()
            self.non_dxl_thread.join(1)
    if self.dxl_head_thread:
        if self.dxl_head_thread.running:
            self.dxl_head_thread.shutdown_flag.set()
            self.dxl_head_thread.join(1)
    if self.dxl_end_of_arm_thread:
        if self.dxl_end_of_arm_thread.running:
            self.dxl_end_of_arm_thread.shutdown_flag.set()
            self.dxl_end_of_arm_thread.join(1)
    if self.sys_thread:
        if self.sys_thread.running:
            self.sys_thread.shutdown_flag.set()
            self.sys_thread.join(1)
    if self.collision_mgmt_thread:
        if self.collision_mgmt_thread.running:
            self.collision_mgmt_thread.shutdown_flag.set()
            self.collision_mgmt_thread.join(1)
            self.collision.stop()
    for k in self.devices:
        if self.devices[k] is not None:
            self.logger.debug('Shutting down %s'%k)
            self.devices[k].stop()
    self.stop_event_loop()
    self.logger.debug('---- Shutdown complete ----')

stow(self)

Cause the robot to move to its stow position Blocking.

Source code in stretch_body/robot.py
def stow(self):
    """
    Cause the robot to move to its stow position
    Blocking.
    """
    self.disable_collision_mgmt()
    self.head.move_to('head_pan', self.get_stow_pos('head_pan'))
    self.head.move_to('head_tilt',self.get_stow_pos('head_pan'))

    lift_stowed=False
    pos_lift = self.get_stow_pos('lift')
    if self.lift.status['pos']<=pos_lift: #Needs to come up before bring in arm
        print('--------- Stowing Lift ----')
        self.lift.move_to(pos_lift)
        self.push_command()
        time.sleep(0.25)
        ts = time.time()
        while not self.lift.motor.status['near_pos_setpoint'] and time.time() - ts < 4.0:
            time.sleep(0.1)
        lift_stowed=True

    # Run pre stow specific to each end of arm
    self.end_of_arm.pre_stow(self)

    #Bring in arm before bring down
    print('--------- Stowing Arm ----')
    self.arm.move_to(self.get_stow_pos('arm'))
    self.push_command()
    time.sleep(0.25)
    ts = time.time()
    while not self.arm.motor.status['near_pos_setpoint'] and time.time() - ts < 6.0:
        time.sleep(0.1)

    self.end_of_arm.stow()
    time.sleep(0.25)

    #Now bring lift down
    if not lift_stowed:
        print('--------- Stowing Lift ----')
        self.lift.move_to(pos_lift)
        self.push_command()
        time.sleep(0.25)
        ts = time.time()
        while not self.lift.motor.status['near_pos_setpoint'] and time.time() - ts < 12.0:
            time.sleep(0.1)
    #Make sure wrist yaw is done before exiting
    while self.end_of_arm.motors['wrist_yaw'].motor.is_moving():
        time.sleep(0.1)
    self.enable_collision_mgmt()

wait_command(self, timeout=15.0, use_motion_generator=True)

Pause program execution until all motion complete.

Queuing up motion and pushing it to the hardware with push_command() is designed to be asynchronous, enabling reactive control of the robot. However, you might want sychronous control, where each command's motion is completed entirely before the program moves on to the next command. This is where you would use wait_command()

Parameters

timeout : float How long to wait for motion to complete. Must be > 0.1 sec.

Returns

bool True if motion completed, False if timed out before motion completed

Source code in stretch_body/robot.py
def wait_command(self, timeout=15.0, use_motion_generator=True):
    """Pause program execution until all motion complete.

    Queuing up motion and pushing it to the hardware with
    push_command() is designed to be asynchronous, enabling
    reactive control of the robot. However, you might want
    sychronous control, where each command's motion is completed
    entirely before the program moves on to the next command.
    This is where you would use wait_command()

    Parameters
    ----------
    timeout : float
        How long to wait for motion to complete. Must be > 0.1 sec.

    Returns
    -------
    bool
        True if motion completed, False if timed out before motion completed
    """
    time.sleep(0.1)
    timeout = max(0.0, timeout - 0.1)
    done = []
    def check_wait(wait_method):
        done.append(wait_method(timeout, use_motion_generator))
    start = time.time()
    threads = []
    threads.append(threading.Thread(target=check_wait, args=(self.base.wait_while_is_moving,)))
    threads.append(threading.Thread(target=check_wait, args=(self.arm.wait_while_is_moving,)))
    threads.append(threading.Thread(target=check_wait, args=(self.lift.wait_while_is_moving,)))
    threads.append(threading.Thread(target=check_wait, args=(self.head.wait_until_at_setpoint,)))
    threads.append(threading.Thread(target=check_wait, args=(self.end_of_arm.wait_until_at_setpoint,)))
    [thread.start() for thread in threads]
    [thread.join() for thread in threads]
    return all(done)

The Device Class

The stretch_body library is modular in design. Each subcomponent of Stretch is defined in its class and the Robot class provides an interface that ties all of these classes together. This modularity allows users to plug in new/modified subcomponents into the Robot interface by extending the Device class.

It is possible to interface with a single subcomponent of Stretch by initializing its device class directly. In this section, we'll look at the API of seven subclasses of the Device class: the Arm, Lift, Base, Head, EndOfArm, Wacc, and Pimu subcomponents of Stretch.

Using the Arm class

Top down Stretch arm blueprint Top down Stretch arm blueprint

The interface to Stretch's telescoping arm is the Arm class. It is typically initialized as:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import stretch_body.arm

a = stretch_body.arm.Arm()
a.motor.disable_sync_mode()
if not a.startup():
    exit() # failed to start arm!

a.home()

# interact with the arm here

Since both Arm and Robot are subclasses of the Device class, the same startup() and stop() methods are available here, as well as other Device methods such as home(). Using the Arm class, we can read the arm's current state and send commands to the joint. For example, continuing the example above:

11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
starting_position = a.status['pos']

# move out by 10cm
a.move_to(starting_position + 0.1)
a.push_command()
a.motor.wait_until_at_setpoint()

# move back to starting position quickly
a.move_to(starting_position, v_m=0.2, a_m=0.25)
a.push_command()
a.motor.wait_until_at_setpoint()

a.move_by(0.1) # move out by 10cm
a.push_command()
a.motor.wait_until_at_setpoint()

The move_to() and move_by() methods queue absolute and relative position commands to the arm, respectively, while the nonblocking push_command() method pushes the queued position commands to the hardware for execution.

The attribute motor, an instance of the Stepper class, has the method wait_until_at_setpoint() which blocks program execution until the joint reaches the commanded goal. With firmware P1 or greater installed, it is also possible to queue a waypoint trajectory for the arm to follow:

26
27
28
29
30
31
32
33
34
35
36
starting_position = a.status['pos']

# queue a trajectory consisting of four waypoints
a.trajectory.add(t_s=0, x_m=starting_position)
a.trajectory.add(t_s=3, x_m=0.15)
a.trajectory.add(t_s=6, x_m=0.1)
a.trajectory.add(t_s=9, x_m=0.2)

# trigger trajectory execution
a.follow_trajectory()
import time; time.sleep(9)

The attribute trajectory, an instance of the PrismaticTrajectory class, has the method add() which adds a single waypoint in a linear sliding trajectory. For a well-formed trajectory (see is_valid()), the follow_trajectory() method starts tracking the trajectory for the telescoping arm.

It is also possible to dynamically restrict the arm joint range:

37
38
39
40
41
42
43
44
45
46
47
48
49
50
range_upper_limit = 0.3 # meters

# set soft limits on arm's range
a.set_soft_motion_limit_min(0)
a.set_soft_motion_limit_max(range_upper_limit)
a.push_command()

# command the arm outside the valid range
a.move_to(0.4)
a.push_command()
a.motor.wait_until_at_setpoint()
print(a.status['pos']) # we should expect to see ~0.3

a.stop()

The set_soft_motion_limit_min/max() methods form the basis of an experimental self-collision avoidance system built into Stretch Body.

All methods in the Arm class are documented below.

API to the Stretch Arm

Source code in stretch_body/arm.py
class Arm(PrismaticJoint):
    """
    API to the Stretch Arm
    """
    def __init__(self,usb=None):
        PrismaticJoint.__init__(self, name='arm',usb=usb)

    # ######### Utilties ##############################

    def motor_rad_to_translate_m(self,ang): #input in rad, output m
        return (self.params['chain_pitch']*self.params['chain_sprocket_teeth']/self.params['gr_spur']/(math.pi*2))*ang

    def translate_m_to_motor_rad(self, x):
        return x/(self.params['chain_pitch']*self.params['chain_sprocket_teeth']/self.params['gr_spur']/(math.pi*2))


    def home(self, end_pos=0.1,to_positive_stop=False, measuring=False):
        return PrismaticJoint.home(self,end_pos=end_pos,to_positive_stop=to_positive_stop,measuring=measuring)

home(self, end_pos=0.1, to_positive_stop=False, measuring=False)

end_pos: position to end on to_positive_stop: -- True: Move to the positive direction stop and mark to range_m[1] -- False: Move to the negative direction stop and mark to range_m[0] measuring: After homing to stop, move to opposite stop and report back measured distance return measured range-of-motion if measuring. Return None if not a valide measurement

Source code in stretch_body/arm.py
def home(self, end_pos=0.1,to_positive_stop=False, measuring=False):
    return PrismaticJoint.home(self,end_pos=end_pos,to_positive_stop=to_positive_stop,measuring=measuring)

Using the Lift class

Stretch lift blueprint Stretch lift blueprint

The interface to Stretch's lift is the Lift class. It is typically initialized as:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import stretch_body.lift

l = stretch_body.lift.Lift()
l.motor.disable_sync_mode()
if not l.startup():
    exit() # failed to start lift!

l.home()

# interact with the lift here

The startup() and home() methods are extended from the Device class. Reading the lift's current state and sending commands to the joint occurs similarly to the Arm class:

11
12
13
14
15
16
starting_position = l.status['pos']

# move up by 10cm
l.move_to(starting_position + 0.1)
l.push_command()
l.motor.wait_until_at_setpoint()

The attribute status is a dictionary of the joint's current status. This state information is updated in the background in real-time by default (disable by initializing as startup(threading=False)). Use the pretty_print() method to print out this state info in a human-interpretable format. Setting up waypoint trajectories for the lift is also similar to the Arm:

17
18
19
20
21
22
23
24
25
26
starting_position = l.status['pos']

# queue a trajectory consisting of three waypoints
l.trajectory.add(t_s=0, x_m=starting_position, v_m=0.0)
l.trajectory.add(t_s=3, x_m=0.5,               v_m=0.0)
l.trajectory.add(t_s=6, x_m=0.6,               v_m=0.0)

# trigger trajectory execution
l.follow_trajectory()
import time; time.sleep(6)

The attribute trajectory is also an instance of the PrismaticTrajectory class, and by providing the instantaneous velocity argument v_m to the add() method, a cubic spline can be loaded into the joint's trajectory. The call to follow_trajectory() begins hardware tracking of the spline.

Finally, setting soft motion limits for the lift's range can be done using:

27
28
29
30
31
32
# cut out 0.2m from the top and bottom of the lift's range
l.set_soft_motion_limit_min(0.2)
l.set_soft_motion_limit_max(0.8)
l.push_command()

l.stop()

The set_soft_motion_limit_min/max() methods perform clipping of the joint's range at the firmware level (can persist across reboots).

All methods in the Lift class are documented below.

API to the Stretch Lift

Source code in stretch_body/lift.py
class Lift(PrismaticJoint):
    """
    API to the Stretch Lift
    """
    def __init__(self,usb=None):
        PrismaticJoint.__init__(self, name='lift',usb=usb)

    # ######### Utilties ##############################

    def motor_rad_to_translate_m(self,ang): #input in rad
        d=self.params['pinion_t']*self.params['belt_pitch_m']/math.pi
        lift_m = (math.degrees(ang)/180.0)*math.pi*(d/2)
        return lift_m

    def translate_m_to_motor_rad(self, x):
        d = self.params['pinion_t'] * self.params['belt_pitch_m'] / math.pi
        ang = 180*x/((d/2)*math.pi)
        return math.radians(ang)

    def home(self, end_pos=0.6,to_positive_stop=True, measuring=False):
        return PrismaticJoint.home(self,end_pos=end_pos,to_positive_stop=to_positive_stop,measuring=measuring)

home(self, end_pos=0.6, to_positive_stop=True, measuring=False)

end_pos: position to end on to_positive_stop: -- True: Move to the positive direction stop and mark to range_m[1] -- False: Move to the negative direction stop and mark to range_m[0] measuring: After homing to stop, move to opposite stop and report back measured distance return measured range-of-motion if measuring. Return None if not a valide measurement

Source code in stretch_body/lift.py
def home(self, end_pos=0.6,to_positive_stop=True, measuring=False):
    return PrismaticJoint.home(self,end_pos=end_pos,to_positive_stop=to_positive_stop,measuring=measuring)

Using the Base class

Stretch mobile base diagram Stretch mobile base diagram

Item Notes
A Drive wheels 4 inch diameter, urethane rubber shore 60A
B Cliff sensors Sharp GP2Y0A51SK0F, Analog, range 2-15 cm
C Mecanum wheel Diameter 50mm

The interface to Stretch's mobile base is the Base class. It is typically initialized as:

1
2
3
4
5
6
7
8
9
import stretch_body.base

b = stretch_body.base.Base()
b.left_wheel.disable_sync_mode()
b.right_wheel.disable_sync_mode()
if not b.startup():
    exit() # failed to start base!

# interact with the base here

Stretch's mobile base is a differential drive configuration. The left and right wheels are accessible through Base left_wheel and right_wheel attributes, both of which are instances of the Stepper class. The startup() method is extended from the Device class. Since the mobile base is unconstrained, there is no homing method. The pretty_print() method prints out mobile base state information in a human-interpretable format. We can read the base's current state and send commands using:

10
11
12
13
14
15
16
17
18
19
20
b.pretty_print()

# translate forward by 10cm
b.translate_by(0.1)
b.push_command()
b.left_wheel.wait_until_at_setpoint()

# rotate counter-clockwise by 90 degrees
b.rotate_by(1.57)
b.push_command()
b.left_wheel.wait_until_at_setpoint()

The translate_by() and rotate_by() methods send relative commands similar to the way move_by() behaves for the single degree of freedom joints.

The mobile base also supports velocity control:

21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# command the base to translate forward at 5cm / second
b.set_translate_velocity(0.05)
b.push_command()
import time; time.sleep(1)

# command the base to rotate counter-clockwise at 0.1rad / second
b.set_rotational_velocity(0.1)
b.push_command()
time.sleep(1)

# command the base with translational and rotational velocities
b.set_velocity(0.05, 0.1)
b.push_command()
time.sleep(1)

# stop base motion
b.enable_freewheel_mode()
b.push_command()

The set_translate_velocity() and set_rotational_velocity() methods give velocity control over the translational and rotational components of the mobile base independently. The set_velocity() method gives control over both of these components simultaneously.

To halt motion, you can command zero velocities or command the base into freewheel mode using enable_freewheel_mode(). The mobile base also supports waypoint trajectory following, but the waypoints are part of the SE2 group, where a desired waypoint is defined as an (x, y) point and a theta orientation:

39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# reset odometry calculation
b.first_step = True
b.pull_status()

# queue a trajectory consisting of three waypoints
b.trajectory.add(time=0, x=0.0, y=0.0, theta=0.0)
b.trajectory.add(time=3, x=0.1, y=0.0, theta=0.0)
b.trajectory.add(time=6, x=0.0, y=0.0, theta=0.0)

# trigger trajectory execution
b.follow_trajectory()
import time; time.sleep(6)
print(b.status['x'], b.status['y'], b.status['theta']) # we should expect to see around (0.0, 0.0, 0.0 or 6.28)

b.stop()

Warning

The Base waypoint trajectory following has no notion of obstacles in the environment. It will blindly follow the commanded waypoints. For obstacle avoidance, we recommend employing perception and a path planner.

The attribute trajectory is an instance of the DiffDriveTrajectory class. The call to follow_trajectory() begins hardware tracking of the spline.

All methods of the Base class are documented below.

API to the Stretch Mobile Base

Source code in stretch_body/base.py
class Base(Device):
    """
    API to the Stretch Mobile Base
    """
    def __init__(self, usb_left=None, usb_right=None):
        Device.__init__(self, 'base')
        if usb_left is None:
            usb_left = self.params['usb_name_left_wheel']
        if usb_right is None:
            usb_right = self.params['usb_name_right_wheel']
        self.left_wheel = Stepper(usb=usb_left, name='hello-motor-left-wheel')
        self.right_wheel = Stepper(usb=usb_right, name='hello-motor-right-wheel')
        self.status = {'timestamp_pc':0,'x':0,'y':0,'theta':0,'x_vel':0,'y_vel':0,'theta_vel':0, 'pose_time_s':0,'effort': [0, 0], 'left_wheel': self.left_wheel.status, 'right_wheel': self.right_wheel.status, 'translation_force': 0, 'rotation_torque': 0}
        self.trajectory = DiffDriveTrajectory()
        self._waypoint_lwpos = None
        self._waypoint_rwpos = None
        self.thread_rate_hz = 5.0
        self.first_step=True
        wheel_circumference_m = self.params['wheel_diameter_m'] * pi
        self.meters_per_motor_rad = (wheel_circumference_m / (2.0 * pi)) / self.params['gr']
        self.wheel_separation_m = self.params['wheel_separation_m']

        # Default controller params
        self.stiffness=1.0
        self.vel_mr=self.translate_to_motor_rad(self.params['motion']['default']['vel_m'])
        self.accel_mr=self.translate_to_motor_rad(self.params['motion']['default']['accel_m'])
        self.fast_motion_allowed = True
    # ###########  Device Methods #############

    def startup(self, threaded=True):
        #Startup steppers first so that status is populated before this Device thread begins (if threaded==true)
        success = self.left_wheel.startup(threaded=False) and self.right_wheel.startup(threaded=False)
        if success:
            Device.startup(self, threaded=threaded)
            self.__update_status()
        return success

    def _thread_loop(self):
        self.pull_status()
        self.update_trajectory()

    def stop(self):
        Device.stop(self)
        if self.left_wheel.hw_valid and int(str(self.left_wheel.board_info['protocol_version'])[1:]) >= 1:
            self.left_wheel.stop_waypoint_trajectory()
            self._waypoint_lwpos = None
        if self.right_wheel.hw_valid and int(str(self.right_wheel.board_info['protocol_version'])[1:]) >= 1:
            self.right_wheel.stop_waypoint_trajectory()
            self._waypoint_rwpos = None
        self.left_wheel.stop()
        self.right_wheel.stop()

    def pretty_print(self):
        print('----------Base------')
        print('X (m)',self.status['x'])
        print('Y (m)',self.status['y'])
        print('Theta (rad)',self.status['theta'])
        print('X_vel (m/s)', self.status['x_vel'])
        print('Y_vel (m/s)', self.status['y_vel'])
        print('Theta_vel (rad/s)', self.status['theta_vel'])
        print('Pose time (s)', self.status['pose_time_s'])
        print('Timestamp PC (s):', self.status['timestamp_pc'])
        print('-----Left-Wheel-----')
        self.left_wheel.pretty_print()
        print('-----Right-Wheel-----')
        self.right_wheel.pretty_print()

    # ###################################################
    def enable_freewheel_mode(self):
        """
        Force motors into freewheel
        """
        self.left_wheel.enable_freewheel()
        self.right_wheel.enable_freewheel()

    def enable_pos_incr_mode(self):
        """
                Force motors into incremental position mode
        """
        self.left_wheel.enable_pos_traj_incr()
        self.right_wheel.enable_pos_traj_incr()

    # ###################################################

    def wait_for_contact(self, timeout=5.0):
        ts = time.time()
        while (time.time() - ts < timeout):
            self.pull_status()
            if self.left_wheel.status['in_guarded_event'] or self.right_wheel.status['in_guarded_event']:
                return True
            time.sleep(0.01)
        return False

    def wait_while_is_moving(self,timeout=15.0, use_motion_generator=True):
        done = []
        def check_wait(wait_method):
            done.append(wait_method(timeout,use_motion_generator))
        threads = []
        threads.append(threading.Thread(target=check_wait, args=(self.left_wheel.wait_while_is_moving,)))
        threads.append(threading.Thread(target=check_wait, args=(self.right_wheel.wait_while_is_moving,)))
        [thread.start() for thread in threads]
        [thread.join() for thread in threads]
        return all(done)

    def wait_until_at_setpoint(self, timeout=15.0):
        #Assume both are in motion. This will exit once both are at setpoints
        at_setpoint = []
        def check_wait(wait_method):
            at_setpoint.append(wait_method(timeout))
        threads = []
        threads.append(threading.Thread(target=check_wait, args=(self.left_wheel.wait_until_at_setpoint,)))
        threads.append(threading.Thread(target=check_wait, args=(self.right_wheel.wait_until_at_setpoint,)))
        [done_thread.start() for done_thread in threads]
        [done_thread.join() for done_thread in threads]
        return all(at_setpoint)

    def contact_thresh_to_motor_current(self,is_translate,contact_thresh):
        """
        This model converts from a specified percentage effort (-100 to 100) of translate/rotational effort to motor currents
        """
        if is_translate:
            e_c = self.params['contact_models']['effort_pct']['contact_thresh_translate_default'] if contact_thresh is None else contact_thresh
            m=self.params['contact_models']['effort_pct']['contact_thresh_translate_max']
            i_l, i_r = self.translation_effort_pct_to_motor_current(min(m,max(e_c,-1*m)))
        else:
            e_c = self.params['contact_models']['effort_pct']['contact_thresh_rotate_default'] if contact_thresh is None else contact_thresh
            m = self.params['contact_models']['effort_pct']['contact_thresh_rotate_max']
            i_l, i_r = self.rotation_effort_pct_to_motor_current(min(m, max(e_c, -1 * m)))
        return i_l,i_r

    def translate_by(self, x_m, v_m=None, a_m=None, stiffness=None, contact_thresh_N=None,contact_thresh=None):
        """
        Incremental translation of the base
        x_m: desired motion (m)
        v_m: velocity for trapezoidal motion profile (m/s)
        a_m: acceleration for trapezoidal motion profile (m/s^2)
        stiffness: stiffness of motion. Range 0.0 (min) to 1.0 (max)
        contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
        contact_thresh: effort to stop at (units of effort_pct (-100, 100))
        """
        check_deprecated_contact_model_base(self,'translate_by', contact_thresh_N, contact_thresh)

        x_mr = self.translate_to_motor_rad(x_m)

        if v_m is not None:
            v_m = min(abs(v_m), self.params['motion']['max']['vel_m'])
            v_mr = self.translate_to_motor_rad(v_m)
        else:
            v_mr = self.vel_mr

        if a_m is not None:
            a_m = min(abs(a_m), self.params['motion']['max']['accel_m'])
            a_mr = self.translate_to_motor_rad(a_m)
        else:
            a_mr = self.accel_mr

        if not self.fast_motion_allowed:
            v_mr=min(self.translate_to_motor_rad(self.params['sentry_max_velocity']['limit_vel_m']),v_mr)
            a_mr=min(self.translate_to_motor_rad(self.params['sentry_max_velocity']['limit_accel_m']),a_mr)

        i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=True, contact_thresh=contact_thresh)

        stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness


        self.left_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_INCR, x_des=x_mr,
                                    v_des=v_mr,
                                    a_des=a_mr,
                                    stiffness=stiffness,
                                    i_feedforward=0,
                                    i_contact_pos=i_contact_l,
                                    i_contact_neg=-1*i_contact_l)
        self.right_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_INCR, x_des=x_mr,
                                    v_des=v_mr,
                                    a_des=a_mr,
                                    stiffness=stiffness,
                                    i_feedforward=0,
                                    i_contact_pos=i_contact_r,
                                    i_contact_neg=-1*i_contact_r)


    def rotate_by(self, x_r, v_r=None, a_r=None, stiffness=None, contact_thresh_N=None, contact_thresh=None):
        """
        Incremental rotation of the base
        x_r: desired motion (radians)
        v_r: velocity for trapezoidal motion profile (rad/s)
        a_r: acceleration for trapezoidal motion profile (rad/s^2)
        stiffness: stiffness of motion. Range 0.0 (min) to 1.0 (max)
        contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
        contact_thresh: effort to stop at (units of effort_pct (-100, 100))
        """

        check_deprecated_contact_model_base(self,'rotate_by', contact_thresh_N, contact_thresh)

        x_mr = self.rotate_to_motor_rad(x_r)

        if v_r is not None:
            v_mr_max = self.translate_to_motor_rad(self.params['motion']['max']['vel_m'])
            v_mr = self.rotate_to_motor_rad(v_r)
            v_mr=min(abs(v_mr),v_mr_max)
        else:
            v_mr = self.vel_mr

        if a_r is not None:
            a_mr_max = self.translate_to_motor_rad(self.params['motion']['max']['accel_m'])
            a_mr = self.rotate_to_motor_rad(a_r)
            a_mr = min(abs(a_mr), a_mr_max)
        else:
            a_mr = self.accel_mr

        if not self.fast_motion_allowed:
            v_mr=min(self.translate_to_motor_rad(self.params['sentry_max_velocity']['limit_vel_m']),v_mr)
            a_mr=min(self.translate_to_motor_rad(self.params['sentry_max_velocity']['limit_accel_m']),a_mr)


        i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=False, contact_thresh=contact_thresh)

        stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness
        self.left_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_INCR,x_des=-1*x_mr,
                                    v_des=v_mr,
                                    a_des=a_mr,
                                    stiffness=stiffness,
                                    i_feedforward=0,
                                    i_contact_pos=i_contact_l,
                                    i_contact_neg=-1 * i_contact_l)
        self.right_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_INCR,x_des=x_mr,
                                     v_des=v_mr,
                                     a_des=a_mr,
                                     stiffness=stiffness,
                                     i_feedforward=0,
                                     i_contact_pos=i_contact_r,
                                     i_contact_neg=-1 * i_contact_r)



    def set_translate_velocity(self, v_m, a_m=None,stiffness=None, contact_thresh_N=None,contact_thresh=None):
        """
        Command the bases translational velocity.
        Use care to prevent collisions / avoid runaways
        v_m: desired velocity (m/s)
        a_m: acceleration of motion profile (m/s^2)
        contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
        contact_thresh: effort to stop at (units of effort_pct (-100, 100))
        """

        check_deprecated_contact_model_base(self,'set_translate_velocity', contact_thresh_N, contact_thresh)

        if a_m is not None:
            a_m = min(abs(a_m), self.params['motion']['max']['accel_m'])
            a_mr = self.translate_to_motor_rad(a_m)
        else:
            a_mr = self.accel_mr
        v_sign = numpy.sign(v_m)
        v_m = v_sign * min(abs(v_m), self.params['motion']['max']['vel_m'])
        v_mr = self.translate_to_motor_rad(v_m)
        stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness
        i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=True,contact_thresh=contact_thresh)

        self.left_wheel.set_command(mode=Stepper.MODE_VEL_TRAJ, v_des=v_mr, a_des=a_mr,stiffness=stiffness,
                                    i_feedforward=0,
                                    i_contact_pos=i_contact_l,
                                    i_contact_neg=-1 * i_contact_l)
        self.right_wheel.set_command(mode=Stepper.MODE_VEL_TRAJ, v_des=v_mr, a_des=a_mr,
                                     stiffness=stiffness,
                                     i_feedforward=0,
                                     i_contact_pos=i_contact_r,
                                     i_contact_neg=-1 * i_contact_r)

    def set_rotational_velocity(self, v_r, a_r=None,stiffness=None, contact_thresh_N=None,contact_thresh=None):
        """
        Command the bases rotational velocity.
        Use care to prevent collisions / avoid runaways
        v_r: desired rotational velocity (rad/s)
        a_r: acceleration of motion profile (rad/s^2)
        contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
        contact_thresh: effort to stop at (units of effort_pct (-100, 100))
        """

        check_deprecated_contact_model_base(self,'set_rotational_velocity', contact_thresh_N, contact_thresh)

        if a_r is not None:
            a_mr_max=self.translate_to_motor_rad(self.params['motion']['max']['accel_m'])
            a_mr = self.rotate_to_motor_rad(a_r)
            a_mr = min(abs(a_mr), a_mr_max)
        else:
            a_mr = self.accel_mr

        w_sign = numpy.sign(v_r)
        v_mr_max = self.translate_to_motor_rad(self.params['motion']['max']['vel_m'])
        v_mr = self.rotate_to_motor_rad(v_r)
        v_mr = w_sign * min(abs(v_mr), v_mr_max)
        stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness
        i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=False,
                                                                        contact_thresh=contact_thresh)
        self.left_wheel.set_command(mode=Stepper.MODE_VEL_TRAJ, v_des=-1*v_mr, a_des=a_mr,
                                    i_feedforward=0,
                                    i_contact_pos=i_contact_l,
                                    i_contact_neg=-1 * i_contact_l)
        self.right_wheel.set_command(mode=Stepper.MODE_VEL_TRAJ, v_des=v_mr, a_des=a_mr,
                                     stiffness=stiffness,
                                     i_feedforward=0,
                                     i_contact_pos=i_contact_r,
                                     i_contact_neg=-1 * i_contact_r)

    def set_velocity(self, v_m, w_r, a=None,stiffness=None, contact_thresh_N=None,contact_thresh=None):
        """
        Command the bases translational and rotational
        velocities simultaneously.
        Use care to prevent collisions / avoid runaways
        v_m: desired velocity (m/s)
        w_r: desired rotational velocity (rad/s)
        a:   acceleration of motion profile (m/s^2 and rad/s^2)
        contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
        contact_thresh: effort to stop at (units of effort_pct (-100, 100))
        """

        check_deprecated_contact_model_base(self,'set_velocity', contact_thresh_N, contact_thresh)


        if a is not None:
            a = min(abs(a), self.params['motion']['max']['accel_m'])
            a_mr = self.translate_to_motor_rad(a)
        else:
            a_mr = self.accel_mr

        # Unicycle dynamics w/o R because
        # translate_to_motor_rad accounts for R and gear ratio
        wl_m = ((2 * v_m) - (w_r * self.params['wheel_separation_m'])) / 2.0
        wr_m = ((2 * v_m) + (w_r * self.params['wheel_separation_m'])) / 2.0

        wl_sign = numpy.sign(wl_m)
        wl_m = wl_sign * min(abs(wl_m), self.params['motion']['max']['vel_m'])
        wl_r = self.translate_to_motor_rad(wl_m)

        wr_sign = numpy.sign(wr_m)
        wr_m = wr_sign * min(abs(wr_m), self.params['motion']['max']['vel_m'])
        wr_r = self.translate_to_motor_rad(wr_m)

        stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness

        i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=True,
                                                                        contact_thresh=contact_thresh)

        if self.params['use_vel_traj']:
            ctrl_mode =Stepper.MODE_VEL_TRAJ
        else:
            ctrl_mode =Stepper.MODE_VEL_PID

        self.left_wheel.set_command(mode=ctrl_mode, v_des=wl_r, a_des=a_mr,
                                    i_feedforward=0,
                                    i_contact_pos=i_contact_l,
                                    i_contact_neg=-1 * i_contact_l)
        self.right_wheel.set_command(mode=ctrl_mode, v_des=wr_r, a_des=a_mr,
                                     stiffness=stiffness,
                                     i_feedforward=0,
                                     i_contact_pos=i_contact_r,
                                     i_contact_neg=-1 * i_contact_r)

    # ######### Waypoint Trajectory Interface ##############################
    def follow_trajectory(self, v_r=None, a_r=None, stiffness=None, contact_thresh_N=None, contact_thresh=None):
        """Starts executing a waypoint trajectory

        `self.trajectory` must be populated with a valid trajectory before calling
        this method.

        Parameters
        ----------
        v_r : float
            velocity limit for trajectory in motor space in meters per second
        a_r : float
            acceleration limit for trajectory in motor space in meters per second squared
        stiffness : float
            stiffness of motion. Range 0.0 (min) to 1.0 (max)
        contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
        contact_thresh: effort to stop at (units of effort_pct (-100, 100))
        """

        check_deprecated_contact_model_base(self,'follow_trajectory', contact_thresh_N, contact_thresh)

        # check if joint valid, traj active, and right protocol
        if not self.left_wheel.hw_valid or not self.right_wheel.hw_valid:
            self.logger.warning('Base connection to hardware not valid')
            return False
        if self._waypoint_lwpos is not None or self._waypoint_rwpos is not None:
            self.logger.warning('Base waypoint trajectory already active')
            return False
        if int(str(self.left_wheel.board_info['protocol_version'])[1:]) < 1:
            self.logger.warning("Base left motor firmware version doesn't support waypoint trajectories")
            return False
        if int(str(self.right_wheel.board_info['protocol_version'])[1:]) < 1:
            self.logger.warning("Base right motor firmware version doesn't support waypoint trajectories")
            return False

        # check if trajectory valid
        vel_limit = v_r if v_r is not None else self.params['motion']['trajectory_max']['vel_r']
        acc_limit = a_r if a_r is not None else self.params['motion']['trajectory_max']['accel_r']
        valid, reason = self.trajectory.is_valid(vel_limit, acc_limit, self.translate_to_motor_rad, self.rotate_to_motor_rad)
        if not valid:
            self.logger.warning('Base trajectory not valid: {0}'.format(reason))
            return False
        if valid and reason == "must have at least two waypoints":
            # skip this device
            return True

        # set defaults
        stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness

        v = self.translate_to_motor_rad(min(abs(v_r), self.params['motion']['trajectory_max']['vel_r'])) \
            if v_r is not None else self.translate_to_motor_rad(self.params['motion']['trajectory_max']['vel_r'])
        a = self.translate_to_motor_rad(min(abs(a_r), self.params['motion']['trajectory_max']['accel_r'])) \
            if a_r is not None else self.translate_to_motor_rad(self.params['motion']['trajectory_max']['accel_r'])

        i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=True,contact_thresh=contact_thresh)

        # start trajectory
        self.left_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_WAYPOINT,
                                    v_des=v,
                                    a_des=a,
                                    stiffness=stiffness,
                                    i_contact_pos=i_contact_l,
                                    i_contact_neg=-i_contact_l*-1)
        self.right_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_WAYPOINT,
                                     v_des=v,
                                     a_des=a,
                                     stiffness=stiffness,
                                     i_contact_pos=i_contact_r,
                                     i_contact_neg=-i_contact_r*-1)
        self.left_wheel.push_command()
        self.right_wheel.push_command()
        self.left_wheel.pull_status()
        self.right_wheel.pull_status()
        self._waypoint_lwpos = self.left_wheel.status['pos']
        self._waypoint_rwpos = self.right_wheel.status['pos']
        ls0, rs0 = self.trajectory.get_wheel_segments(0, self.translate_to_motor_rad, self.rotate_to_motor_rad,
            self._waypoint_lwpos, self._waypoint_rwpos)
        return self.left_wheel.start_waypoint_trajectory(ls0.to_array()) and \
            self.right_wheel.start_waypoint_trajectory(rs0.to_array())

    def reset_odometry(self):
        """
        Reset X/Y/Theta to report 0
        """
        self.first_step = True

    def is_trajectory_active(self):
        return (self.left_wheel.status['waypoint_traj']['state'] == 'active' or self.right_wheel.status['waypoint_traj']['state'] == 'active')

    def get_trajectory_ts(self):
        # Return trajectory execution time
        if self.is_trajectory_active() and self.left_wheel._waypoint_ts is not None and self.right_wheel._waypoint_ts:
            return max(time.time()-self.left_wheel._waypoint_ts,time.time()-self.right_wheel._waypoint_ts)
        elif len(self.trajectory.waypoints):
            return self.trajectory.waypoints[-1].time
        else:
            return 0

    def get_trajectory_time_remaining(self):
        if not self.is_trajectory_active():
            return 0
        else:
            return max(0,self.trajectory.waypoints[-1].time - self.get_trajectory_ts())

    def update_trajectory(self):
        """Updates hardware with the next segment of `self.trajectory`

        This method must be called frequently to enable complete trajectory execution
        and preemption of future segments. If used with `stretch_body.robot.Robot` or
        with `self.startup(threaded=True)`, a background thread is launched for this.
        Otherwise, the user must handle calling this method.
        """
        # check if joint valid, right protocol, and right mode
        if not self.left_wheel.hw_valid or not self.right_wheel.hw_valid:
            return
        if int(str(self.left_wheel.board_info['protocol_version'])[1:]) < 1:
            return
        if int(str(self.right_wheel.board_info['protocol_version'])[1:]) < 1:
            return
        if self.left_wheel.status['mode'] != self.left_wheel.MODE_POS_TRAJ_WAYPOINT:
            return
        if self.right_wheel.status['mode'] != self.right_wheel.MODE_POS_TRAJ_WAYPOINT:
            return
        if self._waypoint_lwpos is None or self._waypoint_rwpos is None:
            return

        if self.left_wheel.status['waypoint_traj']['state'] == 'active' and self.right_wheel.status['waypoint_traj']['state'] == 'active':
            next_segment_id = self.left_wheel.status['waypoint_traj']['segment_id'] - 2 + 1 # subtract 2 due to IDs 0 & 1 being reserved by firmware
            if next_segment_id < self.trajectory.get_num_segments():
                ls1, rs1 = self.trajectory.get_wheel_segments(next_segment_id, self.translate_to_motor_rad, self.rotate_to_motor_rad,
                    self._waypoint_lwpos, self._waypoint_rwpos)
                self.left_wheel.set_next_trajectory_segment(ls1.to_array())
                self.right_wheel.set_next_trajectory_segment(rs1.to_array())
        elif self.left_wheel.status['waypoint_traj']['state'] == 'idle' and self.left_wheel.status['mode'] == Stepper.MODE_POS_TRAJ_WAYPOINT and \
            self.right_wheel.status['waypoint_traj']['state'] == 'idle' and self.right_wheel.status['mode'] == Stepper.MODE_POS_TRAJ_WAYPOINT:
            self._waypoint_lwpos = None
            self._waypoint_rwpos = None
            self.left_wheel.enable_pos_traj()
            self.right_wheel.enable_pos_traj()
            self.push_command()

    def stop_trajectory(self):
        """Stop waypoint trajectory immediately and resets hardware
        """
        self.left_wheel.stop_waypoint_trajectory()
        self.right_wheel.stop_waypoint_trajectory()

    def step_sentry(self,robot):
        """
        Only allow fast mobile base motion if the lift is low,
        the arm is retracted, and the wrist is stowed. This is
        intended to keep the center of mass low for increased
        stability and avoid catching the arm or tool on
        something.
        """
        if self.robot_params['robot_sentry']['base_max_velocity']:
            x_lift=robot.lift.status['pos']
            x_arm =robot.arm.status['pos']
            x_wrist =robot.end_of_arm.motors['wrist_yaw'].status['pos']

            if ((x_lift < self.params['sentry_max_velocity']['max_lift_height_m']) and
                    (x_arm < self.params['sentry_max_velocity']['max_arm_extension_m']) and
                    (x_wrist > self.params['sentry_max_velocity']['min_wrist_yaw_rad'])):
                if not self.fast_motion_allowed:
                    self.logger.debug('Fast motion turned on')
                self.fast_motion_allowed = True
            else:
                if self.fast_motion_allowed:
                    self.logger.debug('Fast motion turned off')
                self.fast_motion_allowed = False

        self.left_wheel.step_sentry(robot)
        self.right_wheel.step_sentry(robot)

    # ###################################################
    def push_command(self):
        self.left_wheel.push_command()
        self.right_wheel.push_command()

    def pull_status(self):
        """
        Computes base odometery based on stepper positions / velocities
        """
        self.left_wheel.pull_status()
        self.right_wheel.pull_status()
        self.__update_status()

    async def pull_status_async(self):
        """
        Computes base odometery based on stepper positions / velocities
        """
        await self.left_wheel.pull_status_async()
        await self.right_wheel.pull_status_async()
        self.__update_status()

    def __update_status(self):

        self.status['timestamp_pc'] = time.time()
        p0 = self.status['left_wheel']['pos']
        p1 = self.status['right_wheel']['pos']
        t0 = self.status['left_wheel']['timestamp']
        t1 = self.status['right_wheel']['timestamp']
        self.status['translation_force'] = 0 #Deprecated
        self.status['rotation_torque'] = 0 #Deprecated

        if self.first_step:
            # Upon the first step, simply set the initial pose, since
            # no movement has yet been recorded.
            self.first_step = False

            self.p0 = p0
            self.p1 = p1

            self.t0_s = t0
            self.t1_s = t1

            self.status['x'] = 0.0
            self.status['y'] = 0.0
            self.status['theta'] = 0.0

            self.status['x_vel'] = 0.0
            self.status['y_vel'] = 0.0
            self.status['theta_vel'] = 0.0

        else:
            ######################################################
            # The odometry related was wrtten starting on January 14,
            # 2019 while looking at the following BSD-3-Clause licensed
            # code for reference.

            # https://github.com/merose/diff_drive/blob/master/src/diff_drive/odometry.py

            # The code uses standard calculations. The reference code
            # specifically cites a document with the following link,
            # which appears to be broken.

            # https://chess.eecs.berkeley.edu/eecs149/documentation/differentialDrive.pdf

            # There are many sources on the internet for these
            # equations. For example, the following document is
            # helpful:

            # http://www8.cs.umu.se/kurser/5DV122/HT13/material/Hellstrom-ForwardKinematics.pdf

            prev_t0_s = self.t0_s
            prev_t1_s = self.t1_s
            t0_s = t0
            t1_s = t1

            delta_t0_s = t0_s - prev_t0_s
            delta_t1_s = t1_s - prev_t1_s

            if (delta_t0_s > 0.0) and (delta_t1_s > 0.0):
                # update if time has passed for both motor readings, otherwise do nothing

                average_delta_t_s = (delta_t0_s + delta_t1_s) / 2.0

                # update the times, since both motors have new readings
                self.prev_t0_s = self.t0_s
                self.prev_t1_s = self.t1_s
                self.t0_s = t0_s
                self.t1_s = t1_s

                # need to check on wrap around / rollover for wheel positions
                self.prev_p0 = self.p0
                self.prev_p1 = self.p1
                self.p0 = p0
                self.p1 = p1

                # Transform the wheel rotations so that left and right
                # wheel distances in meters have the convention that
                # positive values for each wheel corresponds with forward
                # motion of the mobile base.
                prev_left_m = self.prev_p0 * self.meters_per_motor_rad
                left_m = self.p0 * self.meters_per_motor_rad

                prev_right_m = self.prev_p1 * self.meters_per_motor_rad
                right_m = self.p1 * self.meters_per_motor_rad

                delta_left_m = left_m - prev_left_m
                delta_right_m = right_m - prev_right_m

                delta_travel = (delta_right_m + delta_left_m) / 2.0
                delta_theta = (delta_right_m - delta_left_m) / self.wheel_separation_m

                prev_x = self.status['x']
                prev_y = self.status['y']
                prev_theta = self.status['theta']

                if delta_left_m == delta_right_m:
                    # delta_theta is 0.0, which would result in a divide
                    # by zero error and corresponds with an infinite
                    # radius of curvature (0 curvature).
                    delta_x = delta_travel * cos(prev_theta)
                    delta_y = delta_travel * sin(prev_theta)
                else:
                    # calculate the instantaneous center of curvature (ICC)
                    icc_radius = delta_travel / delta_theta
                    icc_x = prev_x - (icc_radius * sin(prev_theta))
                    icc_y = prev_y + (icc_radius * cos(prev_theta))

                    # calculate the change in position based on the ICC
                    delta_x = ((cos(delta_theta) * (prev_x - icc_x))
                               - (sin(delta_theta) * (prev_y - icc_y))
                               + icc_x - prev_x)

                    delta_y = ((sin(delta_theta) * (prev_x - icc_x))
                               + (cos(delta_theta) * (prev_y - icc_y))
                               + icc_y - prev_y)

                # update the estimated total time passed since odometry started
                self.status['pose_time_s'] = self.status['pose_time_s'] + average_delta_t_s

                # update the robot's velocity estimates
                self.status['x_vel'] = delta_travel / average_delta_t_s
                self.status['y_vel'] = 0.0
                self.status['theta_vel'] = delta_theta / average_delta_t_s

                # update the robot's pose estimates
                self.status['x'] = prev_x + delta_x
                self.status['y'] = prev_y + delta_y
                self.status['theta'] = (prev_theta + delta_theta) % (2.0 * pi)

    # ############## Deprecated Contact API ##################

    def motor_current_to_translation_force(self, il, ir):
        raise DeprecationWarning('Method motor_current_to_translate_force has been deprecated since v0.3.5')

    def motor_current_to_rotation_torque(self, il, ir):
        raise DeprecationWarning('Method motor_current_to_rotation_torque has been deprecated since v0.3.5')

    def translation_force_to_motor_current(self, f_N):  # Assume evenly balanced
        raise DeprecationWarning('Method translation_force_to_motor_current has been deprecated since v0.3.5')

    def rotation_torque_to_motor_current(self, tq_Nm):
        raise DeprecationWarning('Method translation_force_to_motor_current has been deprecated since v0.3.5')


    # ########### Effort Contact Conversions ####################
    # Rotation effort is -100 to 100 (where 100 =  L/R motors at +iMax / +iMax)
    # Translation effort is -100 to 100 (where 100 =  L/R motors at +iMax / -iMax)

    def rotation_effort_pct_to_motor_current(self,e_pct):
        il=self.left_wheel.effort_pct_to_current(e_pct)
        ir= -1*self.right_wheel.effort_pct_to_current(e_pct)
        return il, ir

    def translation_effort_pct_to_motor_current(self,e_pct):
        il = self.left_wheel.effort_pct_to_current(e_pct)
        ir = self.right_wheel.effort_pct_to_current(e_pct)
        return il, ir

    def motor_current_to_translate_effort_pct(self,il,ir):
        el=self.left_wheel.current_to_effort_pct(il)
        er = self.right_wheel.current_to_effort_pct(ir)
        return (el+er)/2

    def motor_current_to_rotation_effort_pct(self,il,ir):
        el = self.left_wheel.current_to_effort_pct(il)
        er = self.right_wheel.current_to_effort_pct(ir)
        return (el-er)/2

    # ########### Kinematic Conversions ####################

    def translate_to_motor_rad(self,x_m):
        circ=self.params['wheel_diameter_m']*math.pi
        return deg_to_rad(360*self.params['gr']*x_m/circ)

    def motor_rad_to_translate(self,x_r):
        circ = self.params['wheel_diameter_m'] * math.pi
        return rad_to_deg(x_r)*circ/360/self.params['gr']

    def rotate_to_motor_rad(self,x_r):
        r = self.params['wheel_separation_m'] / 2.0
        c = r * x_r #distance wheel travels (m)
        return self.translate_to_motor_rad(c)

    def motor_rad_to_rotate(self, x_r):
        c = self.motor_rad_to_translate(x_r)
        r = self.params['wheel_separation_m'] / 2.0
        ang_rad = c /r
        return ang_rad

    def translation_to_rotation(self,x_m):
        x_mr=self.translate_to_motor_rad(x_m)
        x_r=self.motor_rad_to_rotate(x_mr)
        return x_r

    def rotation_to_translation(self,x_r):
        x_mr=self.rotate_to_motor_rad(x_r)
        x_m=self.motor_rad_to_translate(x_mr)
        return x_m

contact_thresh_to_motor_current(self, is_translate, contact_thresh)

This model converts from a specified percentage effort (-100 to 100) of translate/rotational effort to motor currents

Source code in stretch_body/base.py
def contact_thresh_to_motor_current(self,is_translate,contact_thresh):
    """
    This model converts from a specified percentage effort (-100 to 100) of translate/rotational effort to motor currents
    """
    if is_translate:
        e_c = self.params['contact_models']['effort_pct']['contact_thresh_translate_default'] if contact_thresh is None else contact_thresh
        m=self.params['contact_models']['effort_pct']['contact_thresh_translate_max']
        i_l, i_r = self.translation_effort_pct_to_motor_current(min(m,max(e_c,-1*m)))
    else:
        e_c = self.params['contact_models']['effort_pct']['contact_thresh_rotate_default'] if contact_thresh is None else contact_thresh
        m = self.params['contact_models']['effort_pct']['contact_thresh_rotate_max']
        i_l, i_r = self.rotation_effort_pct_to_motor_current(min(m, max(e_c, -1 * m)))
    return i_l,i_r

enable_freewheel_mode(self)

Force motors into freewheel

Source code in stretch_body/base.py
def enable_freewheel_mode(self):
    """
    Force motors into freewheel
    """
    self.left_wheel.enable_freewheel()
    self.right_wheel.enable_freewheel()

enable_pos_incr_mode(self)

Force motors into incremental position mode

Source code in stretch_body/base.py
def enable_pos_incr_mode(self):
    """
            Force motors into incremental position mode
    """
    self.left_wheel.enable_pos_traj_incr()
    self.right_wheel.enable_pos_traj_incr()

follow_trajectory(self, v_r=None, a_r=None, stiffness=None, contact_thresh_N=None, contact_thresh=None)

Starts executing a waypoint trajectory

self.trajectory must be populated with a valid trajectory before calling this method.

Parameters

v_r : float velocity limit for trajectory in motor space in meters per second a_r : float acceleration limit for trajectory in motor space in meters per second squared stiffness : float stiffness of motion. Range 0.0 (min) to 1.0 (max) contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N) contact_thresh: effort to stop at (units of effort_pct (-100, 100))

Source code in stretch_body/base.py
def follow_trajectory(self, v_r=None, a_r=None, stiffness=None, contact_thresh_N=None, contact_thresh=None):
    """Starts executing a waypoint trajectory

    `self.trajectory` must be populated with a valid trajectory before calling
    this method.

    Parameters
    ----------
    v_r : float
        velocity limit for trajectory in motor space in meters per second
    a_r : float
        acceleration limit for trajectory in motor space in meters per second squared
    stiffness : float
        stiffness of motion. Range 0.0 (min) to 1.0 (max)
    contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
    contact_thresh: effort to stop at (units of effort_pct (-100, 100))
    """

    check_deprecated_contact_model_base(self,'follow_trajectory', contact_thresh_N, contact_thresh)

    # check if joint valid, traj active, and right protocol
    if not self.left_wheel.hw_valid or not self.right_wheel.hw_valid:
        self.logger.warning('Base connection to hardware not valid')
        return False
    if self._waypoint_lwpos is not None or self._waypoint_rwpos is not None:
        self.logger.warning('Base waypoint trajectory already active')
        return False
    if int(str(self.left_wheel.board_info['protocol_version'])[1:]) < 1:
        self.logger.warning("Base left motor firmware version doesn't support waypoint trajectories")
        return False
    if int(str(self.right_wheel.board_info['protocol_version'])[1:]) < 1:
        self.logger.warning("Base right motor firmware version doesn't support waypoint trajectories")
        return False

    # check if trajectory valid
    vel_limit = v_r if v_r is not None else self.params['motion']['trajectory_max']['vel_r']
    acc_limit = a_r if a_r is not None else self.params['motion']['trajectory_max']['accel_r']
    valid, reason = self.trajectory.is_valid(vel_limit, acc_limit, self.translate_to_motor_rad, self.rotate_to_motor_rad)
    if not valid:
        self.logger.warning('Base trajectory not valid: {0}'.format(reason))
        return False
    if valid and reason == "must have at least two waypoints":
        # skip this device
        return True

    # set defaults
    stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness

    v = self.translate_to_motor_rad(min(abs(v_r), self.params['motion']['trajectory_max']['vel_r'])) \
        if v_r is not None else self.translate_to_motor_rad(self.params['motion']['trajectory_max']['vel_r'])
    a = self.translate_to_motor_rad(min(abs(a_r), self.params['motion']['trajectory_max']['accel_r'])) \
        if a_r is not None else self.translate_to_motor_rad(self.params['motion']['trajectory_max']['accel_r'])

    i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=True,contact_thresh=contact_thresh)

    # start trajectory
    self.left_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_WAYPOINT,
                                v_des=v,
                                a_des=a,
                                stiffness=stiffness,
                                i_contact_pos=i_contact_l,
                                i_contact_neg=-i_contact_l*-1)
    self.right_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_WAYPOINT,
                                 v_des=v,
                                 a_des=a,
                                 stiffness=stiffness,
                                 i_contact_pos=i_contact_r,
                                 i_contact_neg=-i_contact_r*-1)
    self.left_wheel.push_command()
    self.right_wheel.push_command()
    self.left_wheel.pull_status()
    self.right_wheel.pull_status()
    self._waypoint_lwpos = self.left_wheel.status['pos']
    self._waypoint_rwpos = self.right_wheel.status['pos']
    ls0, rs0 = self.trajectory.get_wheel_segments(0, self.translate_to_motor_rad, self.rotate_to_motor_rad,
        self._waypoint_lwpos, self._waypoint_rwpos)
    return self.left_wheel.start_waypoint_trajectory(ls0.to_array()) and \
        self.right_wheel.start_waypoint_trajectory(rs0.to_array())

pull_status(self)

Computes base odometery based on stepper positions / velocities

Source code in stretch_body/base.py
def pull_status(self):
    """
    Computes base odometery based on stepper positions / velocities
    """
    self.left_wheel.pull_status()
    self.right_wheel.pull_status()
    self.__update_status()

pull_status_async(self) async

Computes base odometery based on stepper positions / velocities

Source code in stretch_body/base.py
async def pull_status_async(self):
    """
    Computes base odometery based on stepper positions / velocities
    """
    await self.left_wheel.pull_status_async()
    await self.right_wheel.pull_status_async()
    self.__update_status()

reset_odometry(self)

Reset X/Y/Theta to report 0

Source code in stretch_body/base.py
def reset_odometry(self):
    """
    Reset X/Y/Theta to report 0
    """
    self.first_step = True

rotate_by(self, x_r, v_r=None, a_r=None, stiffness=None, contact_thresh_N=None, contact_thresh=None)

Incremental rotation of the base x_r: desired motion (radians) v_r: velocity for trapezoidal motion profile (rad/s) a_r: acceleration for trapezoidal motion profile (rad/s^2) stiffness: stiffness of motion. Range 0.0 (min) to 1.0 (max) contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N) contact_thresh: effort to stop at (units of effort_pct (-100, 100))

Source code in stretch_body/base.py
def rotate_by(self, x_r, v_r=None, a_r=None, stiffness=None, contact_thresh_N=None, contact_thresh=None):
    """
    Incremental rotation of the base
    x_r: desired motion (radians)
    v_r: velocity for trapezoidal motion profile (rad/s)
    a_r: acceleration for trapezoidal motion profile (rad/s^2)
    stiffness: stiffness of motion. Range 0.0 (min) to 1.0 (max)
    contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
    contact_thresh: effort to stop at (units of effort_pct (-100, 100))
    """

    check_deprecated_contact_model_base(self,'rotate_by', contact_thresh_N, contact_thresh)

    x_mr = self.rotate_to_motor_rad(x_r)

    if v_r is not None:
        v_mr_max = self.translate_to_motor_rad(self.params['motion']['max']['vel_m'])
        v_mr = self.rotate_to_motor_rad(v_r)
        v_mr=min(abs(v_mr),v_mr_max)
    else:
        v_mr = self.vel_mr

    if a_r is not None:
        a_mr_max = self.translate_to_motor_rad(self.params['motion']['max']['accel_m'])
        a_mr = self.rotate_to_motor_rad(a_r)
        a_mr = min(abs(a_mr), a_mr_max)
    else:
        a_mr = self.accel_mr

    if not self.fast_motion_allowed:
        v_mr=min(self.translate_to_motor_rad(self.params['sentry_max_velocity']['limit_vel_m']),v_mr)
        a_mr=min(self.translate_to_motor_rad(self.params['sentry_max_velocity']['limit_accel_m']),a_mr)


    i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=False, contact_thresh=contact_thresh)

    stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness
    self.left_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_INCR,x_des=-1*x_mr,
                                v_des=v_mr,
                                a_des=a_mr,
                                stiffness=stiffness,
                                i_feedforward=0,
                                i_contact_pos=i_contact_l,
                                i_contact_neg=-1 * i_contact_l)
    self.right_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_INCR,x_des=x_mr,
                                 v_des=v_mr,
                                 a_des=a_mr,
                                 stiffness=stiffness,
                                 i_feedforward=0,
                                 i_contact_pos=i_contact_r,
                                 i_contact_neg=-1 * i_contact_r)

set_rotational_velocity(self, v_r, a_r=None, stiffness=None, contact_thresh_N=None, contact_thresh=None)

Command the bases rotational velocity. Use care to prevent collisions / avoid runaways v_r: desired rotational velocity (rad/s) a_r: acceleration of motion profile (rad/s^2) contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N) contact_thresh: effort to stop at (units of effort_pct (-100, 100))

Source code in stretch_body/base.py
def set_rotational_velocity(self, v_r, a_r=None,stiffness=None, contact_thresh_N=None,contact_thresh=None):
    """
    Command the bases rotational velocity.
    Use care to prevent collisions / avoid runaways
    v_r: desired rotational velocity (rad/s)
    a_r: acceleration of motion profile (rad/s^2)
    contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
    contact_thresh: effort to stop at (units of effort_pct (-100, 100))
    """

    check_deprecated_contact_model_base(self,'set_rotational_velocity', contact_thresh_N, contact_thresh)

    if a_r is not None:
        a_mr_max=self.translate_to_motor_rad(self.params['motion']['max']['accel_m'])
        a_mr = self.rotate_to_motor_rad(a_r)
        a_mr = min(abs(a_mr), a_mr_max)
    else:
        a_mr = self.accel_mr

    w_sign = numpy.sign(v_r)
    v_mr_max = self.translate_to_motor_rad(self.params['motion']['max']['vel_m'])
    v_mr = self.rotate_to_motor_rad(v_r)
    v_mr = w_sign * min(abs(v_mr), v_mr_max)
    stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness
    i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=False,
                                                                    contact_thresh=contact_thresh)
    self.left_wheel.set_command(mode=Stepper.MODE_VEL_TRAJ, v_des=-1*v_mr, a_des=a_mr,
                                i_feedforward=0,
                                i_contact_pos=i_contact_l,
                                i_contact_neg=-1 * i_contact_l)
    self.right_wheel.set_command(mode=Stepper.MODE_VEL_TRAJ, v_des=v_mr, a_des=a_mr,
                                 stiffness=stiffness,
                                 i_feedforward=0,
                                 i_contact_pos=i_contact_r,
                                 i_contact_neg=-1 * i_contact_r)

set_translate_velocity(self, v_m, a_m=None, stiffness=None, contact_thresh_N=None, contact_thresh=None)

Command the bases translational velocity. Use care to prevent collisions / avoid runaways v_m: desired velocity (m/s) a_m: acceleration of motion profile (m/s^2) contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N) contact_thresh: effort to stop at (units of effort_pct (-100, 100))

Source code in stretch_body/base.py
def set_translate_velocity(self, v_m, a_m=None,stiffness=None, contact_thresh_N=None,contact_thresh=None):
    """
    Command the bases translational velocity.
    Use care to prevent collisions / avoid runaways
    v_m: desired velocity (m/s)
    a_m: acceleration of motion profile (m/s^2)
    contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
    contact_thresh: effort to stop at (units of effort_pct (-100, 100))
    """

    check_deprecated_contact_model_base(self,'set_translate_velocity', contact_thresh_N, contact_thresh)

    if a_m is not None:
        a_m = min(abs(a_m), self.params['motion']['max']['accel_m'])
        a_mr = self.translate_to_motor_rad(a_m)
    else:
        a_mr = self.accel_mr
    v_sign = numpy.sign(v_m)
    v_m = v_sign * min(abs(v_m), self.params['motion']['max']['vel_m'])
    v_mr = self.translate_to_motor_rad(v_m)
    stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness
    i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=True,contact_thresh=contact_thresh)

    self.left_wheel.set_command(mode=Stepper.MODE_VEL_TRAJ, v_des=v_mr, a_des=a_mr,stiffness=stiffness,
                                i_feedforward=0,
                                i_contact_pos=i_contact_l,
                                i_contact_neg=-1 * i_contact_l)
    self.right_wheel.set_command(mode=Stepper.MODE_VEL_TRAJ, v_des=v_mr, a_des=a_mr,
                                 stiffness=stiffness,
                                 i_feedforward=0,
                                 i_contact_pos=i_contact_r,
                                 i_contact_neg=-1 * i_contact_r)

set_velocity(self, v_m, w_r, a=None, stiffness=None, contact_thresh_N=None, contact_thresh=None)

Command the bases translational and rotational velocities simultaneously. Use care to prevent collisions / avoid runaways v_m: desired velocity (m/s) w_r: desired rotational velocity (rad/s) a: acceleration of motion profile (m/s^2 and rad/s^2) contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N) contact_thresh: effort to stop at (units of effort_pct (-100, 100))

Source code in stretch_body/base.py
def set_velocity(self, v_m, w_r, a=None,stiffness=None, contact_thresh_N=None,contact_thresh=None):
    """
    Command the bases translational and rotational
    velocities simultaneously.
    Use care to prevent collisions / avoid runaways
    v_m: desired velocity (m/s)
    w_r: desired rotational velocity (rad/s)
    a:   acceleration of motion profile (m/s^2 and rad/s^2)
    contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
    contact_thresh: effort to stop at (units of effort_pct (-100, 100))
    """

    check_deprecated_contact_model_base(self,'set_velocity', contact_thresh_N, contact_thresh)


    if a is not None:
        a = min(abs(a), self.params['motion']['max']['accel_m'])
        a_mr = self.translate_to_motor_rad(a)
    else:
        a_mr = self.accel_mr

    # Unicycle dynamics w/o R because
    # translate_to_motor_rad accounts for R and gear ratio
    wl_m = ((2 * v_m) - (w_r * self.params['wheel_separation_m'])) / 2.0
    wr_m = ((2 * v_m) + (w_r * self.params['wheel_separation_m'])) / 2.0

    wl_sign = numpy.sign(wl_m)
    wl_m = wl_sign * min(abs(wl_m), self.params['motion']['max']['vel_m'])
    wl_r = self.translate_to_motor_rad(wl_m)

    wr_sign = numpy.sign(wr_m)
    wr_m = wr_sign * min(abs(wr_m), self.params['motion']['max']['vel_m'])
    wr_r = self.translate_to_motor_rad(wr_m)

    stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness

    i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=True,
                                                                    contact_thresh=contact_thresh)

    if self.params['use_vel_traj']:
        ctrl_mode =Stepper.MODE_VEL_TRAJ
    else:
        ctrl_mode =Stepper.MODE_VEL_PID

    self.left_wheel.set_command(mode=ctrl_mode, v_des=wl_r, a_des=a_mr,
                                i_feedforward=0,
                                i_contact_pos=i_contact_l,
                                i_contact_neg=-1 * i_contact_l)
    self.right_wheel.set_command(mode=ctrl_mode, v_des=wr_r, a_des=a_mr,
                                 stiffness=stiffness,
                                 i_feedforward=0,
                                 i_contact_pos=i_contact_r,
                                 i_contact_neg=-1 * i_contact_r)

startup(self, threaded=True)

Starts machinery required to interface with this device

Parameters

threaded : bool whether a thread manages hardware polling/pushing in the background

Returns

bool whether the startup procedure succeeded

Source code in stretch_body/base.py
def startup(self, threaded=True):
    #Startup steppers first so that status is populated before this Device thread begins (if threaded==true)
    success = self.left_wheel.startup(threaded=False) and self.right_wheel.startup(threaded=False)
    if success:
        Device.startup(self, threaded=threaded)
        self.__update_status()
    return success

step_sentry(self, robot)

Only allow fast mobile base motion if the lift is low, the arm is retracted, and the wrist is stowed. This is intended to keep the center of mass low for increased stability and avoid catching the arm or tool on something.

Source code in stretch_body/base.py
def step_sentry(self,robot):
    """
    Only allow fast mobile base motion if the lift is low,
    the arm is retracted, and the wrist is stowed. This is
    intended to keep the center of mass low for increased
    stability and avoid catching the arm or tool on
    something.
    """
    if self.robot_params['robot_sentry']['base_max_velocity']:
        x_lift=robot.lift.status['pos']
        x_arm =robot.arm.status['pos']
        x_wrist =robot.end_of_arm.motors['wrist_yaw'].status['pos']

        if ((x_lift < self.params['sentry_max_velocity']['max_lift_height_m']) and
                (x_arm < self.params['sentry_max_velocity']['max_arm_extension_m']) and
                (x_wrist > self.params['sentry_max_velocity']['min_wrist_yaw_rad'])):
            if not self.fast_motion_allowed:
                self.logger.debug('Fast motion turned on')
            self.fast_motion_allowed = True
        else:
            if self.fast_motion_allowed:
                self.logger.debug('Fast motion turned off')
            self.fast_motion_allowed = False

    self.left_wheel.step_sentry(robot)
    self.right_wheel.step_sentry(robot)

stop(self)

Shuts down machinery started in startup()

Source code in stretch_body/base.py
def stop(self):
    Device.stop(self)
    if self.left_wheel.hw_valid and int(str(self.left_wheel.board_info['protocol_version'])[1:]) >= 1:
        self.left_wheel.stop_waypoint_trajectory()
        self._waypoint_lwpos = None
    if self.right_wheel.hw_valid and int(str(self.right_wheel.board_info['protocol_version'])[1:]) >= 1:
        self.right_wheel.stop_waypoint_trajectory()
        self._waypoint_rwpos = None
    self.left_wheel.stop()
    self.right_wheel.stop()

stop_trajectory(self)

Stop waypoint trajectory immediately and resets hardware

Source code in stretch_body/base.py
def stop_trajectory(self):
    """Stop waypoint trajectory immediately and resets hardware
    """
    self.left_wheel.stop_waypoint_trajectory()
    self.right_wheel.stop_waypoint_trajectory()

translate_by(self, x_m, v_m=None, a_m=None, stiffness=None, contact_thresh_N=None, contact_thresh=None)

Incremental translation of the base x_m: desired motion (m) v_m: velocity for trapezoidal motion profile (m/s) a_m: acceleration for trapezoidal motion profile (m/s^2) stiffness: stiffness of motion. Range 0.0 (min) to 1.0 (max) contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N) contact_thresh: effort to stop at (units of effort_pct (-100, 100))

Source code in stretch_body/base.py
def translate_by(self, x_m, v_m=None, a_m=None, stiffness=None, contact_thresh_N=None,contact_thresh=None):
    """
    Incremental translation of the base
    x_m: desired motion (m)
    v_m: velocity for trapezoidal motion profile (m/s)
    a_m: acceleration for trapezoidal motion profile (m/s^2)
    stiffness: stiffness of motion. Range 0.0 (min) to 1.0 (max)
    contact_thresh_N: (deprecated) effort to stop at (units of pseudo_N)
    contact_thresh: effort to stop at (units of effort_pct (-100, 100))
    """
    check_deprecated_contact_model_base(self,'translate_by', contact_thresh_N, contact_thresh)

    x_mr = self.translate_to_motor_rad(x_m)

    if v_m is not None:
        v_m = min(abs(v_m), self.params['motion']['max']['vel_m'])
        v_mr = self.translate_to_motor_rad(v_m)
    else:
        v_mr = self.vel_mr

    if a_m is not None:
        a_m = min(abs(a_m), self.params['motion']['max']['accel_m'])
        a_mr = self.translate_to_motor_rad(a_m)
    else:
        a_mr = self.accel_mr

    if not self.fast_motion_allowed:
        v_mr=min(self.translate_to_motor_rad(self.params['sentry_max_velocity']['limit_vel_m']),v_mr)
        a_mr=min(self.translate_to_motor_rad(self.params['sentry_max_velocity']['limit_accel_m']),a_mr)

    i_contact_l, i_contact_r = self.contact_thresh_to_motor_current(is_translate=True, contact_thresh=contact_thresh)

    stiffness = max(0.0, min(1.0, stiffness)) if stiffness is not None else self.stiffness


    self.left_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_INCR, x_des=x_mr,
                                v_des=v_mr,
                                a_des=a_mr,
                                stiffness=stiffness,
                                i_feedforward=0,
                                i_contact_pos=i_contact_l,
                                i_contact_neg=-1*i_contact_l)
    self.right_wheel.set_command(mode=Stepper.MODE_POS_TRAJ_INCR, x_des=x_mr,
                                v_des=v_mr,
                                a_des=a_mr,
                                stiffness=stiffness,
                                i_feedforward=0,
                                i_contact_pos=i_contact_r,
                                i_contact_neg=-1*i_contact_r)

update_trajectory(self)

Updates hardware with the next segment of self.trajectory

This method must be called frequently to enable complete trajectory execution and preemption of future segments. If used with stretch_body.robot.Robot or with self.startup(threaded=True), a background thread is launched for this. Otherwise, the user must handle calling this method.

Source code in stretch_body/base.py
def update_trajectory(self):
    """Updates hardware with the next segment of `self.trajectory`

    This method must be called frequently to enable complete trajectory execution
    and preemption of future segments. If used with `stretch_body.robot.Robot` or
    with `self.startup(threaded=True)`, a background thread is launched for this.
    Otherwise, the user must handle calling this method.
    """
    # check if joint valid, right protocol, and right mode
    if not self.left_wheel.hw_valid or not self.right_wheel.hw_valid:
        return
    if int(str(self.left_wheel.board_info['protocol_version'])[1:]) < 1:
        return
    if int(str(self.right_wheel.board_info['protocol_version'])[1:]) < 1:
        return
    if self.left_wheel.status['mode'] != self.left_wheel.MODE_POS_TRAJ_WAYPOINT:
        return
    if self.right_wheel.status['mode'] != self.right_wheel.MODE_POS_TRAJ_WAYPOINT:
        return
    if self._waypoint_lwpos is None or self._waypoint_rwpos is None:
        return

    if self.left_wheel.status['waypoint_traj']['state'] == 'active' and self.right_wheel.status['waypoint_traj']['state'] == 'active':
        next_segment_id = self.left_wheel.status['waypoint_traj']['segment_id'] - 2 + 1 # subtract 2 due to IDs 0 & 1 being reserved by firmware
        if next_segment_id < self.trajectory.get_num_segments():
            ls1, rs1 = self.trajectory.get_wheel_segments(next_segment_id, self.translate_to_motor_rad, self.rotate_to_motor_rad,
                self._waypoint_lwpos, self._waypoint_rwpos)
            self.left_wheel.set_next_trajectory_segment(ls1.to_array())
            self.right_wheel.set_next_trajectory_segment(rs1.to_array())
    elif self.left_wheel.status['waypoint_traj']['state'] == 'idle' and self.left_wheel.status['mode'] == Stepper.MODE_POS_TRAJ_WAYPOINT and \
        self.right_wheel.status['waypoint_traj']['state'] == 'idle' and self.right_wheel.status['mode'] == Stepper.MODE_POS_TRAJ_WAYPOINT:
        self._waypoint_lwpos = None
        self._waypoint_rwpos = None
        self.left_wheel.enable_pos_traj()
        self.right_wheel.enable_pos_traj()
        self.push_command()

Using the Head class

Stretch head blueprint Stretch head blueprint

The interface to Stretch's head is the Head class. The head contains an Intel Realsense D435i depth camera. The pan and tilt joints in the head allow Stretch to swivel and capture depth imagery of its surrounding. The head is typically initialized as:

1
2
3
4
5
6
7
8
9
import stretch_body.head

h = stretch_body.head.Head()
if not h.startup():
    exit() # failed to start head!

h.home()

# interact with the head here

Head is a subclass of the DynamixelXChain class, which in turn is a subclass of the Device class. Therefore, some of Head's methods, such as startup() and home() are extended from the Device class, while others come from the DynamixelXChain class. Reading the head's current state and sending commands to its revolute joints (head pan and tilt) can be achieved using:

10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
starting_position = h.status['head_pan']['pos']

# look right by 90 degrees
h.move_to('head_pan', starting_position + 1.57)
h.get_joint('head_pan').wait_until_at_setpoint()

# tilt up by 30 degrees
h.move_by('head_tilt', -1.57 / 3)
h.get_joint('head_tilt').wait_until_at_setpoint()

# look down towards the wheels
h.pose('wheels')
import time; time.sleep(3)

# look ahead
h.pose('ahead')
time.sleep(3)

The attribute status is a dictionary of dictionaries, where each subdictionary is the status of one of the head's joints. This state information is updated in the background in real-time by default (disable by initializing as startup(threading=False)). Use the pretty_print() method to print out this state information in a human-interpretable format.

Commanding the head's revolute joints is done through the move_to() and move_by() methods. Notice that, unlike the previous joints, no push command call is required here. These joints are Dynamixel servos, which behave differently than the Hello Robot steppers. Their commands are not queued and are executed as soon as they're received.

Head's two joints, the 'head_pan' and 'head_tilt' are instances of the DynamixelHelloXL430 class and are retrievable using the get_joint() method. They have the wait_until_at_setpoint() method, which blocks program execution until the joint reaches the commanded goal.

The pose() method makes it easy to command the head to common head poses (e.g. looking 'ahead', at the end-of-arm 'tool', obstacles in front of the 'wheels', or 'up').

The head supports waypoint trajectories as well:

27
28
29
30
31
32
33
34
35
36
37
# queue a trajectory consisting of three waypoints
h.get_joint('head_tilt').trajectory.add(t_s=0, x_r=0.0)
h.get_joint('head_tilt').trajectory.add(t_s=3, x_r=-1.0)
h.get_joint('head_tilt').trajectory.add(t_s=6, x_r=0.0)
h.get_joint('head_pan').trajectory.add(t_s=0, x_r=0.1)
h.get_joint('head_pan').trajectory.add(t_s=3, x_r=-0.9)
h.get_joint('head_pan').trajectory.add(t_s=6, x_r=0.1)

# trigger trajectory execution
h.follow_trajectory()
import time; time.sleep(6)

The head pan and tilt DynamixelHelloXL430 instances have an attribute trajectory, which is an instance of the RevoluteTrajectory class. The call to follow_trajectory() begins software tracking of the spline.

Finally, setting soft motion limits for the head's pan and tilt range can be achieved using:

38
39
40
41
42
43
44
45
46
# clip the head_pan's range
h.get_joint('head_pan').set_soft_motion_limit_min(-1.0)
h.get_joint('head_pan').set_soft_motion_limit_max(1.0)

# clip the head_tilt's range
h.get_joint('head_tilt').set_soft_motion_limit_min(-1.0)
h.get_joint('head_tilt').set_soft_motion_limit_max(0.1)

h.stop()

The set_soft_motion_limit_min/max() methods perform clipping of the joint's range at the software level (cannot persist across reboots).

All methods of the Head class are documented below.

API to the Stretch RE1 Head

Source code in stretch_body/head.py
class Head(DynamixelXChain):
    """
    API to the Stretch RE1 Head
    """
    def __init__(self, usb=None):
        if usb is None:
            usb = RobotParams.get_params()[1]['head']['usb_name']
        DynamixelXChain.__init__(self, usb=usb, name='head')
        self.joints = ['head_pan', 'head_tilt']
        for j in self.joints:
            self.add_motor(DynamixelHelloXL430(name=j, chain=self))
        self.poses = {'ahead': [0, 0], 'back': [deg_to_rad(-180), deg_to_rad(0)],
                      'tool': [deg_to_rad(-90), deg_to_rad(-45)],
                      'wheels': [deg_to_rad(0), deg_to_rad(-90)], 'left': [deg_to_rad(90), deg_to_rad(0)],
                      'up': [deg_to_rad(0), deg_to_rad(30)]}

    def startup(self, threaded=True):
        return DynamixelXChain.startup(self, threaded=threaded)

    def get_joint(self, joint_name):
        """Retrieves joint by name.

        Parameters
        ----------
        joint_name : str
            valid joints defined in ``joints``

        Returns
        -------
        DynamixelHelloXL430 or None
            Motor object on valid joint name, else None
        """
        return self.get_motor(joint_name)

    def move_to(self, joint, x_r, v_r=None, a_r=None):
        """
        joint: Name of the joint to move ('head_pan' or 'head_tilt')
        x_r: commanded absolute position (radians).
        v_r: velocity for trapezoidal motion profile (rad/s).
        a_r: acceleration for trapezoidal motion profile (rad/s^2)
        """
        self.motors[joint].move_to(x_r,v_r,a_r)

    def move_by(self, joint,  x_r, v_r=None, a_r=None):
        """
        joint: Name of the joint to move ('head_pan' or 'head_tilt')
        x_r: commanded incremental motion (radians).
        v_r: velocity for trapezoidal motion profile (rad/s).
        a_r: acceleration for trapezoidal motion profile (rad/s^2)
        """
        self.motors[joint].move_by(x_r,v_r,a_r)

    def set_velocity(self, joint, v_r, a_r=None):
        """
        joint: name of joint (string)
        v_r: commanded velocity (rad/s).
        a_r: acceleration motion profile (rad/s^2)
        """
        self.motors[joint].set_velocity(v_r, a_r)

    def home(self):
        if self.motors['head_pan'].params['req_calibration']:
            with self.pt_lock:
                self.motors['head_pan'].home(single_stop=True)

        if self.motors['head_tilt'].params['req_calibration']:
            with self.pt_lock:
                self.motors['head_tilt'].home(single_stop=True)

        self.move_to('head_pan', deg_to_rad(0))
        self.move_to('head_tilt', deg_to_rad(0))


    def pose(self, p, v_r=[None, None], a_r=[None, None]):
        """
        p: Dictionary key to named pose (eg 'ahead')
        v_r: list, velocities for trapezoidal motion profile (rad/s).
        a_r: list, accelerations for trapezoidal motion profile (rad/s^2)
        """
        self.move_to('head_pan', self.poses[p][0], v_r[0], a_r[0])
        self.move_to('head_tilt', self.poses[p][1], v_r[0], a_r[1])

get_joint(self, joint_name)

Retrieves joint by name.

Parameters

joint_name : str valid joints defined in joints

Returns

DynamixelHelloXL430 or None Motor object on valid joint name, else None

Source code in stretch_body/head.py
def get_joint(self, joint_name):
    """Retrieves joint by name.

    Parameters
    ----------
    joint_name : str
        valid joints defined in ``joints``

    Returns
    -------
    DynamixelHelloXL430 or None
        Motor object on valid joint name, else None
    """
    return self.get_motor(joint_name)

move_by(self, joint, x_r, v_r=None, a_r=None)

joint: Name of the joint to move ('head_pan' or 'head_tilt') x_r: commanded incremental motion (radians). v_r: velocity for trapezoidal motion profile (rad/s). a_r: acceleration for trapezoidal motion profile (rad/s^2)

Source code in stretch_body/head.py
def move_by(self, joint,  x_r, v_r=None, a_r=None):
    """
    joint: Name of the joint to move ('head_pan' or 'head_tilt')
    x_r: commanded incremental motion (radians).
    v_r: velocity for trapezoidal motion profile (rad/s).
    a_r: acceleration for trapezoidal motion profile (rad/s^2)
    """
    self.motors[joint].move_by(x_r,v_r,a_r)

move_to(self, joint, x_r, v_r=None, a_r=None)

joint: Name of the joint to move ('head_pan' or 'head_tilt') x_r: commanded absolute position (radians). v_r: velocity for trapezoidal motion profile (rad/s). a_r: acceleration for trapezoidal motion profile (rad/s^2)

Source code in stretch_body/head.py
def move_to(self, joint, x_r, v_r=None, a_r=None):
    """
    joint: Name of the joint to move ('head_pan' or 'head_tilt')
    x_r: commanded absolute position (radians).
    v_r: velocity for trapezoidal motion profile (rad/s).
    a_r: acceleration for trapezoidal motion profile (rad/s^2)
    """
    self.motors[joint].move_to(x_r,v_r,a_r)

pose(self, p, v_r=[None, None], a_r=[None, None])

p: Dictionary key to named pose (eg 'ahead') v_r: list, velocities for trapezoidal motion profile (rad/s). a_r: list, accelerations for trapezoidal motion profile (rad/s^2)

Source code in stretch_body/head.py
def pose(self, p, v_r=[None, None], a_r=[None, None]):
    """
    p: Dictionary key to named pose (eg 'ahead')
    v_r: list, velocities for trapezoidal motion profile (rad/s).
    a_r: list, accelerations for trapezoidal motion profile (rad/s^2)
    """
    self.move_to('head_pan', self.poses[p][0], v_r[0], a_r[0])
    self.move_to('head_tilt', self.poses[p][1], v_r[0], a_r[1])

set_velocity(self, joint, v_r, a_r=None)

joint: name of joint (string) v_r: commanded velocity (rad/s). a_r: acceleration motion profile (rad/s^2)

Source code in stretch_body/head.py
def set_velocity(self, joint, v_r, a_r=None):
    """
    joint: name of joint (string)
    v_r: commanded velocity (rad/s).
    a_r: acceleration motion profile (rad/s^2)
    """
    self.motors[joint].set_velocity(v_r, a_r)

startup(self, threaded=True)

Starts machinery required to interface with this device

Parameters

threaded : bool whether a thread manages hardware polling/pushing in the background

Returns

bool whether the startup procedure succeeded

Source code in stretch_body/head.py
def startup(self, threaded=True):
    return DynamixelXChain.startup(self, threaded=threaded)

Using the EndOfArm class

The interface to Stretch's end-of-arm is the EndOfArm class. It is typically initialized as:

1
2
3
4
5
6
7
8
9
import stretch_body.end_of_arm

e = stretch_body.end_of_arm.EndOfArm()
if not e.startup(threaded=True):
    exit() # failed to start end of arm!

# interact with the end of arm here

e.stop()

All methods of the EndOfArm class are documented below.

The EndOfArm class allows for an extensible serial chain of Dynamixel X series devices It allows the specific type of device to be declared at runtime via the Yaml parameters In this way, a user can add their own custom Dynamixel based tools to the robot end-of-arm by simply deriving it from DynamixelHelloXL430 and declaring the class name / Python module name in the User YAML file

Source code in stretch_body/end_of_arm.py
class EndOfArm(DynamixelXChain):
    """
    The EndOfArm class allows for an extensible serial chain of Dynamixel X series devices
    It allows the specific type of device to be declared at runtime via the Yaml parameters
    In this way, a user can add their own custom Dynamixel based tools to the robot end-of-arm by
    simply deriving it from DynamixelHelloXL430 and declaring the class name / Python module name
    in the User YAML file
    """
    def __init__(self, name='end_of_arm', usb=None):
        if usb is None:
            usb = RobotParams.get_params()[1]['end_of_arm']['usb_name']
        DynamixelXChain.__init__(self, usb=usb, name=name)
        self.joints = self.params.get('devices', {}).keys()
        for j in self.joints:
            module_name = self.params['devices'][j]['py_module_name']
            class_name = self.params['devices'][j]['py_class_name']
            dynamixel_device = getattr(importlib.import_module(module_name), class_name)(chain=self)
            self.add_motor(dynamixel_device)
        self.urdf_map={} #Override

    def startup(self, threaded=True):
        return DynamixelXChain.startup(self, threaded=threaded)

    def get_joint(self, joint_name):
        """Retrieves joint by name.

        Parameters
        ----------
        joint_name : str
            valid joints defined as defined in params['devices']

        Returns
        -------
        DynamixelHelloXL430 or None
            Motor object on valid joint name, else None
        """
        return self.get_motor(joint_name)

    def move_to(self, joint,x_r, v_r=None, a_r=None):
        """
        joint: name of joint (string)
        x_r: commanded absolute position (radians).
        v_r: velocity for trapezoidal motion profile (rad/s).
        a_r: acceleration for trapezoidal motion profile (rad/s^2)
        """
        with  self.pt_lock:
            self.motors[joint].move_to(x_r, v_r, a_r)

    def move_by(self, joint, x_r, v_r=None, a_r=None):
        """
        joint: name of joint (string)
        x_r: commanded incremental motion (radians).
        v_r: velocity for trapezoidal motion profile (rad/s).
        a_r: acceleration for trapezoidal motion profile (rad/s^2)
        """
        with self.pt_lock:
            self.motors[joint].move_by(x_r, v_r, a_r)

    def set_velocity(self, joint, v_r, a_r=None):
        """
        joint: name of joint (string)
        v_r: commanded velocity (rad/s).
        a_r: acceleration motion profile (rad/s^2)
        """
        with self.pt_lock:
            self.motors[joint].set_velocity(v_r, a_r)

    def pose(self,joint, p,v_r=None, a_r=None):
        """
                joint: name of joint (string)
                p: named pose of joint
                v_r: velocity for trapezoidal motion profile (rad/s).
                a_r: acceleration for trapezoidal motion profile (rad/s^2)
                """
        with self.pt_lock:
            self.motors[joint].pose(p, v_r, a_r)

    def stow(self):
        pass #Override by specific tool

    def pre_stow(self,robot=None):
        pass #Override by specific tool

    def home(self, joint=None):
        """
        Home to hardstops
        """
        if joint is None:
            for j in self.joints:
                print('--------- Homing %s ----'%j)
                self.motors[j].home()
        else:
            with self.pt_lock:
                self.motors[joint].home()

    def is_tool_present(self,class_name):
        """
        Return true if the given tool type is present (eg. StretchGripper)
        Allows for conditional logic when switching end-of-arm tools
        """
        for j in self.joints:
            if class_name == self.params['devices'][j]['py_class_name']:
                return True
        return False

    def get_joint_configuration(self,brake_joints={}):
        """
        Construct a dictionary of tools current pose (for robot_collision_mgmt)
        Keys match joint names in URDF
        Specific tools should define urdf_map
        """
        ret = {}
        for j in self.urdf_map:
            jn = self.urdf_map[j]
            motor = self.get_joint(jn)
            dx = 0.0
            try:
                if brake_joints[j]:
                    dx = self.params['collision_mgmt']['k_brake_distance'][jn] * motor.get_braking_distance()
            except KeyError:
                dx=0
            ret[j] = motor.status['pos'] + dx

        gripper_joint = None
        for j in self.joints:
            if 'gripper' in j:
                gripper_joint = j

        if gripper_joint:
            dx = 0
            if brake_joints:
                for j in brake_joints:
                    if 'gripper' in j:
                        dx = self.params['collision_mgmt']['k_brake_distance'][j]
            finger_angle = self.get_joint(gripper_joint).status['gripper_conversion']['finger_rad'] + dx
            ret['joint_gripper_finger_left'] = finger_angle/2
            ret['joint_gripper_finger_right'] = finger_angle/2

        return ret

    def pre_stow(self,robot=None):
        pass

get_joint(self, joint_name)

Retrieves joint by name.

Parameters

joint_name : str valid joints defined as defined in params['devices']

Returns

DynamixelHelloXL430 or None Motor object on valid joint name, else None

Source code in stretch_body/end_of_arm.py
def get_joint(self, joint_name):
    """Retrieves joint by name.

    Parameters
    ----------
    joint_name : str
        valid joints defined as defined in params['devices']

    Returns
    -------
    DynamixelHelloXL430 or None
        Motor object on valid joint name, else None
    """
    return self.get_motor(joint_name)

get_joint_configuration(self, brake_joints={})

Construct a dictionary of tools current pose (for robot_collision_mgmt) Keys match joint names in URDF Specific tools should define urdf_map

Source code in stretch_body/end_of_arm.py
def get_joint_configuration(self,brake_joints={}):
    """
    Construct a dictionary of tools current pose (for robot_collision_mgmt)
    Keys match joint names in URDF
    Specific tools should define urdf_map
    """
    ret = {}
    for j in self.urdf_map:
        jn = self.urdf_map[j]
        motor = self.get_joint(jn)
        dx = 0.0
        try:
            if brake_joints[j]:
                dx = self.params['collision_mgmt']['k_brake_distance'][jn] * motor.get_braking_distance()
        except KeyError:
            dx=0
        ret[j] = motor.status['pos'] + dx

    gripper_joint = None
    for j in self.joints:
        if 'gripper' in j:
            gripper_joint = j

    if gripper_joint:
        dx = 0
        if brake_joints:
            for j in brake_joints:
                if 'gripper' in j:
                    dx = self.params['collision_mgmt']['k_brake_distance'][j]
        finger_angle = self.get_joint(gripper_joint).status['gripper_conversion']['finger_rad'] + dx
        ret['joint_gripper_finger_left'] = finger_angle/2
        ret['joint_gripper_finger_right'] = finger_angle/2

    return ret

home(self, joint=None)

Home to hardstops

Source code in stretch_body/end_of_arm.py
def home(self, joint=None):
    """
    Home to hardstops
    """
    if joint is None:
        for j in self.joints:
            print('--------- Homing %s ----'%j)
            self.motors[j].home()
    else:
        with self.pt_lock:
            self.motors[joint].home()

is_tool_present(self, class_name)

Return true if the given tool type is present (eg. StretchGripper) Allows for conditional logic when switching end-of-arm tools

Source code in stretch_body/end_of_arm.py
def is_tool_present(self,class_name):
    """
    Return true if the given tool type is present (eg. StretchGripper)
    Allows for conditional logic when switching end-of-arm tools
    """
    for j in self.joints:
        if class_name == self.params['devices'][j]['py_class_name']:
            return True
    return False

move_by(self, joint, x_r, v_r=None, a_r=None)

joint: name of joint (string) x_r: commanded incremental motion (radians). v_r: velocity for trapezoidal motion profile (rad/s). a_r: acceleration for trapezoidal motion profile (rad/s^2)

Source code in stretch_body/end_of_arm.py
def move_by(self, joint, x_r, v_r=None, a_r=None):
    """
    joint: name of joint (string)
    x_r: commanded incremental motion (radians).
    v_r: velocity for trapezoidal motion profile (rad/s).
    a_r: acceleration for trapezoidal motion profile (rad/s^2)
    """
    with self.pt_lock:
        self.motors[joint].move_by(x_r, v_r, a_r)

move_to(self, joint, x_r, v_r=None, a_r=None)

joint: name of joint (string) x_r: commanded absolute position (radians). v_r: velocity for trapezoidal motion profile (rad/s). a_r: acceleration for trapezoidal motion profile (rad/s^2)

Source code in stretch_body/end_of_arm.py
def move_to(self, joint,x_r, v_r=None, a_r=None):
    """
    joint: name of joint (string)
    x_r: commanded absolute position (radians).
    v_r: velocity for trapezoidal motion profile (rad/s).
    a_r: acceleration for trapezoidal motion profile (rad/s^2)
    """
    with  self.pt_lock:
        self.motors[joint].move_to(x_r, v_r, a_r)

pose(self, joint, p, v_r=None, a_r=None)

joint: name of joint (string) p: named pose of joint v_r: velocity for trapezoidal motion profile (rad/s). a_r: acceleration for trapezoidal motion profile (rad/s^2)

Source code in stretch_body/end_of_arm.py
def pose(self,joint, p,v_r=None, a_r=None):
    """
            joint: name of joint (string)
            p: named pose of joint
            v_r: velocity for trapezoidal motion profile (rad/s).
            a_r: acceleration for trapezoidal motion profile (rad/s^2)
            """
    with self.pt_lock:
        self.motors[joint].pose(p, v_r, a_r)

set_velocity(self, joint, v_r, a_r=None)

joint: name of joint (string) v_r: commanded velocity (rad/s). a_r: acceleration motion profile (rad/s^2)

Source code in stretch_body/end_of_arm.py
def set_velocity(self, joint, v_r, a_r=None):
    """
    joint: name of joint (string)
    v_r: commanded velocity (rad/s).
    a_r: acceleration motion profile (rad/s^2)
    """
    with self.pt_lock:
        self.motors[joint].set_velocity(v_r, a_r)

startup(self, threaded=True)

Starts machinery required to interface with this device

Parameters

threaded : bool whether a thread manages hardware polling/pushing in the background

Returns

bool whether the startup procedure succeeded

Source code in stretch_body/end_of_arm.py
def startup(self, threaded=True):
    return DynamixelXChain.startup(self, threaded=threaded)

Using the Wacc class

The interface to Stretch's wrist board is the Wacc (wrist + accelerometer) class. This board provides an Arduino and accelerometer sensor that is accessible from the Wacc class. It is typically initialized as:

1
2
3
4
5
6
7
8
9
import stretch_body.wacc

w = stretch_body.wacc.Wacc()
if not w.startup(threaded=True):
    exit() # failed to start wacc!

# interact with the wacc here

w.stop()

All methods of the Wacc class are documented below.

API to the Stretch Wrist Accelerometer (Wacc) Board

Source code in stretch_body/wacc.py
class Wacc(WaccBase):
    """
    API to the Stretch Wrist Accelerometer (Wacc) Board
    """
    def __init__(self, usb=None, ext_status_cb=None, ext_command_cb=None):
        WaccBase.__init__(self, usb=usb, ext_status_cb=ext_status_cb, ext_command_cb=ext_command_cb)
        #Order in descending order so more recent protocols/methods override less recent
        self.supported_protocols = {'p0': (Wacc_Protocol_P0,), 'p1': (Wacc_Protocol_P1, Wacc_Protocol_P0),'p2': (Wacc_Protocol_P2, Wacc_Protocol_P1, Wacc_Protocol_P0,),
                                    'p3': (Wacc_Protocol_P3, Wacc_Protocol_P2, Wacc_Protocol_P1, Wacc_Protocol_P0,),}

    def startup(self, threaded=False):
        """
        First determine which protocol version the uC firmware is running.
        Based on that version, replaces PimuBase class inheritance with a inheritance to a child class of PimuBase that supports that protocol
        """
        WaccBase.startup(self, threaded=threaded)
        if self.hw_valid:
            if self.board_info['protocol_version'] in self.supported_protocols:
                Wacc.__bases__ = self.supported_protocols[self.board_info['protocol_version']]
            else:
                if self.board_info['protocol_version'] is None:
                    protocol_msg = """
                                    ----------------
                                    Failure in communications for {0} on startup.
                                    Please power cycle the robot and try again.
                                    ----------------
                                    """.format(self.name)
                else:
                    protocol_msg = """
                    ----------------
                    Firmware protocol mismatch on {0}.
                    Protocol on board is {1}.
                    Valid protocols are: {2}.
                    Disabling device.
                    Please upgrade the firmware and/or version of Stretch Body.
                    ----------------
                    """.format(self.name, self.board_info['protocol_version'], self.supported_protocols.keys())
                self.logger.warning(textwrap.dedent(protocol_msg))
                self.hw_valid = False
                self.transport.stop()
        if self.hw_valid:
            self.push_command()
            self.pull_status()
        return self.hw_valid

startup(self, threaded=False)

First determine which protocol version the uC firmware is running. Based on that version, replaces PimuBase class inheritance with a inheritance to a child class of PimuBase that supports that protocol

Source code in stretch_body/wacc.py
def startup(self, threaded=False):
    """
    First determine which protocol version the uC firmware is running.
    Based on that version, replaces PimuBase class inheritance with a inheritance to a child class of PimuBase that supports that protocol
    """
    WaccBase.startup(self, threaded=threaded)
    if self.hw_valid:
        if self.board_info['protocol_version'] in self.supported_protocols:
            Wacc.__bases__ = self.supported_protocols[self.board_info['protocol_version']]
        else:
            if self.board_info['protocol_version'] is None:
                protocol_msg = """
                                ----------------
                                Failure in communications for {0} on startup.
                                Please power cycle the robot and try again.
                                ----------------
                                """.format(self.name)
            else:
                protocol_msg = """
                ----------------
                Firmware protocol mismatch on {0}.
                Protocol on board is {1}.
                Valid protocols are: {2}.
                Disabling device.
                Please upgrade the firmware and/or version of Stretch Body.
                ----------------
                """.format(self.name, self.board_info['protocol_version'], self.supported_protocols.keys())
            self.logger.warning(textwrap.dedent(protocol_msg))
            self.hw_valid = False
            self.transport.stop()
    if self.hw_valid:
        self.push_command()
        self.pull_status()
    return self.hw_valid

Using the Pimu class

The interface to Stretch's power board is the Pimu (power + IMU) class. This board provides a 9-DOF IMU that is accessible from the Pimu class. It is typically initialized as:

1
2
3
4
5
6
7
8
9
import stretch_body.pimu

p = stretch_body.pimu.Pimu()
if not p.startup(threaded=True):
    exit() # failed to start pimu!

# interact with the pimu here

p.stop()

All methods of the Pimu class are documented below.

API to the Stretch Power and IMU board (Pimu)

Source code in stretch_body/pimu.py
class Pimu(PimuBase):
    """
    API to the Stretch Power and IMU board (Pimu)
    """
    def __init__(self, event_reset=False, usb=None):
        PimuBase.__init__(self, event_reset,usb)
        # Order in descending order so more recent protocols/methods override less recent
        self.supported_protocols = {'p0': (Pimu_Protocol_P0,),
                                    'p1': (Pimu_Protocol_P1, Pimu_Protocol_P0,),
                                    'p2': (Pimu_Protocol_P2, Pimu_Protocol_P1, Pimu_Protocol_P0,),
                                    'p3': (Pimu_Protocol_P3, Pimu_Protocol_P2, Pimu_Protocol_P1, Pimu_Protocol_P0,),
                                    'p4': (Pimu_Protocol_P4, Pimu_Protocol_P3, Pimu_Protocol_P2, Pimu_Protocol_P1, Pimu_Protocol_P0,),
                                    'p5': (Pimu_Protocol_P5, Pimu_Protocol_P4, Pimu_Protocol_P3, Pimu_Protocol_P2, Pimu_Protocol_P1, Pimu_Protocol_P0,),
                                    'p6': (Pimu_Protocol_P6, Pimu_Protocol_P5, Pimu_Protocol_P4, Pimu_Protocol_P3, Pimu_Protocol_P2, Pimu_Protocol_P1, Pimu_Protocol_P0,)
                                    }

    def startup(self, threaded=False):
        """
        First determine which protocol version the uC firmware is running.
        Based on that version, replaces PimuBase class inheritance with a inheritance to a child class of PimuBase that supports that protocol
        """
        PimuBase.startup(self, threaded=threaded)
        if self.hw_valid:
            if self.board_info['protocol_version'] in self.supported_protocols:
                Pimu.__bases__ = self.supported_protocols[self.board_info['protocol_version']]
                IMU.__bases__= self.imu.supported_protocols[self.board_info['protocol_version']]
            else:
                if self.board_info['protocol_version'] is None:
                    protocol_msg = """
                                    ----------------
                                    Failure in communications for {0} on startup.
                                    Please power cycle the robot and try again.
                                    ----------------
                                    """.format(self.name)
                else:
                    protocol_msg = """
                    ----------------
                    Firmware protocol mismatch on {0}.
                    Protocol on board is {1}.
                    Valid protocols are: {2}.
                    Disabling device.
                    Please upgrade the firmware and/or version of Stretch Body.
                    ----------------
                    """.format(self.name, self.board_info['protocol_version'], self.supported_protocols.keys())
                self.logger.warning(textwrap.dedent(protocol_msg))
                self.hw_valid = False
                self.transport.stop()
        if self.hw_valid:
            self.push_command()
            self.pull_status()
        return self.hw_valid

startup(self, threaded=False)

First determine which protocol version the uC firmware is running. Based on that version, replaces PimuBase class inheritance with a inheritance to a child class of PimuBase that supports that protocol

Source code in stretch_body/pimu.py
def startup(self, threaded=False):
    """
    First determine which protocol version the uC firmware is running.
    Based on that version, replaces PimuBase class inheritance with a inheritance to a child class of PimuBase that supports that protocol
    """
    PimuBase.startup(self, threaded=threaded)
    if self.hw_valid:
        if self.board_info['protocol_version'] in self.supported_protocols:
            Pimu.__bases__ = self.supported_protocols[self.board_info['protocol_version']]
            IMU.__bases__= self.imu.supported_protocols[self.board_info['protocol_version']]
        else:
            if self.board_info['protocol_version'] is None:
                protocol_msg = """
                                ----------------
                                Failure in communications for {0} on startup.
                                Please power cycle the robot and try again.
                                ----------------
                                """.format(self.name)
            else:
                protocol_msg = """
                ----------------
                Firmware protocol mismatch on {0}.
                Protocol on board is {1}.
                Valid protocols are: {2}.
                Disabling device.
                Please upgrade the firmware and/or version of Stretch Body.
                ----------------
                """.format(self.name, self.board_info['protocol_version'], self.supported_protocols.keys())
            self.logger.warning(textwrap.dedent(protocol_msg))
            self.hw_valid = False
            self.transport.stop()
    if self.hw_valid:
        self.push_command()
        self.pull_status()
    return self.hw_valid

All materials are Copyright 2022 by Hello Robot Inc. Hello Robot and Stretch are registered trademarks.