Source code for franka_interface.arm

# /***************************************************************************

#
# @package: franka_interface
# @metapackage: franka_ros_interface
# @author: Saif Sidhik <sxs1412@bham.ac.uk>
#

# **************************************************************************/

# /***************************************************************************
# Copyright (c) 2019-2021, Saif Sidhik

# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# **************************************************************************/

"""
:info: 
   Inteface Class for Franka robot arm.

"""

from builtins import dict # for python2&3 efficient compatibility
from future.utils import iteritems # for python2&3 efficient compatibility

import enum
import rospy
import warnings
import quaternion
import numpy as np
from copy import deepcopy
from rospy_message_converter import message_converter

from franka_core_msgs.msg import JointCommand
from franka_core_msgs.msg import RobotState, EndPointState
from sensor_msgs.msg import JointState
from std_msgs.msg import Float64

import franka_dataflow
from .robot_params import RobotParams

from franka_moveit import PandaMoveGroupInterface
from franka_moveit.utils import create_pose_msg
from franka_tools import FrankaFramesInterface, FrankaControllerManagerInterface, JointTrajectoryActionClient, CollisionBehaviourInterface


class TipState():

    def __init__(self, timestamp, pose, vel, O_effort, K_effort):
        self.timestamp = timestamp
        self._pose = pose
        self._velocity = vel
        self._effort = O_effort
        self._effort_in_K_frame = K_effort

    @property
    def pose(self):
        return self._pose

    @property
    def velocity(self):
        return self._velocity

    @property
    def effort(self):
        return self._effort

    @property
    def effort_in_K_frame(self):
        return self._effort_in_K_frame


[docs]class ArmInterface(object): """ Interface Class for an arm of Franka Panda robot. Constructor. :type synchronous_pub: bool :param synchronous_pub: designates the JointCommand Publisher as Synchronous if True and Asynchronous if False. Synchronous Publishing means that all joint_commands publishing to the robot's joints will block until the message has been serialized into a buffer and that buffer has been written to the transport of every current Subscriber. This yields predicable and consistent timing of messages being delivered from this Publisher. However, when using this mode, it is possible for a blocking Subscriber to prevent the joint_command functions from exiting. Unless you need exact JointCommand timing, default to Asynchronous Publishing (False). """ # Containers
[docs] @enum.unique class RobotMode(enum.IntEnum): """ Enum class for specifying and retrieving the current robot mode. """ # ----- access using parameters name or value # ----- eg. RobotMode(0).name & RobotMode(0).value # ----- or RobotMode['ROBOT_MODE_OTHER'].name & RobotMode['ROBOT_MODE_OTHER'].value ROBOT_MODE_OTHER = 0 ROBOT_MODE_IDLE = 1 ROBOT_MODE_MOVE = 2 ROBOT_MODE_GUIDING = 3 ROBOT_MODE_REFLEX = 4 ROBOT_MODE_USER_STOPPED = 5 ROBOT_MODE_AUTOMATIC_ERROR_RECOVERY = 6
def __init__(self, synchronous_pub=False): self._params = RobotParams() self._ns = self._params.get_base_namespace() self._joint_limits = self._params.get_joint_limits() joint_names = self._joint_limits.joint_names if not joint_names: rospy.logerr("{}: Cannot detect joint names for arm on this " "robot. Exiting Arm.init().".format(self.__class__.__name__)) return self._joint_names = joint_names self.name = self._params.get_robot_name() self._joint_angle = dict() self._joint_velocity = dict() self._joint_effort = dict() self._cartesian_pose = dict() self._cartesian_velocity = dict() self._cartesian_effort = dict() self._stiffness_frame_effort = dict() self._errors = dict() self._collision_state = False self._tip_states = None self._jacobian = None self._cartesian_contact = None self._robot_mode = False self._command_msg = JointCommand() # neutral pose joint positions self._neutral_pose_joints = self._params.get_neutral_pose() self._frames_interface = FrankaFramesInterface() try: self._collision_behaviour_interface = CollisionBehaviourInterface() except rospy.ROSException: rospy.loginfo("{}: Collision Service Not found. It will not be possible to change collision behaviour of robot!".format( self.__class__.__name__)) self._collision_behaviour_interface = None self._ctrl_manager = FrankaControllerManagerInterface( ns=self._ns, sim=self._params._in_sim) self._speed_ratio = 0.15 self._F_T_NE = np.eye(1).flatten().tolist() self._NE_T_EE = np.eye(1).flatten().tolist() queue_size = None if synchronous_pub else 1 with warnings.catch_warnings(): warnings.simplefilter("ignore") self._joint_command_publisher = rospy.Publisher( self._ns + '/motion_controller/arm/joint_commands', JointCommand, tcp_nodelay=True, queue_size=queue_size) self._pub_joint_cmd_timeout = rospy.Publisher( self._ns + '/motion_controller/arm/joint_command_timeout', Float64, latch=True, queue_size=10) self._robot_state_subscriber = rospy.Subscriber( self._ns + '/custom_franka_state_controller/robot_state', RobotState, self._on_robot_state, queue_size=1, tcp_nodelay=True) joint_state_topic = self._ns + '/custom_franka_state_controller/joint_states' self._joint_state_sub = rospy.Subscriber( joint_state_topic, JointState, self._on_joint_states, queue_size=1, tcp_nodelay=True) self._cartesian_state_sub = rospy.Subscriber( self._ns + '/custom_franka_state_controller/tip_state', EndPointState, self._on_endpoint_state, queue_size=1, tcp_nodelay=True) rospy.on_shutdown(self._clean_shutdown) err_msg = ("%s arm init failed to get current joint_states " "from %s") % (self.name.capitalize(), joint_state_topic) franka_dataflow.wait_for(lambda: len(list(self._joint_angle.keys())) > 0, timeout_msg=err_msg, timeout=5.0) err_msg = ("%s arm, init failed to get current tip_state " "from %s") % (self.name.capitalize(), self._ns + 'tip_state') franka_dataflow.wait_for(lambda: len(list(self._cartesian_pose.keys())) > 0, timeout_msg=err_msg, timeout=5.0) err_msg = ("%s arm, init failed to get current robot_state " "from %s") % (self.name.capitalize(), self._ns + 'robot_state') franka_dataflow.wait_for(lambda: self._jacobian is not None, timeout_msg=err_msg, timeout=5.0) # start moveit server with panda_link8 (flange) as the end-effector, unless it is in simulation. However, using move_to_cartesian_pose() method compensates for this and moves the robot's defined end-effector frame (EE frame; see set_EE_frame() and set_EE_at_frame()) to the target pose. try: self._movegroup_interface = PandaMoveGroupInterface( use_panda_hand_link=True if self._params._in_sim else False) except: rospy.loginfo("{}: MoveGroup was not found! This is okay if moveit service is not required!".format( self.__class__.__name__)) self._movegroup_interface = None self.set_joint_position_speed(self._speed_ratio) def _clean_shutdown(self): self._joint_state_sub.unregister() self._cartesian_state_sub.unregister() self._pub_joint_cmd_timeout.unregister() self._robot_state_subscriber.unregister() self._joint_command_publisher.unregister()
[docs] def get_movegroup_interface(self): """ :return: the movegroup interface instance associated with the robot. :rtype: franka_moveit.PandaMoveGroupInterface """ return self._movegroup_interface
[docs] def get_robot_params(self): """ :return: Useful parameters from the ROS parameter server. :rtype: franka_interface.RobotParams """ return self._params
[docs] def get_joint_limits(self): """ Return the joint limits (defined in the parameter server) :rtype: franka_core_msgs.msg.JointLimits :return: JointLimits """ return self._joint_limits
[docs] def joint_names(self): """ Return the names of the joints for the specified limb. :rtype: [str] :return: ordered list of joint names from proximal to distal (i.e. shoulder to wrist). """ return self._joint_names
def _on_joint_states(self, msg): for idx, name in enumerate(msg.name): if name in self._joint_names: self._joint_angle[name] = msg.position[idx] self._joint_velocity[name] = msg.velocity[idx] self._joint_effort[name] = msg.effort[idx] def _on_robot_state(self, msg): self._robot_mode = self.RobotMode(msg.robot_mode) self._robot_mode_ok = (self._robot_mode.value != self.RobotMode.ROBOT_MODE_REFLEX) and ( self._robot_mode.value != self.RobotMode.ROBOT_MODE_USER_STOPPED) self._jacobian = np.asarray(msg.O_Jac_EE).reshape(6, 7, order='F') self._cartesian_velocity = { 'linear': np.asarray([msg.O_dP_EE[0], msg.O_dP_EE[1], msg.O_dP_EE[2]]), 'angular': np.asarray([msg.O_dP_EE[3], msg.O_dP_EE[4], msg.O_dP_EE[5]])} self._cartesian_contact = msg.cartesian_contact self._cartesian_collision = msg.cartesian_collision self._joint_contact = msg.joint_contact self._joint_collision = msg.joint_collision if not self._params._in_sim: self._F_T_NE = msg.F_T_NE # should be constant normally self._NE_T_EE = msg.NE_T_EE self._F_T_EE = msg.F_T_EE if self._frames_interface: self._frames_interface._update_frame_data( self._NE_T_EE, msg.EE_T_K) self._joint_inertia = np.asarray( msg.mass_matrix).reshape(7, 7, order='F') self.q_d = msg.q_d self.dq_d = msg.dq_d self._gravity = np.asarray(msg.gravity) self._coriolis = np.asarray(msg.coriolis) self._errors = message_converter.convert_ros_message_to_dictionary( msg.current_errors)
[docs] def coriolis_comp(self): """ Return coriolis compensation torques. Useful for compensating coriolis when performing direct torque control of the robot. :rtype: np.ndarray :return: 7D joint torques compensating for coriolis. """ return self._coriolis
[docs] def gravity_comp(self): """ Return gravity compensation torques. :rtype: np.ndarray :return: 7D joint torques compensating for gravity. """ return self._gravity
[docs] def get_robot_status(self): """ Return dict with all robot status information. :rtype: dict :return: ['robot_mode' (RobotMode object), 'robot_status' (bool), 'errors' (dict() of errors and their truth value), 'error_in_curr_status' (bool)] """ return {'robot_mode': self._robot_mode, 'robot_status': self._robot_mode_ok, 'errors': self._errors, 'error_in_current_state': self.error_in_current_state()}
[docs] def in_safe_state(self): """ Return True if the specified limb is in safe state (no collision, reflex, errors etc.). :rtype: bool :return: True if the arm is in safe state, False otherwise. """ return self._robot_mode_ok and not self.error_in_current_state()
[docs] def error_in_current_state(self): """ Return True if the specified limb has experienced an error. :rtype: bool :return: True if the arm has error, False otherwise. """ return not all([e == False for e in list(self._errors.values())])
[docs] def what_errors(self): """ Return list of error messages if there is error in robot state :rtype: [str] :return: list of names of current errors in robot state """ return [e for e in self._errors if self._errors[e] == True] if self.error_in_current_state() else None
def _on_endpoint_state(self, msg): cart_pose_trans_mat = np.asarray(msg.O_T_EE).reshape(4, 4, order='F') self._cartesian_pose = { 'position': cart_pose_trans_mat[:3, 3], 'orientation': quaternion.from_rotation_matrix(cart_pose_trans_mat[:3, :3]), 'ori_mat': cart_pose_trans_mat[:3,:3]} self._cartesian_effort = { 'force': np.asarray([msg.O_F_ext_hat_K.wrench.force.x, msg.O_F_ext_hat_K.wrench.force.y, msg.O_F_ext_hat_K.wrench.force.z]), 'torque': np.asarray([msg.O_F_ext_hat_K.wrench.torque.x, msg.O_F_ext_hat_K.wrench.torque.y, msg.O_F_ext_hat_K.wrench.torque.z]) } self._stiffness_frame_effort = { 'force': np.asarray([msg.K_F_ext_hat_K.wrench.force.x, msg.K_F_ext_hat_K.wrench.force.y, msg.K_F_ext_hat_K.wrench.force.z]), 'torque': np.asarray([msg.K_F_ext_hat_K.wrench.torque.x, msg.K_F_ext_hat_K.wrench.torque.y, msg.K_F_ext_hat_K.wrench.torque.z]) } self._tip_states = TipState(msg.header.stamp, deepcopy(self._cartesian_pose), deepcopy( self._cartesian_velocity), deepcopy(self._cartesian_effort), deepcopy(self._stiffness_frame_effort))
[docs] def joint_angle(self, joint): """ Return the requested joint angle. :type joint: str :param joint: name of a joint :rtype: float :return: angle in radians of individual joint """ return self._joint_angle[joint]
[docs] def joint_angles(self): """ Return all joint angles. :rtype: dict({str:float}) :return: unordered dict of joint name Keys to angle (rad) Values """ return deepcopy(self._joint_angle)
[docs] def joint_ordered_angles(self): """ Return all joint angles. :rtype: [float] :return: joint angles (rad) orded by joint_names from proximal to distal (i.e. shoulder to wrist). """ return [self._joint_angle[name] for name in self._joint_names]
[docs] def joint_velocity(self, joint): """ Return the requested joint velocity. :type joint: str :param joint: name of a joint :rtype: float :return: velocity in radians/s of individual joint """ return self._joint_velocity[joint]
[docs] def joint_velocities(self): """ Return all joint velocities. :rtype: dict({str:float}) :return: unordered dict of joint name Keys to velocity (rad/s) Values """ return deepcopy(self._joint_velocity)
[docs] def joint_effort(self, joint): """ Return the requested joint effort. :type joint: str :param joint: name of a joint :rtype: float :return: effort in Nm of individual joint """ return self._joint_effort[joint]
[docs] def joint_efforts(self): """ Return all joint efforts. :rtype: dict({str:float}) :return: unordered dict of joint name Keys to effort (Nm) Values """ return deepcopy(self._joint_effort)
[docs] def endpoint_pose(self): """ Return Cartesian endpoint pose {position, orientation}. :rtype: dict({str:np.ndarray (shape:(3,)), str:quaternion.quaternion}) :return: position and orientation as named tuples in a dict - 'position': np.array of x, y, z - 'orientation': quaternion x,y,z,w in quaternion format """ return deepcopy(self._cartesian_pose)
[docs] def endpoint_velocity(self): """ Return Cartesian endpoint twist {linear, angular}. :rtype: dict({str:np.ndarray (shape:(3,)),str:np.ndarray (shape:(3,))}) :return: linear and angular velocities as named tuples in a dict - 'linear': np.array of x, y, z - 'angular': np.array of x, y, z (angular velocity along the axes) """ return deepcopy(self._cartesian_velocity)
[docs] def endpoint_effort(self, in_base_frame=True): """ Return Cartesian endpoint wrench {force, torque}. :param in_base_frame: if True, returns end-effector effort with respect to base frame, else in stiffness frame [default: True] :type in_base_frame: bool :rtype: dict({str:np.ndarray (shape:(3,)),str:np.ndarray (shape:(3,))}) :return: force and torque at endpoint as named tuples in a dict in the base frame of the robot or in the stiffness frame (wrist) - 'force': Cartesian force on x,y,z axes in np.ndarray format - 'torque': Torque around x,y,z axes in np.ndarray format """ return deepcopy(self._cartesian_effort) if in_base_frame else deepcopy(self._stiffness_frame_effort)
[docs] def exit_control_mode(self, timeout=5, velocity_tolerance=1e-2): """ Clean exit from advanced control modes (joint torque or velocity). Resets control to joint position mode with current positions until further advanced control commands are sent to the robot. .. note:: In normal cases, this method is not required as the interface automatically switches to position control mode if advanced control commands (velocity/torque) are not sent at regular intervals. Therefore it is enough to stop sending the commands to disable advanced control modes. .. note:: In sim, this method does nothing. :type timeout: float :param timeout: seconds to wait for robot to stop moving before giving up [default: 5] :type velocity_tolerance: float :param velocity_tolerance: tolerance """ if self._params._in_sim: return self.set_command_timeout(0.05) rospy.sleep(0.5) def check_stop(): return np.allclose(np.asarray(list(self._joint_velocity.values())), 0., atol=velocity_tolerance) rospy.loginfo("{}: Waiting for robot to stop moving to exit control mode...".format( self.__class__.__name__)) franka_dataflow.wait_for( test=lambda: check_stop(), timeout=timeout, timeout_msg="{}: FAILED to exit control mode! The robot may be still moving. Controllers might not switch correctly".format( self.__class__.__name__), rate=20, raise_on_error=False ) rospy.loginfo("{}: Done. Setting position control target to current position.".format( self.__class__.__name__)) self.set_joint_positions(self.joint_angles())
[docs] def tip_states(self): """ Return Cartesian endpoint state for a given tip name :rtype: TipState object :return: pose, velocity, effort, effort_in_K_frame """ return deepcopy(self._tip_states)
[docs] def joint_inertia_matrix(self): """ Returns the current joint inertia matrix given by libfranka. :return: joint inertia matrix (7,7) :rtype: np.ndarray [7x7] """ return deepcopy(self._joint_inertia)
[docs] def zero_jacobian(self): """ Returns the current jacobian matrix given by libfranka. :return: end-effector jacobian (6,7) :rtype: np.ndarray [6x7] """ return deepcopy(self._jacobian)
[docs] def set_command_timeout(self, timeout): """ Set the timeout in seconds for the joint controller. Advanced control commands (vel, torque) should be sent within this time, or else the controller will switch back to default control mode. :type timeout: float :param timeout: timeout in seconds """ self._pub_joint_cmd_timeout.publish(Float64(timeout))
[docs] def set_joint_position_speed(self, speed=0.3): """ Set ratio of max joint speed to use during joint position moves (only for move_to_joint_positions). Set the proportion of maximum controllable velocity to use during joint position control execution. The default ratio is `0.3`, and can be set anywhere from [0.0-1.0] (clipped). Once set, a speed ratio will persist until a new execution speed is set. :type speed: float :param speed: ratio of maximum joint speed for execution default= 0.3; range= [0.0-1.0] """ if speed > 0.3: rospy.logwarn("{}: Setting speed above 0.3 could be risky!! Be extremely careful.".format( self.__class__.__name__)) if self._movegroup_interface: self._movegroup_interface.set_velocity_scale(speed * 2) self._speed_ratio = speed
[docs] def set_joint_positions(self, positions): """ Commands the joints of this limb to the specified positions. :type positions: dict({str:float} :param positions: dict of {'joint_name':joint_position,} """ self._command_msg.names = self._joint_names self._command_msg.position = [positions[j] for j in self._joint_names] self._command_msg.mode = JointCommand.POSITION_MODE self._command_msg.header.stamp = rospy.Time.now() self._joint_command_publisher.publish(self._command_msg)
[docs] def set_joint_velocities(self, velocities): """ Commands the joints of this limb to the specified velocities. :type velocities: dict({str:float}) :param velocities: dict of {'joint_name':joint_velocity,} """ self._command_msg.names = self._joint_names self._command_msg.velocity = [velocities[j] for j in self._joint_names] self._command_msg.mode = JointCommand.VELOCITY_MODE self._command_msg.header.stamp = rospy.Time.now() self._joint_command_publisher.publish(self._command_msg)
[docs] def set_joint_torques(self, torques): """ Commands the joints of this limb with the specified torques. :type torques: dict({str:float}) :param torques: dict of {'joint_name':joint_torque,} """ self._command_msg.names = self._joint_names self._command_msg.effort = [torques[j] for j in self._joint_names] self._command_msg.mode = JointCommand.TORQUE_MODE self._command_msg.header.stamp = rospy.Time.now() self._joint_command_publisher.publish(self._command_msg)
[docs] def set_joint_positions_velocities(self, positions, velocities): """ Commands the joints of this limb using specified positions and velocities using impedance control. Command at time t is computed as: :math:`u_t= coriolis\_factor * coriolis\_t + K\_p * (positions - curr\_positions) + K\_d * (velocities - curr\_velocities)` :type positions: [float] :param positions: desired joint positions as an ordered list corresponding to joints given by self.joint_names() :type velocities: [float] :param velocities: desired joint velocities as an ordered list corresponding to joints given by self.joint_names() """ self._command_msg.names = self._joint_names self._command_msg.position = positions self._command_msg.velocity = velocities self._command_msg.mode = JointCommand.IMPEDANCE_MODE self._command_msg.header.stamp = rospy.Time.now() self._joint_command_publisher.publish(self._command_msg)
[docs] def has_collided(self): """ Returns true if either joint collision or cartesian collision is detected. Collision thresholds can be set using instance of :py:class:`franka_tools.CollisionBehaviourInterface`. """ return any(self._joint_collision) or any(self._cartesian_collision)
[docs] def move_to_neutral(self, timeout=15.0, speed=0.15): """ Command the Limb joints to a predefined set of "neutral" joint angles. From rosparam /franka_control/neutral_pose. :type timeout: float :param timeout: seconds to wait for move to finish [15] :type speed: float :param speed: ratio of maximum joint speed for execution default= 0.15; range= [0.0-1.0] """ self.set_joint_position_speed(speed) self.move_to_joint_positions(self._neutral_pose_joints, timeout)
[docs] def move_to_joint_positions(self, positions, timeout=10.0, threshold=0.00085, test=None, use_moveit=True): """ (Blocking) Commands the limb to the provided positions. Waits until the reported joint state matches that specified. This function uses a low-pass filter using JointTrajectoryService to smooth the movement or optionally uses MoveIt! to plan and execute a trajectory. :type positions: dict({str:float}) :param positions: joint_name:angle command :type timeout: float :param timeout: seconds to wait for move to finish [15] :type threshold: float :param threshold: position threshold in radians across each joint when move is considered successful [0.00085] :param test: optional function returning True if motion must be aborted :type use_moveit: bool :param use_moveit: if set to True, and movegroup interface is available, move to the joint positions using moveit planner. """ curr_controller = self._ctrl_manager.set_motion_controller( self._ctrl_manager.joint_trajectory_controller) if use_moveit and self._movegroup_interface: self._movegroup_interface.go_to_joint_positions( [positions[n] for n in self._joint_names], tolerance=threshold) else: if use_moveit: rospy.logwarn("{}: MoveGroupInterface was not found! Using JointTrajectoryActionClient instead.".format( self.__class__.__name__)) min_traj_dur = 0.5 traj_client = JointTrajectoryActionClient( joint_names=self.joint_names()) traj_client.clear() dur = [] for j in range(len(self._joint_names)): dur.append(max(abs(positions[self._joint_names[j]] - self._joint_angle[self._joint_names[j]] ) / self._joint_limits.velocity[j], min_traj_dur)) traj_client.add_point( positions=[self._joint_angle[n] for n in self._joint_names], time=0.0001) traj_client.add_point(positions=[ positions[n] for n in self._joint_names], time=max(dur)/self._speed_ratio) def genf(joint, angle): def joint_diff(): return abs(angle - self._joint_angle[joint]) return joint_diff diffs = [genf(j, a) for j, a in list(iteritems(positions)) if j in self._joint_angle] fail_msg = "{}: {} limb failed to reach commanded joint positions.".format( self.__class__.__name__, self.name.capitalize()) def test_collision(): if self.has_collided(): rospy.logerr(' '.join(["Collision detected.", fail_msg])) return True return False traj_client.start() # send the trajectory action request # traj_client.wait(timeout = timeout) franka_dataflow.wait_for( test=lambda: test_collision() or traj_client.result() is not None or (callable(test) and test() == True) or (all(diff() < threshold for diff in diffs)), timeout=timeout, timeout_msg=fail_msg, rate=100, raise_on_error=False ) res = traj_client.result() if res is not None and res.error_code: rospy.loginfo("Trajectory Server Message: {}".format(res)) rospy.sleep(0.5) self._ctrl_manager.set_motion_controller(curr_controller) rospy.loginfo("{}: Trajectory controlling complete".format( self.__class__.__name__))
[docs] def get_flange_pose(self, pos=None, ori=None): """ Get the pose of flange (panda_link8) given the pose of the end-effector frame. .. note:: In sim, this method does nothing. :param pos: position of the end-effector frame in the robot's base frame, defaults to current end-effector position :type pos: np.ndarray, optional :param ori: orientation of the end-effector frame, defaults to current end-effector orientation :type ori: quaternion.quaternion, optional :return: corresponding flange frame pose in the robot's base frame :rtype: np.ndarray, quaternion.quaternion """ if pos is None: pos = self._cartesian_pose['position'] if ori is None: ori = self._cartesian_pose['orientation'] if self._params._in_sim: return pos, ori # get corresponding flange frame pose using transformation matrix F_T_EE = np.asarray(self._F_T_EE).reshape(4, 4, order="F") mat = quaternion.as_rotation_matrix(ori) new_ori = mat.dot(F_T_EE[:3,:3].T) new_pos = pos - new_ori.dot(F_T_EE[:3, 3]) return new_pos, quaternion.from_rotation_matrix(new_ori)
[docs] def move_to_cartesian_pose(self, pos, ori=None, use_moveit=True): """ Move robot end-effector to specified cartesian pose using MoveIt! (also avoids obstacles if they are defined using :py:class:`franka_moveit.ExtendedPlanningSceneInterface`) :param pos: target end-effector position :type pos: [float] or np.ndarray :param ori: target orientation quaternion for end-effector, defaults to current ori :type ori: quaternion.quaternion or [float] (quaternion in w,x,y,z order), optional :param use_moveit: Flag for using MoveIt (redundant for now; only works if set to True), defaults to True :type use_moveit: bool, optional """ if not use_moveit or self._movegroup_interface is None: rospy.logerr("{}: MoveGroupInterface was not found! Aborting cartesian planning.".format( self.__class__.__name__)) return if ori is None: ori = self._cartesian_pose['orientation'] self.get_flange_pose(pos, ori) curr_controller = self._ctrl_manager.set_motion_controller( self._ctrl_manager.joint_trajectory_controller) ## == Plan avoids defined scene obstacles == self._movegroup_interface.go_to_cartesian_pose( create_pose_msg(*self.get_flange_pose(pos, ori))) ## ========================================= ## == does not avoid scene obstacles == # plan, _ = self._movegroup_interface.plan_cartesian_path( # [create_pose_msg(pos, ori)]) # self._movegroup_interface.execute_plan(plan) ## ==================================== rospy.sleep(0.5) self._ctrl_manager.set_motion_controller(curr_controller) rospy.loginfo("{}: Trajectory controlling complete".format( self.__class__.__name__))
[docs] def pause_controllers_and_do(self, func, *args, **kwargs): """ Temporarily stops all active controllers and calls the provided function before restarting the previously active controllers. :param func: the function to be called :type func: callable """ assert callable( func), "{}: Invalid argument provided to ArmInterface->pause_controllers_and_do. Argument 1 should be callable.".format(self.__class__.__name__) active_controllers = self._ctrl_manager.list_active_controllers( only_motion_controllers=True) rospy.loginfo("{}: Stopping motion controllers temporarily...".format( self.__class__.__name__)) for ctrlr in active_controllers: self._ctrl_manager.stop_controller(ctrlr.name) rospy.sleep(1.) retval = func(*args, **kwargs) rospy.sleep(1.) rospy.loginfo("{}: Restarting previously active motion controllers.".format( self.__class__.__name__)) for ctrlr in active_controllers: self._ctrl_manager.start_controller(ctrlr.name) rospy.sleep(1.) rospy.loginfo("{}: Controllers restarted.".format( self.__class__.__name__)) return retval
[docs] def reset_EE_frame(self): """ Reset EE frame to default. (defined by FrankaFramesInterface.DEFAULT_TRANSFORMATIONS.EE_FRAME global variable defined in :py:class:`franka_tools.FrankaFramesInterface` source code). By default, this resets to align with the nominal-end effector frame (F_T_NE) in the flange frame. :rtype: [bool, str] :return: [success status of service request, error msg if any] """ if self._frames_interface: if self._frames_interface.EE_frame_is_reset(): rospy.loginfo("{}: EE Frame already reset".format( self.__class__.__name__)) return True return self.pause_controllers_and_do(self._frames_interface.reset_EE_frame) else: rospy.logwarn("{}: Frames changing not available in simulated environment".format( self.__class__.__name__)) return False
[docs] def set_EE_frame(self, frame): """ Set new EE frame based on the transformation given by 'frame', which is the transformation matrix defining the new desired EE frame with respect to the nominal end-effector frame (NE_T_EE). Motion controllers are stopped and restarted for switching. :type frame: [float (16,)] / np.ndarray (4x4) :param frame: transformation matrix of new EE frame wrt nominal end-effector frame (column major) :rtype: [bool, str] :return: [success status of service request, error msg if any] """ if self._frames_interface: if self._frames_interface.frames_are_same(self._frames_interface.get_EE_frame(as_mat=True), frame): rospy.loginfo("{}: EE Frame already at the target frame.".format( self.__class__.__name__)) return True return self.pause_controllers_and_do(self._frames_interface.set_EE_frame, frame) else: rospy.logwarn("{}: Frames changing not available in simulated environment".format( self.__class__.__name__)) return False
[docs] def set_EE_at_frame(self, frame_name, timeout=5.0): """ Set new EE frame to the same frame as the link frame given by 'frame_name'. Motion controllers are stopped and restarted for switching :type frame_name: str :param frame_name: desired tf frame name in the tf tree :rtype: [bool, str] :return: [success status of service request, error msg if any] """ if self._frames_interface: if not self._frames_interface.EE_frame_already_set(self._frames_interface.get_link_tf(frame_name)): return self.pause_controllers_and_do(self._frames_interface.set_EE_at_frame, frame_name=frame_name, timeout=timeout) else: rospy.logwarn("{}: Frames changing not available in simulated environment".format( self.__class__.__name__)) return False
[docs] def set_collision_threshold(self, cartesian_forces=None, joint_torques=None): """ Set Force Torque thresholds for deciding robot has collided. :return: True if service call successful, False otherwise :rtype: bool :param cartesian_forces: Cartesian force threshold for collision detection [x,y,z,R,P,Y] (robot motion stops if violated) :type cartesian_forces: [float] size 6 :param joint_torques: Joint torque threshold for collision (robot motion stops if violated) :type joint_torques: [float] size 7 """ if self._collision_behaviour_interface: return self._collision_behaviour_interface.set_collision_threshold(joint_torques=joint_torques, cartesian_forces=cartesian_forces) else: rospy.logwarn("No CollisionBehaviourInterface object found!")
[docs] def get_controller_manager(self): """ :return: the FrankaControllerManagerInterface instance associated with the robot. :rtype: franka_tools.FrankaControllerManagerInterface """ return self._ctrl_manager
[docs] def get_frames_interface(self): """ :return: the FrankaFramesInterface instance associated with the robot. :rtype: franka_tools.FrankaFramesInterface """ return self._frames_interface
if __name__ == '__main__': rospy.init_node('test_fri') r = Arm() # rate = rospy.Rate(10) # while not rospy.is_shutdown(): # rate.sleep()