esc
Start typing to search the docs
Navigate Open

Chunk Processing API

The Chunk Processing API is a flexible, chunk-centric API for data ingestion, transformation, and conversion pipelines. It covers I/O from common robotics file formats, powerful declarative data wrangling primitives, and a multithreaded, native engine for pipeline execution. The API is designed to support distributed execution in the future.

Building blocks

The Chunk Processing API is built from three kinds of primitives — readers, stores, and lazy streams — that compose into a pipeline executed by a terminal call:

ReaderStoreLazyChunkStreamTerminalcall .store().stream().stream()filter/lenses/map/…

Readers

Readers produce Chunks from external sources such as files, or datasets hosted on a catalog server.

In some cases, readers are classes provided by the Chunk Processing API, such as RrdReader and McapReader. The reader functionality can also be provided by classes from other parts of the Rerun SDK. For example, DatasetEntry has a segment_store method which returns a LazyStore for the corresponding segment (see the catalog object model for more information on datasets). UrdfTree is another example of a class that offers reader functionality in addition to a larger feature set.

There are two ways in which a reader may provide chunks. All readers can sequentially stream all their source's chunks, typically via the stream() method. Internally, such readers typically parse the source file, convert data to chunks as it is extracted, and yield those chunks as they are produced.

Some readers, called IndexedReader, can also provide indexed, random access to chunks via a LazyStore. This is typically implemented on top of an existing chunk index, and is currently available for the following readers:

  • RrdReader (relies on the RRD footer index)
  • DatasetEntry.segment_store() (relies on the chunk index maintained by the catalog server)

Processing chunks through a LazyStore is beneficial for pipelines where only a subset of chunks is needed, avoiding the I/O cost of loading unnecessary ones.

In all cases, readers typically act as the root of a processing pipeline and provide a LazyChunkStream object to refine and execute it — see Lazy stream below.

Stores

A store is a collection of chunks and comes in two complementary flavors:

  • LazyStore — index-based, on-demand. Returned by indexed loaders such as RrdReader(path).store() and DatasetEntry.segment_store().
  • ChunkStore — fully materialized, all chunks held in memory. Build one with ChunkStore.from_chunks([...]), or materialize a stream via stream.collect().

The previous section already hinted at the perks of LazyStore. Being index-based, it is cheap to create and takes limited amounts of memory. Also, it unlocks performance speed-ups by only loading chunks that are relevant to the given processing pipeline. On the other hand, ChunkStore is fully materialized: its memory footprint scales with the recording size. This is a major exception in the chunk processing API, which generally leans on lazy loading and streaming execution to allow processing large datasets with bounded memory.

One common reason to materialize a ChunkStore is to run chunk optimization; see Optimize chunk count for details.

Both kinds of stores share a common API surface, including:

  • extracting the underlying Schema of the store;
  • turning the store back into a pipeline with .stream();
  • exposing various statistics and content summaries.

Lazy stream

The LazyChunkStream is the central abstraction: a deferred, single-pass iterator of chunks with operators for filtering (filter / drop), branching (split), fan-in (merge), reshaping (lenses), and arbitrary per-chunk manipulation (map / flat_map).

The key design is that a lazy stream is not a materialized collection or actual streaming process. A LazyChunkStream instance can be thought of as a leaf node in a pipeline-description DAG. By composition, it allows building up the DAG to represent the intended pipeline.

For example, this creates a basic pipeline that does nothing but read an MCAP file:

from rerun.experimental import McapReader

stream = McapReader(mcap_path).stream()

This pipeline can be extended using the lazy stream's methods. For example, we can add a filter operation:

stream = stream.filter(content="/robot_left/**")

Up to this point, no data has actually been read or processed. This happens when a terminal operation is called, for example:

stream.write_rrd(
    output_path,
    application_id="rerun_example_chunk_processing_intro",
    recording_id="run1",
)

This exact call triggers the pipeline execution, including reading the source MCAP, performing the filter operation, and writing the output RRD.

Pipeline execution

To recap:

  • A pipeline is a DAG rooted at one or more readers or stores and ending at a leaf node represented by a lazy stream.
  • Composition is cheap: building the DAG is metadata only, regardless of input size. This is done through LazyChunkStream's APIs.
  • The actual execution of the pipeline is triggered by calling a terminal method of the lazy stream, for example .write_rrd(). Terminal calls are blocking, but execution is multithreaded and essentially GIL-free.
  • Memory cost is bounded by what flows through a chunk at a time, not by the total recording size.

Move semantics

To better express the DAG composition process, LazyChunkStream instances exhibit Rust-like move semantics to avoid accidental reuse:

  • stream.filter(...) moves stream into the new pipeline. Reusing stream afterwards raises ValueError: already been consumed.
  • stream.split(...) returns two branches and consumes the parent. Each branch is itself a stream that can only be consumed once.
  • LazyChunkStream.merge(a, b, ...) consumes every input.

Terminal calls, however, do not consume the stream — a lazy stream can be executed multiple times against different destinations:

chunk_list = stream.to_chunks()
stream.write_rrd(path=..., application_id=..., recording_id=...)

Note that doing so executes the entire pipeline twice, which may not be desirable for complex pipelines. In that case, collect the stream to an intermediate ChunkStore to trade memory for re-computation.

Complete example

The rest of this page walks through a single end-to-end pipeline that reads a robot-arm MCAP recording, fans the protobuf joint-state column out into per-joint Scalars series in degrees, tags the result with a static /metadata chunk built from scratch, and writes a new .rrd.

Full source: Python.

Setup

from __future__ import annotations

import math
import uuid
from collections.abc import Callable
from pathlib import Path

import pyarrow as pa
import pyarrow.compute as pc

import rerun as rr
from rerun.experimental import Chunk, DeriveLens, LazyChunkStream, McapReader, Selector

MCAP = Path(__file__).resolve().parents[4] / "tests" / "assets" / "mcap" / "trossen_transfer_cube.mcap"
OUT = Path("chunk_processing.rrd")
  • Imports the experimental entry points: readers (McapReader), chunk and stream types (Chunk, LazyChunkStream), lens primitives (DeriveLens, Selector).
  • Locates the input MCAP relative to the repo root and picks a CWD-relative output path. Nothing here touches Rerun yet.

Reading

stream = McapReader(MCAP).stream()
  • McapReader(MCAP).stream() is the only line that touches the source — and even that is lazy: no MCAP bytes are decoded yet.
  • The returned LazyChunkStream is the root of the DAG.

Processing

JOINTS = ["waist", "shoulder", "elbow", "forearm_roll", "wrist_angle", "wrist_rotate"]


def pick_joint(i: int) -> Callable[[pa.Array], pa.Array]:
    """Extract joint `i` from a list<float64> column and convert rad → deg."""
    return lambda arr: pc.multiply(pc.list_element(arr, i), 180.0 / math.pi)


def fan(side: str) -> list[DeriveLens]:
    return [
        DeriveLens(
            "schemas.proto.JointState:message",
            output_entity=f"/joints_deg/{side}/{name}",
        ).to_component(
            rr.Scalars.descriptor_scalars(),
            Selector(".joint_positions").pipe(pick_joint(i)),
        )
        for i, name in enumerate(JOINTS)
    ]


processed = (
    stream
    .drop(content="/video_raw/**")
    .lenses(fan("left"), content="/robot_left/**", output_mode="forward_unmatched")
    .lenses(fan("right"), content="/robot_right/**", output_mode="forward_unmatched")
)
  • drop(content="/video_raw/**") is a no-op against this MCAP (the path does not exist) but illustrates content-based pruning.
  • fan(side) builds six DeriveLens instances, one per joint, each extracting .joint_positions[i] (via Selector(...).pipe(...)), converting radians to degrees with pyarrow.compute, and routing the result to /joints_deg/<side>/<joint> as a Scalars column.
  • Two scoped .lenses(...) calls apply the per-side fan only to chunks under /robot_left/** and /robot_right/** respectively. The same component name (schemas.proto.JointState:message) lives on both sides; scoping by content= is what disambiguates them. With forward_unmatched, every chunk outside the scope passes through untouched.

Merging

metadata = Chunk.from_columns(
    "/metadata",
    indexes=[],
    columns=rr.AnyValues.columns(processing_type="ingestion", processing_version="v1"),
)
merged = LazyChunkStream.merge(processed, LazyChunkStream.from_iter([metadata]))
  • Chunk.from_columns("/metadata", indexes=[], columns=rr.AnyValues.columns(…)) builds a single static chunk from scratch — indexes=[] makes it static. Any archetype's .columns(…) helper works here.
  • LazyChunkStream.from_iter([metadata]) lifts that one chunk into a one-element stream so it can participate in the pipeline.
  • LazyChunkStream.merge(processed, ...) is fan-in: the two inputs become one stream. Order is preserved per-input, not globally.

Writing

merged.write_rrd(OUT, application_id="rerun_example_chunk_processing", recording_id=str(uuid.uuid4()))
  • write_rrd(...) is the terminal: this is where the DAG actually executes. The whole pipeline runs in a single streaming pass.
  • application_id and recording_id identify the resulting recording; a fresh uuid.uuid4() makes each invocation produce a distinct recording.

Relationship to the logging APIs

Both the logging APIs (rr.log, rr.send_columns, RecordingStream) and the Chunk Processing API target the same underlying data model, but they differ in several ways:

Logging APIChunk processing API
Directionlogging call → sinkchunk source → transform → chunk sink
Granularitysingle rows or columns of datawhole chunks
Execution modelcontinuous, as logging calls are emittedlazy, upon stream execution
Where chunks come frombuilt by the logging API's batcheralready exist (from a reader) or built explicitly with Chunk.from_columns / Chunk.from_record_batch
Typical userealtime data loggingingestion, conversion, post-processing pipelines

The two are interoperable:

  • Logging → chunk processing: save a RecordingStream to an .rrd, then re-open it with RrdReader to get a LazyChunkStream.

  • Chunk processing → logging: rerun.experimental.send_chunks(chunks, recording=...) feeds chunks into an active RecordingStream (useful for streaming to a viewer, for example).

  • Building chunks by hand: Chunk.from_columns mirrors rr.send_columns and accepts the same rr.<Archetype>.columns(...) helpers, so any data that can be logged with rr.send_columns can also be packaged as a Chunk and injected into a processing pipeline.

See also

  • Chunks: the underlying data model.
  • Lenses: the reshaping primitives used here.