Dataset Resampling

This snippet demonstrates how to resample a dataset based on the time index of one component within your data. This is particularly helpful when you have data that is produced at very different frequencies.

First, load a dataset to use for evaluation:

sample_dataset_path = Path(__file__).parents[4] / "tests" / "assets" / "rrd" / "dataset"
server = rr.server.Server(datasets={"dataset": sample_dataset_path})
# Using OSS server for demonstration but in practice replace with
# the URL of your cloud instance
CATALOG_URL = server.url()
client = rr.catalog.CatalogClient(CATALOG_URL)
dataset = client.get_dataset(name="dataset")

Investigate time ranges investigate-time-ranges

Before we do the resampling, we can examine the dataset's time ranges using the function get_index_ranges(). This is not strictly necessary for the resampling work to follow, but it can be helpful during investigation of your data. This will show you the start and end values for all indexes in your dataset, one per segment.

(
    dataset.get_index_ranges()
    .select(
        "rerun_segment_id", "time_1:start", "time_1:end", "time_2:start", "time_2:end", "time_3:start", "time_3:end"
    )
    .sort("rerun_segment_id")
    .show()
)

Prior to resampling prior-to-resampling

The sample data we have loaded is very basic, but it demonstrates having components from three different entities at different times in the dataset. The code below demonstrates what the data looks like before resampling. In order to do data analysis on this DataFrame you would likely need to do some aggregation or window across the time index.

time_index = "time_3"
columns_of_interest = [
    "rerun_segment_id",
    time_index,
    "/obj1:Points3D:positions",
    "/obj2:Points3D:positions",
    "/obj3:Points3D:positions",
]
(dataset.reader(index=time_index).select(*columns_of_interest).sort("rerun_segment_id", time_index).show())

# +----------------------------------+--------+--------------------------+--------------------------+--------------------------+
# | rerun_segment_id                 | time_3 | /obj1:Points3D:positions | /obj2:Points3D:positions | /obj3:Points3D:positions |
# +----------------------------------+--------+--------------------------+--------------------------+--------------------------+
# | 141a866deb2d49f69eb3215e8a404ffc | 1      | [[49.0, 0.0, 0.0]]       | [[44.0, 1.0, 0.0]]       | [[1.0, 2.0, 0.0]]        |
# | 141a866deb2d49f69eb3215e8a404ffc | 2      | [[27.0, 0.0, 0.0]]       | [[42.0, 1.0, 0.0]]       |                          |
# | 141a866deb2d49f69eb3215e8a404ffc | 3      | [[25.0, 0.0, 0.0]]       | [[30.0, 1.0, 0.0]]       | [[3.0, 2.0, 0.0]]        |
# | 141a866deb2d49f69eb3215e8a404ffc | 4      | [[38.0, 0.0, 0.0]]       | [[19.0, 1.0, 0.0]]       |                          |
# | 141a866deb2d49f69eb3215e8a404ffc | 5      | [[17.0, 0.0, 0.0]]       | [[5.0, 1.0, 0.0]]        | [[5.0, 2.0, 0.0]]        |
# | 141a866deb2d49f69eb3215e8a404ffc | 6      | [[2.0, 0.0, 0.0]]        | [[35.0, 1.0, 0.0]]       |                          |
# | 141a866deb2d49f69eb3215e8a404ffc | 7      | [[44.0, 0.0, 0.0]]       | [[4.0, 1.0, 0.0]]        | [[7.0, 2.0, 0.0]]        |

Resampled data resampled-data

The snippet below demonstrates resampling using two lines. First we create a new DataFrame which contains the index values we care about per segment. It is very important in doing this that you do not set fill_latest_at=True. Otherwise it would negate the effect we are trying to produce where we only have rows for which we have data in our component of interest. The required output of this DataFrame is only the segment ID and the index value.

Once we have a DataFrame with these index values, we can now query the dataset using that DataFrame. You can see from the output below that we generate one row per time index for which the component of interest is not null.

resample_column = "/obj3:Points3D:positions"
times_of_interest = (
    dataset.reader(index=time_index).filter(col(resample_column).is_not_null()).select("rerun_segment_id", time_index)
)

(
    dataset.reader(index=time_index, using_index_values=times_of_interest, fill_latest_at=True)
    .select(*columns_of_interest)
    .sort("rerun_segment_id", time_index)
    .show()
)

# +----------------------------------+--------+--------------------------+--------------------------+--------------------------+
# | rerun_segment_id                 | time_3 | /obj1:Points3D:positions | /obj2:Points3D:positions | /obj3:Points3D:positions |
# +----------------------------------+--------+--------------------------+--------------------------+--------------------------+
# | 141a866deb2d49f69eb3215e8a404ffc | 1      | [[49.0, 0.0, 0.0]]       | [[44.0, 1.0, 0.0]]       | [[1.0, 2.0, 0.0]]        |
# | 141a866deb2d49f69eb3215e8a404ffc | 3      | [[25.0, 0.0, 0.0]]       | [[30.0, 1.0, 0.0]]       | [[3.0, 2.0, 0.0]]        |
# | 141a866deb2d49f69eb3215e8a404ffc | 5      | [[17.0, 0.0, 0.0]]       | [[5.0, 1.0, 0.0]]        | [[5.0, 2.0, 0.0]]        |
# | 141a866deb2d49f69eb3215e8a404ffc | 7      | [[44.0, 0.0, 0.0]]       | [[4.0, 1.0, 0.0]]        | [[7.0, 2.0, 0.0]]        |
# | 141a866deb2d49f69eb3215e8a404ffc | 10     | [[12.0, 0.0, 0.0]]       | [[6.0, 1.0, 0.0]]        | [[10.0, 2.0, 0.0]]       |
# | 141a866deb2d49f69eb3215e8a404ffc | 12     | [[13.0, 0.0, 0.0]]       | [[17.0, 1.0, 0.0]]       | [[12.0, 2.0, 0.0]]       |
# | 141a866deb2d49f69eb3215e8a404ffc | 13     | [[20.0, 0.0, 0.0]]       | [[32.0, 1.0, 0.0]]       | [[13.0, 2.0, 0.0]]       |