|
|
| ubuntu 24.04 Desktop ROS2 jazzy |
| ROS2 dynmixel_workbench_operator를 사용하여 차량을 조종 |
| ROS2 패키지를 사용해서 twist 형식인 /cmd_vel을 carla 형식에 맞게 /carla/ego_vehicle/vehicle_control_cmd으로 변환하여 조종하려 했지만 carla에서 vehicle 토픽을 subscribe하지 않은 것을 확인 아직 구현이 안되있는것 같다. <<---수정됨 |
https://github.com/wijekim/CARLA-simulator
*** 수정 ***
기존 문제였던 carla에서 vehicle 토픽을 subscribe하지 않았던 문제를 해결
id를 ego_vehicle로 지정시 vehicle_control_cmd를 subscribe 하는 토픽이 나오지 않는 것을 확인 하였지만
id를 hero로 지정하였을 때 vehicle_control_cmd를 subscribe하는 토픽이 나오는것을 확인
stack.json 설정 파일에서 hero로 id 변경
| # 1. 패키지 디렉토리 생성 cd ~/ros2_ws/src mkdir -p twist_to_carla_control/{src,launch} # 3. CARLA ROS Bridge 설치 (carla_msgs 필요) git clone https://github.com/carla-simulator/ros-bridge.git # 4. 빌드 cd ~/ros2_ws colcon build --packages-select twist_to_carla_control # 5. 실행 source install/setup.bash ros2 launch twist_to_carla_control twist_to_carla.launch.py |
/home/airlab/ros2_ws/src/twist_to_carla_control/launch/twist_to_carla.launch.py
| from launch import LaunchDescription from launch_ros.actions import Node from launch.actions import DeclareLaunchArgument from launch.substitutions import LaunchConfiguration from ament_index_python.packages import get_package_share_directory import os def generate_launch_description(): pkg_share = get_package_share_directory('twist_to_carla_control') # Launch arguments max_steering_arg = DeclareLaunchArgument( 'max_steering_angle', default_value='0.7', description='Maximum steering angle in radians' ) max_speed_arg = DeclareLaunchArgument( 'max_speed', default_value='10.0', description='Maximum speed in m/s' ) wheelbase_arg = DeclareLaunchArgument( 'wheelbase', default_value='2.87', description='Vehicle wheelbase in meters' ) # Node twist_to_carla_node = Node( package='twist_to_carla_control', executable='twist_to_carla_control_node', name='twist_to_carla_control', output='screen', parameters=[{ 'max_steering_angle': LaunchConfiguration('max_steering_angle'), 'max_throttle': 1.0, 'max_brake': 1.0, 'wheelbase': LaunchConfiguration('wheelbase'), 'max_speed': LaunchConfiguration('max_speed'), }], remappings=[ ('/cmd_vel', '/cmd_vel'), ('/carla/hero/vehicle_control_cmd', '/carla/hero/vehicle_control_cmd'), ] ) return LaunchDescription([ max_steering_arg, max_speed_arg, wheelbase_arg, twist_to_carla_node, ]) |
/home/airlab/ros2_ws/src/twist_to_carla_control/src/twist_to_carla_control.cpp
| #include <rclcpp/rclcpp.hpp> #include <geometry_msgs/msg/twist.hpp> #include <carla_msgs/msg/carla_ego_vehicle_control.hpp> class TwistToCarlaControl : public rclcpp::Node { public: TwistToCarlaControl() : Node("twist_to_carla_control") { // 파라미터 선언 this->declare_parameter("max_steering_angle", 0.7); // 최대 조향각 (라디안) this->declare_parameter("max_throttle", 1.0); this->declare_parameter("max_brake", 1.0); this->declare_parameter("wheelbase", 2.87); // 차량 축거 (m) this->declare_parameter("max_speed", 10.0); // 최대 속도 (m/s) // 파라미터 가져오기 max_steering_ = this->get_parameter("max_steering_angle").as_double(); max_throttle_ = this->get_parameter("max_throttle").as_double(); max_brake_ = this->get_parameter("max_brake").as_double(); wheelbase_ = this->get_parameter("wheelbase").as_double(); max_speed_ = this->get_parameter("max_speed").as_double(); // Subscriber 생성 twist_sub_ = this->create_subscription<geometry_msgs::msg::Twist>( "/cmd_vel", 10, std::bind(&TwistToCarlaControl::twistCallback, this, std::placeholders::_1)); // Publisher 생성 control_pub_ = this->create_publisher<carla_msgs::msg::CarlaEgoVehicleControl>( "/carla/hero/vehicle_control_cmd", 10); RCLCPP_INFO(this->get_logger(), "Twist to CARLA Control Node Started"); RCLCPP_INFO(this->get_logger(), "Subscribing to: /cmd_vel"); RCLCPP_INFO(this->get_logger(), "Publishing to: /carla/hero/vehicle_control_cmd"); } private: void twistCallback(const geometry_msgs::msg::Twist::SharedPtr msg) { auto control_msg = carla_msgs::msg::CarlaEgoVehicleControl(); // Linear velocity (x) -> Throttle/Brake double desired_speed = msg->linear.x; if (desired_speed > 0.0) { // 전진: Throttle 계산 control_msg.throttle = std::min(desired_speed / max_speed_, max_throttle_); control_msg.brake = 0.0; } else if (desired_speed < 0.0) { // 후진: Reverse와 Throttle 사용 control_msg.throttle = std::min(std::abs(desired_speed) / max_speed_, max_throttle_); control_msg.brake = 0.0; control_msg.reverse = true; } else { // 정지: Brake 적용 control_msg.throttle = 0.0; control_msg.brake = 0.5; control_msg.reverse = false; } // Angular velocity (z) -> Steering // Ackermann 조향 모델 사용 double angular_vel = msg->angular.z; if (std::abs(desired_speed) > 0.01) { // 회전 반경 계산: R = v / ω double turning_radius = desired_speed / angular_vel; // Ackermann 조향각 계산: δ = atan(L / R) double steering_angle = std::atan(wheelbase_ / turning_radius); // 조향각 정규화 [-1, 1] control_msg.steer = std::clamp(steering_angle / max_steering_, -1.0, 1.0); } else { // 제자리 회전 control_msg.steer = (angular_vel > 0) ? 1.0 : -1.0; } // Hand brake는 기본적으로 사용 안 함 control_msg.hand_brake = false; control_msg.manual_gear_shift = false; control_msg.gear = 0; // 메시지 발행 control_pub_->publish(control_msg); // 디버그 정보 출력 (주기적으로) static int count = 0; if (++count % 20 == 0) { RCLCPP_DEBUG(this->get_logger(), "Twist: linear=%.2f, angular=%.2f -> Control: throttle=%.2f, brake=%.2f, steer=%.2f", msg->linear.x, msg->angular.z, control_msg.throttle, control_msg.brake, control_msg.steer); } } // Subscriber & Publisher rclcpp::Subscription<geometry_msgs::msg::Twist>::SharedPtr twist_sub_; rclcpp::Publisher<carla_msgs::msg::CarlaEgoVehicleControl>::SharedPtr control_pub_; // 파라미터 double max_steering_; double max_throttle_; double max_brake_; double wheelbase_; double max_speed_; }; int main(int argc, char** argv) { rclcpp::init(argc, argv); auto node = std::make_shared<TwistToCarlaControl>(); rclcpp::spin(node); rclcpp::shutdown(); return 0; } |
/home/airlab/ros2_ws/src/twist_to_carla_control/CMakeLists.txt
| cmake_minimum_required(VERSION 3.8) project(twist_to_carla_control) if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") add_compile_options(-Wall -Wextra -Wpedantic) endif() # find dependencies find_package(ament_cmake REQUIRED) find_package(rclcpp REQUIRED) find_package(geometry_msgs REQUIRED) find_package(carla_msgs REQUIRED) # Add executable add_executable(twist_to_carla_control_node src/twist_to_carla_control.cpp) ament_target_dependencies(twist_to_carla_control_node rclcpp geometry_msgs carla_msgs ) # Install targets install(TARGETS twist_to_carla_control_node DESTINATION lib/${PROJECT_NAME} ) # Install launch files install(DIRECTORY launch DESTINATION share/${PROJECT_NAME} ) if(BUILD_TESTING) find_package(ament_lint_auto REQUIRED) ament_lint_auto_find_test_dependencies() endif() ament_package() |
/home/airlab/ros2_ws/src/twist_to_carla_control/package.xml
| <?xml version="1.0"?> <?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?> <package format="3"> <name>twist_to_carla_control</name> <version>1.0.0</version> <description>Convert geometry_msgs/Twist to CARLA EgoVehicleControl</description> <maintainer email="your.email@example.com">Your Name</maintainer> <license>MIT</license> <buildtool_depend>ament_cmake</buildtool_depend> <depend>rclcpp</depend> <depend>geometry_msgs</depend> <depend>carla_msgs</depend> <test_depend>ament_lint_auto</test_depend> <test_depend>ament_lint_common</test_depend> <export> <build_type>ament_cmake</build_type> </export> </package> |
사용방법
| dynamixel_workbench_operator를 실행후 또 다른 터미널을 열어 ros2 launch twist_to_carla_control twist_to_carla.launch.py 실행 |
operator로 2.5까지 올렸을때 앞으로 가기 시작함
wist_to_carla_control에서 수정해주면 될것 같다.
carla 라이브를 사용할때
pkg 작성
ros-bridg를 사용하지 않을것이기 때문에
carla library를 사용하여 ROS2 패키지를 작성함
패키지 만들기
| # 1. 패키지 구조 생성 cd ~/ros2_ws/src mkdir -p twist_carla_direct_control/{twist_carla_direct_control,launch,config,resource} # 2. 파일들 생성 (위 내용 복사) # 3. CARLA Python 설치 pip3 install carla # 4. 빌드 cd ~/ros2_ws colcon build --packages-select twist_carla_direct_control source install/setup.bash # 5. 실행 # Terminal 1: CARLA ./CarlaUnreal.sh --ross # Terminal 2: 컨트롤러 ros2 launch twist_carla_direct_control twist_carla_control.launch.py |
패키지 구조
| twist_carla_direct_control/ ├── twist_carla_direct_control/ │ ├── __init__.py │ └── twist_carla_controller.py ├── launch/ │ └── twist_carla_control.launch.py ├── config/ │ └── params.yaml ├── package.xml ├── setup.py ├── setup.cfg |
twist_carla_direct_control/twist_carla_controller.py
| #!/usr/bin/env python3 """ Twist to CARLA Direct Controller ROS2 Twist 메시지를 받아서 CARLA Python API로 직접 차량 제어 """ import rclpy from rclpy.node import Node from geometry_msgs.msg import Twist import carla import sys import math class TwistCarlaController(Node): def __init__(self): super().__init__('twist_carla_controller') # 파라미터 선언 self.declare_parameter('carla_host', 'localhost') self.declare_parameter('carla_port', 2000) self.declare_parameter('role_name', 'ego_vehicle') self.declare_parameter('timeout', 10.0) self.declare_parameter('max_steering_angle', 0.7) # 라디안 self.declare_parameter('max_speed', 10.0) # m/s self.declare_parameter('wheelbase', 2.87) # m self.declare_parameter('update_rate', 50.0) # Hz # 파라미터 가져오기 host = self.get_parameter('carla_host').value port = self.get_parameter('carla_port').value self.role_name = self.get_parameter('role_name').value timeout = self.get_parameter('timeout').value self.max_steering = self.get_parameter('max_steering_angle').value self.max_speed = self.get_parameter('max_speed').value self.wheelbase = self.get_parameter('wheelbase').value update_rate = self.get_parameter('update_rate').value # CARLA 연결 self.get_logger().info(f'Connecting to CARLA at {host}:{port}...') try: self.client = carla.Client(host, port) self.client.set_timeout(timeout) self.world = self.client.get_world() self.get_logger().info(f'✓ Connected to CARLA: {self.world.get_map().name}') except Exception as e: self.get_logger().error(f'✗ Failed to connect to CARLA: {e}') self.get_logger().error('Make sure CARLA is running!') sys.exit(1) # Ego vehicle 찾기 self.vehicle = None self.find_ego_vehicle() if self.vehicle is None: self.get_logger().error(f'✗ No vehicle with role_name "{self.role_name}" found!') self.get_logger().info('Spawning new ego_vehicle...') self.spawn_ego_vehicle() if self.vehicle is None: self.get_logger().error('Failed to spawn vehicle. Exiting.') sys.exit(1) # Vehicle 설정 self.vehicle.set_autopilot(False) self.vehicle.set_simulate_physics(True) self.get_logger().info(f'✓ Found ego_vehicle: {self.vehicle.id}') self.get_logger().info(f' Type: {self.vehicle.type_id}') self.get_logger().info(f' Location: {self.vehicle.get_location()}') # 마지막 제어 명령 저장 self.last_twist = Twist() self.msg_count = 0 # ROS2 Subscriber 생성 self.subscription = self.create_subscription( Twist, '/cmd_vel', self.twist_callback, 10 ) # 타이머로 주기적으로 제어 명령 전송 (제어 안정성 향상) self.timer = self.create_timer(1.0 / update_rate, self.control_timer_callback) self.get_logger().info('='*60) self.get_logger().info('Twist to CARLA Direct Controller Started') self.get_logger().info('='*60) self.get_logger().info('Subscribing to: /cmd_vel') self.get_logger().info('Ready to receive Twist commands!') self.get_logger().info('='*60) def find_ego_vehicle(self): """role_name이 ego_vehicle인 차량 찾기""" for actor in self.world.get_actors().filter('vehicle.*'): if 'role_name' in actor.attributes: if actor.attributes['role_name'] == self.role_name: self.vehicle = actor return def spawn_ego_vehicle(self): """Ego vehicle 자동 스폰""" try: blueprint_library = self.world.get_blueprint_library() vehicle_bp = blueprint_library.filter('vehicle.tesla.model3')[0] vehicle_bp.set_attribute('role_name', self.role_name) spawn_points = self.world.get_map().get_spawn_points() if len(spawn_points) == 0: self.get_logger().error('No spawn points available!') return spawn_point = spawn_points[0] self.vehicle = self.world.spawn_actor(vehicle_bp, spawn_point) self.get_logger().info(f'✓ Spawned ego_vehicle at {spawn_point.location}') except Exception as e: self.get_logger().error(f'Failed to spawn vehicle: {e}') def twist_callback(self, msg): """Twist 메시지 수신 콜백""" self.last_twist = msg def control_timer_callback(self): """주기적으로 제어 명령 적용""" if self.vehicle is None: self.get_logger().warn('Vehicle not found, searching again...') self.find_ego_vehicle() return msg = self.last_twist try: # CARLA VehicleControl 생성 control = carla.VehicleControl() # Linear velocity (x) -> Throttle/Brake/Reverse desired_speed = msg.linear.x * 100 if desired_speed > 0.01: # 전진 # Throttle 계산 (0.0 ~ 1.0) control.throttle = min(abs(desired_speed) / self.max_speed, 1.0) control.brake = 0.0 control.reverse = False elif desired_speed < -0.01: # 후진 control.throttle = max(abs(desired_speed) / self.max_speed, 1.0) control.brake = 0.0 control.reverse = True else: # 정지 control.throttle = 0.0 control.brake = 0.5 control.reverse = False # Angular velocity (z) -> Steering angular_vel = -(msg.angular.z)*10 if abs(desired_speed) > 0.01: # Ackermann 조향 모델 # 회전 반경: R = v / ω if abs(angular_vel) > 0.001: turning_radius = desired_speed / angular_vel # 조향각: δ = atan(L / R) steering_angle = math.atan(self.wheelbase / turning_radius) # 정규화 [-1, 1] control.steer = max(-1.0, min(1.0, steering_angle / self.max_steering)) else: control.steer = 0.0 else: # 제자리 회전 if abs(angular_vel) > 0.01: control.steer = 1.0 if angular_vel > 0 else -1.0 else: control.steer = 0.0 # Hand brake 및 기타 설정 control.hand_brake = False control.manual_gear_shift = False # 차량에 제어 적용 self.vehicle.apply_control(control) # 주기적 로그 출력 self.msg_count += 1 if self.msg_count % 100 == 0: # 2초마다 (50Hz 기준) vel = self.vehicle.get_velocity() speed = math.sqrt(vel.x**2 + vel.y**2 + vel.z**2) self.get_logger().info( f'Twist: linear={msg.linear.x:.2f}, angular={msg.angular.z:.2f} | ' f'Control: throttle={control.throttle:.2f}, steer={control.steer:.2f}, ' f'brake={control.brake:.2f}, reverse={control.reverse} | ' f'Speed: {speed:.2f} m/s' ) except Exception as e: self.get_logger().error(f'Error applying control: {e}') self.vehicle = None def destroy(self): """종료 시 정리""" if self.vehicle is not None: # 차량 정지 control = carla.VehicleControl() control.throttle = 0.0 control.brake = 1.0 control.hand_brake = True self.vehicle.apply_control(control) self.get_logger().info('Vehicle stopped') def main(args=None): rclpy.init(args=args) node = None try: node = TwistCarlaController() rclpy.spin(node) except KeyboardInterrupt: pass except Exception as e: print(f'Error: {e}') finally: if node is not None: node.destroy() node.destroy_node() rclpy.shutdown() if __name__ == '__main__': main() |
package.xml
| <?xml version="1.0"?> <?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?> <package format="3"> <name>twist_carla_direct_control</name> <version>1.0.0</version> <description>Direct CARLA vehicle control from ROS2 Twist messages using CARLA Python API</description> <maintainer email="your.email@example.com">Your Name</maintainer> <license>MIT</license> <exec_depend>rclpy</exec_depend> <exec_depend>geometry_msgs</exec_depend> <exec_depend>python3-carla</exec_depend> <test_depend>ament_copyright</test_depend> <test_depend>ament_flake8</test_depend> <test_depend>ament_pep257</test_depend> <test_depend>python3-pytest</test_depend> <export> <build_type>ament_python</build_type> </export> </package> |
setup.py
| from setuptools import setup import os from glob import glob package_name = 'twist_carla_direct_control' setup( name=package_name, version='1.0.0', packages=[package_name], data_files=[ ('share/ament_index/resource_index/packages', ['resource/' + package_name]), ('share/' + package_name, ['package.xml']), (os.path.join('share', package_name, 'launch'), glob('launch/*.py')), (os.path.join('share', package_name, 'config'), glob('config/*.yaml')), ], install_requires=['setuptools'], zip_safe=True, maintainer='Your Name', maintainer_email='your.email@example.com', description='Direct CARLA vehicle control from ROS2 Twist messages', license='MIT', tests_require=['pytest'], entry_points={ 'console_scripts': [ 'twist_carla_controller = twist_carla_direct_control.twist_carla_controller:main', ], }, ) |
setup.cfg
| [develop] script-dir=$base/lib/twist_carla_direct_control [install] install-scripts=$base/lib/twist_carla_direct_control |
launch/twist_carla_control.launch.py
| from launch import LaunchDescription from launch_ros.actions import Node from launch.actions import DeclareLaunchArgument from launch.substitutions import LaunchConfiguration def generate_launch_description(): # Launch arguments carla_host_arg = DeclareLaunchArgument( 'carla_host', default_value='localhost', description='CARLA server host' ) carla_port_arg = DeclareLaunchArgument( 'carla_port', default_value='2000', description='CARLA server port' ) role_name_arg = DeclareLaunchArgument( 'role_name', default_value='ego_vehicle', description='Vehicle role name' ) max_speed_arg = DeclareLaunchArgument( 'max_speed', default_value='10.0', description='Maximum speed in m/s' ) wheelbase_arg = DeclareLaunchArgument( 'wheelbase', default_value='2.87', description='Vehicle wheelbase in meters' ) # Node twist_carla_node = Node( package='twist_carla_direct_control', executable='twist_carla_controller', name='twist_carla_controller', output='screen', parameters=[{ 'carla_host': LaunchConfiguration('carla_host'), 'carla_port': LaunchConfiguration('carla_port'), 'role_name': LaunchConfiguration('role_name'), 'timeout': 10.0, 'max_steering_angle': 0.7, 'max_speed': LaunchConfiguration('max_speed'), 'wheelbase': LaunchConfiguration('wheelbase'), 'update_rate': 50.0, }], remappings=[ ('/cmd_vel', '/cmd_vel'), ] ) return LaunchDescription([ carla_host_arg, carla_port_arg, role_name_arg, max_speed_arg, wheelbase_arg, twist_carla_node, ]) |
config/params.yaml
| twist_carla_controller: ros__parameters: carla_host: "localhost" carla_port: 2000 role_name: "ego_vehicle" timeout: 10.0 max_steering_angle: 0.7 # radians max_speed: 10.0 # m/s wheelbase: 2.87 # meters update_rate: 50.0 # Hz |
resource/twist_carla_direct_control
| # Empty marker file for package resource indexing |
twist_carla_direct_control/init.py
| 비어있음 |
빌드 할때 주의
ros2 jazzy에서 실행하려면
carla 라이브러리를 install 해야하는데
pip3 install carla
carla 예제를 사용하던 가상환경을 사용하면 라이브러리를 install 하지 않아도 된다.
carla 경로를 찾을 수 없는 문제가 생겼을때
| # CARLA 경로를 PYTHONPATH에 추가 export PYTHONPATH=$PYTHONPATH:~/carla/carla_venv/lib/python3.12/site-packages 이러한 형태로 경로를 지정해주면된다. export PYTHONPATH=$PYTHONPATH:<가상환경 경로>/lib/python3.12/site-packages |
실행 영상
launch 파일 생성
dynamixel_workbench_operators
twist_carla_direct_control
두개의 패키지를 동시에 실행하는 론치파일 생성
| cd ~/ros2_ws/src ros2 pkg create carla_twist_launch --build-type ament_python --dependencies dynamixel_workbench_operators twist_carla_direct_control mkdir -p ~/ros2_ws/src/carla_twist_launch/launch xterm을 사용할 것이기 때문에 sudo apt update && sudo apt install xterm colcon build --packages-select carla_twist_launch source install/setup.bash ros2 launch carla_twist_launch combined_launch.py |
carla_twist_launch/launch/combined_launch.py
| import os from launch import LaunchDescription from launch_ros.actions import Node def generate_launch_description(): # 1. Dynamixel Workbench 노드 (C++) # 'xterm'을 사용하여 별도의 터미널 창에서 실행되도록 설정합니다. dynamixel_node = Node( package='dynamixel_workbench_operators', executable='dynamixel_workbench_operators', # 실제 실행 파일 이름으로 확인 필요 name='dynamixel_manager', output='screen', prefix='xterm -hold -e', # 별도 창 생성 및 종료 후 창 유지 parameters=[{ # 'usb_port': '/dev/ttyUSB0', # 'baud_rate': 57600, }] ) # 2. Twist Carla Direct Control 노드 (Python) carla_control_node = Node( package='twist_carla_direct_control', executable='twist_carla_controller', # 실제 실행 파일 이름으로 확인 필요 name='carla_twist_controller', output='screen' ) return LaunchDescription([ dynamixel_node, carla_control_node ]) |
setup.py
| import os from glob import glob from setuptools import find_packages, setup package_name = 'carla_twist_launch' setup( name=package_name, version='0.0.0', packages=find_packages(exclude=['test']), data_files=[ ('share/ament_index/resource_index/packages', ['resource/' + package_name]), ('share/' + package_name, ['package.xml']), (os.path.join('share', package_name, 'launch'), glob(os.path.join('launch', '*launch.[pxy][yma]*'))) ], install_requires=['setuptools'], zip_safe=True, maintainer='airlab', maintainer_email='airlab@todo.todo', description='TODO: Package description', license='TODO: License declaration', extras_require={ 'test': [ 'pytest', ], }, entry_points={ 'console_scripts': [ ], }, ) |
setup.cfg
| [develop] script_dir=$base/lib/carla_twist_launch [install] install_scripts=$base/lib/carla_twist_launch |
