diff --git a/src/rs_t265_record.cpp b/src/rs_t265_record.cpp index aa897d9..f3f3913 100644 --- a/src/rs_t265_record.cpp +++ b/src/rs_t265_record.cpp @@ -72,7 +72,8 @@ pangolin::Var webp_quality("ui.webp_quality", 90, 0, 101); pangolin::Var skip_frames("ui.skip_frames", 1, 1, 10); pangolin::Var exposure("ui.exposure", 5.0, 1, 20); -tbb::concurrent_bounded_queue image_data_queue; +tbb::concurrent_bounded_queue image_data_queue, + image_data_queue2; tbb::concurrent_bounded_queue imu_data_queue; tbb::concurrent_bounded_queue pose_data_queue; @@ -85,69 +86,79 @@ static constexpr int NUM_CAMS = basalt::RsT265Device::NUM_CAMS; static constexpr int NUM_WORKERS = 8; std::ofstream cam_data[NUM_CAMS], exposure_data[NUM_CAMS], imu0_data, pose_data; -std::mutex cam_mutex[NUM_CAMS]; std::vector worker_threads; -std::thread imu_worker_thread, pose_worker_thread; +std::thread imu_worker_thread, pose_worker_thread, exposure_save_thread, + stop_recording_thread; + +#if CV_MAJOR_VERSION >= 3 +std::string file_extension = ".webp"; +#else +std::string file_extension = ".jpg"; +#endif // manual exposure mode, if not enabled will also record pose data bool manual_exposure; +void exposure_save_worker() { + basalt::OpticalFlowInput::Ptr img; + while (!stop_workers) { + if (image_data_queue.try_pop(img)) { + for (size_t cam_id = 0; cam_id < NUM_CAMS; ++cam_id) { + cam_data[cam_id] << img->t_ns << "," << img->t_ns << file_extension + << std::endl; + + exposure_data[cam_id] << img->t_ns << "," + << int64_t(img->img_data[cam_id].exposure * 1e9) + << std::endl; + } + + image_data_queue2.push(img); + } else { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } +} + void image_save_worker() { basalt::OpticalFlowInput::Ptr img; while (!stop_workers) { - if (image_data_queue.try_pop(img)) { - if (recording) { - for (size_t cam_id = 0; cam_id < NUM_CAMS; ++cam_id) { - std::scoped_lock lock(cam_mutex[cam_id]); -#if CV_MAJOR_VERSION >= 3 - cam_data[cam_id] << img->t_ns << "," << img->t_ns << ".webp" - << std::endl; -#else - cam_data[cam_id] << img->t_ns << "," << img->t_ns << ".jpg" - << std::endl; -#endif + if (image_data_queue2.try_pop(img)) { + for (size_t cam_id = 0; cam_id < NUM_CAMS; ++cam_id) { + basalt::ManagedImage::Ptr image_raw = + img->img_data[cam_id].img; - exposure_data[cam_id] << img->t_ns << "," - << int64_t(img->img_data[cam_id].exposure * 1e9) - << std::endl; + if (!image_raw.get()) continue; + + cv::Mat image(image_raw->h, image_raw->w, CV_8U); + + uint8_t *dst = image.ptr(); + const uint16_t *src = image_raw->ptr; + + for (size_t i = 0; i < image_raw->size(); i++) { + dst[i] = (src[i] >> 8); } - for (size_t cam_id = 0; cam_id < NUM_CAMS; ++cam_id) { - basalt::ManagedImage::Ptr image_raw = - img->img_data[cam_id].img; - - if (!image_raw.get()) continue; - - cv::Mat image(image_raw->h, image_raw->w, CV_8U); - - uint8_t *dst = image.ptr(); - const uint16_t *src = image_raw->ptr; - - for (size_t i = 0; i < image_raw->size(); i++) { - dst[i] = (src[i] >> 8); - } - #if CV_MAJOR_VERSION >= 3 - std::string filename = dataset_dir + "mav0/cam" + - std::to_string(cam_id) + "/data/" + - std::to_string(img->t_ns) + ".webp"; + std::string filename = dataset_dir + "mav0/cam" + + std::to_string(cam_id) + "/data/" + + std::to_string(img->t_ns) + ".webp"; - std::vector compression_params = {cv::IMWRITE_WEBP_QUALITY, - webp_quality}; - cv::imwrite(filename, image, compression_params); + std::vector compression_params = {cv::IMWRITE_WEBP_QUALITY, + webp_quality}; + cv::imwrite(filename, image, compression_params); #else - std::string filename = dataset_dir + "mav0/cam" + - std::to_string(cam_id) + "/data/" + - std::to_string(img->t_ns) + ".jpg"; + std::string filename = dataset_dir + "mav0/cam" + + std::to_string(cam_id) + "/data/" + + std::to_string(img->t_ns) + ".jpg"; - std::vector compression_params = {cv::IMWRITE_JPEG_QUALITY, - webp_quality}; - cv::imwrite(filename, image, compression_params); + std::vector compression_params = {cv::IMWRITE_JPEG_QUALITY, + webp_quality}; + cv::imwrite(filename, image, compression_params); #endif - } } + } else { std::this_thread::sleep_for(std::chrono::milliseconds(10)); } @@ -224,6 +235,8 @@ inline std::string get_date() { void startRecording(const std::string &dir_path) { if (!recording) { + if (stop_recording_thread.joinable()) stop_recording_thread.join(); + dataset_dir = dir_path + "dataset_" + get_date() + "/"; fs::create_directory(dataset_dir); @@ -256,8 +269,9 @@ void startRecording(const std::string &dir_path) { "[m s^-2],a_RS_S_z [m s^-2]\n"; save_calibration(t265_device); - t265_device->image_data_queue->clear(); - t265_device->imu_data_queue->clear(); + t265_device->image_data_queue = &image_data_queue; + t265_device->imu_data_queue = &imu_data_queue; + t265_device->pose_data_queue = &pose_data_queue; std::cout << "Started recording dataset in " << dataset_dir << std::endl; @@ -269,15 +283,31 @@ void startRecording(const std::string &dir_path) { void stopRecording() { if (recording) { - recording = false; - cam_data[0].close(); - cam_data[1].close(); - exposure_data[0].close(); - exposure_data[1].close(); - imu0_data.close(); - pose_data.close(); + auto stop_recording_func = [&]() { + t265_device->imu_data_queue = nullptr; + t265_device->pose_data_queue = nullptr; + t265_device->image_data_queue = nullptr; - std::cout << "Stopped recording dataset in " << dataset_dir << std::endl; + while (!image_data_queue.empty() || !image_data_queue2.empty() || + !imu_data_queue.empty() || !pose_data_queue.empty()) { + std::cout << "Waiting until the data from the queues is written to the " + "hard drive." + << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + + recording = false; + cam_data[0].close(); + cam_data[1].close(); + exposure_data[0].close(); + exposure_data[1].close(); + imu0_data.close(); + pose_data.close(); + + std::cout << "Stopped recording dataset in " << dataset_dir << std::endl; + }; + + stop_recording_thread = std::thread(stop_recording_func); } } @@ -304,7 +334,7 @@ int main(int argc, char *argv[]) { return app.exit(e); } - if (dataset_path[dataset_path.length() - 1] != '/') { + if (!dataset_path.empty() && dataset_path[dataset_path.length() - 1] != '/') { dataset_path += '/'; } @@ -317,10 +347,12 @@ int main(int argc, char *argv[]) { } } + exposure_save_thread = std::thread(exposure_save_worker); imu_worker_thread = std::thread(imu_save_worker); pose_worker_thread = std::thread(pose_save_worker); image_data_queue.set_capacity(1000); + image_data_queue2.set_capacity(1000); imu_data_queue.set_capacity(10000); pose_data_queue.set_capacity(10000); @@ -328,10 +360,6 @@ int main(int argc, char *argv[]) { t265_device.reset(new basalt::RsT265Device(manual_exposure, skip_frames, webp_quality, exposure)); - t265_device->image_data_queue = &image_data_queue; - t265_device->imu_data_queue = &imu_data_queue; - t265_device->pose_data_queue = &pose_data_queue; - t265_device->start(); imu_log.reset(new pangolin::DataLog); @@ -380,7 +408,7 @@ int main(int argc, char *argv[]) { if (idx == 0) { pangolin::GlFont::I() - .Text("Queue: %d.", image_data_queue.size()) + .Text("Queue: %d.", image_data_queue2.size()) .Draw(30, 60); }