|
|
https://epretx.etri.re.kr/dataDetail?lang=ko&id=456
2025 자율주행 인공지능 챌린지의 환경은 docker로 제공을 해줬다. 이전대회 2024 자율주행 인공지능 챌린지와 환경이 똑같은 것을 확인하여 윤지가 docker로 진행하고 나머지 팀원들은 이전의 환경으로 실행하기로 결정하였다.
etri 사이트에 따르면 사용된 lidar는 64채널과 128채널의 2개의 lidar로 주파수는 동일하지만 채널수가 차이가 발생하기 때문에 각 데이터셋에 사용된 센서의 사양을 알기위해 eda를 실행
------------------------------------------------------------------------------------------------------------------------------
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from collections import Counter
from sklearn.cluster import KMeans
import matplotlib as mpl
mpl.rcParams['font.family'] = 'DejaVu Sans'
mpl.rcParams['axes.unicode_minus'] = False
DATA_ROOT = '../data/custom_av'
POINTS_DIR = os.path.join(DATA_ROOT, 'points')
LABELS_DIR = os.path.join(DATA_ROOT, 'labels')
def load_split_lists(data_root):
split = {}
for cand in ['ImageSets', 'splits', 'Split']:
d = os.path.join(data_root, cand)
if os.path.isdir(d):
for name in ['train', 'val', 'test']:
p = os.path.join(d, f'{name}.txt')
if os.path.isfile(p):
with open(p) as f:
ids = [line.strip().split('.')[0] for line in f if line.strip()]
if ids:
split[name] = set(ids)
if split:
return split
subdirs = {k: os.path.join(POINTS_DIR, k) for k in ['train','val','test']}
if any(os.path.isdir(v) for v in subdirs.values()):
for k, d in subdirs.items():
if os.path.isdir(d):
ids = [os.path.splitext(f)[0] for f in os.listdir(d) if f.endswith('.npy')]
if ids:
split[k] = set(ids)
if split:
return split
all_ids = [os.path.splitext(f)[0] for f in os.listdir(POINTS_DIR) if f.endswith('.npy')]
return {'all': set(all_ids)}
SPLITS = load_split_lists(DATA_ROOT)
print('발견된 splits:', {k: len(v) for k, v in SPLITS.items()})
def point_path(sample_id, split_name=None):
cands = []
if split_name:
cands.append(os.path.join(POINTS_DIR, split_name, f'{sample_id}.npy'))
cands.append(os.path.join(POINTS_DIR, f'{sample_id}.npy'))
for p in cands:
if os.path.isfile(p):
return p
return None
def label_path(sample_id, split_name=None):
cands = []
if split_name:
cands.append(os.path.join(LABELS_DIR, split_name, f'{sample_id}.txt'))
cands.append(os.path.join(LABELS_DIR, f'{sample_id}.txt'))
for p in cands:
if os.path.isfile(p):
return p
return None
def estimate_layers(xyz, angle_resolution=0.4):
"""Estimate vertical layer count by quantizing elevation angle."""
xy = np.linalg.norm(xyz[:, :2], axis=1)
v_angles = np.degrees(np.arctan2(xyz[:, 2], xy))
q = np.floor((v_angles - v_angles.min()) / angle_resolution)
return int(np.unique(q).size)
def eda_for_split(split_name, ids):
rows = []
for sid in ids:
p = point_path(sid, split_name if split_name!='all' else None)
if p is None:
continue
pts = np.load(p)
xyz = pts[:, :3]
rows.append({
'id': sid,
'point_count': xyz.shape[0],
'layer_count': estimate_layers(xyz, angle_resolution=0.4),
'split': split_name
})
df = pd.DataFrame(rows)
return df
def label_stats_for_split(split_name, ids):
obj_counter = Counter()
frame_counter = Counter()
total_labeled_frames = 0
for sid in ids:
lp = label_path(sid, split_name if split_name!='all' else None)
if lp is None:
continue
total_labeled_frames += 1
seen = set()
with open(lp, 'r') as f:
for line in f:
parts = line.strip().split()
if not parts:
continue
cls = parts[-1]
obj_counter[cls] += 1
seen.add(cls)
for cls in seen:
frame_counter[cls] += 1
return obj_counter, frame_counter, total_labeled_frames
all_df = []
split_order = list(SPLITS.keys())
for split_name in split_order:
print(f'\n[{split_name}] EDA 실행 중...')
df = eda_for_split(split_name, SPLITS[split_name])
print(df[['point_count','layer_count']].describe())
all_df.append(df)
all_df = pd.concat(all_df, ignore_index=True)
def plot_hist_by_split(df, col, bins=40, title=''):
plt.figure(figsize=(7,4))
for s in df['split'].unique():
sub = df[df['split']==s][col]
plt.hist(sub, bins=bins, alpha=0.5, label=f'{s} (n={len(sub)})')
plt.title(title if title else f'{col} distribution (by split)')
plt.xlabel(col)
plt.legend()
plt.tight_layout()
plt.show()
plot_hist_by_split(all_df, 'point_count', bins=40, title='Point Count Distribution (by split)')
plot_hist_by_split(all_df, 'layer_count', bins=40, title='Vertical Layer Count Distribution (by split)')
features = all_df[['point_count','layer_count']].values
kmeans = KMeans(n_clusters=2, random_state=42, n_init=10).fit(features)
all_df['cluster'] = kmeans.labels_
pc_means = all_df.groupby('cluster')['point_count'].mean()
mapping = {pc_means.idxmin():'64ch_like', pc_means.idxmax():'128ch_like'}
all_df['sensor_guess'] = all_df['cluster'].map(mapping)
print('\n[Overall] cluster stats (point_count-based mapping)')
print(all_df.groupby(['cluster'])[['point_count','layer_count']].agg(['mean','std','min','max']))
print('\n[By split] estimated sensor ratio (point_count-based)')
print(pd.crosstab(all_df['split'], all_df['sensor_guess'], normalize='index').round(3))
for split_name in split_order:
ids = SPLITS[split_name]
obj_cnt, frm_cnt, labeled_frames = label_stats_for_split(split_name, ids)
print(f'\n[{split_name}] labeled frames: {labeled_frames}')
if labeled_frames == 0:
print(' No labels → skip object/frame distribution.')
continue
print(' Object counts:', dict(obj_cnt))
print(' Frame counts :', dict(frm_cnt))
if obj_cnt:
classes = list(obj_cnt.keys())
counts = [obj_cnt[c] for c in classes]
plt.figure(figsize=(6,4))
plt.bar(classes, counts)
plt.title(f'[{split_name}] Object Count Distribution')
plt.ylabel('Count')
plt.tight_layout()
plt.show()
out_csv = os.path.join(DATA_ROOT, 'eda_point_layer_sensor.csv')
all_df.to_csv(out_csv, index=False)
print('\nSaved EDA summary CSV:', out_csv)
------------------------------------------------------------------------------------------------------------------------------
point수를 특징으로 64ch과 128ch을 분류하기 위해 K-평균 클러스터링을 적용해 분류한 결과 test 데이터는 높은 확률로 64ch을 사용하였다고 결과가 나오고 훈련데이터셋은 6:4 비율로 128ch과 64ch이 섞인것으로 확인된다.
훈련 방향
레이블에서 추출된 클래스 개수는 심각한 불균형을 보인다. 훈련 세트에는 차량 객체가 317,872개, 보행자 객체가 122,305개, Cyclist 객체가 10,580개만 있다. 차량 클래스에 편향되는 것을 방지하기 위해 gt_sampling과 클래스별 가중치 부여를 통해 클래스 불균형 문제를 해결할 것이다.
훈련 데이터셋과 테스트 데이터셋의 환경이 다르므로 128채널 포인트 클라우드를 다운샘플링하여 64채널 밀도를 구현하는 방법을 통해 포인트 밀도 분포가 64채널 프레임과 비슷하도록 만들어 테스트 환경과 비슷하게 만들어 테스트 데이터셋에 대하여 좋은 성능을 발휘할수있도록 훈련방향을 잡을 예정이다.
사용할 모델로는 VoxelNext,pv_rcnn++,centerpoint_pillar 모델들을 위 과정을 적용시켜 돌려보고 테스트 데이터셋에 예측결과를 시각화해 분석후 가장 좋은 모델을 리더보드에 제출할 예정이다.
test 데이터셋 시각화 및 추론결과
------------------------------------------------------------------------------------------------------------------------------
import open3d as o3d
import numpy as np
import os
import time
import pickle
# ======================= Configuration =======================
DATA_PATH = "./data/custom_av"
FRAME_LIST_FILE = os.path.join(DATA_PATH, "ImageSets", "test.txt")
POINTS_FOLDER = os.path.join(DATA_PATH, "points")
RESULT_PKL = "./result.pkl"
VIEW_FILE = os.path.join(DATA_PATH, "view.json")
CLASS_COLOR = {
"Vehicle": [1.0, 0.0, 0.0], # 빨강
"Pedestrian": [0.0, 1.0, 0.0], # 초록
"Cyclist": [0.0, 0.0, 1.0], # 파랑
}
LABEL_COLOR = {
1: [1.0, 0.0, 0.0], # Vehicle
2: [0.0, 1.0, 0.0], # Pedestrian
3: [0.0, 0.0, 1.0], # Cyclist
}
with open(FRAME_LIST_FILE, 'r') as f:
frame_ids = [line.strip() for line in f.readlines()]
# --------- 포인트 로더 ---------
def load_npy_pointcloud(file_path):
"""npy 포인트클라우드를 Open3D PointCloud로 변환"""
points = np.load(file_path)
xyz = points[:, :3]
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(xyz)
return pcd
# --------- 박스/화살표 생성 ---------
def create_bbox(center, size, yaw, color):
"""(x,y,z,dx,dy,dz,yaw) -> OrientedBoundingBox"""
R = o3d.geometry.get_rotation_matrix_from_axis_angle([0, 0, yaw])
box = o3d.geometry.OrientedBoundingBox(center, R, size)
box.color = color
return box
def create_heading_arrow(center, yaw, length=2.0):
"""객체 진행방향 화살표(빨강)"""
arrow = o3d.geometry.TriangleMesh.create_arrow(
cylinder_radius=0.05, cone_radius=0.1,
cylinder_height=length * 0.8, cone_height=length * 0.2
)
arrow.paint_uniform_color([1, 0, 0]) # Red
# 기본 +Z → +X로 회전
R_to_x = o3d.geometry.get_rotation_matrix_from_xyz([-np.pi / 2, 0, 0])
arrow.rotate(R_to_x, center=(0, 0, 0))
# yaw 적용 (Open3D의 axis-angle 회전 기준에 맞춰 -pi/2 보정)
R_yaw = o3d.geometry.get_rotation_matrix_from_axis_angle([0, 0, yaw - np.pi/2])
arrow.rotate(R_yaw, center=(0, 0, 0))
# 위치 이동
arrow.translate(center)
return arrow
# --------- 예측 결과 로더 ---------
def load_predictions(pkl_path):
"""
result.pkl -> {frame_id(str): anno(dict)} 로 변환
anno 키 예시:
'frame_id', 'boxes_lidar'(Nx7), 'pred_labels'(N,), 'name'(N,), 'score'(N,)
"""
with open(pkl_path, 'rb') as f:
det_annos = pickle.load(f) # list of dict
return {str(anno['frame_id']): anno for anno in det_annos}
def visualize_frames():
# Open3D 윈도우
vis = o3d.visualization.VisualizerWithKeyCallback()
if not vis.create_window(window_name='LiDAR Viewer'):
print("[ERROR] Failed to create Open3D window.")
return
# 예측 결과 로드
try:
preds_by_frame = load_predictions(RESULT_PKL)
except Exception as e:
print(f"[ERROR] Failed to read {RESULT_PKL}: {e}")
return
frame_idx = 0
last_key_time = 0
key_delay = 0.1 # seconds
coordinate_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=1.0, origin=[0, 0, 0])
def debounce():
nonlocal last_key_time
now = time.time()
if now - last_key_time >= key_delay:
last_key_time = now
return True
return False
def update_scene():
vis.clear_geometries()
frame_id = frame_ids[frame_idx]
pc_path = os.path.join(POINTS_FOLDER, f"{frame_id}.npy")
# 포인트 표시
if not os.path.exists(pc_path):
print(f"[WARN] point file not found: {pc_path}")
else:
pcd = load_npy_pointcloud(pc_path)
vis.add_geometry(pcd)
# 예측 박스 표시
anno = preds_by_frame.get(str(frame_id))
if anno is None:
alt = str(frame_id).lstrip('0')
if alt == "":
alt = "0"
anno = preds_by_frame.get(alt)
if anno is None:
print(f"[WARN] No prediction found for frame_id={frame_id}")
else:
boxes = np.asarray(anno.get('boxes_lidar', [])) # (N,7): x,y,z,dx,dy,dz,yaw
names = np.asarray(anno.get('name', []))
scores = np.asarray(anno.get('score', []))
labels = np.asarray(anno.get('pred_labels', [])) # 1-based
N = len(boxes)
for i in range(N):
if scores.size and scores[i] < SCORE_THR:
continue
x, y, z, dx, dy, dz, yaw = boxes[i].tolist()
if names.size:
color = CLASS_COLOR.get(str(names[i]), [1, 0, 0])
elif labels.size:
color = LABEL_COLOR.get(int(labels[i]), [1, 0, 0])
else:
color = [1, 0, 0]
box = create_bbox([x, y, z], [dx, dy, dz], yaw, color)
vis.add_geometry(box)
vis.add_geometry(create_heading_arrow([x, y, z], yaw))
vis.add_geometry(coordinate_frame)
vis.poll_events()
vis.update_renderer()
if os.path.exists(VIEW_FILE):
try:
params = o3d.io.read_pinhole_camera_parameters(VIEW_FILE)
vis.get_view_control().convert_from_pinhole_camera_parameters(params)
except Exception:
pass
def next_frame(_):
nonlocal frame_idx
if debounce():
frame_idx = (frame_idx + 1) % len(frame_ids)
update_scene()
return False
def prev_frame(_):
nonlocal frame_idx
if debounce():
frame_idx = (frame_idx - 1 + len(frame_ids)) % len(frame_ids)
update_scene()
return False
def save_view(_):
params = vis.get_view_control().convert_to_pinhole_camera_parameters()
o3d.io.write_pinhole_camera_parameters(VIEW_FILE, params)
print(f"Viewpoint saved to {VIEW_FILE}")
return False
def quit_viewer(_):
print("Quitting viewer.")
vis.close()
return False
vis.register_key_callback(ord("D"), next_frame)
vis.register_key_callback(ord("A"), prev_frame)
vis.register_key_callback(ord("F"), save_view)
vis.register_key_callback(ord("Q"), quit_viewer)
update_scene()
vis.run()
if __name__ == "__main__":
visualize_frames()
|
|
