跳到主要内容
版本:2.15.0

SQL结果到kafka

通过SQL语句,用户可以将查询的结果作为事件导出到kafka。

前提条件

  1. Kafka的服务器地址必须是炎凰数据平台可达的,并且需要导出到的topic在kafka中事先创建好。
  2. 需要提前使用SQL语句创建一张映射到kafka配置的外部表。
  3. 暂时不支持kafka的用户名密码验证,因此导出到的kafka实例需要关闭用户名密码认证。

创建外部表映射到kafka配置

为了使用SQL将查询结果导出到某个kafka实例,我们需要先在平台中创建一张外部表,这张外部表配置了目标kafka实例必需的参数,可以理解为对目标kafka实例的一种映射。 用户成功创建了一张kafka类型的外部表后,再往这张外部表中插入查询结果,即等同于往目标kafka实例导出查询结果。 平台中创建一张映射kafka实例的外部表,其语法如下:

CREATE [OR REPLACE] TABLE <table_name> ENGINE=kafka WITH (server_url=<server_ip>,server_port=<server_port>,topic=<topic_name>)

这里table_name是外部表的唯一标识,它不能与平台中的数据集、视图、物化视图等重名;ENGINE=kafka表示这是一张映射到kafka的外部表; WITH关键字后面跟着的是逗号分隔的配置参数,每一组参数以key=value的格式赋值,kafka类型的外部表必需的参数列表说明如下: 参数说明:

参数说明
server_urlkafka服务的IP地址或者有效URL
server_portkafka服务使用的连接端口
topickafka服务上存在的topic名称

例如,我们先创建一张名为kafka_0的外部表,映射到一个地址为1.1.1.1:9999的kafka实例,这个实例上已经有一个名为new-events的topic用于接收数据导入:

CREATE OR REPLACE TABLE kafka_0 ENGINE=kafka WITH (server_url='1.1.1.1',server_port='9999',topic='new-events')

创建外部表成功后,我们可以通过查看对应的SHOW TABLES语法来查看外部表是否创建成功,语法如下: SHOW [FULL] TABLES [WHERE identity={table_name}]

例如,我们可以直接列出所有外部表的名称,查看是否有我们刚刚创建的表:

SHOW TABLES

我们还可以查看我们创建的外部表的详细配置是否正确,通过下列语句列出外部表的具体参数:

SHOW FULL TABLES WHERE identity='kafka_0'

当我们确认外部表配置无误后,我们就可以通过SQL将查询结果导出到kafka,其语法与将查询结果导入到某个数据集相同: INSERT INTO <table_name> <query_expr>

例如,通过以下SQL语句,将表函数ip_location查询的结果导入出到kafka_0映射的kafka实例,查询结果将被导出到对应kafka的new-events这个topic中:

INSERT INTO kafka_0 SELECT * FROM ip_location('43.228.180.166')

如果我们不再需要这个映射到kafka的外部表,我们也可以通过SQL将其删除,语法如下。注意删除外部表只是删除该kafka实例在平台上的映射配置,不会删除已经导出到对应kafka实例的数据。 DROP TABLE <table_name>

例如,删除掉名为kafka_0的外部表:

DROP TABLE kafka_0

{% include note.html content="目标topic必须存在" %}

导出查询流程

  1. 在查询页面中,执行CREATE TABLE类型的查询,将目标kafka必需的配置存成一张kafka类型的外部表。
  2. 在查询页面中,执行一个INSERT INTO <table_name> <query_expr>类型的查询。该类型的查询包含了查询结果以及将结果导出到kafka两部分。
  3. 查询部分完成后,会返回给用户状态信息,提示用户导出任务是否已经被触发。将查询结果导出到kafka服务的工作将会在后台继续运行。
  4. 用户可前往查询任务页面,查看本次查询结果导出的状态,待查询结果以及结果导出两部分工作都完成后,该任务状态将显示为已完成