Running Temporian on Apache Beam¶
This guide shows how to run Temporian programs on large datasets with Beam.
WARNING: Temporian with Apache Beam is experimental. The API might change, some optimizations are not implemented, and some operators are not available.
The reader is assumed to be familiar with Temporian in-process execution. Please read the Getting Started or the User Guide before.
Introduction¶
The Temporian User Guide teaches how to run a Temporian program in-process using Python. This approach is ideal for quick experimentation and for datasets that are small enough to fit in a single computer. However, for large datasets with billions or even trillions of events, this approach will run out of memory. Instead, you can execute Temporian programs on large datasets using Apache Beam.
Info: Apache Beam is a library for large-scale distributed computation.
Executing a Temporian program in-process and executing it with Beam are very similar. There are only two things to watch out for:
- The Temporian program needs to be defined in graph execution mode. Eager execution mode is not compatible with Beam.
- Data set I/O needs to be Beam-compatible. To enable this, replace the normal dataset I/O with their counterparts in
temporian.beam
(for exampletemporian.from_csv
will becometemporian.beam.from_csv
). Alternatively, you can implement your own IO code using Beam IO connectors. Dataset I/O with Beam will be explained in more detail in this section.
Install dependencies¶
We need to install Temporian and Apache Beam.
%pip install temporian[beam] -q
A simple example¶
In this example, we run a Temporian program on a csv file both with the in-process and with the Beam approaches.
First, we create a small csv file containing our input data. This dataset contains 5 events with two features (a
and b
). Individual events are stored as separate rows.
%%writefile input.csv
timestamp,a,b
1,x,1
2,y,2
13,z,3
14,x,2
15,y,1
Note: In practice, large datasets should be divided into multiple files called “shards” to facilitate the distribution of the work among machines. In this case, dataset paths can be a glob expression e.g., input-*.csv
. For more information, see the "Speed-up pipeline" section.
Note 2: In this file, each event is presented in a separate row. For dataset formats that support it, it is more efficient to group examples of the same index key together. For more information, see the "Speed-up pipeline" section.
The following code processes our csv file and outputs the result in the output_in_process.csv
csv file using in-process eager execution mode.
import temporian as tp
# Our processing function
def my_processing(data):
# Re-index the events according to "a",
# and apply a 4 time-unit moving average.
return data.add_index("a").moving_sum(4)
# Load the csv in memory into an EventSet
input = tp.from_csv("input.csv")
# Apply the processing
output = my_processing(input)
# Save the results to a csv file
tp.to_csv(output, path="output_in_process.csv")
The next code applies the same processing and outputs the result in the output_beam.csv
csv file using the Beam execution mode.
import temporian as tp
import temporian.beam as tpb # Import Temporian's Beam capabilities
import apache_beam as beam
# Same processing as before
def my_processing(data):
return data.add_index("a").moving_sum(4)
# Create the input node i.e. the schema of our input
input_node = tp.input_node([("a", tp.str_), ("b", tp.float32)])
# Create a Temporian graph. No computation is applied so far.
output_node = my_processing(input_node)
# Define a Beam pieline
with beam.Pipeline() as pipeline:
output = (
pipeline
# Reads events from the csv file.
| tpb.from_csv("input.csv", input_node.schema)
# Apply the processing
| tpb.run(input=input_node, output=output_node)
# Save the results to a csv file.
# Note: shard_name_template="" outputs the results in a single csv file.
| tpb.to_csv("output_beam.csv", output_node.schema, shard_name_template="")
)
# Execute the pipeline
pipeline.run()
Finally, we check the content of the output csv files.
%cat output_in_process.csv
%cat output_beam.csv
Both files contain the same events. However, notice that events are not stored in the same order: In Temporian event order is not constrained, and different implementations and backend are free to change it.
Input and output data¶
Info: Apache Beam represents data using PCollections. A PCollection is essentially a distributed and homogeneous collection of Python values. For example, a PCollection can contain strings, lists of strings, or more complex objects.
You can import and export data from files using temporian.beam.to_<format>
and temporian.beam.from_<format>
functions, or you can write your own exporter on top of the Beam IO connectors.
Here are some examples of data IO functions:
temporian.beam.{from|to}_csv
: Import and export data from CSV files.temporian.beam.{from|to}_tensorflow_record
: Import and export data from TensorFlow Record files.
The method temporian.beam.to_event_set
and temporian.beam.to_dict
can respectively convert dictionaries of values from and to Temporian Beam EventSets. For example:
| ... # Generate dictionaries of values.
| tpb.to_event_set(input_node.schema)
| tpb.run(input=input_node, output=output_node)
| tpb.to_dict(output_node.schema)
| ... # Do something with the results
Some IO functions such as to_event_set
, to_dict
and temporian.beam.{from|to}_tensorflow_record
support different formats to represent events and are controlled by the format
attribute.
format="single_events"
: Each event is a dictionary of feature/index key to value. Each event is a different PCollection item.format="grouped_by_index"
(default): Events with the same index are grouped in the same PCollection. Each item of this PCollection is a dictionary of feature/index key to value. Index values are represented as python primitives (e.g. int) and features are represented as Numpy arrays.
The next code shows how to feed three events, with two features “a” and “b” and one index “c”, fed into Temporian with both the format="single_events"
and format="grouped_by_index"
format:
# With format="single_events"
| pipeline
| beam.Create([
# Index and features always have one value
{"timestamp": 1., "a": 4., "b": b"X", "c": 10},
{"timestamp": 2., "a": 5., "b": b"Y", "c": 10},
{"timestamp": 3., "a": 6., "b": b"Z", "c": 11},
])
| tpb.to_event_set(input_node.schema, format="single_events")
| tpb.run(...)
| ...
# With format="grouped_by_index" (recommended)
| pipeline
| beam.Create([
{"timestamp": np.array([1., 2.], dtype=np.float64),
# "a" is a feature, so theree is one value per timestamp.
"a": np.array([4., 5,], dtype=np.float32),
"b": np.array([b"X", b"Y"], dtype=np.bytes_),
"c": 10}, # "c" is an index, so there is only one value.
{"timestamp": np.array([3.], dtype=np.float64),
"a": np.array([6.,], dtype=np.float32),
"b": np.array([b"Z"], dtype=np.bytes_),
"c": 11}
])
| tpb.to_event_set(input_node.schema, format="grouped_by_index")
| tpb.run(...)
| ...
Warning: Temporian dtype is not permissive. For example, a tp.float32 feature can only consume a numpy array of np.float32. Feeding a list or a numpy array of float64 will fail.
Note indexedEvents
is significantly faster than singleEvents
format. When possible, use the indexedEvents
format.
Using tpb.to_event_set
and tpb.to_dict
, you can input and export data from other Beam IO connectors. The next example shows how to import events stored in TF.Records into Temporian.
Note: This is just an example, TF.Records are natively supported by Temporian with tpb.from_tensorflow_record
.
def _parse_single_event_tf_example(example: example_pb2.Example) -> Dict[str, Union[float, int, bytes]]:
"""Converts a TF.Example proto into a dictionary of values."""
dict_example = {}
for k, v in example.features.feature.items():
if v.HasField("int64_list"):
dict_example[k] = v.int64_list.value[0]
elif v.HasField("float_list"):
dict_example[k] = v.float_list.value[0]
elif v.HasField("bytes_list"):
dict_example[k] = v.bytes_list.value[0]
else:
raise ValueError("Bad feature")
return dict_example
| beam.io.tfrecordio.ReadFromTFRecord(
file_pattern=input,
coder=beam.coders.ProtoCoder(example_pb2.Example),
compression_type=beam.io.filesystem.CompressionTypes.GZIP)
| beam.Map(_parse_single_event_tf_example)
| tpb.to_event_set(input_node.schema, format="single_events")
| tpb.run(...)
Speed-up pipeline¶
Following are some considerations to create fast Beam+Temporian pipelines on large datasets.
Consider in-process execution mode¶
The in-process execution mode runs on a single machine. If your dataset fits on a single machine, it is likely that in-process execution with the @tp.compiled
or graph mode will be faster than the Beam execution mode.
You can see the memory usage of an EventSet when printing it (e.g., print(evset)
) or with the evset.memory_usage()
method.
Divide dataset into multiple files¶
Ensure that any large dataset is divided into multiple files. As a rule of thumb, (rule 1) files should not be smaller than 10MB and (rule 2) there should be at least 10 files for each worker (unless rule 1 prevents it).
For instance, if your dataset contains 100GB of data and you have 100 workers, the input dataset should be divided into 1000 files of size 100MB.
Most Beam sharded output operations will automatically determine the number of output shards.
Import and export events using the indexedEvents format¶
Prefer the tpb.UserEventSetFormat.indexedEvents
event format to the tpb.UserEventSetFormat.singleEvent
format. The format is specified as an argument of the import tpb.to_event_set
and export tpb.to_dict
functions.
Avoid single index values with large amount of events¶
Note: Temporian indexes are currently implemented using Beam keys. This means that all the values of a given feature and a given index key must be able to fit into the memory of a single worker.
When intermediate values of a computation depend on index values with a large number of events, consider the following recommendations:
Add indexes before removing them. For instance, prefer x.add_index(“a”).drop_index(“b”)
over x.drop_index(“b”).add_index(“a”)
.
Apply aggregations before removing indexes. For instance, prefer x.moving_sum(5).drop_index(“a”).moving_sum(tp.duration.shortest)
over x.drop_index(a).moving_sum(5)
.
Import indexed EventSet¶
Note: This rule is a special case of the “Avoid single index values with large amounts of events” and “Import and export events using the indexedEvents format” rules described above.
When possible, your pipeline should consume and return indexed EventSets instead of creating those indexes during the computation.
The following two examples return the same result. However, the first example can be significantly faster.
Example 1: Load an indexed EventSet and then apply some transformations.
input_node = tp.input_node(
features=[("f", tp.int32)],
indexes=[("x", tp.str_)])
output_node = input_node.moving_sum(5)
...
| tpb.from_csv("input.csv", input_node.schema)
| tpb.run(input=input_node, output=output_node)
...
Example 2: Load a non-indexed EventSet, add an index with the add_index
operator, and apply the same transformations.
input_node = tp.input_node(features=[("f", tp.int32), ("x", tp.str_)])
output_node = input_node.add_index("x").moving_sum(5)
...
| tpb.from_csv("input.csv", input_node.schema)
| tpb.run(input=input_node, output=output_node)
...