Source code for franka_tools.controller_param_config_client

# /***************************************************************************

# 
# @package: franka_tools
# @metapackage: franka_ros_interface
# @author: Saif Sidhik <sxs1412@bham.ac.uk>
# 

# **************************************************************************/

# /***************************************************************************
# Copyright (c) 2019-2021, Saif Sidhik
 
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at

#     http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# **************************************************************************/

import rospy

import dynamic_reconfigure.client


K_GAINS_KW = ['j1_k','j2_k','j3_k','j4_k','j5_k','j6_k','j7_k']
D_GAINS_KW = ['j1_d','j2_d','j3_d','j4_d','j5_d','j6_d','j7_d']


[docs]class ControllerParamConfigClient: """ Interface class for updating dynamically configurable paramters of a controller. :param controller_name: The name of the controller. :type controller_name: str """ def __init__(self, controller_name): """ Initialisation: Client is not started yet. """ self._controller_name = controller_name if controller_name[0] != '/' else controller_name[1:] self._is_running = False @property def is_running(self): """ :return: True if client is running / server is unavailable; False otherwise :rtype: bool """ return self._is_running
[docs] def start(self, timeout = 5): """ Start the dynamic_reconfigure client :param timeout: time to wait before giving up on service request :type timeout: float """ service_name = "/{}/arm/controller_parameters_config".format(self._controller_name) try: self._client = dynamic_reconfigure.client.Client(service_name, timeout=timeout, config_callback=self._log_update) except rospy.ROSException: rospy.logdebug("ControllerParamConfigClient: Could not find configuration server at {}".format(service_name)) self._is_running = True
def _log_update(self, config): """ Optional callback to log parameter changes after each update request. """ k_gains = [config[n] for n in K_GAINS_KW] d_gains = [config[n] for n in D_GAINS_KW] rospy.logdebug("ControllerParamConfigClient: {controller_name} config set to \n\tSmoothing Param: {smoothing_param} \n\tJoint Stiffness: {K_gains} \n\tJoint Damping: {D_gains}".format(controller_name=self._controller_name, smoothing_param = config['position_joint_delta_filter'], K_gains = k_gains, D_gains = d_gains))
[docs] def update_config(self, **kwargs): """ Update the config in the server using the provided keyword arguments. :param kwargs: These are keyword arguments matching the parameter names in config file: franka_ros_controllers/cfg/joint_controller_params.cfg """ self._client.update_configuration(kwargs)
[docs] def set_controller_gains(self, k_gains, d_gains = None): """ Update the stiffness and damping parameters of the joints for the current controller. :param k_gains: joint stiffness parameters (should be within limits specified in franka documentation; same is also set in franka_ros_controllers/cfg/joint_controller_params.cfg) :type k_gains: [float] :param d_gains: joint damping parameters (should be within limits specified in franka documentation; same is also set in franka_ros_controllers/cfg/joint_controller_params.cfg) :type d_gains: [float] """ assert len(k_gains) == 7, "ControllerParamConfigClient: k_gains argument should be of length 7!" config = {} for i, k_val in enumerate(k_gains): config[K_GAINS_KW[i]] = k_val if d_gains: assert len(k_gains) == 7, "ControllerParamConfigClient: d_gains argument should be of length 7!" for j, d_val in enumerate(d_gains): config[D_GAINS_KW[j]] = d_val rospy.logdebug("Config: {}".format(config)) self.update_config(**config)
[docs] def set_joint_motion_smoothing_parameter(self, value): """ Update the joint motion smoothing parameter (only valid for position_joint_position_controller). :param value: smoothing factor (should be within limit set in franka_ros_controllers/cfg/joint_controller_params.cfg) :type value: [float] """ self.update_config(position_joint_delta_filter = value)
[docs] def get_joint_motion_smoothing_parameter(self, timeout = 5): """ :return: the currently set value for the joint position smoothing parameter from the server. :rtype: float :param timeout: time to wait before giving up on service request :type timeout: float """ return self.get_config(timeout = timeout)['position_joint_delta_filter']
[docs] def get_config(self, timeout = 5): """ :return: the currently set values for all paramters from the server :rtype: dict {str : float} :param timeout: time to wait before giving up on service request :type timeout: float """ return self._client.get_configuration(timeout = timeout)
[docs] def get_controller_gains(self, timeout = 5): """ :return: the currently set values for controller gains from the server :rtype: ( [float], [float] ) :param timeout: time to wait before giving up on service request :type timeout: float """ config = self.get_config(timeout = timeout) k_gains = [] for k_val_ in K_GAINS_KW: if k_val_ in config: k_gains.append(config[k_val_]) else: rospy.logwarn("ControllerParamConfigClient: Could not find K gain {} in server".format(k_val_)) return False, False d_gains = [] for d_val_ in D_GAINS_KW: if d_val_ in config: d_gains.append(config[d_val_]) else: rospy.logwarn("ControllerParamConfigClient: Could not find D gain {} in server".format(d_val_)) return k_gains, False return k_gains, d_gains
[docs] def get_parameter_descriptions(self, timeout = 5): """ :return: the description of each parameter as defined in the cfg file from the server. :rtype: dict {str : str} :param timeout: time to wait before giving up on service request :type timeout: float """ return self._client.get_parameter_descriptions(timeout = timeout)
if __name__ == "__main__": rospy.init_node("dynamic_client") # client = dynamic_reconfigure.client.Client("dynamic_tutorials", timeout=30, config_callback=callback)