Query Performance Tuning
This is a loose collection of considerations when querying Rerun datasets. Over time baseline performance will improve, rendering some of these approaches unnecessary. Since Rerun depends on DataFusion, some of these approaches are observations from our own usage.
First, generate a DataFrame for comparison:
sample_video_path = Path(__file__).parents[4] / "tests" / "assets" / "rrd" / "video_sample"
server = rr.server.Server(datasets={"video_dataset": sample_video_path})
# Using OSS server for demonstration but in practice replace with
# the URL of your cloud instance
CATALOG_URL = server.url()
client = rr.catalog.CatalogClient(CATALOG_URL)
dataset = client.get_dataset(name="video_dataset")
df = dataset.filter_contents(["/compressed_images/**", "/raw_images/**"]).reader(index="log_time")Extract Python types from a DataFrame extract-python-types-from-a-dataframe
DataFusion is a streaming query engine, which allows for processing arbitrarily large amounts of data. When working with smaller or filtered-down datasets that fit into memory, you can extract data into Python variables for further post processing. In these examples, we convert DataFrames to PyArrow tables to materialize them in memory. Similar patterns using Polars or Pandas also apply.
Prefer to_numpy prefer-tonumpy
This is technically a PyArrow and general Python detail.
For example, when extracting data from a PyArrow table, to_pylist can be multiple orders of magnitude slower, even when using to_numpy(zero_copy_only=False).
table = pa.table(df)
table["log_time"].to_numpy()
# vs.
table["log_time"].to_pylist()Fine-tune data collection finetune-data-collection
Similar to the approach described above to collect a DataFusion DataFrame into a PyArrow table, you can instead collect the results in memory and keep them as a DataFrame.
Then any operations on this in-memory (cached) DataFrame are typically very fast.
df.count() # has to pull some data
df.count() # has to pull same data again
# vs.
cache_df = df.cache() # materializes table in memory
cache_df.count() # basically free
cache_df.count() # basically freeLeverage sparsity to minimize scans leverage-sparsity-to-minimize-scans
In a write once, read many paradigm adding an additional sparse column can enable cheap access to data of interest via filtering. The Rerun Data Platform has the ability to "push down" filters to greatly reduce the amount of data returned, improving query performance. In this example we take advantage of this fact by filtering based on a sparse marker we have intentionally inserted into the recording.
# Create a new sparse layer identifying interesting events
segment_id = dataset.segment_ids()[0]
second_to_last_timestamp = pa.table(df)["log_time"].to_numpy()[-2]
with rr.RecordingStream("rerun_example_layer", recording_id=segment_id) as rec:
rec.save(RRD_PATH)
rec.set_time("log_time", timestamp=second_to_last_timestamp)
rec.log("/events", rr.AnyValues(flag=True))
dataset.register(Path(RRD_PATH).as_uri(), layer_name="event_layer")
# Read dataframe including new sparse layer
df_with_flag = dataset.filter_contents(["/compressed_images/**", "/raw_images/**", "/events/**"]).reader(
index="log_time"
)
# This filter only looks at the single row in events
df_with_flag.filter(col("/events:flag").is_not_null())
# vs. using row_number which requires scanning all rows
df_with_row_number = df.with_column(
"row_num",
F.row_number(order_by="log_time"),
)
df_with_row_number.filter(col("row_num") == df_with_row_number.count() - 1)