Query video streams
Video streams provide the best compression ratio for camera feeds, but require special handling when querying data back from the Data Platform. For more details about the different video types we support see our video reference.
This guide focuses on querying VideoStream data from the Rerun Data Platform,
including how to decode individual frames and how to export entire streams to MP4 files.
The dependencies in this example require rerun-sdk[all] and av for video decoding.
Setup setup
Simplified setup to launch the local server for demonstration. In practice you'll connect to your cloud instance.
from fractions import Fraction
from io import BytesIO
from pathlib import Path
import av
import numpy as np
import pyarrow as pa
import rerun as rr
from datafusion import col
sample_video_path = Path(__file__).parents[4] / "tests" / "assets" / "rrd" / "video_sample"
server = rr.server.Server(datasets={"video_dataset": sample_video_path})
CATALOG_URL = server.url()
client = rr.catalog.CatalogClient(CATALOG_URL)
dataset = client.get_dataset(name="video_dataset")
df = dataset.filter_contents(["/video_stream/**"]).reader(index="log_time")
times = pa.table(df.select("log_time"))["log_time"].to_numpy()Understanding video stream data understanding-video-stream-data
Video streams are logged using the VideoStream archetype,
which stores encoded video samples (frames) along with codec information.
Key columns you'll work with:
VideoStream:codec- The video codec used (e.g., H.264)VideoStream:sample- The encoded video frame data (in Annex B format for H.264)
Checking the video codec checking-the-video-codec
Before processing video data, verify the codec matches what you expect:
codec_column = "/video_stream:VideoStream:codec"
num_codec_matches = df.select(col(codec_column)[0] == rr.VideoCodec.H264.value).count()
if num_codec_matches != df.select(codec_column).count():
raise ValueError(f"Expected H.264 codec {rr.VideoCodec.H264.value}, got {df.select(codec_column).limit(1)}")Decoding a specific frame decoding-a-specific-frame
Unlike raw images, video frames are encoded using inter-frame compression.
To decode a specific frame, you must decode from the beginning of the stream (or from the most recent keyframe) and iterate forward.
av handles keyframe detection internally during decoding.
video_column = "/video_stream:VideoStream:sample"
selected_frame_index = 3 # Pick an arbitrary frame to decode
# Query all samples up to and including the target frame.
# We need to decode from the start (or a keyframe) to reach our target.
selected_time = times[selected_frame_index]
video_df = df.filter(col("log_time") <= selected_time).select("log_time", video_column)
pa_table = pa.table(video_df)
# Concatenate samples into a byte buffer
samples = pa_table[video_column].to_numpy()
sample_times = pa_table["log_time"].to_numpy()
sample_bytes = b""
for sample in samples:
sample_bytes += sample[0].tobytes()
data_buffer = BytesIO(sample_bytes)
# Decode using PyAV
container = av.open(data_buffer, format="h264", mode="r")
video_stream: av.video.stream.VideoStream = container.streams.video[0]
start_time = sample_times[0]
# Decode all frames up to our target, keeping only the last one
frame = None
for packet, time in zip(container.demux(video_stream), sample_times, strict=False):
packet.time_base = Fraction(1, 1_000_000_000) # Timestamps in nanoseconds
packet.pts = int(time - start_time)
packet.dts = packet.pts # No B-frames, so dts == pts
for decoded_frame in packet.decode():
frame = decoded_frame
if not isinstance(frame, av.VideoFrame):
raise RuntimeError("Failed to decode frame.")
image = np.asarray(frame.to_image())
print(f"Decoded frame shape: {image.shape}")Efficient random access with keyframe information efficient-random-access-with-keyframe-information
The example above queries all samples from the start of the stream, which can be inefficient for long videos. For better performance with random access, you can add keyframe information as a layer.
Adding keyframe information as a layer adding-keyframe-information-as-a-layer
You can analyze your video data once to identify keyframes and register them as a separate layer:
# Preprocessing step: Add keyframe information to existing video data as a layer
# This is typically done once to make subsequent queries faster
# Query all video samples from the existing recording
video_samples_df = df.select("log_time", video_column)
video_table = pa.table(video_samples_df)
sample_times = video_table["log_time"].to_numpy()
samples = video_table[video_column].to_numpy()
# Concatenate all samples to analyze keyframes
sample_bytes = b""
for sample in samples:
sample_bytes += sample[0].tobytes()
# Decode the video to detect keyframes
data_buffer = BytesIO(sample_bytes)
container = av.open(data_buffer, format="h264", mode="r")
video_stream = container.streams.video[0]
# Identify which samples are keyframes
keyframe_times = []
for packet, ts in zip(container.demux(video_stream), sample_times):
if packet.is_keyframe:
keyframe_times.append(ts)
container.close()
keyframe_values = [True] * len(keyframe_times)
print(f"Found {len(keyframe_times)} keyframes")
# Save keyframe data as a separate layer
# Get the segment ID to align with the original recording
segment_ids = dataset.segment_ids()
first_segment_id = segment_ids[0]
# Create time column and content using the columnar API
# Make sure the timeline matches the original video stream
timeline = "log_time"
time_column = rr.TimeColumn(timeline=timeline, timestamp=keyframe_times)
content = rr.DynamicArchetype.columns(archetype="KeyframeData", components={"is_keyframe": keyframe_values})
# Write to a new file as a layer
layer_path = TMP_DIR / "keyframe_layer.rrd"
with rr.RecordingStream(
application_id="keyframes",
recording_id=first_segment_id, # Match original recording_id
) as rec:
rec.save(layer_path)
rec.send_columns("/video_stream", indexes=[time_column], columns=[*content])
# Register the layer with the dataset
dataset.register(layer_path.as_uri(), layer_name="keyframes")
print(f"Registered keyframe layer at {layer_path}")This preprocessing approach:
- Decodes the video once to detect which packets are keyframes using
packet.is_keyframe - Creates sparse data containing only keyframe timestamps
- Writes the keyframe data to a separate RRD file
- Registers it as a layer on the dataset
Once registered, the layer data appears as additional columns when querying the dataset (see catalog object model for details on datasets and layers).
Querying with keyframe information querying-with-keyframe-information
With the keyframe layer registered, you can query only the samples between the nearest keyframe and your target frame, significantly reducing the amount of data to fetch and decode:
# Query using keyframe information for efficient random access
# Assume we've already added keyframe information via the preprocessing step above
target_frame_index = 42
target_time = times[target_frame_index]
# Create a reader that includes the keyframe layer data
# The column name follows the pattern: /{entity_path}:{component_name}
keyframe_column = "/video_stream:is_keyframe"
full_df = dataset.filter_contents(["/video_stream/**"]).reader(index="log_time")
# Query to find the most recent keyframe at or before the target time
# Since we only log when is_keyframe=True, any row with this column present is a keyframe
keyframe_slice = full_df.filter((col("log_time") <= target_time) & col(keyframe_column).is_not_null())
closest_keyframe_df = keyframe_slice.aggregate(
[], [F.last_value(col("log_time"), order_by=[col("log_time")]).alias("latest_keyframe")]
)
keyframe_result = pa.table(closest_keyframe_df)
# Start decoding from the most recent keyframe
start_time = keyframe_result["latest_keyframe"].to_numpy()[0]
start_frame_idx = np.searchsorted(times, start_time)
frames_saved = target_frame_index - start_frame_idx
print(f"Found keyframe at frame {start_frame_idx}, saved decoding {frames_saved} frames")
# Query only the video samples from keyframe to target (much more efficient!)
efficient_video_df = df.filter(col("log_time").between(start_time, target_time)).select("log_time", video_column)
efficient_table = pa.table(efficient_video_df)
frames_to_decode = len(efficient_table)
print(f"Decoding {frames_to_decode} frames (vs {target_frame_index + 1} without keyframe info)")
# Now decode just this smaller range
samples = efficient_table[video_column].to_numpy()
sample_times = efficient_table["log_time"].to_numpy()
sample_bytes = b""
for sample in samples:
sample_bytes += sample[0].tobytes()
data_buffer = BytesIO(sample_bytes)
container = av.open(data_buffer, format="h264", mode="r")
video_stream = container.streams.video[0]
# Decode to the target frame
frame = None
for packet, time in zip(container.demux(video_stream), sample_times, strict=False):
packet.time_base = Fraction(1, 1_000_000_000)
packet.pts = int(time - sample_times[0])
packet.dts = packet.pts
for decoded_frame in packet.decode():
frame = decoded_frame
if isinstance(frame, av.VideoFrame):
image = np.asarray(frame.to_image())
print(f"Efficiently decoded frame {target_frame_index} with shape: {image.shape}")This approach is especially beneficial for:
- Long video sequences where decoding from the start is expensive
- Random access patterns where you need to jump to arbitrary frames
- High-resolution video where bandwidth and decode time are significant
- Interactive applications that need to seek to specific timestamps
Exporting to MP4 (remuxing) exporting-to-mp4-remuxing
You can export video stream data to an MP4 file without re-encoding. This is called "remuxing", the encoded samples are simply repackaged into a container format.
# Query all video samples
video_df = df.select("log_time", "/video_stream:VideoStream:sample")
pa_table = pa.table(video_df)
all_times = pa_table["log_time"]
all_samples = pa_table["/video_stream:VideoStream:sample"]
# Concatenate samples into a single byte buffer
sample_bytes = np.concatenate([sample[0] for sample in all_samples.to_numpy()]).tobytes()
sample_bytes_io = BytesIO(sample_bytes)
# Setup input container (H.264 Annex B stream)
input_container = av.open(sample_bytes_io, mode="r", format="h264")
input_stream = input_container.streams.video[0]
# Setup output container (MP4)
output_path = TMP_DIR / "output.mp4"
output_container = av.open(output_path, mode="w")
output_stream = output_container.add_stream_from_template(input_stream)
# Remux packets with correct timestamps
start_time = all_times.chunk(0)[0]
for packet, time in zip(input_container.demux(input_stream), all_times, strict=False):
packet.time_base = Fraction(1, 1_000_000_000)
packet.pts = int(time.value - start_time.value)
packet.dts = packet.pts
packet.stream = output_stream
output_container.mux(packet)
input_container.close()
output_container.close()
print(f"Exported video to {output_path}")Important considerations important-considerations
Keyframe handling keyframe-handling
Video streams often use inter-frame compression where most frames only store the difference from previous frames.
av handles keyframe detection internally, but for efficient random access to specific frames,
you may want to log keyframe indicators separately at recording time.
Timestamp handling timestamp-handling
Video timestamps in Rerun are typically stored in nanoseconds.
When using PyAV for decoding or muxing, ensure you set the correct time_base (typically Fraction(1, 1_000_000_000)).
B-frames bframes
Currently, Rerun's VideoStream does not support B-frames,
so dts (decode timestamp) equals pts (presentation timestamp).