跳到主要内容

Connector 连接器

Connector

连接器(Connector)的编程接口,用于管理连接器的生命周期、配置、状态、任务等。 连接器是一个独立的进程,用于连接数据源和数据目的地,实现数据的抽取、转换和加载。 连接器的生命周期包括:创建、启动、暂停、恢复、重启、删除。

继承自:

示例 1:
const connEntity = new Connector();
connEntity.init(this.connectionService, 'test_connector_1');
// 获取连接器的详细配置信息
connEntity.fetch().then((result) => {
});
// 获取连接器的配置信息
connEntity.status().then((result) => {
});

成员函数:

pause() → {Promise}

暂停连接器

resume() → {Promise}

恢复连接器

restart() → {Promise}

重启连接器

config() → {Promise}

获取连接器的配置信息

updateConfig(props: object) → {Promise}

更新连接器的配置信息

参数:
参数名称参数类型描述
propsobject

新的连接器配置

status() → {Promise}

获取连接器状态

tasks() → {Promise}

获取连接器下的子任务

stopWaitStatusChange()

停止由waitUntilStatusChange方法发起的状态变化监听

waitUntilStatusChange(curStatus: string, callback: function)

等待直到连接器状态发生变化。此方法会定时获取连接器的状态,直到连接器的状态发生变化,或者超过100次获取尝试。

参数:
参数名称参数类型描述
curStatusstring

当前状态

callbackfunction

回调函数,当连接器的状态发生变化时,会调用该函数

示例:
const connEntity = new Connector();
connEntity.init(this.connectionService, name);
connEntity.waitUntilStatusChange('unknown', () => {
    // load connector status
});

Connectors

连接器列表(Connectors)是连接器的集合,用于获取连接器列表;创建连接器等操作。以下例子是创建一个Kafka连接器。 我们推荐从页面中去创建连接器,而不是通过SDK创建连接器。因为:

  1. 有许多默认值在页面中已经设置好,而通过SDK创建连接器时,需要用户自己去设置这些默认值。
  2. 有许多预定义的字段在页面中都已经做了校验,而通过SDK创建连接器时,需要用户自己去校验这些字段。

继承自:

示例 1:
// 使用SDK创建一个Karfka连接器
const connectorCol = connectorService.connectors()
connectorCol.create({
     name: 'test-connector',
     config: {
         "connector.class": "com.comcast.kafka.connect.kafka.KafkaSourceConnector",
         "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
         "value.converter": "org.apache.kafka.connect.json.JsonConverter",
         "value.converter.schemas.enable": "false",
         "transforms": "HoistField,setupYHP",
         "transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Value",
         "transforms.setupYHP.type": "com.yanhuangdata.parana.connect.smt.SetYhpAttribute",
         "transforms.HoistField.field": "_message",
         "connector.description": "test kafka connector",
         "source.bootstrap.servers": "<kafka-broker-ip>:<kafka-broker-port>",
         "source.topic.whitelist": "",
         "source.auto.offset.reset": "earliest",
         "source.group.id": "groupId-kZamceUS",
         "transforms.setupYHP.datatype": "json",
         "transforms.setupYHP.eventset": "energy",
         "tasks.max": 1
     }
}).then((newConnector) => {
    // do something with newConnector
})

成员函数:

init(service: Service, namespace: object)

连接器列表的初始化方法,一般用户不需要调用该方法,而是通过ConnectionService.connectors()获取该对象,在该方法中会初始化连接器列表的路径、服务等信息。

参数:
参数名称参数类型描述
serviceService
namespaceobject