Example 6
Example 6
In this example, we will review a Python script that prints out and stores the effort values from a specified joint. If you are looking for a continuous print of the joint state efforts while Stretch is in action, then you can use the rostopic command-line tool shown in the Internal State of Stretch Tutorial.
Begin by running the following command in the terminal in a terminal.
# Terminal 1
roslaunch stretch_core stretch_driver.launch
# Terminal 2
rosservice call /switch_to_position_mode
cd catkin_ws/src/stretch_tutorials/src/
python effort_sensing.py
FollowJointTrajectory
command to move Stretch's arm or head while also printing the effort of the lift.
The Code
#!/usr/bin/env python
import rospy
import time
import actionlib
import os
import csv
from datetime import datetime
from control_msgs.msg import FollowJointTrajectoryGoal
from trajectory_msgs.msg import JointTrajectoryPoint
from sensor_msgs.msg import JointState
import hello_helpers.hello_misc as hm
class JointActuatorEffortSensor(hm.HelloNode):
"""
A class that sends multiple joint trajectory goals to a single joint.
"""
def __init__(self, export_data=False):
"""
Function that initializes the subscriber,and other features.
:param self: The self reference.
:param export_data: A boolean message type.
"""
hm.HelloNode.__init__(self)
self.sub = rospy.Subscriber('joint_states', JointState, self.callback)
self.joints = ['joint_lift']
self.joint_effort = []
self.save_path = '/home/hello-robot/catkin_ws/src/stretch_tutorials/stored_data'
self.export_data = export_data
def callback(self, msg):
"""
Callback function to update and store JointState messages.
:param self: The self reference.
:param msg: The JointState message.
"""
self.joint_states = msg
def issue_command(self):
"""
Function that makes an action call and sends joint trajectory goals
to a single joint.
:param self: The self reference.
"""
trajectory_goal = FollowJointTrajectoryGoal()
trajectory_goal.trajectory.joint_names = self.joints
point0 = JointTrajectoryPoint()
point0.positions = [0.9]
trajectory_goal.trajectory.points = [point0]
trajectory_goal.trajectory.header.stamp = rospy.Time(0.0)
trajectory_goal.trajectory.header.frame_id = 'base_link'
self.trajectory_client.send_goal(trajectory_goal, feedback_cb=self.feedback_callback, done_cb=self.done_callback)
rospy.loginfo('Sent position goal = {0}'.format(trajectory_goal))
self.trajectory_client.wait_for_result()
def feedback_callback(self,feedback):
"""
The feedback_callback function deals with the incoming feedback messages
from the trajectory_client. Although, in this function, we do not use the
feedback information.
:param self: The self reference.
:param feedback: FollowJointTrajectoryActionFeedback message.
"""
if 'wrist_extension' in self.joints:
self.joints.remove('wrist_extension')
self.joints.append('joint_arm_l0')
current_effort = []
for joint in self.joints:
index = self.joint_states.name.index(joint)
current_effort.append(self.joint_states.effort[index])
if not self.export_data:
print("name: " + str(self.joints))
print("effort: " + str(current_effort))
else:
self.joint_effort.append(current_effort)
def done_callback(self, status, result):
"""
The done_callback function will be called when the joint action is complete.
Within this function we export the data to a .txt file in the /stored_data directory.
:param self: The self reference.
:param status: status attribute from FollowJointTrajectoryActionResult message.
:param result: result attribute from FollowJointTrajectoryActionResult message.
"""
if status == actionlib.GoalStatus.SUCCEEDED:
rospy.loginfo('Succeeded')
else:
rospy.loginfo('Failed')
if self.export_data:
file_name = datetime.now().strftime("%Y-%m-%d_%I:%M:%S-%p")
completeName = os.path.join(self.save_path, file_name)
with open(completeName, "w") as f:
writer = csv.writer(f)
writer.writerow(self.joints)
writer.writerows(self.joint_effort)
def main(self):
"""
Function that initiates the issue_command function.
:param self: The self reference.
"""
hm.HelloNode.main(self, 'issue_command', 'issue_command', wait_for_first_pointcloud=False)
rospy.loginfo('issuing command...')
self.issue_command()
time.sleep(2)
if __name__ == '__main__':
try:
node = JointActuatorEffortSensor(export_data=True)
node.main()
except KeyboardInterrupt:
rospy.loginfo('interrupt received, so shutting down')
The Code Explained
This code is similar to that of the multipoint_command and joint_state_printer node. Therefore, this example will highlight sections that are different from those tutorials. Now let's break the code down.
#!/usr/bin/env python
import rospy
import time
import actionlib
import os
import csv
from datetime import datetime
from control_msgs.msg import FollowJointTrajectoryGoal
from trajectory_msgs.msg import JointTrajectoryPoint
from sensor_msgs.msg import JointState
import hello_helpers.hello_misc as hm
FollowJointTrajectoryGoal
from the control_msgs.msg
package to control the Stretch robot. Import JointTrajectoryPoint
from the trajectory_msgs
package to define robot trajectories. The hello_helpers
package consists of a module that provides various Python scripts used across stretch_ros. In this instance, we are importing the hello_misc
script.
class JointActuatorEffortSensor(hm.HelloNode):
"""
A class that sends multiple joint trajectory goals to a single joint.
"""
def __init__(self, export_data=False):
"""
Function that initializes the subscriber,and other features.
:param self: The self reference.
:param export_data: A boolean message type.
"""
hm.HelloNode.__init__(self)
JointActuatorEffortSensor
class inherits the HelloNode
class from hm
and is initialized.
self.sub = rospy.Subscriber('joint_states', JointState, self.callback)
self.joints = ['joint_lift']
JointState
messages. When a message comes in, ROS is going to pass it to the function "callback" automatically. Create a list of the desired joints you want to print.
self.joint_effort = []
self.save_path = '/home/hello-robot/catkin_ws/src/stretch_tutorials/stored_data'
self.export_data = export_data
self.trajectory_client.send_goal(trajectory_goal, feedback_cb=self.feedback_callback, done_cb=self.done_callback)
def feedback_callback(self,feedback):
"""
The feedback_callback function deals with the incoming feedback messages
from the trajectory_client. Although, in this function, we do not use the
feedback information.
:param self: The self reference.
:param feedback: FollowJointTrajectoryActionFeedback message.
"""
FollowJointTrajectoryActionFeedback
message as its argument.
if 'wrist_extension' in self.joints:
self.joints.remove('wrist_extension')
self.joints.append('joint_arm_l0')
wrist_extenstion
to joint_arm_l0
. This is because joint_arm_l0
has the effort values that the wrist_extension
is experiencing.
current_effort = []
for joint in self.joints:
index = self.joint_states.name.index(joint)
current_effort.append(self.joint_states.effort[index])
if not self.export_data:
print("name: " + str(self.joints))
print("effort: " + str(current_effort))
else:
self.joint_effort.append(current_effort)
def done_callback(self, status, result):
"""
The done_callback function will be called when the joint action is complete.
Within this function we export the data to a .txt file in the /stored_data directory.
:param self: The self reference.
:param status: status attribute from FollowJointTrajectoryActionResult message.
:param result: result attribute from FollowJointTrajectoryActionResult message.
"""
FollowJointTrajectoryActionResult
messages as its arguments.
if status == actionlib.GoalStatus.SUCCEEDED:
rospy.loginfo('Succeeded')
else:
rospy.loginfo('Failed')
FollowJointTrajectoryActionResult
succeeded or failed.
if self.export_data:
file_name = datetime.now().strftime("%Y-%m-%d_%I:%M:%S-%p")
completeName = os.path.join(self.save_path, file_name)
with open(completeName, "w") as f:
writer = csv.writer(f)
writer.writerow(self.joints)
writer.writerows(self.joint_effort)
Plotting/Animating Effort Data
We added a simple python script, stored_data_plotter.py, to this package for plotting the stored data. Note you have to change the name of the file you wish to see in the python script. This is shown below:
####################### Copy the file name here! #######################
file_name = '2022-06-30_11:26:20-AM'
cd catkin_ws/src/stretch_tutorials/src/
python3 stored_data_plotter.py
roscore
to run this script. Please review the comments in the python script for additional guidance.