DiTer++ is under development of dataset player, further updates will be uploaded!
#!/usr/bin/env python3
"""
Multi-threaded ROS data saver for PointCloud2, IMU, Image, and CameraInfo topics.
This node subscribes to four ROS topics:
- /pointcloud (sensor_msgs/PointCloud2)
- /imu/data (sensor_msgs/Imu)
- /camera/image_raw (sensor_msgs/Image)
- /camera/camera_info (sensor_msgs/CameraInfo)
Each subscriber callback enqueues incoming messages into a thread-safe queue.
Dedicated worker threads consume from each queue and save the data to disk:
• PointCloud2 → PCD files (ASCII)
• IMU → CSV file (one line per message)
• Image → PNG files
• CameraInfo → JSON files (intrinsics/extrinsics)
Usage:
1. Make sure your ROS master is running.
2. Adjust topic names below if needed.
3. `chmod +x data_saver_node.py`
4. `rosrun <your_package> data_saver_node.py`
"""
import os
import threading
import queue
import csv
import json
import rospy
import cv2
import numpy as np
from sensor_msgs.msg import PointCloud2, Imu, Image, CameraInfo
import sensor_msgs.point_cloud2 as pc2
from cv_bridge import CvBridge
# ------------------------------------------------------------------------------
# CONFIGURATION
# ------------------------------------------------------------------------------
# Topic names (adjust as needed)
POINTCLOUD_TOPIC = "/dog/points"
IMU_TOPIC = "/dog/imu/data"
IMAGE_TOPIC = "/dog/image_raw"
CAMINFO_TOPIC = "/dog/camera_info"
# Output directories
OUTPUT_BASE_DIR = os.path.expanduser("~/ros_data") # base folder for all saved data
PCD_DIR = os.path.join(OUTPUT_BASE_DIR, "pointclouds")
IMU_DIR = os.path.join(OUTPUT_BASE_DIR, "imu")
IMAGE_DIR = os.path.join(OUTPUT_BASE_DIR, "images")
CAMERA_INFO_DIR = os.path.join(OUTPUT_BASE_DIR, "camera_info")
# IMU CSV file name
IMU_CSV_FILENAME = os.path.join(IMU_DIR, "imu_data.csv")
# ------------------------------------------------------------------------------
# UTILITY: Ensure directories exist
# ------------------------------------------------------------------------------
for d in (PCD_DIR, IMU_DIR, IMAGE_DIR, CAMERA_INFO_DIR):
os.makedirs(d, exist_ok=True)
# ------------------------------------------------------------------------------
# THREAD-SAFE QUEUES FOR EACH TOPIC
# ------------------------------------------------------------------------------
pc_queue = queue.Queue() # holds incoming PointCloud2 messages
imu_queue = queue.Queue() # holds incoming Imu messages
image_queue = queue.Queue() # holds incoming Image messages
caminfo_queue = queue.Queue() # holds incoming CameraInfo messages
# Bridge for converting ROS Image → OpenCV
bridge = CvBridge()
# ------------------------------------------------------------------------------
# WORKER THREADS
# ------------------------------------------------------------------------------
def pointcloud_saver():
"""
Worker thread that consumes PointCloud2 messages from pc_queue,
converts them to XYZ points, and writes to an ASCII PCD file.
"""
while not rospy.is_shutdown():
try:
msg = pc_queue.get(timeout=0.5)
except queue.Empty:
continue
header = msg.header
stamp = header.stamp
# Format timestamp as "<secs>_<nsecs>"
ts_str = f"{stamp.secs:010d}_{stamp.nsecs:09d}"
filename = os.path.join(PCD_DIR, f"pointcloud_{ts_str}.pcd")
# Read XYZ points from the PointCloud2 message
# We only extract x, y, z fields here
points = []
for point in pc2.read_points(msg, field_names=("x", "y", "z"), skip_nans=True):
points.append([point[0], point[1], point[2]])
points = np.array(points, dtype=np.float32)
num_points = points.shape[0]
# Write ASCII PCD
try:
with open(filename, "w") as f:
# PCD header
f.write("# .PCD v0.7 - Point Cloud Data file format\n")
f.write("VERSION 0.7\n")
f.write("FIELDS x y z\n")
f.write("SIZE 4 4 4\n")
f.write("TYPE F F F\n")
f.write("COUNT 1 1 1\n")
f.write(f"WIDTH {num_points}\n")
f.write("HEIGHT 1\n")
f.write("VIEWPOINT 0 0 0 1 0 0 0\n")
f.write(f"POINTS {num_points}\n")
f.write("DATA ascii\n")
# Write each point as "x y z"
for pt in points:
f.write(f"{pt[0]:.6f} {pt[1]:.6f} {pt[2]:.6f}\n")
except Exception as e:
rospy.logerr(f"[PointCloudSaver] Failed to write {filename}: {e}")
pc_queue.task_done()
def imu_saver():
"""
Worker thread that consumes Imu messages from imu_queue and appends
data to a CSV file (IMU_CSV_FILENAME). One line per message:
timestamp, orientation (x,y,z,w), angular_velocity (x,y,z), linear_acceleration (x,y,z)
"""
# If CSV does not exist, write header
if not os.path.isfile(IMU_CSV_FILENAME):
try:
with open(IMU_CSV_FILENAME, mode="w", newline="") as csv_file:
writer = csv.writer(csv_file)
header = [
"timestamp",
"ori_x", "ori_y", "ori_z", "ori_w",
"ang_vel_x", "ang_vel_y", "ang_vel_z",
"lin_acc_x", "lin_acc_y", "lin_acc_z",
]
writer.writerow(header)
except Exception as e:
rospy.logerr(f"[IMUSaver] Failed to create CSV {IMU_CSV_FILENAME}: {e}")
while not rospy.is_shutdown():
try:
msg = imu_queue.get(timeout=0.5)
except queue.Empty:
continue
header = msg.header
stamp = header.stamp
# Timestamp as float seconds
ts = stamp.to_sec()
ori = msg.orientation
ang_vel = msg.angular_velocity
lin_acc = msg.linear_acceleration
row = [
f"{ts:.9f}",
f"{ori.x:.6f}", f"{ori.y:.6f}", f"{ori.z:.6f}", f"{ori.w:.6f}",
f"{ang_vel.x:.6f}", f"{ang_vel.y:.6f}", f"{ang_vel.z:.6f}",
f"{lin_acc.x:.6f}", f"{lin_acc.y:.6f}", f"{lin_acc.z:.6f}",
]
try:
with open(IMU_CSV_FILENAME, mode="a", newline="") as csv_file:
writer = csv.writer(csv_file)
writer.writerow(row)
except Exception as e:
rospy.logerr(f"[IMUSaver] Failed to append to {IMU_CSV_FILENAME}: {e}")
imu_queue.task_done()
def image_saver():
"""
Worker thread that consumes Image messages from image_queue, converts
them to OpenCV images via CvBridge, and writes each as a PNG file.
"""
while not rospy.is_shutdown():
try:
msg = image_queue.get(timeout=0.5)
except queue.Empty:
continue
header = msg.header
stamp = header.stamp
ts_str = f"{stamp.secs:010d}_{stamp.nsecs:09d}"
filename = os.path.join(IMAGE_DIR, f"image_{ts_str}.png")
try:
# Convert ROS Image → OpenCV (assume mono8 or rgb8)
if msg.encoding == "mono8" or msg.encoding == "mono16":
cv_image = bridge.imgmsg_to_cv2(msg, desired_encoding="mono16")
else:
cv_image = bridge.imgmsg_to_cv2(msg, desired_encoding="bgr8")
except Exception as e:
rospy.logerr(f"[ImageSaver] CvBridge conversion failed: {e}")
image_queue.task_done()
continue
try:
cv2.imwrite(filename, cv_image)
except Exception as e:
rospy.logerr(f"[ImageSaver] Failed to write {filename}: {e}")
image_queue.task_done()
def caminfo_saver():
"""
Worker thread that consumes CameraInfo messages from caminfo_queue and
writes each message's intrinsic/extrinsic parameters to a JSON file.
"""
while not rospy.is_shutdown():
try:
msg = caminfo_queue.get(timeout=0.5)
except queue.Empty:
continue
header = msg.header
stamp = header.stamp
ts_str = f"{stamp.secs:010d}_{stamp.nsecs:09d}"
filename = os.path.join(CAMERA_INFO_DIR, f"camera_info_{ts_str}.json")
cam_info_dict = {
"header": {
"seq": msg.header.seq,
"stamp_secs": msg.header.stamp.secs,
"stamp_nsecs": msg.header.stamp.nsecs,
"frame_id": msg.header.frame_id,
},
"height": msg.height,
"width": msg.width,
"distortion_model": msg.distortion_model,
"D": list(msg.D),
"K": list(msg.K), # 3x3 row-major
"R": list(msg.R), # 3x3 row-major
"P": list(msg.P), # 3x4 row-major
"binning_x": msg.binning_x,
"binning_y": msg.binning_y,
"roi": {
"x_offset": msg.roi.x_offset,
"y_offset": msg.roi.y_offset,
"height": msg.roi.height,
"width": msg.roi.width,
"do_rectify": msg.roi.do_rectify,
}
}
try:
with open(filename, "w") as f:
json.dump(cam_info_dict, f, indent=2)
except Exception as e:
rospy.logerr(f"[CamInfoSaver] Failed to write {filename}: {e}")
caminfo_queue.task_done()
# ------------------------------------------------------------------------------
# SUBSCRIBER CALLBACKS
# ------------------------------------------------------------------------------
def pc_callback(msg: PointCloud2):
"""
Enqueue incoming PointCloud2 message.
"""
try:
pc_queue.put_nowait(msg)
except queue.Full:
rospy.logwarn("[pc_callback] pc_queue is full; dropping message")
def imu_callback(msg: Imu):
"""
Enqueue incoming Imu message.
"""
try:
imu_queue.put_nowait(msg)
except queue.Full:
rospy.logwarn("[imu_callback] imu_queue is full; dropping message")
def image_callback(msg: Image):
"""
Enqueue incoming Image message.
"""
try:
image_queue.put_nowait(msg)
except queue.Full:
rospy.logwarn("[image_callback] image_queue is full; dropping message")
def caminfo_callback(msg: CameraInfo):
"""
Enqueue incoming CameraInfo message.
"""
try:
caminfo_queue.put_nowait(msg)
except queue.Full:
rospy.logwarn("[caminfo_callback] caminfo_queue is full; dropping message")
# ------------------------------------------------------------------------------
# MAIN ENTRY POINT
# ------------------------------------------------------------------------------
def main():
rospy.init_node("multi_topic_saver_node", anonymous=False)
rospy.loginfo("Starting multi-topic data saver node...")
# Launch worker threads
threads = []
t_pc = threading.Thread(target=pointcloud_saver, name="PointCloudSaverThread", daemon=True)
t_imu = threading.Thread(target=imu_saver, name="IMUSaverThread", daemon=True)
t_img = threading.Thread(target=image_saver, name="ImageSaverThread", daemon=True)
t_cam = threading.Thread(target=caminfo_saver, name="CamInfoSaverThread", daemon=True)
threads.extend([t_pc, t_imu, t_img, t_cam])
for t in threads:
t.start()
rospy.loginfo(f"Started thread: {t.name}")
# Set up subscribers (using default queue_size; callbacks enqueue into our queues)
rospy.Subscriber(POINTCLOUD_TOPIC, PointCloud2, pc_callback, queue_size=1)
rospy.Subscriber(IMU_TOPIC, Imu, imu_callback, queue_size=10)
rospy.Subscriber(IMAGE_TOPIC, Image, image_callback, queue_size=5)
rospy.Subscriber(CAMINFO_TOPIC, CameraInfo, caminfo_callback, queue_size=1)
rospy.loginfo("Subscribers created. Waiting for messages...")
# Keep python from exiting until node is stopped
rospy.spin()
# Optional: join threads if needed (they are daemons, so they will exit when main thread exits)
for t in threads:
t.join(timeout=1.0)
rospy.loginfo("Node shutting down.")
if __name__ == "__main__":
main()
Please use the Python script example provided from the above to extract data from our ROS-1 bag data.
Further updates for dataset player will be uploaded later with GUI