你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

Python 插件

适用于:✅Microsoft Fabric✅Azure 数据资源管理器

Python 插件使用 Python 脚本运行用户定义函数 (UDF)。 此 Python 脚本获取表格数据作为其输入,并生成表格输出。 插件的运行时托管在沙盒中,运行在群集的节点上。

语法

T|evaluate [ (singleper_node | )] [hint.remote==hint.distributionauto | local)] output_schema,脚本 [,script_parameters] python( [external_artifacts] [,,spill_to_disk])

详细了解语法约定

参数

客户 类型​​ 必需 说明
output_schema string ✔️ type 文本,定义由 Python 代码返回的表格数据的输出模式。 格式为:typeof(ColumnName ColumnType[, ...]:)例如,typeof(col1:string, col2:long)。 若要扩展输入架构,请使用以下语法:typeof(*, col1:string, col2:long)
脚本 string ✔️ 要执行的有效 Python 脚本。 若要生成多行字符串,请参阅用法提示
script_parameters dynamic 一个名称/值对的属性包,将作为保留的 kargs 字典传递给 Python 脚本。 有关详细信息,请参阅保留 Python 变量
hint.distribution string 一个提示,用于在多个群集节点上分布插件的执行。 默认值为 singlesingle 意味着脚本的单个实例将针对整个查询数据运行。 per_node 意味着,如果在分发 Python 块之前执行查询,则脚本的一个实例将在每个节点上对其包含的数据运行。
hint.remote string 此提示仅适用于跨群集查询。 默认值为 autoauto 意味着服务器会自动决定执行 Python 代码的群集。 将值设置为 local 会强制在本地群集上执行 Python 代码。 在远程群集上禁用了 Python 插件时使用它。
external_artifacts dynamic 名称和 URL 对的属性包,用于可从云存储访问的项目。 有关详细信息,请参阅使用外部项目
spill_to_disk bool 指定一个用于将输入表序列化为 Python 沙盒的替代方法。 若要序列化大表,请将其设置为 true 以加速序列化,并大大减少沙盒内存消耗。 默认值为 true

保留 Python 变量

以下变量是为 Kusto 查询语言和 Python 代码之间的交互而保留的。

  • df:作为 T 数据帧的输入表格数据(上述 pandas 的值)。
  • kargs:作为 Python 字典的 script_parameters 参数的值。
  • result:由 Python 脚本创建的 pandas 数据帧,其值成为发送到插件后面的 Kusto 查询操作符的表格数据。

启用插件

默认禁用该插件。 在开始之前,请查看先决条件列表。 若要启用插件并选择 Python 映像的版本,请参阅在群集上启用语言扩展

Python 沙盒映像

要将 Python 映像的版本更改为不同的托管映像或自定义映像,请参阅更改群集上的 Python 语言扩展映像

若要查看不同 Python 映像的包列表,请参阅 Python 包参考

注意

  • 默认情况下,插件将 numpy 作为 np 导入,将 pandas 作为 pd 导入。 (可选)可以根据需要导入其他模块。
  • 有一些包可能与运行插件的沙盒所实施的限制不兼容。

使用来自查询和更新策略的引入

  • 在以下查询中使用插件:
  • 不能在定义为更新策略一部分的查询中使用插件,该策略的源表是使用流式处理引入引入的。

示例

range x from 1 to 360 step 1
| evaluate python(
//
typeof(*, fx:double),               //  Output schema: append a new fx column to original table 
```
result = df
n = df.shape[0]
g = kargs["gain"]
f = kargs["cycles"]
result["fx"] = g * np.sin(df["x"]/n*2*np.pi*f)
```
, bag_pack('gain', 100, 'cycles', 4)    //  dictionary of parameters
)
| render linechart 

正弦演示的屏幕截图显示查询结果。

性能提示

  • 将插件的输入数据集减少到所需的最小量(列/行)。
    • 在可能的情况下,通过 Kusto 的查询语言对源数据集使用筛选器。
    • 若要对源列的子集进行计算,请在调用插件之前仅投影这些列。
  • 如果脚本中的逻辑可分发,请使用 hint.distribution = per_node
  • 尽可能使用 Kusto 的查询语言来实现 Python 脚本的逻辑。

使用提示

  • 若要在查询编辑器中生成包含 Python 脚本的多行字符串,请从常用的 Python 编辑器(JupyterVisual Studio CodePyCharm 等)复制 Python 脚本,将其粘贴到查询编辑器中,然后在包含三个连续反引号的行之间括起完整的脚本。 例如:

    ```
    python code
    ```

  • 使用 externaldata 运算符 获取存储在外部位置(例如 Azure Blob 存储)中的脚本的内容。

读取 Python 脚本外部数据的示例

    let script = 
        externaldata(script:string)
        [h'https://kustoscriptsamples.blob.core.windows.net/samples/python/sample_script.py']
        with(format = raw);
    range x from 1 to 360 step 1
    | evaluate python(
        typeof(*, fx:double),
        toscalar(script), 
        bag_pack('gain', 100, 'cycles', 4))
    | render linechart 

使用外部项目

云存储中的外部项目可用于脚本并在运行时使用。

外部项目属性引用的 URL 必须是:

注意

使用托管标识对外部项目进行身份验证时,必须在群集级别SandboxArtifacts上定义 用法。

项目可用于从本地临时目录 .\Temp读取的脚本。 属性包中提供的名称用作本地文件名。 请参阅示例

有关引用外部包的信息,请参阅安装 Python 插件的包

刷新外部项目缓存

查询中使用的外部项目文件缓存在群集上。 如果对云存储中的文件进行更新,并要求立即与群集同步,可以使用 .clear cluster cache external-artifacts 命令。 此命令会清除缓存的文件,并确保后续查询使用最新版本的项目运行。

安装 Python 插件的包

在大多数用例下,你可能更愿意创建自定义映像

出于以下原因,你可能想自己安装包:

  • 你没有创建自定义映像的权限。
  • 包是专用的。
  • 你更想要创建用于测试的临时包安装,而不想创建自定义映像的开销。

安装包,如下所示:

先决条件

  1. 创建 blob 容器来承载包,最好与群集位于同一个位置。 例如, https://artifactswestus.blob.core.windows.net/python假设群集位于美国西部。

  2. 更改群集的标注策略以允许访问该位置。

    • 此更改需要 AllDatabasesAdmin 权限。

    • 例如,若要启用对位于 https://artifactswestus.blob.core.windows.net/python 中的 blob 的访问,请运行以下命令:

    .alter-merge cluster policy callout @'[ { "CalloutType": "sandbox_artifacts", "CalloutUriRegex": "artifactswestus\\.blob\\.core\\.windows\\.net/python/","CanCall": true } ]'
    

安装包

  1. 对于 PyPi 或其他通道中的公共包,请下载包及其依赖项。

    • 从本地 Windows Python 环境中的 cmd 窗口运行:
    pip wheel [-w download-dir] package-name.
    
  2. 创建包含所需包及其依赖项的 zip 文件。

    • 对于专用包:压缩包的文件夹及其依赖项的文件夹。
    • 对于公共包,对在上一步中下载的文件进行压缩。

    注意

    • 请确保下载与 Python 引擎和沙盒运行时平台兼容的包(当前为 Windows 上的 3.10.8 或 3.11.7)
    • 请确保压缩 .whl 文件本身,而不是其父文件夹。
    • 对于基本沙箱映像中已经存在的具有相同版本的包,你可以跳过 .whl 文件。
  3. 将 zip 文件上传到项目位置中的 blob(从先决条件的步骤 1 开始)。

  4. 调用 python 插件。

    • 使用 external_artifacts zip 文件的属性包和 blob URL(包括 SAS 令牌)指定参数。
    • 在内联 python 代码中,使用 ZIP 文件的本地名称导入Zipackagesandbox_utils并调用其install()方法。

使用外部项目的示例

安装生成伪数据的 Faker 包。

range ID from 1 to 3 step 1 
| extend Name=''
| evaluate python(typeof(*), ```if 1:
    from sandbox_utils import Zipackage
    Zipackage.install("Faker.zip")
    from faker import Faker
    fake = Faker()
    result = df
    for i in range(df.shape[0]):
        result.loc[i, "Name"] = fake.name()
    ```,
    external_artifacts=bag_pack('faker.zip', 'https://artifacts.blob.core.windows.net/Faker.zip;impersonate'))
身份证件 客户
1 加里·塔皮亚
2 艾玛·埃文斯
3 阿什利·鲍文

有关使用 Python 插件的 UDF 函数的更多示例,请参阅函数库

Python 插件使用 Python 脚本运行用户定义函数 (UDF)。 此 Python 脚本获取表格数据作为其输入,并生成表格输出。

语法

T|evaluate [ (singleper_node | )] [hint.remote==hint.distributionauto | local)] output_schema,脚本 [,script_parameters] python( [external_artifacts] [,,spill_to_disk])

详细了解语法约定

参数

客户 类型​​ 必需 说明
output_schema string ✔️ type 文本,定义由 Python 代码返回的表格数据的输出模式。 格式为:typeof(ColumnName ColumnType[, ...]:)例如,typeof(col1:string, col2:long)。 若要扩展输入架构,请使用以下语法:typeof(*, col1:string, col2:long)
脚本 string ✔️ 要执行的有效 Python 脚本。 若要生成多行字符串,请参阅用法提示
script_parameters dynamic 一个名称/值对的属性包,将作为保留的 kargs 字典传递给 Python 脚本。 有关详细信息,请参阅保留 Python 变量
hint.distribution string 插件执行要分布在多个沙盒中的提示。 默认值为 singlesingle 表示脚本的单个实例将针对单个沙盒中的整个查询数据运行。 per_node 表示如果 Python 块分发到分区之前的查询,则每个分区将并行在其自己的沙盒中运行。
hint.remote string 此提示仅适用于跨群集查询。 默认值为 autoauto 意味着服务器会自动决定执行 Python 代码的群集。 将值设置为 local 会强制在本地群集上执行 Python 代码。 在远程群集上禁用了 Python 插件时使用它。
external_artifacts dynamic 可从 OneLake 存储访问的项目的属性包名称和 URL 对。 有关详细信息,请参阅使用外部项目
spill_to_disk bool 指定一个用于将输入表序列化为 Python 沙盒的替代方法。 若要序列化大表,请将其设置为 true 以加速序列化,并大大减少沙盒内存消耗。 默认值为 true

保留 Python 变量

以下变量是为 Kusto 查询语言和 Python 代码之间的交互而保留的。

  • df:作为 T 数据帧的输入表格数据(上述 pandas 的值)。
  • kargs:作为 Python 字典的 script_parameters 参数的值。
  • result:由 Python 脚本创建的 pandas 数据帧,其值成为发送到插件后面的 Kusto 查询操作符的表格数据。

启用插件

默认禁用该插件。 在开始之前,请在 KQL 数据库中启用 Python 插件

Python 沙盒映像

若要查看不同 Python 映像的包列表,请参阅 Python 包参考

注意

  • 默认情况下,插件将 numpy 作为 np 导入,将 pandas 作为 pd 导入。 (可选)可以根据需要导入其他模块。
  • 有一些包可能与运行插件的沙盒所实施的限制不兼容。

使用来自查询和更新策略的引入

  • 在以下查询中使用插件:
  • 不能在定义为更新策略一部分的查询中使用插件,该策略的源表是使用流式处理引入引入的。

示例

range x from 1 to 360 step 1
| evaluate python(
//
typeof(*, fx:double),               //  Output schema: append a new fx column to original table 
```
result = df
n = df.shape[0]
g = kargs["gain"]
f = kargs["cycles"]
result["fx"] = g * np.sin(df["x"]/n*2*np.pi*f)
```
, bag_pack('gain', 100, 'cycles', 4)    //  dictionary of parameters
)
| render linechart 

正弦演示的屏幕截图显示查询结果。

性能提示

  • 将插件的输入数据集减少到所需的最小量(列/行)。
    • 在可能的情况下,通过 Kusto 的查询语言对源数据集使用筛选器。
    • 若要对源列的子集进行计算,请在调用插件之前仅投影这些列。
  • 如果脚本中的逻辑可分发,请使用 hint.distribution = per_node
  • 尽可能使用 Kusto 的查询语言来实现 Python 脚本的逻辑。

使用提示

  • 若要在查询编辑器中生成包含 Python 脚本的多行字符串,请从常用的 Python 编辑器(JupyterVisual Studio CodePyCharm 等)复制 Python 脚本,将其粘贴到查询编辑器中,然后在包含三个连续反引号的行之间括起完整的脚本。 例如:

    ```
    python code
    ```

  • 使用 externaldata 运算符 获取存储在外部位置(例如 Azure Blob 存储)中的脚本的内容。

读取 Python 脚本外部数据的示例

    let script = 
        externaldata(script:string)
        [h'https://kustoscriptsamples.blob.core.windows.net/samples/python/sample_script.py']
        with(format = raw);
    range x from 1 to 360 step 1
    | evaluate python(
        typeof(*, fx:double),
        toscalar(script), 
        bag_pack('gain', 100, 'cycles', 4))
    | render linechart 

使用外部项目

OneLake 存储中的外部项目可用于脚本并在运行时使用。

项目可用于从本地临时目录 .\Temp读取的脚本。 属性包中提供的名称用作本地文件名。 请参阅示例

有关引用外部包的信息,请参阅安装 Python 插件的包

刷新外部项目缓存

查询中使用的外部项目文件缓存在群集上。 如果对云存储中的文件进行更新,并要求立即与群集同步,可以使用 .clear cluster cache external-artifacts 命令。 此命令会清除缓存的文件,并确保后续查询使用最新版本的项目运行。

安装 Python 插件的包

安装包,如下所示:

先决条件

  • 创建 lakehouse 来托管包,最好与 eventhouse 位于同一工作区中。

安装包

  1. 对于 PyPi 或其他通道中的公共包,请下载包及其依赖项。

    • 从本地 Windows Python 环境中的 cmd 窗口运行:
    pip wheel [-w download-dir] package-name.
    
  2. 创建包含所需包及其依赖项的 zip 文件。

    • 对于专用包:压缩包的文件夹及其依赖项的文件夹。
    • 对于公共包,对在上一步中下载的文件进行压缩。

    注意

    • 请确保下载与 Python 引擎和沙盒运行时平台兼容的包(当前为 Windows 上的 3.10.8 或 3.11.7)
    • 请确保压缩 .whl 文件本身,而不是其父文件夹。
    • 对于基本沙箱映像中已经存在的具有相同版本的包,你可以跳过 .whl 文件。
  3. 将 zip 文件上传到 Lakehouse。

  4. 复制 OneLake URL(从压缩文件的属性)

  5. 调用 python 插件。

    • 使用 external_artifacts zip 文件的属性包和 OneLake URL 指定参数。
    • 在内联 python 代码中,从 Zipackage 导入 sandbox_utils,并使用 zip 文件的名称调用其 install() 方法。

使用外部项目的示例

安装生成伪数据的 Faker 包。

range ID from 1 to 3 step 1 
| extend Name=''
| evaluate python(typeof(*), ```if 1:
    from sandbox_utils import Zipackage
    Zipackage.install("Faker.zip")
    from faker import Faker
    fake = Faker()
    result = df
    for i in range(df.shape[0]):
        result.loc[i, "Name"] = fake.name()
    ```,
    external_artifacts=bag_pack('faker.zip', 'https://msit-onelake.dfs.fabric.microsoft.com/MSIT_DEMO_WS/MSIT_DEMO_LH.Lakehouse/Files/Faker.zip;impersonate'))
身份证件 客户
1 加里·塔皮亚
2 艾玛·埃文斯
3 阿什利·鲍文

有关使用 Python 插件的 UDF 函数的更多示例,请参阅函数库