import argparse import json import logging import carla import numpy as np import signal import sys
# 설정 변수 TM_PORT = 8000
# --- 차량 설정 --- def _setup_vehicle(world, config): vehicle_id = config.get("id", "ego_vehicle") logging.info(f"차량 생성 시작 [설정 ID: {vehicle_id}]")
bp_library = world.get_blueprint_library() map_obj = world.get_map()
vehicle_type = config.get("type") if vehicle_type == "random": vehicle_blueprints = bp_library.filter('vehicle.*') bp = np.random.choice(vehicle_blueprints) else: bp = bp_library.filter(vehicle_type)[0]
bp.set_attribute("role_name", vehicle_id) if bp.has_attribute("ros_name"): bp.set_attribute("ros_name", vehicle_id)
spawn_point_index = config.get("spawn_point", 0) spawn_points = map_obj.get_spawn_points()
# 인덱스 범위 초과 방지 if spawn_point_index >= len(spawn_points): spawn_point_index = 0
actor = world.spawn_actor(bp, spawn_points[spawn_point_index], attach_to=None) logging.info(f"차량 생성 완료: {actor.attributes.get('role_name')} ({bp.id})") return actor
# --- 센서 설정 --- def _setup_sensors(world, vehicle, sensors_config): bp_library = world.get_blueprint_library() sensors = []
for sensor in sensors_config: sensor_id = sensor.get("id", "sensor") bp = bp_library.filter(sensor.get("type"))[0]
bp.set_attribute("role_name", sensor_id) if bp.has_attribute("ros_name"): bp.set_attribute("ros_name", sensor_id)
for key, value in sensor.get("attributes", {}).items(): bp.set_attribute(str(key), str(value))
sp = sensor.get("spawn_point", {}) transform = carla.Transform( location=carla.Location(x=sp.get("x", 0), y=-sp.get("y", 0), z=sp.get("z", 0)), rotation=carla.Rotation(roll=sp.get("roll", 0), pitch=-sp.get("pitch", 0), yaw=-sp.get("yaw", 0)) )
s = world.spawn_actor(bp, transform, attach_to=vehicle)
if hasattr(s, 'enable_for_ros'): s.enable_for_ros()
sensors.append(s)
return sensors
# --- 메인 실행 루프 --- def main(args): world = None vehicle = None sensors = [] original_settings = None
try: client = carla.Client(args.host, args.port) client.set_timeout(60.0) world = client.get_world() map_obj = world.get_map()
# 서버 설정: 동기 모드 및 렌더링 끔 original_settings = world.get_settings() settings = world.get_settings() settings.synchronous_mode = True settings.fixed_delta_seconds = 0.05 settings.no_rendering_mode = True # 서버 측 렌더링 생략 world.apply_settings(settings)
traffic_manager = client.get_trafficmanager(TM_PORT) traffic_manager.set_synchronous_mode(True)
with open(args.file) as f: config = json.load(f)
# 차량 및 센서 배치 vehicle = _setup_vehicle(world, config) sensors = _setup_sensors(world, vehicle, config.get("sensors", []))
# 경로 설정 #pillar route_indices = config.get("route", [103, 111, 95, 141, 109, 13, 11, 80, 91, 0]) spawn_points = map_obj.get_spawn_points() route = [spawn_points[i].location for i in route_indices if i < len(spawn_points)]
if route: traffic_manager.set_path(vehicle, route) traffic_manager.set_desired_speed(vehicle, config.get("target_speed_kmh", 15.0)) vehicle.set_autopilot(True, traffic_manager.get_port())
logging.info("시뮬레이션 시작 (중단: Ctrl+C)")
current_waypoint_index = 0 lap_count = 0
while True: world.tick()
if route: current_loc = vehicle.get_location() next_wp_idx = (current_waypoint_index + 1) % len(route) distance = current_loc.distance(route[next_wp_idx])
if distance < 5.0: current_waypoint_index = next_wp_idx
# --- [수정 구간] 한 바퀴 완료 시 로직 --- if current_waypoint_index == 0: lap_count += 1 print(f"\n[알림] {lap_count} 바퀴 완료! 경로를 다시 설정합니다.")
# TM에게 동일한 경로를 다시 부여하여 무한 루프 구현 traffic_manager.set_path(vehicle, route) # --------------------------------------
v_id = config.get("id", "ego") print(f"\r[{v_id}] Lap: {lap_count} | Next WP: {current_waypoint_index+1}/{len(route)} | Dist: {distance:.1f}m", end="", flush=True)
except KeyboardInterrupt: print("\n사용자에 의해 종료되었습니다.") finally: logging.info("정리 중...") if original_settings: world.apply_settings(original_settings)
for s in sensors: if s.is_alive: s.destroy() if vehicle and vehicle.is_alive: vehicle.destroy() logging.info("정리 완료.")
if __name__ == '__main__': argparser = argparse.ArgumentParser() argparser.add_argument('--host', default='localhost') argparser.add_argument('--port', default=2000, type=int) argparser.add_argument('-f', '--file', required=True, help='JSON 설정 파일 경로') args = argparser.parse_args()
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO) main(args)
|