并行化

特征提取、特征选择以及滚动计算都提供了并行化的可能性。默认情况下,所有这些任务都由 tsfresh 并行化处理。本文档讨论了控制并行化的各种设置。为了在您的用例中获得最佳结果,您应该试验这些参数。

注意

本文档描述了如何通过并行化来加速处理时间。如果您正在处理大量数据(可能无法放入内存),请查看 大规模输入数据

请将您调整以下参数的结果告知我们!这将有助于改进文档和默认设置。

特征选择的并行化

我们使用一个 multiprocessing.Pool 来并行计算每个特征的 p 值。实例化时,我们将 Pool 的工作进程数设置为 n_jobs。该字段默认为当前系统的处理器数量。我们建议将其设置为可用(且当前空闲)的最大处理器数量。

Pool 的 map 函数的 chunksize 是另一个需要考虑的重要参数。它可以通过 chunksize 字段设置。默认情况下,它取决于 multiprocessing.Pool 的并行化参数。一个数据块被定义为一个 id 和一个类型的单个时间序列。块大小是作为一项任务提交给一个工作进程的块的数量。如果您将块大小设置为 10,则意味着一个工作任务对应于计算 10 个 id/类型时间序列组合的所有特征。如果设置为 None,则根据分发器使用启发式算法来寻找最优块大小。块大小对集群的最佳性能有至关重要的影响,应针对具体问题进行基准测试优化。

特征提取的并行化

对于特征提取,tsfresh 暴露了参数 n_jobschunksize。两者行为方式与特征选择的参数类似。

为了进行性能研究和剖析,有时关闭并行化会很有用。这可以通过将参数 n_jobs 设置为 0 来实现。

超越单机的并行化

大量时间序列数据可能需要大规模分析。因此,时间序列需要在计算单元组上处理,而不是在单台机器上处理。

因此,可能需要将时间序列特征提取分发到集群。可以使用 tsfresh 以分布式方式提取特征。在以下段落中,我们将讨论如何设置分布式 tsfresh

为了分发特征计算,我们使用一个特定对象,即 Distributor 类(位于 tsfresh.utilities.distribution 模块中)。

本质上,一个 Distributor 组织特征计算器应用于数据块。它将特征计算器映射到数据块,然后对其进行归约(reduce),这意味着它将单个映射的结果组合成一个对象,即特征矩阵。

因此,Distributor 将按以下顺序执行:

  1. 根据时间序列数据的特征计算最优 chunk_size(通过 calculate_best_chunk_size()

  2. 将时间序列数据分割成块(通过 partition()

  3. 将特征计算器应用于数据块(通过 distribute()

  4. 将结果组合成特征矩阵(通过 map_reduce()

  5. 关闭所有连接,关闭所有资源并清理一切(通过 close()

那么,如何使用 Distributor 来使用 tsfresh 提取特征呢?您需要将 distributor 作为参数传递给 extract_features() 方法。

以下示例展示了如何定义 MultiprocessingDistributor,它将把计算分发到本地线程池。

from tsfresh.examples.robot_execution_failures import \
    download_robot_execution_failures, \
    load_robot_execution_failures
from tsfresh.feature_extraction import extract_features
from tsfresh.utilities.distribution import MultiprocessingDistributor

# download and load some time series data
download_robot_execution_failures()
df, y = load_robot_execution_failures()

# We construct a Distributor that will spawn the calculations
# over four threads on the local machine
Distributor = MultiprocessingDistributor(n_workers=4,
                                         disable_progressbar=False,
                                         progressbar_title="Feature Extraction")

# just to pass the Distributor object to
# the feature extraction, along with the other parameters
X = extract_features(timeseries_container=df,
                     column_id='id',
                     column_sort='time',
                     distributor=Distributor)

以下示例对应于现有的多进程 tsfresh API,您只需指定作业数量,而无需构造 Distributor。

from tsfresh.examples.robot_execution_failures import \
    download_robot_execution_failures, \
    load_robot_execution_failures
from tsfresh.feature_extraction import extract_features

download_robot_execution_failures()
df, y = load_robot_execution_failures()

X = extract_features(timeseries_container=df,
                     column_id='id',
                     column_sort='time',
                     n_jobs=4)

使用 dask 分布计算

我们为 dask 框架 提供了一个 Distributor,其中 “Dask 是一个灵活的并行计算库,用于分析计算。”

注意

文档的这一部分仅处理使用 dask 集群并行化计算。输入和输出仍然是 pandas 对象。如果您想利用 dask 的能力扩展到超出本地内存的数据,请查看 大规模输入数据

Dask 是一个将分析计算分发到集群的优秀框架。它可以向上和向下扩展,这意味着您也可以在单台机器上使用它。在 Dask 集群上运行 tsfresh 唯一需要的是 dask-scheduler 的 IP 地址和端口号。

假设您的 dask scheduler 正在 192.168.0.1:8786 上运行,那么我们可以构造一个 ClusterDaskDistributor,它连接到 scheduler 并将时间序列数据和计算分发到集群。

from tsfresh.examples.robot_execution_failures import \
    download_robot_execution_failures, \
    load_robot_execution_failures
from tsfresh.feature_extraction import extract_features
from tsfresh.utilities.distribution import ClusterDaskDistributor

download_robot_execution_failures()
df, y = load_robot_execution_failures()

Distributor = ClusterDaskDistributor(address="192.168.0.1:8786")

X = extract_features(timeseries_container=df,
                     column_id='id',
                     column_sort='time',
                     distributor=Distributor)

与上面的 MultiprocessingDistributor 示例相比,我们只需更改一行代码即可从单台机器切换到整个集群。就是这么简单。通过更改 Distributor,您可以轻松地将应用程序部署到集群上运行,而不是在工作站上。

您也可以在本地机器上使用本地 DaskCluster 来模拟 Dask 网络。以下示例展示了如何在由 3 个 worker 组成的本地集群上设置一个 LocalDaskDistributor

from tsfresh.examples.robot_execution_failures import \
    download_robot_execution_failures, \
    load_robot_execution_failures
from tsfresh.feature_extraction import extract_features
from tsfresh.utilities.distribution import LocalDaskDistributor

download_robot_execution_failures()
df, y = load_robot_execution_failures()

Distributor = LocalDaskDistributor(n_workers=3)

X = extract_features(timeseries_container=df,
                     column_id='id',
                     column_sort='time',
                     distributor=Distributor)

编写自己的分发器

如果您想使用 Dask 以外的其他框架,则必须编写自己的 Distributor。要构建自定义 Distributor,您需要定义一个继承自抽象基类 tsfresh.utilities.distribution.DistributorBaseClass 的对象。tsfresh.utilities.distribution 模块包含有关您需要实现的更多信息。

有效并行化注意事项

默认情况下,tsfresh 使用并行化将单线程 Python 代码分发到主机上的多个核心。

然而,这可能会产生一个称为“过度分配”(over-provisioning)的问题。特征计算器中使用的许多底层 Python 库(例如 numpy)具有用于其低级处理的 C 代码实现。它们尝试将工作负载分散到尽可能多的可用核心之间——这与 tsfresh 完成的并行化冲突。

过度分配由于重复上下文切换的开销而效率低下。

通过使用以下环境变量将 C 库限制为单线程,可以解决此问题:

import os
os.environ['OMP_NUM_THREADS'] = "1"
os.environ['MKL_NUM_THREADS'] = "1"
os.environ['OPENBLAS_NUM_THREADS'] = "1"

将这些行放在您的 notebook/Python 脚本的开头——在调用任何 tsfresh 代码或导入任何其他模块之前。

主机计算机的核心越多,通过实施这些环境变量更改获得的处理速度提升就越大。根据主机类型,观察到速度提升在 6 倍到 26 倍之间。

注意

如果您打算在特征提取后运行机器学习流水线,强烈建议恢复这些更改。设置已在 GPU 加速的 XGBoost 分类器训练任务上进行了测试,速度降低了 9 倍。许多流行的机器学习库,如 scikit-learn 和 Tensorflow/PyTorch,都依赖于 CPU 核心的并行化来加速 CPU 依赖的低级计算。通过强制这些库仅使用单线程,即使您使用 GPU 加速它们,也会为其他下游任务造成瓶颈。例如,每次 PyTorch 神经网络正向传播后的损失函数 CPU 计算将仅在一个线程上运行,而 GPU 处于空闲状态,等待其完成才能开始其反向传播任务。