大型输入数据

如果您正在处理大型时间序列数据,您可能会面临多个问题。其中最重要的两个是

  • 特征提取执行时间长

  • 内存消耗大,甚至超出了单台机器的处理能力

要解决第一个问题,您可以并行化计算,如并行化中所述。请注意,在您的本地计算机上,并行化默认已经开启。

然而,对于更大的数据集,您需要同时处理这两个问题。您有多种选择可以做到这一点,我们将在接下来的段落中进行讨论。

Dask - 简单方法

tsfresh 接受 dask dataframe 作为 tsfresh.extract_features() 函数的输入,而不是 pandas dataframe。Dask dataframes 允许您将计算扩展到本地内存之外(通过内部数据分区),甚至扩展到大型机器集群。其 dataframe API 与 pandas dataframes 非常相似,甚至可以是直接替代。

数据格式 中讨论的所有参数对于 dask dataframes 也有效。输入数据将使用 dask 方法转换为 tsfresh 的正确格式,特征提取将作为附加计算添加到计算图中。然后您可以向结果添加附加计算,或照常使用 .compute() 触发计算。

注意

特征提取的最后一步是将所有特征转换为表格格式。特别是对于非常大的数据样本,此计算可能是一个巨大的性能瓶颈。因此,如果您确实不需要透视,我们建议将其关闭,并尽可能使用非透视的数据。

例如,从 parquet 读取数据并进行特征提取

import dask.dataframe as dd
from tsfresh import extract_features

df = dd.read_parquet(...)

X = extract_features(df,
                     column_id="id",
                     column_sort="time",
                     pivot=False)

result = X.compute()

Dask - 更多控制

特征提取方法在调用实际特征计算器之前需要执行一些数据转换。如果您想优化数据流,您可能希望更精确地控制如何将特征计算添加到您的 dask 计算图中。

因此,也可以直接添加特征提取

from tsfresh.convenience.bindings import dask_feature_extraction_on_chunk
features = dask_feature_extraction_on_chunk(df_grouped,
                                            column_id="id",
                                            column_kind="kind",
                                            column_sort="time",
                                            column_value="value")

但在这种情况下,df_grouped 必须已经处于正确的格式。请查看 tsfresh.convenience.bindings.dask_feature_extraction_on_chunk() 的文档了解更多信息。在这种情况下不会执行透视。

PySpark

与 dask 类似,也可以将特征提取传递到 Spark 计算图中。您可以在 tsfresh.convenience.bindings.spark_feature_extraction_on_chunk() 的文档中找到更多信息。