|
|
carla에서 라이다 데이터를 저장 할 때 .ply 형식으로 저장하였다
label형식은 아래와 같이 Location,Rotation, BBox Center ,BBox Extent 형식으로 저장하였다.
-----------------------------------------------------------------------------------------------------------------
[LiDAR]
Location: -103.041 89.016 2.327
Rotation: -0.126 -102.091 -1.096
[Ego Vehicle]
Location: -102.995 89.012 -0.073
Rotation: -0.126 -102.091 -1.096
BBox Center: -103.005 89.047 0.816
BBox Extent: 2.276 1.048 0.886
[Vehicle ID 26]
Location: -107.678 78.042 -0.051
Rotation: -0.335 -92.904 0.184
BBox Center: -107.673 78.071 0.838
BBox Extent: 2.276 1.048 0.886
-----------------------------------------------------------------------------------------------------------------
label의 데이터는 carla의 world 좌표계 기준의 좌표계 이기때문에 lidar 기준의 좌표계로 변환하여야 한다.
carla->custom 형식으로 변환 코드
-----------------------------------------------------------------------------------------------------------------
import os, glob, math, numpy as np
# Z축 회전
def Rz(rad: float) -> np.ndarray:
c, s = math.cos(rad), math.sin(rad)
return np.array([[ c, -s, 0],
[ s, c, 0],
[ 0, 0, 1]], dtype=float)
def parse_label_file(label_path):
lidar_loc = lidar_rot = None
ego = {"center": None, "extent": None, "rot": None}
vehicles = []
cur = None
tmp_c = tmp_e = tmp_r = None
with open(label_path, 'r') as f:
for raw in f:
line = raw.strip()
if not line:
continue
if line.startswith('['):
if cur and cur.startswith('Vehicle ID') and tmp_c is not None:
vehicles.append({'center': tmp_c, 'extent': tmp_e, 'rot': tmp_r})
tmp_c = tmp_e = tmp_r = None
cur = line.strip('[]')
continue
if cur == 'LiDAR':
if line.startswith('Location:'):
lidar_loc = np.array(list(map(float, line.split(':')[1].split())), dtype=float)
elif line.startswith('Rotation:'):
# pitch, yaw, roll CARLA
lidar_rot = np.array(list(map(float, line.split(':')[1].split())), dtype=float)
elif cur == 'Ego Vehicle':
if line.startswith('Rotation:'):
ego["rot"] = np.array(list(map(float, line.split(':')[1].split())), dtype=float)
elif line.startswith('BBox Center:'):
ego["center"] = np.array(list(map(float, line.split(':')[1].split())), dtype=float)
elif line.startswith('BBox Extent:'):
ego["extent"] = np.array(list(map(float, line.split(':')[1].split())), dtype=float)
elif cur and cur.startswith('Vehicle ID'):
if line.startswith('Rotation:'):
tmp_r = np.array(list(map(float, line.split(':')[1].split())), dtype=float)
elif line.startswith('BBox Center:'):
tmp_c = np.array(list(map(float, line.split(':')[1].split())), dtype=float)
elif line.startswith('BBox Extent:'):
tmp_e = np.array(list(map(float, line.split(':')[1].split())), dtype=float)
if cur and cur.startswith('Vehicle ID') and tmp_c is not None:
vehicles.append({'center': tmp_c, 'extent': tmp_e, 'rot': tmp_r})
return lidar_loc, lidar_rot, ego, vehicles
def worldRH_to_lidarRH(point_world_xyz, lidar_loc_world, lidar_yaw_deg):
# 1) world -> world y 반전
Pw_rh = np.array([point_world_xyz[0], -point_world_xyz[1], point_world_xyz[2]], dtype=float)
t_rh = np.array([lidar_loc_world[0], -lidar_loc_world[1], lidar_loc_world[2]], dtype=float)
# 2) 역변환: Rz(+yawL) @ (Pw_rh - t_rh)
R_inv = Rz(math.radians(lidar_yaw_deg)) #(역변환은 +yaw)
Pl = R_inv @ (Pw_rh - t_rh)
return Pl # LiDAR 로컬 (RH)
def relative_heading_lidar(obj_yaw_deg, lidar_yaw_deg):
"""
네 코드의 월드(RH) yaw:
yaw_w = -deg2rad(yaw_carla)
라벨 heading(센서 좌표계) = yaw_obj_w - yaw_lidar_w
= -(obj_yaw - lidar_yaw) [rad]
"""
h = -math.radians(obj_yaw_deg - lidar_yaw_deg)
# [-pi, pi] 정규화
return math.atan2(math.sin(h), math.cos(h))
# 라벨 파일 변환
def convert_label_file_to_custom(input_label_path, output_label_path,
include_ego=True, category='Vehicle',
use_center=True):
lidar_loc, lidar_rot, ego, vehicles = parse_label_file(input_label_path)
lidar_yaw_deg = float(lidar_rot[1]) # CARLA yaw
lines = []
def one_line(center_w, extent_half, rot_pyr):
xyz_lidar = worldRH_to_lidarRH(center_w, lidar_loc, lidar_yaw_deg)
dims = (np.asarray(extent_half, float) * 2.0) # dx,dy,dz
if not use_center:
xyz_lidar[2] -= dims[2] / 2.0
heading = relative_heading_lidar(rot_pyr[1], lidar_yaw_deg)
return f"{xyz_lidar[0]:.3f} {xyz_lidar[1]:.3f} {xyz_lidar[2]:.3f} " \
f"{dims[0]:.3f} {dims[1]:.3f} {dims[2]:.3f} {heading:.3f} {category}"
if include_ego and all(v is not None for v in ego.values()):
lines.append(one_line(ego["center"], ego["extent"], ego["rot"]))
for o in vehicles:
lines.append(one_line(o["center"], o["extent"], o["rot"]))
os.makedirs(os.path.dirname(output_label_path), exist_ok=True)
with open(output_label_path, 'w') as f:
f.write("\n".join(lines))
# ply -> npy
def ply_to_npy(ply_path, npy_path, flip_y=False):
with open(ply_path, 'rb') as f:
fmt = 'ascii'; nverts=None; props=[]
while True:
line = f.readline().decode('utf-8').strip()
if line.startswith('format') and 'binary' in line: fmt='binary'
if line.startswith('element vertex'): nverts=int(line.split()[-1])
if line.startswith('property'): props.append(line.split()[2])
if line == 'end_header': break
xi, yi, zi = props.index('x'), props.index('y'), props.index('z')
ii = props.index('intensity') if 'intensity' in props else None
if fmt == 'ascii':
rows=[]
for _ in range(nverts):
p = f.readline().decode('utf-8').split()
x=float(p[xi]); y=float(p[yi]); z=float(p[zi])
inten=float(p[ii]) if ii is not None else 0.0
if flip_y: y = -y
rows.append([x,y,z,inten])
arr = np.asarray(rows, dtype=np.float32)
else:
nump=len(props)
buf=np.fromfile(f, dtype=np.float32, count=nverts*nump).reshape(-1, nump)
x=buf[:,xi]; y=buf[:,yi]; z=buf[:,zi]
inten=buf[:,ii] if ii is not None else np.zeros((nverts,), np.float32)
if flip_y: y = -y
arr = np.column_stack([x,y,z,inten]).astype(np.float32)
os.makedirs(os.path.dirname(npy_path), exist_ok=True)
np.save(npy_path, arr)
def batch_convert_custom_dataset(label_dir, ply_dir, out_root,
category_name='Vehicle',
include_ego=True,
use_center=True,
flip_y_ply=False,
require_both=False,
prefer_raw_ply=True):
os.makedirs(out_root, exist_ok=True)
out_labels = os.path.join(out_root, 'labels')
out_points = os.path.join(out_root, 'points')
os.makedirs(out_labels, exist_ok=True)
os.makedirs(out_points, exist_ok=True)
# 라벨
label_paths = sorted(glob.glob(os.path.join(label_dir, 'label_*.txt')))
label_frames = {os.path.basename(p).split('_')[1].split('.')[0]: p for p in label_paths}
# ply
ply_frames = {}
pc_candidates = glob.glob(os.path.join(ply_dir, 'pc_*.ply'))
raw_candidates = glob.glob(os.path.join(ply_dir, '*.ply'))
if prefer_raw_ply:
for p in pc_candidates:
stem = os.path.basename(p).replace('pc_','').split('.')[0]
if stem.isdigit(): ply_frames.setdefault(stem, p)
for p in raw_candidates:
stem = os.path.basename(p).split('.')[0]
if stem.isdigit(): ply_frames[stem] = p
else:
for p in pc_candidates + raw_candidates:
stem = os.path.basename(p).replace('pc_','').split('.')[0]
if stem.isdigit(): ply_frames[stem] = p
frames = sorted(set(label_frames) | set(ply_frames)) if not require_both \
else sorted(set(label_frames) & set(ply_frames))
n_lab = n_ply = 0
for frame in frames:
if frame in label_frames:
in_lb = label_frames[frame]
out_lb = os.path.join(out_labels, f'{frame}.txt')
convert_label_file_to_custom(in_lb, out_lb,
include_ego=include_ego,
category=category_name,
use_center=use_center)
n_lab += 1
elif require_both:
continue
if frame in ply_frames:
in_ply = ply_frames[frame]
out_npy = os.path.join(out_points, f'{frame}.npy')
ply_to_npy(in_ply, out_npy, flip_y=flip_y_ply)
n_ply += 1
print(f'[done] labels: {n_lab} files, points: {n_ply} files, total frames considered: {len(frames)}')
if __name__ == "__main__":
import argparse
ap = argparse.ArgumentParser()
ap.add_argument("--label_dir", required=True)
ap.add_argument("--ply_dir", required=True)
ap.add_argument("--out_root", required=True)
ap.add_argument("--include_ego", action="store_true")
ap.add_argument("--flip_y_ply", action="store_true")
args = ap.parse_args()
batch_convert_custom_dataset(args.label_dir, args.ply_dir, args.out_root,
include_ego=args.include_ego,
flip_y_ply=args.flip_y_ply)
-----------------------------------------------------------------------------------------------------------------
변환된 label.txt 예시
-0.034 -0.000 -1.512 4.552 2.096 1.772 -0.000 Vehicle (EGO)
18.079 -13.151 -1.512 4.552 2.096 1.772 -0.000 Vehicle (다른차량)
.npy로 변환된 포인트 클라우드에 custom데이터셋 형식으로 변환한 라벨로 3D 바운딩 박스를 그리는 코드
-----------------------------------------------------------------------------------------------------------------
import argparse, math
import numpy as np
import open3d as o3d
def load_points_npy(path):
arr = np.load(path)
if arr.ndim != 2 or arr.shape[1] < 3:
raise ValueError("points .npy must be (N,>=3) [x,y,z,(intensity)]")
pts = arr[:, :3].astype(np.float64, copy=False)
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(pts)
pcd.colors = o3d.utility.Vector3dVector(np.ones((pts.shape[0], 3), np.float64))
return pcd
def parse_custom_labels(path):
labels = []
with open(path, "r") as f:
for line in f:
s = line.strip()
if not s or s.startswith("#"): continue
parts = s.split()
if len(parts) < 8:
raise ValueError(f"Invalid label line: {s}")
*nums, cls = parts
x, y, z, dx, dy, dz, heading = map(float, nums)
labels.append({
"center": np.array([x, y, z], np.float64),
"dims": np.array([dx, dy, dz], np.float64),
"heading": heading,
"cls": cls
})
return labels
def Rz(yaw):
c, s = math.cos(yaw), math.sin(yaw)
return np.array([[ c, -s, 0],
[ s, c, 0],
[ 0, 0, 1]], np.float64)
def color_for_class(name: str):
n = name.lower()
if "veh" in n or "car" in n: return (1.0, 0.0, 0.0)
if "ped" in n: return (0.0, 1.0, 0.0)
if "cycl" in n or "bike" in n:return (0.0, 0.7, 1.0)
return (1.0, 1.0, 0.0)
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--points", required=True, help="(N,4) .npy path [x,y,z,intensity]")
ap.add_argument("--labels", required=True, help="custom label .txt path")
ap.add_argument("--point_size", type=float, default=2.0)
ap.add_argument("--line_width", type=float, default=2.0)
ap.add_argument("--draw_center_axes", action="store_true")
ap.add_argument("--global_axes", action="store_true")
ap.add_argument("--axes_size", type=float, default=1.0)
ap.add_argument("--flip_y_labels", action="store_true",
help="라벨만 y *= -1 (heading도 부호 반전). 포인트는 그대로.")
args = ap.parse_args()
# 포인트 로드
pcd = load_points_npy(args.points)
# 라벨 로드
labels = parse_custom_labels(args.labels)
geoms = [pcd]
for lbl in labels:
c = lbl["center"].copy()
h = float(lbl["heading"])
if args.flip_y_labels:
c[1] *= -1.0
h = -h
obb = o3d.geometry.OrientedBoundingBox(c, Rz(h), lbl["dims"])
ls = o3d.geometry.LineSet.create_from_oriented_bounding_box(obb)
ls.paint_uniform_color(color_for_class(lbl["cls"]))
geoms.append(ls)
if args.draw_center_axes:
ax = o3d.geometry.TriangleMesh.create_coordinate_frame(size=args.axes_size, origin=c)
ax.rotate(Rz(h), center=c)
geoms.append(ax)
if args.global_axes:
geoms.append(o3d.geometry.TriangleMesh.create_coordinate_frame(size=args.axes_size))
vis = o3d.visualization.Visualizer()
vis.create_window("CustomDataset Viewer", 1280, 800, visible=True)
opt = vis.get_render_option()
opt.background_color = np.array([0.0, 0.0, 0.0], np.float64)
opt.point_size = float(args.point_size)
try: opt.line_width = float(args.line_width)
except: pass
for g in geoms: vis.add_geometry(g)
vis.run()
vis.destroy_window()
if __name__ == "__main__":
main()
-----------------------------------------------------------------------------------------------------------------
변환이 제대로 적용된 것을 확인
학습이 가능한지 확인하기 위해 OpenPCDet의 형식으로 데이터 정보파일을 생성
일단 학습이 가능한지 확인하기 위해서 약 500개의 데이터로 진행
-----------------------------------------------------------------------------------------------------------------
import pickle, numpy as np
from pathlib import Path
import open3d as o3d
# 경로 설정
RESULT_PKL = Path("result.pkl")
DATA_ROOT = Path("custom_dataset")
TARGET_ID = "003024"
SCORE_THR = 0.3
#points
pts_file = DATA_ROOT / "points" / f"{TARGET_ID}.npy"
pts = np.load(pts_file)[:, :3]
#result.pkl
preds = pickle.load(open(RESULT_PKL, "rb"))
def _norm(s: str):
s = str(s)
return s.zfill(6) if s.isdigit() else s
def _get_id(d: dict):
if 'frame_id' in d:
return _norm(d['frame_id'])
meta = d.get('metadata', {})
for k in ('sample_idx', 'image_idx', 'frame_id'):
if k in meta:
return _norm(meta[k])
return None
match = None
for d in preds:
if _get_id(d) == TARGET_ID:
match = d
break
def pick(d, *keys):
for k in keys:
if k in d and d[k] is not None:
return d[k]
return None
boxes = pick(match, 'boxes_lidar', 'pred_boxes')
scores = pick(match, 'score', 'scores', 'pred_scores')
labels = pick(match, 'name', 'label_preds', 'pred_labels')
boxes = np.asarray(boxes)
scores = np.asarray(scores)
mask = scores >= SCORE_THR
boxes = boxes[mask]
pcd = o3d.geometry.PointCloud(o3d.utility.Vector3dVector(pts))
pcd.paint_uniform_color([1.0, 1.0, 1.0])
def obb_from_box7(b):
# box = x, y, z, dx, dy, dz, yaw
x, y, z, dx, dy, dz, yaw = b.tolist()
center = np.array([x, y, z], dtype=np.float64)
R = o3d.geometry.OrientedBoundingBox.get_rotation_matrix_from_axis_angle([0, 0, yaw])
extent = np.array([dx, dy, dz], dtype=np.float64)
return o3d.geometry.OrientedBoundingBox(center, R, extent)
geoms = [pcd]
for b in boxes:
obb = obb_from_box7(b)
obb.color = (0.0, 1.0, 0.0)
geoms.append(obb)
vis = o3d.visualization.Visualizer()
vis.create_window(window_name=f"Viz {TARGET_ID}", width=1280, height=720, visible=True)
for g in geoms:
vis.add_geometry(g)
# 배경/포인트 크기 설정
opt = vis.get_render_option()
opt.background_color = np.asarray([0.0, 0.0, 0.0])
opt.point_size = 1.0
vis.run()
vis.destroy_window()
-----------------------------------------------------------------------------------------------------------------
예측 점수가 99점(ego)와 65점(차량)이 나오며 학습이 가능한 것을 확인함 이제 차량을 여러대 추가하여 데이터셋을 만들어 보고 변환 및 시각화를 통해 확인하고 차량이 최대한 많이 찍히도록 carla코드를 수정중에 있음
