tsfresh.utilities 包

子模块

tsfresh.utilities.dataframe_functions 模块

用于处理 DataFrame 转换为内部标准化格式(参见 normalize_input_to_internal_representation)或处理 DataFrame 中 NaNinf 的实用函数。

tsfresh.utilities.dataframe_functions.add_sub_time_series_index(df_or_dict, sub_length, column_id=None, column_sort=None, column_kind=None)[source]

添加一个包含“id”的列,其内容如下:

  • 如果 column_id 为 None:对于每种类型(如果 column_kind 也为 None 则对于整个数据帧),通过将数据“子打包”成长度为“sub_length”的包来构建一个新的索引。例如,如果您有长度为 11 的数据且 sub_length 为 2,您将获得 6 个新包:0, 0; 1, 1; 2, 2; 3, 3; 4, 4; 5。

  • 如果 column_id 不为 None:与之前相同,但分别针对每个 id。旧的 column_id 值在逗号后添加到新的“id”列中。

您可以使用此函数将一个长测量值转换为子包,您希望在这些子包上提取特征。

参数:
  • df_or_dict (pandas.DataFramedict) – pandas DataFrame 或字典。对象的所需形状/形式取决于其余传入的参数。

  • column_id (basestringNone) – 必须存在于 pandas DataFrame 或字典中的所有 DataFrame 中。此列不允许有 NaN 值。

  • column_sort (basestringNone) – 如果不为 None,则按此列对行进行排序。此列不允许有 NaN 值。

  • column_kind (basestringNone) – 仅在传入 pandas DataFrame 时可以使用(字典已被假定按类型分组)。它必须存在于 DataFrame 中且不允许有 NaN 值。如果未传入类型列,则假定 pandas DataFrame 中的每列(id 或排序列除外)都是一种可能的类型。

返回:

添加了“id”列的数据帧或数据帧字典

返回类型:

与 df_or_dict 相同

tsfresh.utilities.dataframe_functions.check_for_nans_in_columns(df, columns=None)[source]

辅助函数,用于检查数据帧中的 NaN,如果存在,则抛出 ValueError

参数:
  • df (pandas.DataFrame) – 要测试 NaN 的 pandas DataFrame

  • columns (list) – 要测试 NaN 的列列表。如果留空,将测试 DataFrame 的所有列。

返回:

返回类型:

抛出:

ValueError 如果在 DataFrame 中找到 NaNs

tsfresh.utilities.dataframe_functions.get_ids(df_or_dict, column_id)[source]

从时间序列容器聚合 column_id 中的所有 id

参数:
  • df_or_dict (pandas.DataFramedict) – pandas DataFrame 或字典。

  • column_id (basestring) – 必须存在于 pandas DataFrame 或字典中的所有 DataFrame 中。此列不允许有 NaN 值。

返回:

作为一个集合,包含 energy_ratio_by_chunks 中的所有现有 id

返回类型:

集合

抛出:

TypeError 如果 df_or_dict 不是 dict 或 pandas.DataFrame 类型

tsfresh.utilities.dataframe_functions.get_range_values_per_column(df)[source]

检索 DataFrame df 中每列的有限最大值、最小值和中位数,并将它们存储在三个字典中。这些字典 col_to_maxcol_to_mincol_to_median 将列名映射到该列的最大值、最小值或中位数。

如果一列完全不包含任何有限值,则存储 0。

参数:

df (pandas.DataFrame) – 要获取每列最大值、最小值和中位数的数据帧

返回:

将列名映射到最大值、最小值、平均值的字典

返回类型:

(dict, dict, dict)

tsfresh.utilities.dataframe_functions.impute(df_impute)[source]

按列将 DataFrame df_impute 中的所有 NaNsinfs 替换为同列的平均值/极端值。具体替换方式如下: df_impute 中出现的每个 infNaN 将被替换为:

  • -inf -> min

  • +inf -> max

  • NaN -> median

如果该列完全不包含有限值,则填充零。

此函数修改 df_impute 原位。之后,保证 df_impute 不包含任何非有限值。此外,保证所有列都是 np.float64 类型。

参数:

df_impute (pandas.DataFrame) – 要填充的 DataFrame

返回 df_impute:

填充后的 DataFrame

返回类型 df_impute:

pandas.DataFrame

tsfresh.utilities.dataframe_functions.impute_dataframe_range(df_impute, col_to_max, col_to_min, col_to_median)[source]

按列将 DataFrame df_impute 中的所有 NaNs-inf+inf 替换为提供的字典中的平均值/极端值。

具体替换方式如下: df_impute 中出现的每个 infNaN 将被替换为:

  • -inf -> 由 col_to_min 中的值替换

  • +inf -> 由 col_to_max 中的值替换

  • NaN -> 由 col_to_median 中的值替换

如果在其中一个字典中找不到 df_impute 的列,此方法将引发 ValueError。此外,如果要替换的值之一不是有限值,则返回 ValueError。

此函数修改 df_impute 原位。之后,保证 df_impute 不包含任何非有限值。此外,保证所有列都是 np.float64 类型。

参数:
  • df_impute (pandas.DataFrame) – 要填充的 DataFrame

  • col_to_max (dict) – 将列名映射到最大值的字典

  • col_to_min – 将列名映射到最小值的字典

  • col_to_median – 将列名映射到中位数的字典

返回 df_impute:

填充后的 DataFrame

返回类型 df_impute:

pandas.DataFrame

抛出:

ValueError – 如果 df_impute 的列在 col_to_max, col_to_min 或 col_to_median 中缺失,或者要替换的值是非有限值

tsfresh.utilities.dataframe_functions.impute_dataframe_zero(df_impute)[source]

将 DataFrame df_impute 中的所有 NaNs-infs+infs 替换为 0。 df_impute 将被原位修改。所有列都将转换为 np.float64 数据类型。

参数:

df_impute (pandas.DataFrame) – 要填充的 DataFrame

返回 df_impute:

填充后的 DataFrame

返回类型 df_impute:

pandas.DataFrame

tsfresh.utilities.dataframe_functions.make_forecasting_frame(x, kind, max_timeshift, rolling_direction)[source]

接受一个单一时间序列 x,并构建一个 DataFrame df 和目标向量 y,可用于时间序列预测任务。

返回的 df 将包含 x 中每个时间戳的最后 max_timeshift 个数据点,作为新的时间序列,这样就可以用于拟合时间序列预测模型。

请参阅 滚动/时间序列预测 以详细了解滚动过程以及特征矩阵和目标向量是如何得出的。

返回的时间序列容器 df 将包含展平的数据帧形式的滚动时间序列,即 数据格式 中的第一种格式。

当 x 是 pandas.Series 时,其索引将用作 id。

参数:
  • x (np.arraypd.Series) – 单一时间序列

  • kind (str) – 时间序列的类型

  • rolling_direction (int) – 符号决定是向后(如果符号为正)还是向前(如果在“时间”中)滚动

  • max_timeshift (int) – 如果不为 None,则仅移动至多 max_timeshift。如果为 None,则尽可能多地移动。

返回:

时间序列容器 df,目标向量 y

返回类型:

(pd.DataFrame, pd.Series)

tsfresh.utilities.dataframe_functions.restrict_input_to_index(df_or_dict, column_id, index)[source]

将 df_or_dict 限制为 index 中包含的那些 id。

参数:
  • df_or_dict (pandas.DataFramedict) – pandas DataFrame 或字典。

  • column_id (basestring) – 必须存在于 pandas DataFrame 或字典中的所有 DataFrame 中。此列不允许有 NaN 值。

  • index (Iterablepandas.Series) – 包含 id 的索引

返回 df_or_dict_restricted:

受限的 df_or_dict

返回类型 df_or_dict_restricted:

dict 或 pandas.DataFrame

抛出:

TypeError 如果 df_or_dict 不是 dict 或 pandas.DataFrame 类型

tsfresh.utilities.dataframe_functions.roll_time_series(df_or_dict, column_id, column_sort=None, column_kind=None, rolling_direction=1, max_timeshift=None, min_timeshift=0, chunksize=None, n_jobs=1, show_warnings=False, disable_progressbar=False, distributor=None)[source]

此方法创建时间序列的子窗口。它在“时间”域中(由 column_sort 给定的排序顺序表示)分别对每种类型和每个 id 的(已排序)数据帧进行滚动操作。

对于每个滚动步骤,通过 ({id}, {shift}) 的方案创建一个新的 id,其中 id 是列的旧 id,shift 是“时间”移动量。您可以将其视为一个固定长度(max_timeshift)的窗口,每次移动一个步骤遍历您的时间序列。窗口看到的每个剪切部分都是一个新的时间序列,具有新的标识符。

几点说明

  • 此方法将创建新的 ID!

  • rolling 的符号定义了时间滚动的方向,正值表示我们将剪切窗口向前移动。每个新的子时间序列的名称由最后一个时间点给出。这意味着,名为 ([id=]4,[timeshift=]5)max_timeshift 为 3 的时间序列包含时间点 3、4 和 5 的数据。负的 rolling 方向意味着,您在数据的负时间方向上移动。名为 ([id=]4,[timeshift=]5)max_timeshift 为 3 的时间序列将包含时间点 5、6 和 7 的数据。绝对值定义了在每个步骤移动多少时间。

  • 可以移动不同长度的时间序列,但是

  • 我们假设时间序列是均匀采样的

  • 欲了解更多信息,请参阅 滚动/时间序列预测

参数:
  • df_or_dict (pandas.DataFramedict) – pandas DataFrame 或字典。对象的所需形状/形式取决于其余传入的参数。

  • column_id (basestring) – 必须存在于 pandas DataFrame 或字典中的所有 DataFrame 中。此列不允许有 NaN 值。

  • column_sort (basestringNone) – 如果不为 None,则按此列对行进行排序。此列不允许有 NaN 值。如果未给出,将由一个递增的数字填充,表示传入数据帧的顺序用作时间序列的“时间”。

  • column_kind (basestringNone) – 仅在传入 pandas DataFrame 时可以使用(字典已被假定按类型分组)。它必须存在于 DataFrame 中且不允许有 NaN 值。如果未传入类型列,则假定 pandas DataFrame 中的每列(id 或排序列除外)都是一种可能的类型。

  • rolling_direction (int) – 符号决定是将我们的剪切窗口在“时间”中向后移动还是向前移动。绝对值决定每个步骤移动多少。

  • max_timeshift (int) – 如果不为 None,则剪切窗口最大为 max_timeshift 大小。如果为 None,则无限增长。

  • min_timeshift (int) – 丢弃所有小于或等于此值的提取预测窗口。必须大于或等于 0。

  • n_jobs (int) – 用于并行化的进程数。如果为零,则不使用并行化。

  • chunksize (Noneint) – 每个任务应该计算多少次移动。

  • show_warnings (bool) – 在特征提取过程中显示警告(用于计算器调试)。

  • disable_progressbar (bool) – 计算时不显示进度条。

  • distributor (class) – 高级参数:将其设置为您要用作分发器的类名。有关更多信息,请参阅 utilities/distribution.py。如果您希望 TSFresh 选择最佳分发器,则留空。

返回:

滚动的或数据帧字典

返回类型:

与 df_or_dict 相同

tsfresh.utilities.distribution 模块

此模块包含 Distributor 类,这些对象用于分配特征计算。本质上,Distributor 组织将特征计算器应用于数据块。

此模块由 Nils Braun 设计

class tsfresh.utilities.distribution.ApplyDistributor(meta)[source]

基类: DistributorBaseClass

map_reduce(map_function, data, function_kwargs=None, chunk_size=None, data_length=None)[source]

此方法包含 DistributorBaseClass 类的核心功能。

它将 map_function 映射到数据的每个元素,并将结果合并以返回一个展平的列表。

它需要在每个子类中实现。

参数:
  • map_function (callable) – 应用于每个数据项的函数。

  • data (iterable) – 计算中使用的数据

  • function_kwargs (dict of string to parameter) – map 函数的参数

  • chunk_size (int) – 如果给定,则根据此大小对数据进行分块。如果未给定,则使用经验值。

  • data_length (int) – 如果数据是生成器,则必须在此处设置长度。如果为 none,则长度从数据的 len 中推断出来。

返回:

计算结果

返回类型:

list

class tsfresh.utilities.distribution.ClusterDaskDistributor(address)[source]

基类: IterableDistributorBaseClass

使用 dask 集群的分发器,意味着计算分布在集群上

calculate_best_chunk_size(data_length)[source]

使用集群中的 dask worker 数量(在执行时,即启动提取时)来找到最佳 chunk_size。

参数:

data_length (int) – 定义需要进行多少计算的长度。

close()[source]

关闭与 Dask Scheduler 的连接

distribute(func, partitioned_chunks, kwargs)[source]

通过将 map 命令分发到集群上的 dask worker 来并行计算特征

参数:
  • func (callable) – 要发送到每个 worker 的函数。

  • partitioned_chunks (iterable) – 数据块列表 - 每个元素又是一个块列表 - 应该由一个 worker 处理。

  • kwargs (dict of string to parameter) – map 函数的参数

返回:

计算结果列表 - 每个项应该是将 func 应用于单个元素的结果。

class tsfresh.utilities.distribution.DistributorBaseClass[source]

基类: object

分发器抽象基类。

DistributorBaseClass 子类实例的主要目的是在数据项列表(称为 data)上评估一个函数(称为 map_function)。

根据 distribute 函数的实现,这可以在并行或使用节点集群完成。

map_reduce(map_function, data, function_kwargs=None, chunk_size=None, data_length=None)[source]

此方法包含 DistributorBaseClass 类的核心功能。

它将 map_function 映射到数据的每个元素,并将结果合并以返回一个展平的列表。

它需要在每个子类中实现。

参数:
  • map_function (callable) – 应用于每个数据项的函数。

  • data (iterable) – 计算中使用的数据

  • function_kwargs (dict of string to parameter) – map 函数的参数

  • chunk_size (int) – 如果给定,则根据此大小对数据进行分块。如果未给定,则使用经验值。

  • data_length (int) – 如果数据是生成器,则必须在此处设置长度。如果为 none,则长度从数据的 len 中推断出来。

返回:

计算结果

返回类型:

list

class tsfresh.utilities.distribution.IterableDistributorBaseClass[source]

基类: DistributorBaseClass

Distributor 基类,可以处理所有可迭代项,并分别计算每个项上的 map_function。

这是在数据块上完成的,这意味着 DistributorBaseClass 类将把数据分成块,分发数据并分别对项应用 map_function 函数。

根据 distribute 函数的实现,这可以在并行或使用节点集群完成。

calculate_best_chunk_size(data_length)[source]

计算长度为 data_length 的列表的最佳 chunk_size。当前实现的公式或多或少是单机多进程情况下的经验结果。

参数:

data_length (int) – 定义需要进行多少计算的长度。

返回:

计算出的 chunk size

返回类型:

int

TODO: 研究不同设置下的最佳 chunk size。

close()[source]

用于在使用后清理 DistributorBaseClass 的抽象基函数,例如关闭与 DaskScheduler 的连接

distribute(func, partitioned_chunks, kwargs)[source]

此抽象基函数将工作分配给 worker,worker 可以是线程或集群中的节点。必须在派生类中实现。

参数:
  • func (callable) – 要发送到每个 worker 的函数。

  • partitioned_chunks (iterable) – 数据块列表 - 每个元素又是一个块列表 - 应该由一个 worker 处理。

  • kwargs (dict of string to parameter) – map 函数的参数

返回:

计算结果列表 - 每个项应该是将 func 应用于单个元素的结果。

map_reduce(map_function, data, function_kwargs=None, chunk_size=None, data_length=None)[source]

此方法包含 DistributorBaseClass 类的核心功能。

它将 map_function 映射到数据的每个元素,并将结果合并以返回一个展平的列表。

如何计算任务,由类 tsfresh.utilities.distribution.DistributorBaseClass.distribute() 方法决定,该方法可以在多个线程或多个处理单元之间分配任务等。

为了不单独传输数据的每个元素,数据根据块大小(或在未给定时根据经验猜测)被分割成块。通过这种方式,worker 处理的是大小适当而非微小的数据部分。

参数:
  • map_function (callable) – 应用于每个数据项的函数。

  • data (iterable) – 计算中使用的数据

  • function_kwargs (dict of string to parameter) – map 函数的参数

  • chunk_size (int) – 如果给定,则根据此大小对数据进行分块。如果未给定,则使用经验值。

  • data_length (int) – 如果数据是生成器,则必须在此处设置长度。如果为 none,则长度从数据的 len 中推断出来。

返回:

计算结果

返回类型:

list

static partition(data, chunk_size)[source]

此生成器将可迭代对象分成长度为 chunk_size 的切片。如果块大小不能整除数据长度,则最后一个切片将更短。

取自 https://stackoverflow.com/questions/1915170/split-a-generator-iterable-every-n-items-in-python-splitevery

这里的重要部分是,可迭代对象只被遍历一次,并且块是一个接一个地生成的。这对于内存和速度都有好处。

参数:
  • data (Iterable) – 要分块的数据。

  • chunk_size (int) – 块大小。最后一个块可能更小。

返回:

一个生成数据块的生成器。

返回类型:

Generator[Iterable]

class tsfresh.utilities.distribution.LocalDaskDistributor(n_workers)[source]

基类: IterableDistributorBaseClass

使用本地 dask 集群和进程内通信的分发器。

close()[source]

关闭与本地 Dask Scheduler 的连接

distribute(func, partitioned_chunks, kwargs)[source]

通过将 map 命令分发到本地机器上的 dask worker 来并行计算特征

参数:
  • func (callable) – 要发送到每个 worker 的函数。

  • partitioned_chunks (iterable) – 数据块列表 - 每个元素又是一个块列表 - 应该由一个 worker 处理。

  • kwargs (dict of string to parameter) – map 函数的参数

返回:

计算结果列表 - 每个项应该是将 func 应用于单个元素的结果。

class tsfresh.utilities.distribution.MapDistributor(disable_progressbar=False, progressbar_title='Feature Extraction')[source]

基类: IterableDistributorBaseClass

使用 Python 内建的 map 分发器,它按顺序计算每个任务。

calculate_best_chunk_size(data_length)[source]

对于按顺序计算特征的 map 命令,将使用 chunk_size 为 1。

参数:

data_length (int) – 定义需要进行多少计算的长度。

distribute(func, partitioned_chunks, kwargs)[source]

通过 Python 的 map 命令按顺序计算特征

参数:
  • func (callable) – 要发送到每个 worker 的函数。

  • partitioned_chunks (iterable) – 数据块列表 - 每个元素又是一个块列表 - 应该由一个 worker 处理。

  • kwargs (dict of string to parameter) – map 函数的参数

返回:

计算结果列表 - 每个项应该是将 func 应用于单个元素的结果。

class tsfresh.utilities.distribution.MultiprocessingDistributor(n_workers, disable_progressbar=False, progressbar_title='Feature Extraction', show_warnings=True)[source]

基类: IterableDistributorBaseClass

使用 multiprocessing Pool 在本地机器上并行计算任务的分发器。

close()[source]

收集 worker 的结果并关闭线程池。

distribute(func, partitioned_chunks, kwargs)[source]

通过将 map 命令分发到线程池来并行计算特征

参数:
  • func (callable) – 要发送到每个 worker 的函数。

  • partitioned_chunks (iterable) – 数据块列表 - 每个元素又是一个块列表 - 应该由一个 worker 处理。

  • kwargs (dict of string to parameter) – map 函数的参数

返回:

计算结果列表 - 每个项应该是将 func 应用于单个元素的结果。

tsfresh.utilities.distribution.initialize_warnings_in_workers(show_warnings)[source]

用于在多进程 worker 中初始化警告模块的小型辅助函数。

在 Windows 上,Python 会生成新的进程,这些进程不会继承警告状态,因此必须在运行计算之前启用/禁用警告。

参数:

show_warnings (bool) – 是否显示警告。

tsfresh.utilities.profiling 模块

包含用于启动和停止分析器的方法,该分析器检查不同特征计算器的运行时

tsfresh.utilities.profiling.end_profiling(profiler, filename, sorting=None)[source]

辅助函数,用于停止分析过程并将分析数据写入给定文件名。在此之前,按传入的 sorting 对统计信息进行排序。

参数:
  • profiler (cProfile.Profile) – 已启动的分析器(可能由 start_profiling 启动)。

  • filename (basestring) – 保存配置文件的输出文件名。

  • sorting (basestring) – 传递给 sort_stats 函数的统计信息的排序方式。

返回:

返回类型:

使用以下方法启动和停止分析器

>>> profiler = start_profiling()
>>> # Do something you want to profile
>>> end_profiling(profiler, "out.txt", "cumulative")
tsfresh.utilities.profiling.get_n_jobs()[source]

获取用于并行处理的任务数量。

返回:

用于并行处理的任务数量。

返回类型:

int

tsfresh.utilities.profiling.set_n_jobs(n_jobs)[source]

设置用于并行处理的任务数量。

参数:

n_jobs (int) – 用于并行处理的任务数量。

返回:

返回类型:

tsfresh.utilities.profiling.start_profiling()[source]

辅助函数,用于启动分析过程并返回分析器(以便稍后关闭)。

返回:

一个已启动的分析器。

返回类型:

cProfile.Profile

使用以下方法启动和停止分析器

>>> profiler = start_profiling()
>>> # Do something you want to profile
>>> end_profiling(profiler, "cumulative", "out.txt")

tsfresh.utilities.string_manipulation 模块

tsfresh.utilities.string_manipulation.convert_to_output_format(param)[source]

辅助函数,用于将参数转换为可用于列名的有效字符串。它执行与 from_columns 函数中使用的相反操作。

参数按其名称排序并按以下形式输出

<参数名>_<参数值>__<参数名>_<参数值>__ …

如果 <参数值> 是字符串,此方法将用括号 “” 包裹它,例如 “<参数值>”

参数:

param (dict) – 要输出的参数字典

返回:

解析后参数的字符串

返回类型:

str

tsfresh.utilities.string_manipulation.get_config_from_string(parts)[source]

辅助函数,用于从列名中提取某个函数的配置。列名部分(按“__”分隔)应传入此函数。它将跳过类型名和函数名,仅使用参数部分。这些部分将按“_”分成参数名和参数值。该值将转换为 Python 对象(例如,“(1, 2, 3)”转换为由整数 1, 2 和 3 组成的元组)。

如果列名中没有参数,则返回 None。

参数:

parts (list) – 按“__”分隔的列名部分

返回:

一个字典,包含列名中编码的所有参数。

返回类型:

dict

模块内容

utilities 子模块包含多个实用函数。这些函数仅应在 tsfresh 内部使用。