A collection of FunctionNode
or SummarizerNode
.
A NodeCollection is an iterable that can be passed to functions expecting multiple time series, or for mapping each node in the collection to a function.
For raw series and point sets, the dataframe of a NodeCollection contains an extra column called
series
denoting the series that the points belong to. The series
value will be the series ID either set
in the time series sync or via an alias for foundryts.functions.points()
.
Ensure that you do not directly pass a NodeCollection to functions expecting a single input time
series as this will error out the operation. For applying the same function on all elements of the node, use
NodeCollection.map()
which will map the transformed points or summary to the series in the final
dataframe. Refer to FoundryTS functions documentation to review the number of inputs for functions.
Copied!1 2 3 4 5 6 7 8 9 10 11 12
>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1") >>> series_1.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 100.0 1 1970-01-01 00:00:00.000000002 200.0 2 1970-01-01 00:00:00.000000003 300.0 >>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2") >>> series_2.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 200.0 1 1970-01-01 00:00:00.000000002 400.0 2 1970-01-01 00:00:00.000000003 600.0
Copied!1 2 3 4 5 6 7 8 9
>>> nc = NodeCollection([series_1, series_2]) >>> nc.to_pandas() series timestamp value 0 series-1 1970-01-01 00:00:00.000000001 100.0 1 series-1 1970-01-01 00:00:00.000000002 200.0 2 series-1 1970-01-01 00:00:00.000000003 300.0 3 series-2 1970-01-01 00:00:00.000000001 200.0 4 series-2 1970-01-01 00:00:00.000000002 400.0 5 series-2 1970-01-01 00:00:00.000000003 600.0
Copied!1 2 3 4 5 6
>>> scatter_plt = F.scatter()(nc) # scatter() uses exactly 2 input time series >>> scatter_plt.to_pandas() is_truncated points.first_value points.second_value points.timestamp 0 False 100.0 200.0 1970-01-01 00:00:00.000000001 1 False 200.0 400.0 1970-01-01 00:00:00.000000002 2 False 300.0 600.0 1970-01-01 00:00:00.000000003
Copied!1 2 3 4 5 6 7 8 9 10
>>> scaled_nc = F.scale(10)(nc) # error - scale() works on a single input series >>> scaled_nc = nc.map(F.scale(10)) # ok - we're mapping each series in the collection to the result of scale() >>> scaled_nc.to_pandas() series timestamp value 0 series-1 1970-01-01 00:00:00.000000001 1000.0 1 series-1 1970-01-01 00:00:00.000000002 2000.0 2 series-1 1970-01-01 00:00:00.000000003 3000.0 3 series-2 1970-01-01 00:00:00.000000001 2000.0 4 series-2 1970-01-01 00:00:00.000000002 4000.0 5 series-2 1970-01-01 00:00:00.000000003 6000.0
Returns a tuple of strings representing the column names of the pandas.DataFrame
that would be produced by evaluating this collection to a dataframe.
Keys of nested objects will be flattened into a tuple with nested keys joined with .
.
Non-uniform collections will contain the union of all columns of each element in the collection.
Copied!1 2
>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1") >>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2")
Copied!1 2 3
>>> nc = NodeCollection(series_1, series_2) >>> nc.columns() # note the additional series column ('series', 'timestamp', 'value')
Copied!1 2 3 4
>>> dist = F.distribution()(nc) >>> dist.columns() ('delta', 'distribution_values.count', 'distribution_values.end', 'distribution_values.start', 'end', 'end_timestamp', 'start', 'start_timestamp')
Copied!1 2 3 4 5 6
>>> mixed_nc = NodeCollection([F.distribution()(series_1), F.statistics()(series_2)]) ('series', 'delta', 'distribution_values.count', 'distribution_values.end', 'distribution_values.start', 'end', 'end_timestamp', 'start', 'start_timestamp', 'count', 'earliest_point.timestamp', 'earliest_point.value', 'largest_point.timestamp', 'largest_point.value', 'latest_point.timestamp', 'latest_point.value', 'mean', 'smallest_point.timestamp', 'smallest_point.value')
Map each time series in the collection to a output for the input function.
This is useful for applying the same set of queries to a collection of time-series together.
func
.Copied!1 2 3 4 5 6 7 8 9 10 11 12 13
>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1") >>> series_1.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 100.0 1 1970-01-01 00:00:00.000000002 200.0 2 1970-01-01 00:00:00.000000003 300.0 >>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2") >>> series_2.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 200.0 1 1970-01-01 00:00:00.000000002 400.0 2 1970-01-01 00:00:00.000000003 600.0 >>> nc = NodeCollection([series_1, series_2])
Copied!1 2 3 4 5 6 7 8 9
>>> scaled_nc = nc.map(F.scale(10)) >>> scaled_nc.to_pandas() series timestamp value 0 series-1 1970-01-01 00:00:00.000000001 1000.0 1 series-1 1970-01-01 00:00:00.000000002 2000.0 2 series-1 1970-01-01 00:00:00.000000003 3000.0 3 series-2 1970-01-01 00:00:00.000000001 2000.0 4 series-2 1970-01-01 00:00:00.000000002 4000.0 5 series-2 1970-01-01 00:00:00.000000003 6000.0
Copied!1 2 3 4 5 6 7 8 9
>>> mapped_summary_nc = nc.map(F.distribution()) >>> mapped_summary_nc.to_pandas() series delta distribution_values.count distribution_values.end distribution_values.start end end_timestamp start start_timestamp 0 series-1 20.0 1 120.0 100.0 300.0 2262-01-01 100.0 1677-09-21 00:12:43.145225216 1 series-1 20.0 1 200.0 180.0 300.0 2262-01-01 100.0 1677-09-21 00:12:43.145225216 2 series-1 20.0 1 300.0 280.0 300.0 2262-01-01 100.0 1677-09-21 00:12:43.145225216 3 series-2 40.0 1 240.0 200.0 600.0 2262-01-01 200.0 1677-09-21 00:12:43.145225216 4 series-2 40.0 1 400.0 360.0 600.0 2262-01-01 200.0 1677-09-21 00:12:43.145225216 5 series-2 40.0 1 600.0 560.0 600.0 2262-01-01 200.0 1677-09-21 00:12:43.145225216
Creates a time range for all time series in the collection using the intervals.
Each interval is used to create a foundryts.functions.time_range()
on the input time series,
which can be used for further transformations and analysis. This is best used with creating Interval
either manually or by converting the result of foundryts.functions.time_range()
to
Interval
.
The resulting dataframe has additional columns for interval.start
and interval.end
.
func
.Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1") >>> series_1.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 100.0 1 1970-01-01 00:00:00.000000002 200.0 2 1970-01-01 00:00:00.000000003 300.0 >>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2") >>> series_2.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 200.0 1 1970-01-01 00:00:00.000000002 400.0 2 1970-01-01 00:00:00.000000003 600.0 >>> nc = NodeCollection([series_1, series_2]) >>> from foundryts.core.interval import Interval >>> intervals = [Interval(1, 2), Interval(2, 3), Interval(3, 4), Intervals(1,3)]
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13
>>> nc = nc.map_intervals(intervals, interval_name="interval") >>> nc.to_pandas() series interval interval.start interval.end timestamp value 0 series-1 1 2 1970-01-01 00:00:00.000000001 100.0 1 series-1 2 3 1970-01-01 00:00:00.000000002 200.0 2 series-1 3 4 1970-01-01 00:00:00.000000003 300.0 3 series-1 1 3 1970-01-01 00:00:00.000000001 100.0 4 series-1 1 3 1970-01-01 00:00:00.000000002 200.0 5 series-2 1 2 1970-01-01 00:00:00.000000001 200.0 6 series-2 2 3 1970-01-01 00:00:00.000000002 400.0 7 series-2 3 4 1970-01-01 00:00:00.000000003 600.0 8 series-2 1 3 1970-01-01 00:00:00.000000001 200.0 9 series-2 1 3 1970-01-01 00:00:00.000000002 400.0
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13
>>> scaled_nc = nc.map(F.scale(1000)) >>> scaled_nc.to_pandas() # applying the scale() function on each time range created from the intervals series interval interval.start interval.end timestamp value 0 series-1 1 2 1970-01-01 00:00:00.000000001 100000.0 1 series-1 2 3 1970-01-01 00:00:00.000000002 200000.0 2 series-1 3 4 1970-01-01 00:00:00.000000003 300000.0 3 series-1 1 3 1970-01-01 00:00:00.000000001 100000.0 4 series-1 1 3 1970-01-01 00:00:00.000000002 200000.0 5 series-2 1 2 1970-01-01 00:00:00.000000001 200000.0 6 series-2 2 3 1970-01-01 00:00:00.000000002 400000.0 7 series-2 3 4 1970-01-01 00:00:00.000000003 600000.0 8 series-2 1 3 1970-01-01 00:00:00.000000001 200000.0 9 series-2 1 3 1970-01-01 00:00:00.000000002 400000.0
Evaluates all time series in the collection and concatenate the results to a pyspark.sql.DataFrame
.
PySpark DataFrames enable distributed data processing and parallelized transformations. They can be useful when
working with dataframes with a large number of rows, for example loading all the points in a raw series or the
result of a FunctionNode
, or evaluating the results of multiple SummarizerNode
or
FunctionNode
together.
An additional series
column will note which series the results belong to using the series ID or
foundryts.functions.points()
alias.
Set numPartitions
to an appropriate value for a large collection.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1") >>> series_1.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 100.0 1 1970-01-01 00:00:00.000000002 200.0 2 1970-01-01 00:00:00.000000003 300.0 >>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2") >>> series_2.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 200.0 1 1970-01-01 00:00:00.000000002 400.0 2 1970-01-01 00:00:00.000000003 600.0 >>> nc = NodeCollection(series_1, series_2) >>> nc.to_dataframe() +---------+-----------------------------+-----+ | series | timestamp |value| +---------+-----------------------------+-----+ |series-1 |1970-01-01 00:00:00.000000001|100.0| |series-1 |1970-01-01 00:00:00.000000002|200.0| |series-1 |1970-01-01 00:00:00.000000003|300.0| |series-2 |1970-01-01 00:00:00.000000001|200.0| |series-2 |1970-01-01 00:00:00.000000002|400.0| |series-2 |1970-01-01 00:00:00.000000003|600.0| +---------+-----------------------------+-----+
Evaluates the time series queries in this collection and concatenates the results to a
pandas.DataFrame
.
Refer to FunctionNode.to_pandas()
and SummarizerNode.to_pandas()
for details on the
shape of the dataframe.
An additional series
column will note which series the results belong to using the series ID or
foundryts.functions.points()
alias.
The result of each time series query in the collection is stored and concatenated to the final dataframe locally, so results must fit in memory.
Parallel execution will increase memory usage and resource consumption. Using process
mode may introduce
overhead. Excessive parallelism can degrade performance.
Copied!1 2 3 4 5 6 7 8 9 10 11 12 13
>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1") >>> series_1.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 100.0 1 1970-01-01 00:00:00.000000002 200.0 2 1970-01-01 00:00:00.000000003 300.0 >>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2") >>> series_2.to_pandas() timestamp value 0 1970-01-01 00:00:00.000000001 200.0 1 1970-01-01 00:00:00.000000002 400.0 2 1970-01-01 00:00:00.000000003 600.0 >>> nc = NodeCollection(series_1, series_2)
Copied!1 2 3 4 5 6 7 8
>>> nc.to_pandas() series timestamp value 0 series-1 1970-01-01 00:00:00.000000001 100.0 1 series-1 1970-01-01 00:00:00.000000002 200.0 2 series-1 1970-01-01 00:00:00.000000003 300.0 3 series-2 1970-01-01 00:00:00.000000001 200.0 4 series-2 1970-01-01 00:00:00.000000002 400.0 5 series-2 1970-01-01 00:00:00.000000003 600.0
Deprecated since version 1.0.0.
Return an RDD of (key, object
) by evaluating the nodes in
this collection. If the nodes in this NodeCollection
resulted from grouping or
windowing operations, the respective key will contain the given metadata keys or intervals.
Otherwise it will be the node’s series identifier.
pyspark.RDD
Returns a tuple of types for the column of the pandas.DataFrame
that would be produced by evaluating the collection to a dataframe.
Non-uniform collections will contain the union of all types of each element in the collection.
Copied!1 2
>>> series_1 = F.points((1, 100.0), (2, 200.0), (3, 300.0), name="series-1") >>> series_2 = F.points((1, 200.0), (2, 400.0), (3, 600.0), name="series-2")
Copied!1 2 3
>>> nc = NodeCollection(series_1, series_2) >>> nc.types() (<class 'str'>, <class 'int'>, <class 'float'>)
Copied!1 2 3 4 5
>>> dist = F.distribution()(nc) >>> dist.types() (<class 'float'>, <class 'int'>, <class 'float'>, <class 'float'>, <class 'float'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>)
Copied!1 2 3 4 5 6 7 8 9
>>> mixed_nc = NodeCollection([F.distribution()(series_1), F.statistics()(series_2)]) >>> print(mixed_nc.types()) (<class 'str'>, <class 'float'>, <class 'int'>, <class 'float'>, <class 'float'>, <class 'float'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'int'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>, <class 'float'>, <class 'pandas._libs.tslibs.timestamps.Timestamp'>, <class 'float'>)