basalt/python/basalt/metric.py
Nikolaus Demmel 24325f2a06 ICCV'21 square root marginalization paper code release
Major changes:

- New square-root implementation for optimization and
  marginalization, giving faster optimization and numerically
  more stable marginalization. The square root solver is the new
  default, but the Schur complement based implementation is still
  available. (Implements the ICCV'21 paper.)

- The odometry estimator is now fully templetized and you can run
  in float or double. Default is float, which works well with the
  new square-root implementation and gives best runtimes.

- Batch evaluation scripts and documentation to reproduce the
  ICCV'21 experiments.

Additional changes:

- New options in VIO to marginalize lost landmark right away and
  not only when the frame is marginalized (enabled by default).

- small bugfix for keypoint patch extraction bounds

- basalt_vio: more logging for batch evaluation

- basalt_vio: better handling of closing the GUI while estimator is still running

- basalt_vio: new command line argument to limit the number of frames processed

- basalt_vio: new command line argument to save ground truth trajectory

- added unit tests for square root marginalization

- update basalt-headers

- new submodules: gmt, nlohmann/json, magic_enum
2021-10-15 15:09:15 +02:00

172 lines
6.5 KiB
Python

#
# BSD 3-Clause License
#
# This file is part of the Basalt project.
# https://gitlab.com/VladyslavUsenko/basalt.git
#
# Copyright (c) 2019-2021, Vladyslav Usenko and Nikolaus Demmel.
# All rights reserved.
#
import copy
import numpy as np
class ExperimentSpec:
def __init__(self, string):
if "@it" in string:
self.name, it = string.split("@it")
self.it = int(it)
else:
self.name = string
self.it = -1
def display_name(self, exp):
if self.it == -1:
return exp.display_name
else:
return "{} @ it{}".format(exp.display_name, self.it)
class Metric:
def __init__(self,
display_name,
accessor,
decimals,
format_string="{:.{prec}f}",
highlight_top=True,
geometric_mean=False,
larger_is_better=False):
self.display_name = display_name
self.accessor = accessor
self.decimals = decimals
self.display_decimals = None
self.relative_to_column = None
self.relative_to_experiment = None
self.relative_to_metric = None
self.ratio = True
self.format_string = format_string
self.highlight_top = highlight_top
self.larger_is_better = larger_is_better
self.exclude_columns_highlight = []
self.geometric_mean = geometric_mean
self.failed_threshold = None
def set_config(self, spec):
# change defaults in case of "relative_to_..." display mode
if any(k in spec for k in ["relative_to_column", "relative_to_experiment", "relative_to_metric"]):
# maybe overwritten by explicit "decimals" / "format_string" below
self.decimals = 3
self.display_decimals = 3
self.format_string = "{:.3f}"
self.geometric_mean = True
if "display_name" in spec:
self.display_name = spec.display_name
if "decimals" in spec:
self.decimals = spec.decimals
if "display_decimals" in spec:
self.display_decimals = spec.display_decimals
if "relative_to_column" in spec:
self.relative_to_column = spec.relative_to_column
if "relative_to_experiment" in spec:
self.relative_to_experiment = ExperimentSpec(spec.relative_to_experiment)
if "relative_to_metric" in spec:
self.relative_to_metric = spec.relative_to_metric
if "ratio" in spec:
self.ratio = spec.ratio
if "format_string" in spec:
self.format_string = spec.format_string
if "highlight_top" in spec:
self.highlight_top = spec.highlight_top
if "larger_is_better" in spec:
self.larger_is_better = spec.larger_is_better
if "exclude_columns_highlight" in spec:
self.exclude_columns_highlight = spec.exclude_columns_highlight
if "geometric_mean" in spec:
self.geometric_mean = spec.geometric_mean
if "failed_threshold" in spec:
self.failed_threshold = spec.failed_threshold
def effective_display_decimals(self):
if self.display_decimals is not None:
return self.display_decimals
else:
return self.decimals
def get_value(self, exps, e, s, it):
#try:
value = self.accessor(e.runs[s].log, it)
#except AttributeError as err:
# raise
if self.relative_to_metric is not None:
relative_to_metric_accessor = self.relative_to_metric.accessor
else:
relative_to_metric_accessor = self.accessor
if self.relative_to_experiment is not None:
relative_to_log = exps[self.relative_to_experiment.name].runs[s].log
relative_to_it = self.relative_to_experiment.it
else:
relative_to_log = e.runs[s].log
relative_to_it = it
if self.relative_to_metric is not None or self.relative_to_experiment is not None:
base_value = relative_to_metric_accessor(relative_to_log, relative_to_it)
if self.ratio:
value = value / base_value
else:
value = base_value - value
return value
def peak_memory_opt(l, it):
if it == -1:
index = -1
else:
index = int(l.all.num_it[:it + 1].sum()) - 1
return l.all.resident_memory_peak[index] / 1024**2
# yapf: disable
metric_desc = dict(
ev_min=Metric("min ev", lambda l, it: min(min(x) for x in l.sums.marg_ev), 1),
avg_num_it=Metric("avg #it", lambda l, it: np.mean(l.sums.num_it), 1),
avg_num_it_failed=Metric("avg #it-fail", lambda l, it: np.mean(l.sums.num_it_rejected), 1),
duration=Metric("duration (s)", lambda l, it: l.duration(), 1),
time_marg=Metric("t marg", lambda l, it: np.sum(l.sums.marginalize), 2),
time_opt=Metric("t opt", lambda l, it: np.sum(l.sums.optimize), 2),
time_optmarg=Metric("t opt", lambda l, it: np.sum(l.sums.optimize) + np.sum(l.sums.marginalize), 2),
time_exec=Metric("t exec", lambda l, it: l.vio.exec_time_s[0], 1),
time_exec_realtimefactor=Metric("t exec (rtf)", lambda l, it: l.duration() / l.vio.exec_time_s[0], 1, larger_is_better=True),
time_measure=Metric("t meas", lambda l, it: np.sum(l.sums.measure), 1),
time_measure_realtimefactor=Metric("t meas (rtf)", lambda l, it: l.duration() / np.sum(l.sums.measure), 1, larger_is_better=True),
time_exec_minus_measure=Metric("t exec - meas", lambda l, it: l.vio.exec_time_s[0] - np.sum(l.sums.measure), 1),
time_measure_minus_optmarg=Metric("t exec - (opt + marg)", lambda l, it: np.sum(l.sums.measure) - (np.sum(l.sums.optimize) + np.sum(l.sums.marginalize)), 1),
ate_num_kfs=Metric("ATE #kf", lambda l, it: l.vio.ate_num_kfs[0], 0),
ate_rmse=Metric("ATE", lambda l, it: l.vio.ate_rmse[0], 3),
peak_memory=Metric("mem peak (MB)", lambda l, it: l.vio.resident_memory_peak[0] / 1024**2, 1),
#peak_memory_opt=Metric("mem peak opt (MB)", lambda l, it: l.all.resident_memory_peak[l.all.num_it[:it].sum()-1] / 1024**2, 1),
peak_memory_opt=Metric("mem peak opt (MB)", peak_memory_opt, 1),
)
# yapf: enable
def metrics_from_config(spec):
def get_from_spec(m):
if isinstance(m, str):
obj = copy.copy(metric_desc[m])
else:
obj = copy.copy(metric_desc[m.name])
obj.set_config(m)
if obj.relative_to_metric is not None:
obj.relative_to_metric = get_from_spec(obj.relative_to_metric)
return obj
return [get_from_spec(m) for m in spec]