update recorder

This commit is contained in:
Vladyslav Usenko 2019-10-01 14:06:49 +02:00
parent b822f14224
commit ea4f9a63cf
1 changed files with 89 additions and 61 deletions

View File

@ -72,7 +72,8 @@ pangolin::Var<int> webp_quality("ui.webp_quality", 90, 0, 101);
pangolin::Var<int> skip_frames("ui.skip_frames", 1, 1, 10); pangolin::Var<int> skip_frames("ui.skip_frames", 1, 1, 10);
pangolin::Var<float> exposure("ui.exposure", 5.0, 1, 20); pangolin::Var<float> exposure("ui.exposure", 5.0, 1, 20);
tbb::concurrent_bounded_queue<basalt::OpticalFlowInput::Ptr> image_data_queue; tbb::concurrent_bounded_queue<basalt::OpticalFlowInput::Ptr> image_data_queue,
image_data_queue2;
tbb::concurrent_bounded_queue<basalt::ImuData::Ptr> imu_data_queue; tbb::concurrent_bounded_queue<basalt::ImuData::Ptr> imu_data_queue;
tbb::concurrent_bounded_queue<basalt::RsPoseData> pose_data_queue; tbb::concurrent_bounded_queue<basalt::RsPoseData> pose_data_queue;
@ -85,35 +86,45 @@ static constexpr int NUM_CAMS = basalt::RsT265Device::NUM_CAMS;
static constexpr int NUM_WORKERS = 8; static constexpr int NUM_WORKERS = 8;
std::ofstream cam_data[NUM_CAMS], exposure_data[NUM_CAMS], imu0_data, pose_data; std::ofstream cam_data[NUM_CAMS], exposure_data[NUM_CAMS], imu0_data, pose_data;
std::mutex cam_mutex[NUM_CAMS];
std::vector<std::thread> worker_threads; std::vector<std::thread> worker_threads;
std::thread imu_worker_thread, pose_worker_thread; std::thread imu_worker_thread, pose_worker_thread, exposure_save_thread,
stop_recording_thread;
#if CV_MAJOR_VERSION >= 3
std::string file_extension = ".webp";
#else
std::string file_extension = ".jpg";
#endif
// manual exposure mode, if not enabled will also record pose data // manual exposure mode, if not enabled will also record pose data
bool manual_exposure; bool manual_exposure;
void image_save_worker() { void exposure_save_worker() {
basalt::OpticalFlowInput::Ptr img; basalt::OpticalFlowInput::Ptr img;
while (!stop_workers) { while (!stop_workers) {
if (image_data_queue.try_pop(img)) { if (image_data_queue.try_pop(img)) {
if (recording) {
for (size_t cam_id = 0; cam_id < NUM_CAMS; ++cam_id) { for (size_t cam_id = 0; cam_id < NUM_CAMS; ++cam_id) {
std::scoped_lock<std::mutex> lock(cam_mutex[cam_id]); cam_data[cam_id] << img->t_ns << "," << img->t_ns << file_extension
#if CV_MAJOR_VERSION >= 3
cam_data[cam_id] << img->t_ns << "," << img->t_ns << ".webp"
<< std::endl; << std::endl;
#else
cam_data[cam_id] << img->t_ns << "," << img->t_ns << ".jpg"
<< std::endl;
#endif
exposure_data[cam_id] << img->t_ns << "," exposure_data[cam_id] << img->t_ns << ","
<< int64_t(img->img_data[cam_id].exposure * 1e9) << int64_t(img->img_data[cam_id].exposure * 1e9)
<< std::endl; << std::endl;
} }
image_data_queue2.push(img);
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
}
void image_save_worker() {
basalt::OpticalFlowInput::Ptr img;
while (!stop_workers) {
if (image_data_queue2.try_pop(img)) {
for (size_t cam_id = 0; cam_id < NUM_CAMS; ++cam_id) { for (size_t cam_id = 0; cam_id < NUM_CAMS; ++cam_id) {
basalt::ManagedImage<uint16_t>::Ptr image_raw = basalt::ManagedImage<uint16_t>::Ptr image_raw =
img->img_data[cam_id].img; img->img_data[cam_id].img;
@ -147,7 +158,7 @@ void image_save_worker() {
cv::imwrite(filename, image, compression_params); cv::imwrite(filename, image, compression_params);
#endif #endif
} }
}
} else { } else {
std::this_thread::sleep_for(std::chrono::milliseconds(10)); std::this_thread::sleep_for(std::chrono::milliseconds(10));
} }
@ -224,6 +235,8 @@ inline std::string get_date() {
void startRecording(const std::string &dir_path) { void startRecording(const std::string &dir_path) {
if (!recording) { if (!recording) {
if (stop_recording_thread.joinable()) stop_recording_thread.join();
dataset_dir = dir_path + "dataset_" + get_date() + "/"; dataset_dir = dir_path + "dataset_" + get_date() + "/";
fs::create_directory(dataset_dir); fs::create_directory(dataset_dir);
@ -256,8 +269,9 @@ void startRecording(const std::string &dir_path) {
"[m s^-2],a_RS_S_z [m s^-2]\n"; "[m s^-2],a_RS_S_z [m s^-2]\n";
save_calibration(t265_device); save_calibration(t265_device);
t265_device->image_data_queue->clear(); t265_device->image_data_queue = &image_data_queue;
t265_device->imu_data_queue->clear(); t265_device->imu_data_queue = &imu_data_queue;
t265_device->pose_data_queue = &pose_data_queue;
std::cout << "Started recording dataset in " << dataset_dir << std::endl; std::cout << "Started recording dataset in " << dataset_dir << std::endl;
@ -269,6 +283,19 @@ void startRecording(const std::string &dir_path) {
void stopRecording() { void stopRecording() {
if (recording) { if (recording) {
auto stop_recording_func = [&]() {
t265_device->imu_data_queue = nullptr;
t265_device->pose_data_queue = nullptr;
t265_device->image_data_queue = nullptr;
while (!image_data_queue.empty() || !image_data_queue2.empty() ||
!imu_data_queue.empty() || !pose_data_queue.empty()) {
std::cout << "Waiting until the data from the queues is written to the "
"hard drive."
<< std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
recording = false; recording = false;
cam_data[0].close(); cam_data[0].close();
cam_data[1].close(); cam_data[1].close();
@ -278,6 +305,9 @@ void stopRecording() {
pose_data.close(); pose_data.close();
std::cout << "Stopped recording dataset in " << dataset_dir << std::endl; std::cout << "Stopped recording dataset in " << dataset_dir << std::endl;
};
stop_recording_thread = std::thread(stop_recording_func);
} }
} }
@ -304,7 +334,7 @@ int main(int argc, char *argv[]) {
return app.exit(e); return app.exit(e);
} }
if (dataset_path[dataset_path.length() - 1] != '/') { if (!dataset_path.empty() && dataset_path[dataset_path.length() - 1] != '/') {
dataset_path += '/'; dataset_path += '/';
} }
@ -317,10 +347,12 @@ int main(int argc, char *argv[]) {
} }
} }
exposure_save_thread = std::thread(exposure_save_worker);
imu_worker_thread = std::thread(imu_save_worker); imu_worker_thread = std::thread(imu_save_worker);
pose_worker_thread = std::thread(pose_save_worker); pose_worker_thread = std::thread(pose_save_worker);
image_data_queue.set_capacity(1000); image_data_queue.set_capacity(1000);
image_data_queue2.set_capacity(1000);
imu_data_queue.set_capacity(10000); imu_data_queue.set_capacity(10000);
pose_data_queue.set_capacity(10000); pose_data_queue.set_capacity(10000);
@ -328,10 +360,6 @@ int main(int argc, char *argv[]) {
t265_device.reset(new basalt::RsT265Device(manual_exposure, skip_frames, t265_device.reset(new basalt::RsT265Device(manual_exposure, skip_frames,
webp_quality, exposure)); webp_quality, exposure));
t265_device->image_data_queue = &image_data_queue;
t265_device->imu_data_queue = &imu_data_queue;
t265_device->pose_data_queue = &pose_data_queue;
t265_device->start(); t265_device->start();
imu_log.reset(new pangolin::DataLog); imu_log.reset(new pangolin::DataLog);
@ -380,7 +408,7 @@ int main(int argc, char *argv[]) {
if (idx == 0) { if (idx == 0) {
pangolin::GlFont::I() pangolin::GlFont::I()
.Text("Queue: %d.", image_data_queue.size()) .Text("Queue: %d.", image_data_queue2.size())
.Draw(30, 60); .Draw(30, 60);
} }