Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rosbag_trans): add support to convert multiple aimrt bags into a single ROS bag #90

Merged
merged 5 commits into from
Nov 8, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor(rosbag_trans): add DatabaseManager class to unify database o…
…perations
yglsaltfish committed Nov 8, 2024
commit d4d03b016832cd3f497749570200e248541b7582
254 changes: 138 additions & 116 deletions src/tools/aimrt_cli/aimrt_cli/trans/rosbag_trans.py
Original file line number Diff line number Diff line change
@@ -15,6 +15,67 @@ def increase_indent(self, flow=False, indentless=False):
return super(IndentDumper, self).increase_indent(flow, False)


class DatabaseManager:
def __init__(self, db_path: str):
self.db_path = db_path
self.conn = None
self.cursor = None

def connect(self):
self.conn = sqlite3.connect(self.db_path)
self.cursor = self.conn.cursor()
return self.conn, self.cursor

def create_tables(self):
try:
# create messages table
self.cursor.execute("""
CREATE TABLE messages(
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
topic_id INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
data BLOB NOT NULL)
""")

# create topics table
self.cursor.execute("""
CREATE TABLE topics(
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
type TEXT NOT NULL,
serialization_format TEXT NOT NULL,
offered_qos_profiles TEXT NOT NULL)
""")

# create schema table
self.cursor.execute("""
CREATE TABLE "schema" (
"schema_version" INTEGER,
"ros_distro" TEXT NOT NULL,
PRIMARY KEY("schema_version")
);
""")
self.cursor.execute("""
INSERT INTO schema (schema_version, ros_distro)
VALUES (?, ?)
""", (3, "humble"))

# create metadata table
self.cursor.execute("""
CREATE TABLE metadata(id INTEGER PRIMARY KEY,metadata_version INTEGER NOT NULL,metadata TEXT NOT NULL)
""")
self.conn.commit()
except sqlite3.Error as e:
self.conn.rollback()
raise e

def close(self):
if self.cursor:
self.cursor.close()
if self.conn:
self.conn.close()


class SingleDbProcess:
def __init__(self, topic_info_dict: dict, db_path: Path):
self.message_count = 0
@@ -29,21 +90,25 @@ def get_bag_info(self, conn, cursor):
try:
cursor.execute("SELECT topic_id, timestamp FROM messages")
rows = sorted(cursor.fetchall())

self.starting_time_nanoseconds = self.starting_time_nanoseconds
self.end_time_nanoseconds = rows[-1][1]
self.message_count = len(rows)
for row in rows:
self.topic_with_message_count[self.topic_info_dict[row[0]].topic_name] = self.topic_with_message_count.get(
self.topic_info_dict[row[0]].topic_name, 0) + 1
if rows:
self.starting_time_nanoseconds = rows[0][1]
self.end_time_nanoseconds = rows[-1][1]
self.message_count = len(rows)
for row in rows:
topic_name = self.topic_info_dict[row[0]].topic_name
self.topic_with_message_count[topic_name] = \
self.topic_with_message_count.get(topic_name, 0) + 1
except Exception as e:
print(f"Error getting single bag info: {e}")
conn.rollback()

def get_info(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
self.get_bag_info(conn, cursor)
db_manager = DatabaseManager(str(self.db_path))
conn, cursor = db_manager.connect()
try:
self.get_bag_info(conn, cursor)
finally:
db_manager.close()


@dataclass
@@ -91,7 +156,7 @@ def trans_single_db(self, source_path: Path, topic_map: dict):
self.end_time_nanoseconds = max(self.end_time_nanoseconds, single_bag_info.end_time_nanoseconds)

conn = sqlite3.connect(source_path)
print(f"source_path: {source_path}")
print(f" processing db file: {source_path}")
cursor = conn.cursor()

try:
@@ -105,19 +170,20 @@ def trans_single_db(self, source_path: Path, topic_map: dict):
for row in rows:
topic_map[self.topic_info_dict[row[1]].topic_name].message_count += 1
self.conn.commit()
print(f"length of rows: {len(rows)} done")
print(f" size of data inserted: {len(rows)} done")
except Exception as e:
print(f"Error updating messages table: {e}")
print(f" Error updating messages table: {e}")
self.conn.rollback()
self.id += len(rows)

def trans_single_bag(self, topic_map: dict):
self.parse_yaml()
print(f"there are {len(self.files_list)} db files")
print(f"there are {len(self.files_list)} db files in {self.input_dir_}")
for db_path in self.files_list:
trans_path = Path(self.output_dir_) / db_path['path']
self.trans_single_db(Path(self.input_dir_) / db_path['path'], topic_map)
print(f"trans_path: {trans_path} done")
print(f" trans_path: {trans_path} done")
print(f"all db files in {self.input_dir_} done\n")


class AimrtbagToRos2:
@@ -130,38 +196,36 @@ def __init__(self, input_dir: list, output_dir: str):
self.starting_time_nanoseconds = 100000000000000000000
self.end_time_nanoseconds = 0
self.topics_list = []
self.db_manager = None
self.conn = None
self.cursor = None

def insert_metadata_table(self, conn, cursor):
try:
cursor.execute("""
CREATE TABLE "metadata" (
"id" INTEGER,
"metadata_version" INTEGER NOT NULL,
"metadata" TEXT NOT NULL,
PRIMARY KEY("id")
);
""")
except Exception as e:
print(f"Error create metadata table, error: {e}")
conn.rollback()
def create_output_dir(self):
if os.path.exists(self.output_dir_):
shutil.rmtree(self.output_dir_)
os.makedirs(self.output_dir_)

# initialize database
db_path = os.path.join(self.output_dir_, "rosbag.db3")
self.db_manager = DatabaseManager(db_path)
self.conn, self.cursor = self.db_manager.connect()
self.db_manager.create_tables()

def insert_schema_version(self, conn, cursor):
try:
cursor.execute("""
CREATE TABLE "schema" (
"schema_version" INTEGER,
"ros_distro" TEXT NOT NULL,
PRIMARY KEY("schema_version")
);
""")
cursor.execute("""
INSERT INTO schema (schema_version, ros_distro)
VALUES (?, ?)
""", (3, "humble"))
conn.commit()
except Exception as e:
print(f"Error create schema version, error: {e}")
conn.rollback()
def parse_yaml(self, input_dir: str):
with open(os.path.join(input_dir, "metadata.yaml"), "r") as f:
data = yaml.load(f, Loader=yaml.FullLoader)
if data["aimrt_bagfile_information"] is None or data["aimrt_bagfile_information"]["topics"] is None:
raise Exception("No topics information found in metadata.yaml")

topics_list = data["aimrt_bagfile_information"]["topics"]

for topic in topics_list:
if topic["topic_name"] not in self.topic_map:
self.id += 1
self.topic_map[topic["topic_name"]] = TopicInfo(
self.id, topic["topic_name"], topic["msg_type"], topic["serialization_type"], 0)
else:
print(f"warning: topic {topic['topic_name']} already exists")

def format_qos_profiles(self):
qos_dict = {
@@ -182,65 +246,9 @@ def format_qos_profiles(self):
'False',
'false')


return qos_string

def create_output_dir(self):
if os.path.exists(self.output_dir_):
shutil.rmtree(self.output_dir_)
try:
os.makedirs(self.output_dir_)
print(f"directory created successfully {self.output_dir_}")

# create sqlite database file
db_path = os.path.join(self.output_dir_, "rosbag.db3")
self.conn = sqlite3.connect(db_path)
self.cursor = self.conn.cursor()

# create messages table
self.cursor.execute("""
CREATE TABLE messages(
id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,
topic_id INTEGER NOT NULL,
timestamp INTEGER NOT NULL,
data BLOB NOT NULL)
""")

# create topics table
self.cursor.execute("""
CREATE TABLE topics(
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
type TEXT NOT NULL,
serialization_format TEXT NOT NULL,
offered_qos_profiles TEXT NOT NULL)
""")

# create schema table
self.insert_schema_version(self.conn, self.cursor)

print(f"SQLite database created successfully: {db_path}")

except OSError as e:
print(f"system error: {e}")
except sqlite3.Error as e:
print(f"database error: {e}")

def parse_yaml(self, input_dir: str):
with open(os.path.join(input_dir, "metadata.yaml"), "r") as f:
data = yaml.load(f, Loader=yaml.FullLoader)
if data["aimrt_bagfile_information"] is None or data["aimrt_bagfile_information"]["topics"] is None:
raise Exception("No topics information found in metadata.yaml")

topics_list = data["aimrt_bagfile_information"]["topics"]

for topic in topics_list:
if topic["topic_name"] not in self.topic_map:
self.id += 1
self.topic_map[topic["topic_name"]] = TopicInfo(
self.id, topic["topic_name"], topic["msg_type"], topic["serialization_type"], 0)
else:
print(f"warning: topic {topic['topic_name']} already exists")

def insert_topics_table(self):
try:
qos_dict = [{
@@ -339,6 +347,7 @@ def update_rosbag_yaml_data(self):
f.write(yaml_str)

def sort_db_data(self):
print("start sorting messages table by timestamp")
try:
self.cursor.execute("""
CREATE TABLE messages_temp(
@@ -360,23 +369,36 @@ def sort_db_data(self):
self.cursor.execute("ALTER TABLE messages_temp RENAME TO messages")

self.conn.commit()
print("Messages table has been sorted by timestamp")

except Exception as e:
print(f"Error sorting messages table: {e}")
self.conn.rollback()

def trans(self):
self.create_output_dir()
for input_dir in self.input_dir_:
self.parse_yaml(input_dir)
self.insert_topics_table()
for input_dir in self.input_dir_:
single_bag_trans = SingleBagTrans(input_dir, self.output_dir_, self.conn, self.cursor, self.message_count)
single_bag_trans.trans_single_bag(self.topic_map)
self.message_count = single_bag_trans.id
self.starting_time_nanoseconds = single_bag_trans.starting_time_nanoseconds
self.end_time_nanoseconds = single_bag_trans.end_time_nanoseconds

self.sort_db_data()
self.update_rosbag_yaml_data()
print(f"transing {self.input_dir_} to {self.output_dir_} \n")
try:
self.create_output_dir()
for input_dir in self.input_dir_:
self.parse_yaml(input_dir)
self.insert_topics_table()

for input_dir in self.input_dir_:
single_bag_trans = SingleBagTrans(
input_dir,
self.output_dir_,
self.conn,
self.cursor,
self.message_count
)
single_bag_trans.trans_single_bag(self.topic_map)
self.message_count = single_bag_trans.id
self.starting_time_nanoseconds = single_bag_trans.starting_time_nanoseconds
self.end_time_nanoseconds = single_bag_trans.end_time_nanoseconds

self.sort_db_data()
self.update_rosbag_yaml_data()
finally:
if self.db_manager:
self.db_manager.close()

print(f"transing {self.input_dir_} to {self.output_dir_} done\n")