Python UDTF开发
炎凰数据平台提供了Python工具Eggfarm来辅助开发Python自定义表函数,我们先来介绍Python表函数相关API。
开发环境
- Python >=3.9,<4.0
- Poetry >=1.0 安装文档
Python UDTF APIs
表函数API
@udtf(is_parser=False)
class BaseFunction():
    def get_name(self):
        """
        :return: the name of the table function
        """
        return "base"
    def initialize(self, table_writer):
        """
        This method will be called once for every batch 
        in the input table with function applied
        :param table_writer: table writer for writing produced results
        :return: None
        """
        pass
    def process(self, params, table_writer, context):
        """
        This method is main body of function using input parameters.
        When applying table function, 
        it will be called once for every row from input table.
        :param params: a list containing all of the input parameters
        :param context: context that maintaining more information, 
                which is reserved for future
        :param table_writer: write the produced rows and columns 
                     into the result table.
        "table_writer" has three writing mode:
            - row oriented
            - column oriented
            - batch oriented
        in a function process, single writing mode is required.
        related apis are:
            - write_row(kv_pairs) (all values are saved as string type)
            - write_column(column_name, column_type, [column_values])
            - table_batch_iterator
        :return: None
        """    
示例代码中展示了表函数的所有API:
- get_name:返回表函数的名称
- initialize:初始化- table_writer或者初始化表函数执行过程中的状态信息
- process:表函数的主体逻辑- params: 表函数输入参数列表
- table_writer: 存放处理结果
- context: 预留参数
 
平台处理返回数据的接口是通过table_writer来表达的,目前包含三种写方式:
- write_row(dict)按行写入结果,这里输入的参数只有一个,由键值对组成的字典结果,需要注意的是,这里会将所有的值转换为- string类型返回
- write_column(column_name, column_type, [column_values]): 按列写入结果- column_name:列名;
- column_type:列具体类型,这里需要使用pyarrow的数据类型,在同一个process处理过程中,请不要将相同的列名赋予不同的数据类型。常用的数据类型:- pyarrow.utf8(),- pyarrow.int64()等;
- column_values:需要写入的列值列表,值类型需要和列类型匹配;
 
- table_batch_iterator:将结果组织成生成器Generator传入- table_writer中,生成器Generator调用- next方法后生成的结果为- pyarrow batch数据类型。
另外,我们可以添加装饰器udtf来显示定义表函数类型,用以得到更好的性能,但这并不会对表函数处理过程进行检查。
def udtf(is_lookup=False, is_parser=False, not_producing_multi_rows=False):
    """
    @param is_parser: `is_parser=True` indicates this table function parses its argument and produces key value pairs as a new row. It should NOT write more than 1 row for each input if specified. Specifying it to be True helps performance.
    @param is_lookup: `is_lookup=True` indicates this table function uses its argument as lookup key and produces key value pairs as a new row. It should NOT write more than 1 row for each input if specified. Specifying it to be True helps performance.
    @param not_producing_multi_rows: `not_producing_multi_rows=True` indicates this table function produces no more than one row. It should NOT write more than 1 row for each input if specified.
    """
    def decorate_with_udtf(func_class):
        func_class.is_parser = is_parser
        func_class.is_lookup = is_lookup
        func_class.not_producing_multi_rows = is_lookup or is_parser or not_producing_multi_rows
        return func_class
    return decorate_with_udtf
装饰器udtf提供了三个参数来表述表函数的类型:
- is_lookup
- is_parser
- not_producing_multi_rows
这三个参数意在指定表函数对于任意输入参数列表process处理后,只生成不多于一行的结果。当表函数被指定这样的装饰器后,在Apply操作中,平台将节省Apply原始表的处理过程,可以假定原始表原样返回,从而提升性能。
Eggfarm
本章节介绍用于辅助开发Python表函数的工具Eggfarm。
基本使用
- 安装- pip install eggfarm
- eggfarm已经发布到公共环境
- 安装eggfarm后会同时安装eggfarm命令
 
- 使用- eggfarm new add_two
- 这里add_two为表函数的名称,需要注意的是,这个名称需要和后续注册到平台中的名称保持一致。
 
生成表函数的结构展示如下:
add_two/
├── add_two
│   ├── __init__.py
│   ├── info.toml
│   └── version.py
├── pyproject.toml
├── tests
│   ├── func_test.py
│   └── supported_signature_list.py
├── Makefile
└── README.md
进入add_two目录下执行make install && make test && make package就可以得到最简单的add_two函数打包结果,可直接上传到平台文件管理页面,然后注册使用。
那么这些文件具体代表什么呢?
pyproject.toml
Eggfarms生成的目录是基于poetry开发环境的,带有pyproject.toml文件:
[tool.poetry]
name = "add_two"
version = "0.1.0"
description = "add_two table function"
authors = []
[tool.poetry.dependencies]
python = "^3.9"
[tool.poetry.dev-dependencies]
pytest = "^6.0.1"
# depends on the udtfs package 
stonewave-sql-udtfs = "^0.6.0"
toml = "^0.10.2"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
pyproject.toml需要用户加入表函数所有用到的依赖,这里已经默认将stonewave-sql-udtfs 和toml依赖加入到tool.poetry.dev-dependencies中,stonewave-sql-udtfs包含了所有表函数基本API,toml用于读取表函数签名。放在tool.poetry.dev-dependencies中的依赖库将不会打包到最终的结果中,可以放测试相关的应用库。对于函数实际执行中需要用到的依赖请放到tool.poetry.dependencies中,最后需要被打包到结果中。
__init__.py
Eggfarm生成了基于创建时的函数名称的表函数接口,继承自BaseFunction:
from stonewave.sql.udtfs.base_function import BaseFunction, udtf
from stonewave.sql.udtfs.logger import logger
import pyarrow
# TODO: change `is_parser` to True if your table function parses an event and extracts fields from the event.
# Each event parsing SHOULD produce exactly one event.
# by specifying `is_parser` property, the performance will be better
@udtf(is_parser=False)
class AddTwoFunction(BaseFunction):
    def get_name(self):
        """
        :return: the name of the table function
        """
        return "add_two"
    # ...
    
    def process(self, params, table_writer, context):
        
        """    
        # method 1:
        # using kv pairs to append row, kv pairs means column name and column value
        # all results are appended in string datatype
        num1 = params[0]
        num2 = params[1]
        table_writer.write_row({"add_result": num1+num2 })
        # ===>  | add_result |
        #       |  num1+num2 |
        """
        # method 2 (Advanced):
        # using write_column to add a column using pyarrow datatype
        # using extend to append multiple value for column
        num1 = params[0]
        num2 = params[1]
        table_writer.write_column("add_result", pyarrow.int64(), [int(num1) + int(num2)])
        """
这里process生成了简单代码,示例table_writer的使用。所有日志请使用from stonewave.sql.udtfs.logger import logger作为统一的日志收集接口。
info.toml
# TODO: change the function sigature according to your needs
# to support multiple signatures, you can add more signatures to this list
# This file is for stonewave table function registration.
# it is not supported to have multiple signatures with the same number of parameters
# e.g. signature_list = [ [ "INT", "STRING", "INT",], [ "INT", "STRING", "INT", "BOOL",], [ "INT", "STRING", "INT", "BOOL", "FLOAT",],]
# current valid data types are: [STRING | INT | BOOL | FLOAT | TABLE]
signature_list = [ ["INT", "INT"] ]
注册函数包到炎凰数据平台时,需要指定函数的参数列表,info.toml便承担了这部分工作。很多时候,函数参数列表含有参数默认值,SQL在表达参数默认值时不直接,这里info.toml提供参数列表的列表signature_list,来表达含有参数默认值的情况。signature_list需要列出全部可能的参数列表。当函数没有默认值时,signature_list即为单一列表元素的列表。
tests
tests目录包含了相应的简单的测试。
- supported_signature_list.py文件无需更改,主要用于测试中对参数的检查
- func_test.py包含简单的测试
基于上述介绍,开发一个简单的表函数,只需要更改process方法,以及填写函数签名signature_list即可完成。但值得注意的是,打包上传的依赖需要是linux可安装的包,所以make package将下载对应依赖的linux包,这可能和你的开发环境依赖的包不一致。
针对于arm64环境的python udtf开发,在最新版的eggfarm 中支持使用make package_arm64下载arm64开发依赖包。
注册
- 首先我们需要将Eggfarm打包的函数包上传至文件管理
- 应当上传格式为 <function_name>_table_func_<creation_datetime>.tar.gz的文件
- 上传文件时须关闭解压
- 在查询页面创建删除自定义Python表函数,语法支持如下:CREATE [OR REPLACE] FUNCTION <function_name>
 LANGUAGE PYTHON
 PACKAGE '<package_path>'
 DROP FUNCTION <function_name>
 LANGUAGE PYTHON
- package_path是上传到文件管理的相对文件路径,当前即文件名
- ⚠️ 注册python table function时会重启python table function server,在其他的python table function正在执行时会注册失败,需要等待其他的python table function执行完成后才能成功执行注册。
表函数process示例
抽取函数(write_row)
from pygrok import Grok
@udtf(is_parser=True)
class ParseGrokFunction(BaseFunction):
    def get_name(self):
        return "parse_grok"
    def process(self, params, table_writer, context):
        text = params[0]
        grok_pattern = params[1]
        grok = Grok(grok_pattern)
        extracted_values = grok.match(text)
        if extracted_values:
           table_writer.write_row(extracted_values)
使用示例
SELECT ip FROM parse_grok('ip is 127.0.0.1', '%{IPV4:ip}')
| ip | 
|---|
| 127.0.0.1 | 
数据生成函数(write_column)
from faker import Faker
import pyarrow
from stonewave.sql.udtfs.logger import logger
fake = Faker()
# add faker provider
class FakerFunction(BaseFunction):
    def get_name(self):
        return "faker"
    def _safe_cast(self, value, to_type, default=None):
        try:
            return to_type(value)
        except (ValueError, TypeError):
            return default
    def process(self, params, table_writer, context):
        row_count = max(0, self._safe_cast(params[0], int, 0))
        if params[1] is None:
            logger.debug("faker column name is not provided, no row is produced")
            return
        column_names = params[1].split(",")
        for column in columns:
            field_faker = getattr(fake, column, None)
            field_type = _get_data_type(field_faker)
            array = [field_faker() if field_faker else None for i in range(row_count)]
            table_writer.write_column(column, field_type, array)
        logger.debug(
            "rows generated via faker",
            row_count=row_count,
        )
        
    # we need use pyarrow datatype
    def _get_data_type(field_faker):
        if not field_faker:
            return pyarrow.utf8()
        else:
            value = field_faker()
            # bool needs to be placed before int because isinstance(True, int) == True
            if isinstance(value, bool):
                return pyarrow.bool_()
            elif isinstance(value, int):
                return pyarrow.int64()
            elif isinstance(value, float):
                return pyarrow.float64()
            else:
                return pyarrow.utf8()
使用示例
select * from faker(10, 'name, email')
| name | |
|---|---|
| Steven Salazar | ldavis@gmail.com | 
| Lydia Sanchez | john86@hotmail.com | 
| Brian Tucker | opatterson@hatfield-kelly.com | 
| Donna Williams | nolansamuel@gilmore-sandoval.com | 
| Christopher Solomon | ashleyjordan@green.com | 
| Jonathan Anderson | sarah09@yahoo.com | 
| Tina Berry | gregoryharris@brown.info | 
| Jacob Lopez | harrismichael@yahoo.com | 
| Amanda Richardson | nmarshall@sanchez-lee.com | 
读取文件函数(table_batch_iterator)
import pyarrow as pa
import pandas as pd
from stonewave.sql.udtfs.logger import logger
from stonewave.sql.udtfs.base_function import BaseFunction
from stonewave.sql.udtfs.constants import STONEWAVE_HOME
class LoadExcelFunction(BaseFunction):
    def __init__(self):
        # uploaded files will be placed in 
        # ${STONEWAVE_HOME}/var/external_data
        self.external_data_dir = 
            os.path.join(STONEWAVE_HOME, "var", "external_data")
    def get_name(self):
        return "load_excel"
    def process(self, params, table_writer, context):
        excel_file = params[0]
        logger.debug(
            "executing load_excel table function",
            excel_file=excel_file,
        )
        try:
            self.set_path(excel_file)
            excel_file_path = self.get_path()
            excel_data = pd.read_excel(
                excel_file_path,
                engine="openpyxl",
                dtype=str,
            )
            sheets = []
            for sheet_name, df in excel_data.items():
                # trim spaces in column names
                df.columns = df.columns.str.strip()
                table = pa.Table.from_pandas(df, preserve_index=False)
                batches = table.to_batches()
                if batches:
                    sheets.append(batches[0])
            table_writer.table_batch_iterator = iter(sheets)
        except Exception as e:
            logger.error("failed to load excel file", error=str(e))
            
    def set_path(self, path):
        logger.debug(
            "executing set_path in base dataloader",
            path=path,
            external_data_dir=self.external_data_dir,
        )
        # ... check path 
        self.path = data_path
    def get_path(self):
        return self.path
使用示例
select * from load_excel('档案编号.xlsx')
| 户管档案编号 | 
|---|
| 201102 | 
| 201104 | 
| 201103 | 
| 201105 | 
| 201201 | 
| 201202 | 
| 201203 | 
| 201204 | 
| 201205 | 
表值函数
import pyarrow as pa
class SummarizeFunction(BaseFunction):
    def __init__(self):
        self.tables = []
    def get_name(self):
        return "summarize"
    def process(self, params, table_writer, context):
        assert len(params) > 0
        # table valued parameter must be placed in first place
        # python process will process a streaming of table batches
        # summarize function needs to combine all batches to calculate results
        batch = params[0]
        if batch is not None:
            table = pa.Table.from_batches([batch])
            self.tables.append(table)
        # when first parameter is None, means batch streaming is ended
        else:
            # when get all batch of input table, calculate summarize stats
            self.summarize(table_writer)
    def summarize(self, table_writer):
        if self.tables:
            table = pa.concat_tables(self.tables, promote=True)
            df = table.to_pandas()
            desc_df = df.describe(datetime_is_numeric=True)
            # the df is transposed because it contains mixed type column, which is not allowed in arrow
            desc_df = desc_df.transpose()
            desc_df.insert(0, "fields", desc_df.index)
            desc_table = pa.Table.from_pandas(desc_df, preserve_index=False)
            batches = desc_table.to_batches()
            table_writer.table_batch_iterator = iter(batches)
        else:
            return
使用示例
with cte as (select * from generate_series(1, 10))
select * from summarize(cte)
| fields | count | mean | std | min | 25% | 50% | 75% | max | 
|---|---|---|---|---|---|---|---|---|
| generate_series | 10 | 5.5 | 3.0276503540974917 | 1 | 3.25 | 5.5 | 7.75 | 10 | 
常见问题
1. make package失败
打包上传的文件需要下载所有直接依赖库,可以是的平台不需要联网便可安装新的表函数。然而平台运行在linux环境下,所以make package会下载所有依赖的linux平台包,需要确认开发时环境所有的依赖包及对应的版本是否在linux平台也存在。
2. 上传注册成功后,执行出错
在函数测试正确的情况下,执行出错,那么需要重启平台后台查询服务(Search Broker Pod),以便加载新的函数包。