The Chunk Processing API is a flexible, chunk-centric API for data ingestion, transformation, and conversion pipelines. It covers I/O from common robotics file formats, powerful declarative data wrangling primitives, and a multithreaded, native engine for pipeline execution. The API is designed to support distributed execution in the future.
Building blocks
The Chunk Processing API is built from three kinds of primitives — readers, stores, and lazy streams — that compose into a pipeline executed by a terminal call:
Readers
Readers produce Chunks from external sources such as files, or datasets hosted on a catalog server.
In some cases, readers are classes provided by the Chunk Processing API, such as RrdReader and McapReader.
The reader functionality can also be provided by classes from other parts of the Rerun SDK.
For example, DatasetEntry has a segment_store method which returns a LazyStore for the corresponding segment (see the catalog object model for more information on datasets).
UrdfTree is another example of a class that offers reader functionality in addition to a larger feature set.
There are two ways in which a reader may provide chunks.
All readers can sequentially stream all their source's chunks, typically via the stream() method.
Internally, such readers typically parse the source file, convert data to chunks as it is extracted, and yield those chunks as they are produced.
Some readers, called IndexedReader, can also provide indexed, random access to chunks via a LazyStore.
This is typically implemented on top of an existing chunk index, and is currently available for the following readers:
RrdReader(relies on the RRD footer index)DatasetEntry.segment_store()(relies on the chunk index maintained by the catalog server)
Processing chunks through a LazyStore is beneficial for pipelines where only a subset of chunks is needed, avoiding the I/O cost of loading unnecessary ones.
In all cases, readers typically act as the root of a processing pipeline and provide a LazyChunkStream object to refine and execute it — see Lazy stream below.
Stores
A store is a collection of chunks and comes in two complementary flavors:
LazyStore— index-based, on-demand. Returned by indexed loaders such asRrdReader(path).store()andDatasetEntry.segment_store().ChunkStore— fully materialized, all chunks held in memory. Build one withChunkStore.from_chunks([...]), or materialize a stream viastream.collect().
The previous section already hinted at the perks of LazyStore. Being index-based, it is cheap to create and takes limited amounts of memory.
Also, it unlocks performance speed-ups by only loading chunks that are relevant to the given processing pipeline.
On the other hand, ChunkStore is fully materialized: its memory footprint scales with the recording size.
This is a major exception in the chunk processing API, which generally leans on lazy loading and streaming execution to allow processing large datasets with bounded memory.
One common reason to materialize a ChunkStore is to run chunk optimization; see Optimize chunk count for details.
Both kinds of stores share a common API surface, including:
- extracting the underlying
Schemaof the store; - turning the store back into a pipeline with
.stream(); - exposing various statistics and content summaries.
Lazy stream
The LazyChunkStream is the central abstraction: a deferred, single-pass iterator of chunks with operators for filtering (filter / drop), branching (split), fan-in (merge), reshaping (lenses), and arbitrary per-chunk manipulation (map / flat_map).
The key design is that a lazy stream is not a materialized collection or actual streaming process.
A LazyChunkStream instance can be thought of as a leaf node in a pipeline-description DAG.
By composition, it allows building up the DAG to represent the intended pipeline.
For example, this creates a basic pipeline that does nothing but read an MCAP file:
from rerun.experimental import McapReader
stream = McapReader(mcap_path).stream()This pipeline can be extended using the lazy stream's methods. For example, we can add a filter operation:
stream = stream.filter(content="/robot_left/**")Up to this point, no data has actually been read or processed. This happens when a terminal operation is called, for example:
stream.write_rrd(
output_path,
application_id="rerun_example_chunk_processing_intro",
recording_id="run1",
)This exact call triggers the pipeline execution, including reading the source MCAP, performing the filter operation, and writing the output RRD.
Pipeline execution
To recap:
- A pipeline is a DAG rooted at one or more readers or stores and ending at a leaf node represented by a lazy stream.
- Composition is cheap: building the DAG is metadata only, regardless of input size. This is done through
LazyChunkStream's APIs. - The actual execution of the pipeline is triggered by calling a terminal method of the lazy stream, for example
.write_rrd(). Terminal calls are blocking, but execution is multithreaded and essentially GIL-free. - Memory cost is bounded by what flows through a chunk at a time, not by the total recording size.
Move semantics
To better express the DAG composition process, LazyChunkStream instances exhibit Rust-like move semantics to avoid accidental reuse:
stream.filter(...)movesstreaminto the new pipeline. Reusingstreamafterwards raisesValueError: already been consumed.stream.split(...)returns two branches and consumes the parent. Each branch is itself a stream that can only be consumed once.LazyChunkStream.merge(a, b, ...)consumes every input.
Terminal calls, however, do not consume the stream — a lazy stream can be executed multiple times against different destinations:
chunk_list = stream.to_chunks()
stream.write_rrd(path=..., application_id=..., recording_id=...)Note that doing so executes the entire pipeline twice, which may not be desirable for complex pipelines. In that case, collect the stream to an intermediate ChunkStore to trade memory for re-computation.
Complete example
The rest of this page walks through a single end-to-end pipeline that reads a robot-arm MCAP recording, fans the protobuf joint-state column out into per-joint Scalars series in degrees, tags the result with a static /metadata chunk built from scratch, and writes a new .rrd.
Full source: Python.
Setup
from __future__ import annotations
import math
import uuid
from collections.abc import Callable
from pathlib import Path
import pyarrow as pa
import pyarrow.compute as pc
import rerun as rr
from rerun.experimental import Chunk, DeriveLens, LazyChunkStream, McapReader, Selector
MCAP = Path(__file__).resolve().parents[4] / "tests" / "assets" / "mcap" / "trossen_transfer_cube.mcap"
OUT = Path("chunk_processing.rrd")- Imports the experimental entry points: readers (
McapReader), chunk and stream types (Chunk,LazyChunkStream), lens primitives (DeriveLens,Selector). - Locates the input MCAP relative to the repo root and picks a CWD-relative output path. Nothing here touches Rerun yet.
Reading
stream = McapReader(MCAP).stream()McapReader(MCAP).stream()is the only line that touches the source — and even that is lazy: no MCAP bytes are decoded yet.- The returned
LazyChunkStreamis the root of the DAG.
Processing
JOINTS = ["waist", "shoulder", "elbow", "forearm_roll", "wrist_angle", "wrist_rotate"]
def pick_joint(i: int) -> Callable[[pa.Array], pa.Array]:
"""Extract joint `i` from a list<float64> column and convert rad → deg."""
return lambda arr: pc.multiply(pc.list_element(arr, i), 180.0 / math.pi)
def fan(side: str) -> list[DeriveLens]:
return [
DeriveLens(
"schemas.proto.JointState:message",
output_entity=f"/joints_deg/{side}/{name}",
).to_component(
rr.Scalars.descriptor_scalars(),
Selector(".joint_positions").pipe(pick_joint(i)),
)
for i, name in enumerate(JOINTS)
]
processed = (
stream
.drop(content="/video_raw/**")
.lenses(fan("left"), content="/robot_left/**", output_mode="forward_unmatched")
.lenses(fan("right"), content="/robot_right/**", output_mode="forward_unmatched")
)drop(content="/video_raw/**")is a no-op against this MCAP (the path does not exist) but illustrates content-based pruning.fan(side)builds sixDeriveLensinstances, one per joint, each extracting.joint_positions[i](viaSelector(...).pipe(...)), converting radians to degrees withpyarrow.compute, and routing the result to/joints_deg/<side>/<joint>as aScalarscolumn.- Two scoped
.lenses(...)calls apply the per-side fan only to chunks under/robot_left/**and/robot_right/**respectively. The same component name (schemas.proto.JointState:message) lives on both sides; scoping bycontent=is what disambiguates them. Withforward_unmatched, every chunk outside the scope passes through untouched.
Merging
metadata = Chunk.from_columns(
"/metadata",
indexes=[],
columns=rr.AnyValues.columns(processing_type="ingestion", processing_version="v1"),
)
merged = LazyChunkStream.merge(processed, LazyChunkStream.from_iter([metadata]))Chunk.from_columns("/metadata", indexes=[], columns=rr.AnyValues.columns(…))builds a single static chunk from scratch —indexes=[]makes it static. Any archetype's.columns(…)helper works here.LazyChunkStream.from_iter([metadata])lifts that one chunk into a one-element stream so it can participate in the pipeline.LazyChunkStream.merge(processed, ...)is fan-in: the two inputs become one stream. Order is preserved per-input, not globally.
Writing
merged.write_rrd(OUT, application_id="rerun_example_chunk_processing", recording_id=str(uuid.uuid4()))write_rrd(...)is the terminal: this is where the DAG actually executes. The whole pipeline runs in a single streaming pass.application_idandrecording_ididentify the resulting recording; a freshuuid.uuid4()makes each invocation produce a distinct recording.
Relationship to the logging APIs
Both the logging APIs (rr.log, rr.send_columns, RecordingStream) and the Chunk Processing API target the same underlying data model, but they differ in several ways:
| Logging API | Chunk processing API | |
|---|---|---|
| Direction | logging call → sink | chunk source → transform → chunk sink |
| Granularity | single rows or columns of data | whole chunks |
| Execution model | continuous, as logging calls are emitted | lazy, upon stream execution |
| Where chunks come from | built by the logging API's batcher | already exist (from a reader) or built explicitly with Chunk.from_columns / Chunk.from_record_batch |
| Typical use | realtime data logging | ingestion, conversion, post-processing pipelines |
The two are interoperable:
Logging → chunk processing: save a
RecordingStreamto an.rrd, then re-open it withRrdReaderto get aLazyChunkStream.Chunk processing → logging:
rerun.experimental.send_chunks(chunks, recording=...)feeds chunks into an activeRecordingStream(useful for streaming to a viewer, for example).Building chunks by hand:
Chunk.from_columnsmirrorsrr.send_columnsand accepts the samerr.<Archetype>.columns(...)helpers, so any data that can be logged withrr.send_columnscan also be packaged as aChunkand injected into a processing pipeline.