Streaming

The openPMD API includes a streaming-aware API as well as streaming-enabled backends (currently: ADIOS2).

Unlike in file-based backends, the order in which data is put out becomes relevant in streaming-based backends. Each iteration will be published as one atomical step by the streaming API (compare the concept of steps in ADIOS2).

In order to process Iterations synchronously, and one after another, the openPMD-api has linear access modes twinned with each regular, i.e. random-access mode (except READ_WRITE, which only supports random-access).

Reading

The reading end of the streaming API enforces further restrictions that become necessary through the nature of streaming. It can be used to read any kind of openPMD-compatible dataset, stream-based and filesystem-based alike.

C++

The reading end of the streaming API is activated through use of Access::READ_LINEAR instead of Access::READ_RANDOM_ACCESS (or Access::READ_ONLY). Iterations must be accessed with Series::snapshots() instead of directly using the field Series::iterations.

In READ_LINEAR mode, Iterations are implicitly opened and Iteration::open() needs not be called explicitly. Users are encouraged to explicitly .close() the iteration after reading from it. Closing the iteration will flush all pending operations on that iteration. If an iteration is not closed until the beginning of the next iteration, it will be closed automatically.

Note that a closed iteration can in general not be reopened. Limited support for reopening closed Iterations in READ_LINEAR is available under the condition that the Series is neither a stream nor that it uses ADIOS2 steps. In a stream, Iterations may be dropped by the writer once the reader has finished reading from it.

/* Copyright 2025 Franz Poeschel, Luca Fedeli
 *
 * This file is part of openPMD-api.
 *
 * openPMD-api is free software: you can redistribute it and/or modify
 * it under the terms of of either the GNU General Public License or
 * the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * openPMD-api is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License and the GNU Lesser General Public License
 * for more details.
 *
 * You should have received a copy of the GNU General Public License
 * and the GNU Lesser General Public License along with openPMD-api.
 * If not, see <http://www.gnu.org/licenses/>.
 */
#include <openPMD/openPMD.hpp>

#include <algorithm>
#include <array>
#include <iostream>
#include <memory>

using std::cout;
using namespace openPMD;

int main()
{
#if openPMD_HAVE_ADIOS2
    auto backends = openPMD::getFileExtensions();
    if (std::find(backends.begin(), backends.end(), "sst") == backends.end())
    {
        std::cout << "SST engine not available in ADIOS2." << std::endl;
        return 0;
    }

    // Access the Series linearly. This means that upon opening the Series, no
    // data is accessed yet. Instead, the single Iterations are processed
    // collectively, one after the other, and data access only happens upon
    // explicitly accessing an Iteration from `Series::snapshots()`. Note that
    // the Container API of `Series::snapshots()` will work in a restricted mode
    // compared to the `READ_RANDOM_ACCESS` access type, refer also to the
    // documentation of the `Snapshots` class in `snapshots/Snapshots.hpp`. This
    // restricted workflow enables performance optimizations in the backends,
    // and more importantly is compatible with streaming I/O.
    Series series = Series("electrons.sst", Access::READ_LINEAR, R"(
{
  "adios2": {
    "engine": {
      "parameters": {
        "DataTransport": "WAN"
      }
    }
  }
})");

    for (auto &[index, iteration] : series.snapshots())
    {
        std::cout << "Current iteration: " << index << std::endl;
        Record electronPositions = iteration.particles["e"]["position"];
        std::array<RecordComponent::shared_ptr_dataset_types, 3> loadedChunks;
        std::array<Extent, 3> extents;
        std::array<std::string, 3> const dimensions{{"x", "y", "z"}};

        for (size_t i = 0; i < 3; ++i)
        {
            std::string const &dim = dimensions[i];
            RecordComponent rc = electronPositions[dim];
            loadedChunks[i] = rc.loadChunkVariant(
                Offset(rc.getDimensionality(), 0), rc.getExtent());
            extents[i] = rc.getExtent();
        }

        // The iteration can be closed in order to help free up resources.
        // The iteration's content will be flushed automatically.
        iteration.close();

        for (size_t i = 0; i < 3; ++i)
        {
            std::string const &dim = dimensions[i];
            Extent const &extent = extents[i];
            std::cout << "\ndim: " << dim << "\n" << std::endl;
            auto chunk = loadedChunks[i];
            std::visit(
                [&extent](auto &shared_ptr) {
                    for (size_t j = 0; j < extent[0]; ++j)
                    {
                        std::cout << shared_ptr.get()[j] << ", ";
                    }
                },
                chunk);
            std::cout << "\n----------\n" << std::endl;
        }
    }

    /* The files in 'series' are still open until the object is destroyed, on
     * which it cleanly flushes and closes all open file handles.
     * When running out of scope on return, the 'Series' destructor is called.
     * Alternatively, one can call `series.close()` to the same effect as
     * calling the destructor, including the release of file handles.
     */
    series.close();

    return 0;
#else
    std::cout << "The streaming example requires that openPMD has been built "
                 "with ADIOS2."
              << std::endl;
    return 0;
#endif
}

Python

The reading end of the streaming API is activated through use of Access.read_linear instead of Access.read_random_access (or Access.read_only). Iterations must be accessed with Series.snapshots() instead of directly using the field Series.iterations.

In read_linear mode, Iterations are implicitly opened and Iteration.open() needs not be called explicitly. Users are encouraged to explicitly .close() the iteration after reading from it. Closing the iteration will flush all pending operations on that iteration. If an iteration is not closed until the beginning of the next iteration, it will be closed automatically.

Note that a closed iteration can in general not be reopened. Limited support for reopening closed Iterations in read_linear is available under the condition that the Series is neither a stream nor that it uses ADIOS2 steps. In a stream, Iterations may be dropped by the writer once the reader has finished reading from it.

#!/usr/bin/env python
import sys

import openpmd_api as io

# pass-through for ADIOS2 engine parameters
# https://adios2.readthedocs.io/en/latest/engines/engines.html
config = {'adios2': {'engine': {}, 'dataset': {}}}
config['adios2']['engine'] = {'parameters':
                              {'Threads': '4', 'DataTransport': 'WAN'}}
config['adios2']['dataset'] = {'operators': [{'type': 'bzip2'}]}

if __name__ == "__main__":
    # this block is for our CI, SST engine is not present on all systems
    backends = io.file_extensions
    if "sst" not in backends:
        print("SST engine not available in ADIOS2.")
        sys.exit(0)

    series = io.Series("simData.sst", io.Access_Type.read_linear, config)

    # Read all available iterations and print electron position data.
    for index, iteration in series.snapshots().items():
        print("Current iteration {}".format(index))
        electronPositions = iteration.particles["e"]["position"]
        loadedChunks = []
        shapes = []
        dimensions = ["x", "y", "z"]

        for i in range(3):
            dim = dimensions[i]
            rc = electronPositions[dim]
            loadedChunks.append(rc.load_chunk([0], rc.shape))
            shapes.append(rc.shape)

        # Closing the iteration loads all data and releases the current
        # streaming step.
        # If the iteration is not closed, it will be implicitly closed upon
        # opening the next iteration.
        iteration.close()

        # data is now available for printing
        for i in range(3):
            dim = dimensions[i]
            shape = shapes[i]
            print("dim: {}".format(dim))
            chunk = loadedChunks[i]
            print(chunk)

    # The files in 'series' are still open until the object is destroyed, on
    # which it cleanly flushes and closes all open file handles.
    # When running out of scope on return, the 'Series' destructor is called.
    # Alternatively, one can call `series.close()` to the same effect as
    # calling the destructor, including the release of file handles.
    series.close()

Writing

The writing end of the streaming API enforces further restrictions that become necessary through the nature of streaming. It can be used to write any kind of openPMD-compatible dataset, stream-based and filesystem-based alike.

C++

The writing end of the streaming API is activated through use of Access::CREATE_LINEAR instead of ACCESS::CREATE_RANDOM_ACCESS (or Access::CREATE). Iterations must be accessed with Series::snapshots() instead of using the field Series::iterations directly. With linear create mode, Snapshots::operator[](uint64_t) will automatically open a streaming step for each new corresponding iteration.

Users are encouraged to explicitly .close() the iteration after writing to it. Closing the iteration will flush all pending operations on that iteration. If an iteration is not closed until the next iteration is accessed via WriteIterations::operator[]( uint64_t ), it will be closed automatically.

Note that a closed iteration can in general not be reopened. Limited support for reopening closed Iterations in CREATE_LINEAR is available for non-streaming backends other than ADIOS2 (and in ADIOS2, if using file-based encoding with engine BP5 and engine option FlattenSteps=ON). In a stream, Iterations may not be modified after they have been sent to readers.

/* Copyright 2025 Axel Huebl, Franz Poeschel
 *
 * This file is part of openPMD-api.
 *
 * openPMD-api is free software: you can redistribute it and/or modify
 * it under the terms of of either the GNU General Public License or
 * the GNU Lesser General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * openPMD-api is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License and the GNU Lesser General Public License
 * for more details.
 *
 * You should have received a copy of the GNU General Public License
 * and the GNU Lesser General Public License along with openPMD-api.
 * If not, see <http://www.gnu.org/licenses/>.
 */
#include "openPMD/Series.hpp"
#include "openPMD/snapshots/Snapshots.hpp"
#include <openPMD/openPMD.hpp>

#include <algorithm>
#include <iostream>
#include <memory>
#include <numeric> // std::iota

using std::cout;
using namespace openPMD;

int main()
{
#if openPMD_HAVE_ADIOS2
    using position_t = double;
    auto backends = openPMD::getFileExtensions();
    if (std::find(backends.begin(), backends.end(), "sst") == backends.end())
    {
        std::cout << "SST engine not available in ADIOS2." << std::endl;
        return 0;
    }

    // open file for writing
    // use QueueFullPolicy = Discard in order to create a situation where from
    // the reader's perspective steps are skipped. This tests the bug reported
    // in https://github.com/openPMD/openPMD-api/issues/1747.
    // Create the Series with linear write access, i.e. one Iteration after
    // the other. The alternative would be random-access where multiple
    // Iterations can be accessed independently from one another. This more
    // restricted mode enables performance optimizations in the backends, and
    // more importantly is compatible with streaming I/O.
    Series series = Series("electrons.sst", Access::CREATE_LINEAR, R"(
{
  "adios2": {
    "engine": {
      "parameters": {
        "DataTransport": "WAN",
        "QueueFullPolicy": "Discard"
      }
    }
  }
})");

    Datatype datatype = determineDatatype<position_t>();
    constexpr unsigned long length = 10ul;
    Extent global_extent = {length};
    Dataset dataset = Dataset(datatype, global_extent);
    std::shared_ptr<position_t> local_data(
        new position_t[length], [](position_t const *ptr) { delete[] ptr; });

    auto iterations = series.snapshots();
    for (size_t i = 0; i < 100; ++i)
    {
        Iteration iteration = iterations[i];
        Record electronPositions = iteration.particles["e"]["position"];

        std::iota(local_data.get(), local_data.get() + length, i * length);
        for (auto const &dim : {"x", "y", "z"})
        {
            RecordComponent pos = electronPositions[dim];
            pos.resetDataset(dataset);
            pos.storeChunk(local_data, Offset{0}, global_extent);
        }
        iteration.close();
    }

    /* The files in 'series' are still open until the object is destroyed, on
     * which it cleanly flushes and closes all open file handles.
     * When running out of scope on return, the 'Series' destructor is called.
     * Alternatively, one can call `series.close()` to the same effect as
     * calling the destructor, including the release of file handles.
     */
    series.close();

    return 0;
#else
    std::cout << "The streaming example requires that openPMD has been built "
                 "with ADIOS2."
              << std::endl;
    return 0;
#endif
}

Python

The writing end of the streaming API is activated through use of Access.create_linear instead of ACCESS.create_random_access (or Access.create). Iterations must be accessed with Series.snapshots() instead of using the field Series.iterations directly. With linear create mode, Snapshots.__getitem__(index) will automatically open a streaming step for each new corresponding iteration.

Users are encouraged to explicitly .close() the iteration after writing to it. Closing the iteration will flush all pending operations on that iteration. If an iteration is not closed until the next iteration is accessed via WriteIterations.__getitem__(index), it will be closed automatically.

Note that a closed iteration can in general not be reopened. Limited support for reopening closed Iterations in create_linear is available for non-streaming backends other than ADIOS2 (and in ADIOS2, if using file-based encoding with engine BP5 and engine option FlattenSteps=ON). In a stream, Iterations may not be modified after they have been sent to readers.

#!/usr/bin/env python
import sys

import numpy as np
import openpmd_api as io

# pass-through for ADIOS2 engine parameters
# https://adios2.readthedocs.io/en/latest/engines/engines.html
config = {'adios2': {'engine': {}, 'dataset': {}}}
config['adios2']['engine'] = {'parameters':
                              {'Threads': '4', 'DataTransport': 'WAN'}}
config['adios2']['dataset'] = {'operators': [{'type': 'bzip2'}]}

if __name__ == "__main__":
    # this block is for our CI, SST engine is not present on all systems
    backends = io.file_extensions
    if "sst" not in backends:
        print("SST engine not available in ADIOS2.")
        sys.exit(0)

    # create a series and specify some global metadata
    # change the file extension to .json, .h5 or .bp for regular file writing
    series = io.Series("simData.sst", io.Access_Type.create_linear, config)
    series.set_author("Franz Poeschel <f.poeschel@hzdr.de>")
    series.set_software("openPMD-api-python-examples")

    # now, write a number of iterations (or: snapshots, time steps)
    for i in range(10):
        iteration = series.snapshots()[i]

        #######################
        # write electron data #
        #######################

        electronPositions = iteration.particles["e"]["position"]

        # openPMD attribute
        # (this one would also be set automatically for positions)
        electronPositions.unit_dimension = {io.Unit_Dimension.L: 1.0}
        # custom attribute
        electronPositions.set_attribute("comment", "I'm a comment")

        length = 10
        local_data = np.arange(i * length, (i + 1) * length,
                               dtype=np.dtype("double"))
        for dim in ["x", "y", "z"]:
            pos = electronPositions[dim]
            pos.reset_dataset(io.Dataset(local_data.dtype, [length]))
            pos[()] = local_data

        # optionally: flush now to clear buffers
        iteration.series_flush()  # this is a shortcut for `series.flush()`

        ###############################
        # write some temperature data #
        ###############################

        temperature = iteration.meshes["temperature"]
        temperature.unit_dimension = {io.Unit_Dimension.theta: 1.0}
        temperature.axis_labels = ["x", "y"]
        temperature.grid_spacing = [1., 1.]
        # temperature has no x,y,z components, so skip the last layer:
        temperature_dataset = temperature
        # let's say we are in a 3x3 mesh
        temperature_dataset.reset_dataset(
            io.Dataset(np.dtype("double"), [3, 3]))
        # temperature is constant
        temperature_dataset.make_constant(273.15)

        # After closing the iteration, the readers can see the iteration.
        # It can no longer be modified.
        # If not closing an iteration explicitly, it will be implicitly closed
        # upon creating the next iteration.
        iteration.close()

    # The files in 'series' are still open until the object is destroyed, on
    # which it cleanly flushes and closes all open file handles.
    # When running out of scope on return, the 'Series' destructor is called.
    # Alternatively, one can call `series.close()` to the same effect as
    # calling the destructor, including the release of file handles.
    series.close()

Chunk provenance tracking using a rank table

In a large parallel streaming setup, it is important to adhere to a certain concept of data locality when deciding which data to load from the producer. The openPMD-api has some mechanisms to help with this process:

The API call BaseRecordComponent::availableChunks()/Base_Record_Component.available_chunks() returns the data chunks within a specific dataset that are available for loading, each chunk hereby annotating its MPI rank within the data producer in WrittenChunkInfo::sourceID/WrittenChunkInfo::source_ID.

In order to correlate this information with the MPI ranks of the data consumer, a rank table can be used in order to transmit an additional tag for each of the producer’s MPI ranks. On the data producer side, the rank table can be set manually or automatically:

  • automatically Using the JSON/TOML option rank_table. The suggested specification is {"rank_table": "hostname"}, although the explicit values "mpi_processor_name" and "posix_hostname" are also accepted. "hostname" resolves to the MPI processor name when the Series has been initialized with MPI, to the POSIX hostname otherwise (if that is available).

  • manually: Using the API call Series::setRankTable(std::string const &myRankInfo) that specifies the current rank’s tag. This can be used to set custom tags, identifying e.g. NUMA nodes or groups of compute nodes.

The rank table takes the form of a 2-dimensional dataset, listing the tags as null-terminated strings line by line in order of the MPI ranks and can be loaded using Series::rankTable()/Series.get_rank_table().

Setting the rank table is collective, though the collective action is only performed upon flushing. Reading the rank table requires specifying if the read operation should be done collectively (better for performance), or independently.

In order to retrieve the corresponding information on the consumer side, the function host_info::byMethod()/HostInfo.get() can be used for retrieving the local rank’s information, or alternatively host_info::byMethodCollective()/HostInfo.get_info() for retrieving the rank table for all consumer ranks.