foundryts.NodeCollection

class foundryts.NodeCollection(*nodes, **kwargs)

A collection of FunctionNode or SummarizerNode.

A NodeCollection is an iterable that can be passed to functions expecting multiple time series, or for mapping each node in the collection to a function.

For raw series and point sets, the dataframe of a NodeCollection contains an extra column called series denoting the series that the points belong to. The series value will be the series ID either set in the time series sync or via an alias for foundryts.functions.points().

Note

Ensure that you do not directly pass a NodeCollection to functions expecting a single input time series as this will error out the operation. For applying the same function on all elements of the node, use NodeCollection.map() which will map the transformed points or summary to the series in the final dataframe. Refer to FoundryTS functions documentation to review the number of inputs for functions.

Examples

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 >>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1") >>> series_1.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 100.0 1 1970-01-01 00:00:00.000000002 200.0 2 1970-01-01 00:00:00.000000003 300.0 >>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2") >>> series_2.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 200.0 1 1970-01-01 00:00:00.000000002 400.0 2 1970-01-01 00:00:00.000000003 600.0
Copied!
1 2 3 4 5 6 7 8 9 >>> nc = NodeCollection([series_1, series_2]) >>> nc.to_pandas() series timestamp value 0 series-1 1970-01-01 00:00:00.000000001 100.0 1 series-1 1970-01-01 00:00:00.000000002 200.0 2 series-1 1970-01-01 00:00:00.000000003 300.0 3 series-2 1970-01-01 00:00:00.000000001 200.0 4 series-2 1970-01-01 00:00:00.000000002 400.0 5 series-2 1970-01-01 00:00:00.000000003 600.0
Copied!
1 2 3 4 5 6 >>> scatter_plt = F.scatter()(nc) # scatter() uses exactly 2 input time series >>> scatter_plt.to_pandas() is_truncated points.first_value points.second_value points.timestamp 0 False 100.0 200.0 1970-01-01 00:00:00.000000001 1 False 200.0 400.0 1970-01-01 00:00:00.000000002 2 False 300.0 600.0 1970-01-01 00:00:00.000000003
Copied!
1 2 3 4 5 6 7 8 9 10 >>> scaled_nc = F.scale(10)(nc) # error - scale() works on a single input series >>> scaled_nc = nc.map(F.scale(10)) # ok - we're mapping each series in the collection to the result of scale() >>> scaled_nc.to_pandas() series timestamp value 0 series-1 1970-01-01 00:00:00.000000001 1000.0 1 series-1 1970-01-01 00:00:00.000000002 2000.0 2 series-1 1970-01-01 00:00:00.000000003 3000.0 3 series-2 1970-01-01 00:00:00.000000001 2000.0 4 series-2 1970-01-01 00:00:00.000000002 4000.0 5 series-2 1970-01-01 00:00:00.000000003 6000.0

columns()

Returns a tuple of strings representing the column names of the pandas.DataFrame that would be produced by evaluating this collection to a dataframe.

Note

Keys of nested objects will be flattened into a tuple with nested keys joined with ..

Non-uniform collections will contain the union of all columns of each element in the collection.

  • Returns: Tuple containing names of the columns in the resulting dataframe which the collection gets evaluated to.
  • Return type: Tuple[str]

Examples

Copied!
1 2 >>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1") >>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2")
Copied!
1 2 3 >>> nc = NodeCollection(series_1, series_2) >>> nc.columns() # note the additional series column ('series', 'timestamp', 'value')
Copied!
1 2 3 4 >>> dist = F.distribution()(nc) >>> dist.columns() ('delta', 'distribution_values.count', 'distribution_values.end', 'distribution_values.start', 'end', 'end_timestamp', 'start', 'start_timestamp')
Copied!
1 2 3 4 5 6 >>> mixed_nc = NodeCollection([F.distribution()(series_1), F.statistics()(series_2)]) ('series', 'delta', 'distribution_values.count', 'distribution_values.end', 'distribution_values.start', 'end', 'end_timestamp', 'start', 'start_timestamp', 'count', 'earliest_point.timestamp', 'earliest_point.value', 'largest_point.timestamp', 'largest_point.value', 'latest_point.timestamp', 'latest_point.value', 'mean', 'smallest_point.timestamp', 'smallest_point.value')

map(func)

Map each time series in the collection to a output for the input function.

This is useful for applying the same set of queries to a collection of time-series together.

  • Parameters: func ( ( ) -> FunctionNode | SummarizerNode) – FoundryTS supported function to apply on each time series in the collection.
  • Returns: The updated NodeCollection with each item mapped to the corresponding output of applying the func.
  • Return type: Iterable[NodeCollection]

Examples

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 >>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1") >>> series_1.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 100.0 1 1970-01-01 00:00:00.000000002 200.0 2 1970-01-01 00:00:00.000000003 300.0 >>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2") >>> series_2.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 200.0 1 1970-01-01 00:00:00.000000002 400.0 2 1970-01-01 00:00:00.000000003 600.0 >>> nc = NodeCollection([series_1, series_2])
Copied!
1 2 3 4 5 6 7 8 9 >>> scaled_nc = nc.map(F.scale(10)) >>> scaled_nc.to_pandas() series timestamp value 0 series-1 1970-01-01 00:00:00.000000001 1000.0 1 series-1 1970-01-01 00:00:00.000000002 2000.0 2 series-1 1970-01-01 00:00:00.000000003 3000.0 3 series-2 1970-01-01 00:00:00.000000001 2000.0 4 series-2 1970-01-01 00:00:00.000000002 4000.0 5 series-2 1970-01-01 00:00:00.000000003 6000.0
Copied!
1 2 3 4 5 6 7 8 9 >>> mapped_summary_nc = nc.map(F.distribution()) >>> mapped_summary_nc.to_pandas() series delta distribution_values.count distribution_values.end distribution_values.start end end_timestamp start start_timestamp 0 series-1 20.0 1 120.0 100.0 300.0 2262-01-01 100.0 1677-09-21 00:12:43.145225216 1 series-1 20.0 1 200.0 180.0 300.0 2262-01-01 100.0 1677-09-21 00:12:43.145225216 2 series-1 20.0 1 300.0 280.0 300.0 2262-01-01 100.0 1677-09-21 00:12:43.145225216 3 series-2 40.0 1 240.0 200.0 600.0 2262-01-01 200.0 1677-09-21 00:12:43.145225216 4 series-2 40.0 1 400.0 360.0 600.0 2262-01-01 200.0 1677-09-21 00:12:43.145225216 5 series-2 40.0 1 600.0 560.0 600.0 2262-01-01 200.0 1677-09-21 00:12:43.145225216

map_intervals(intervals, interval_name=None)

Creates a time range for all time series in the collection using the intervals.

Each interval is used to create a foundryts.functions.time_range() on the input time series, which can be used for further transformations and analysis. This is best used with creating Interval either manually or by converting the result of foundryts.functions.time_range() to Interval.

The resulting dataframe has additional columns for interval.start and interval.end.

  • Parameters:
    • intervals (Interval | List [Interval ]) – One or more intervals to create time ranges for all time series in the collection.
    • interval_name (str , optional) – Optional alias for the intervals column in the dataframe.
  • Returns: The updated NodeCollection with each item mapped to the corresponding output of applying the func.
  • Return type: Iterable[NodeCollection]

Examples

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 >>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1") >>> series_1.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 100.0 1 1970-01-01 00:00:00.000000002 200.0 2 1970-01-01 00:00:00.000000003 300.0 >>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2") >>> series_2.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 200.0 1 1970-01-01 00:00:00.000000002 400.0 2 1970-01-01 00:00:00.000000003 600.0 >>> nc = NodeCollection([series_1, series_2]) >>> from foundryts.core.interval import Interval >>> intervals = [Interval(1, 2), Interval(2, 3), Interval(3, 4), Intervals(1,3)]
Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 >>> nc = nc.map_intervals(intervals, interval_name="interval") >>> nc.to_pandas() series interval interval.start interval.end timestamp value 0 series-1 1 2 1970-01-01 00:00:00.000000001 100.0 1 series-1 2 3 1970-01-01 00:00:00.000000002 200.0 2 series-1 3 4 1970-01-01 00:00:00.000000003 300.0 3 series-1 1 3 1970-01-01 00:00:00.000000001 100.0 4 series-1 1 3 1970-01-01 00:00:00.000000002 200.0 5 series-2 1 2 1970-01-01 00:00:00.000000001 200.0 6 series-2 2 3 1970-01-01 00:00:00.000000002 400.0 7 series-2 3 4 1970-01-01 00:00:00.000000003 600.0 8 series-2 1 3 1970-01-01 00:00:00.000000001 200.0 9 series-2 1 3 1970-01-01 00:00:00.000000002 400.0
Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 >>> scaled_nc = nc.map(F.scale(1000)) >>> scaled_nc.to_pandas() # applying the scale() function on each time range created from the intervals series interval interval.start interval.end timestamp value 0 series-1 1 2 1970-01-01 00:00:00.000000001 100000.0 1 series-1 2 3 1970-01-01 00:00:00.000000002 200000.0 2 series-1 3 4 1970-01-01 00:00:00.000000003 300000.0 3 series-1 1 3 1970-01-01 00:00:00.000000001 100000.0 4 series-1 1 3 1970-01-01 00:00:00.000000002 200000.0 5 series-2 1 2 1970-01-01 00:00:00.000000001 200000.0 6 series-2 2 3 1970-01-01 00:00:00.000000002 400000.0 7 series-2 3 4 1970-01-01 00:00:00.000000003 600000.0 8 series-2 1 3 1970-01-01 00:00:00.000000001 200000.0 9 series-2 1 3 1970-01-01 00:00:00.000000002 400000.0

to_dataframe(numPartitions=16)

Evaluates all time series in the collection and concatenate the results to a pyspark.sql.DataFrame.

PySpark DataFrames enable distributed data processing and parallelized transformations. They can be useful when working with dataframes with a large number of rows, for example loading all the points in a raw series or the result of a FunctionNode, or evaluating the results of multiple SummarizerNode or FunctionNode together.

An additional series column will note which series the results belong to using the series ID or foundryts.functions.points() alias.

  • Parameters: numPartitions (int , optional) – Specifies the number of partitions for distributing the time series data in the collection across Spark executors, optimizing parallel data processing. Higher values can improve performance when the executor count is high, whereas lower values may be more efficient with fewer executors. Adjust this based on the size of the time series collection and your Spark configuration (e.g., number of executors and executor memory) (default is 16).
  • Returns: Output of all time series and operations in the collection evaluated to a PySpark dataframe.
  • Return type: pyspark.sql.DataFrame
Note

Set numPartitions to an appropriate value for a large collection.

Examples

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 >>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1") >>> series_1.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 100.0 1 1970-01-01 00:00:00.000000002 200.0 2 1970-01-01 00:00:00.000000003 300.0 >>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2") >>> series_2.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 200.0 1 1970-01-01 00:00:00.000000002 400.0 2 1970-01-01 00:00:00.000000003 600.0 >>> nc = NodeCollection(series_1, series_2) >>> nc.to_dataframe() +---------+-----------------------------+-----+ | series | timestamp |value| +---------+-----------------------------+-----+ |series-1 |1970-01-01 00:00:00.000000001|100.0| |series-1 |1970-01-01 00:00:00.000000002|200.0| |series-1 |1970-01-01 00:00:00.000000003|300.0| |series-2 |1970-01-01 00:00:00.000000001|200.0| |series-2 |1970-01-01 00:00:00.000000002|400.0| |series-2 |1970-01-01 00:00:00.000000003|600.0| +---------+-----------------------------+-----+

to_pandas(parallel=4, mode='thread')

Evaluates the time series queries in this collection and concatenates the results to a pandas.DataFrame.

Refer to FunctionNode.to_pandas() and SummarizerNode.to_pandas() for details on the shape of the dataframe.

An additional series column will note which series the results belong to using the series ID or foundryts.functions.points() alias.

  • Parameters:
    • parallel (int , optional) – Number of parallel threads or processes used to evaluate the time series queries. If set to 1, no multiprocessing is done (default is 4).
    • mode (str , optional) – Valid options are process or thread, each controlling the type of multiprocessing threadpool.
  • Returns: Output of the time series queries in the collection evaluated to a Pandas dataframe.
  • Return type: pd.DataFrame
Note

The result of each time series query in the collection is stored and concatenated to the final dataframe locally, so results must fit in memory.

Parallel execution will increase memory usage and resource consumption. Using process mode may introduce overhead. Excessive parallelism can degrade performance.

Examples

Copied!
1 2 3 4 5 6 7 8 9 10 11 12 13 >>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1") >>> series_1.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 100.0 1 1970-01-01 00:00:00.000000002 200.0 2 1970-01-01 00:00:00.000000003 300.0 >>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2") >>> series_2.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 200.0 1 1970-01-01 00:00:00.000000002 400.0 2 1970-01-01 00:00:00.000000003 600.0 >>> nc = NodeCollection(series_1, series_2)
Copied!
1 2 3 4 5 6 7 8 >>> nc.to_pandas() series timestamp value 0 series-1 1970-01-01 00:00:00.000000001 100.0 1 series-1 1970-01-01 00:00:00.000000002 200.0 2 series-1 1970-01-01 00:00:00.000000003 300.0 3 series-2 1970-01-01 00:00:00.000000001 200.0 4 series-2 1970-01-01 00:00:00.000000002 400.0 5 series-2 1970-01-01 00:00:00.000000003 600.0

to_rdd(numPartitions=16)

Deprecated

Deprecated since version 1.0.0.

Return an RDD of (key, object) by evaluating the nodes in this collection. If the nodes in this NodeCollection resulted from grouping or windowing operations, the respective key will contain the given metadata keys or intervals. Otherwise it will be the node’s series identifier.

  • Parameters: numPartitions (int) – number of partitions to use for spark execution
  • Return type: pyspark.RDD

types()

Returns a tuple of types for the column of the pandas.DataFrame that would be produced by evaluating the collection to a dataframe.

  • Returns: Tuple containing types of the columns in the resulting dataframe which the collection gets evaluated to.
  • Return type: Tuple[Type]
Note

Non-uniform collections will contain the union of all types of each element in the collection.

Examples

Copied!
1 2 >>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1") >>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2")
Copied!
1 2 3 >>> nc = NodeCollection(series_1, series_2) >>> nc.types() (<class 'str'>, <class 'int'>, <class 'float'>)
Copied!
1 2 3 4 5 >>> dist = F.distribution()(nc) >>> dist.types() (<class 'float'>, <class 'int'>, <class 'float'>, <class 'float'>, <class 'float'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>)
Copied!
1 2 3 4 5 6 7 8 9 >>> mixed_nc = NodeCollection([F.distribution()(series_1), F.statistics()(series_2)]) >>> print(mixed_nc.types()) (<class 'str'>, <class 'float'>, <class 'int'>, <class 'float'>, <class 'float'>, <class 'float'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'int'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>, <class 'float'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>)