# Apache Kafka And Confluent Cloud Integration

## なぜTreasure Data CDPとKafkaが重要なのか?

Kafkaはリアルタイムデータストリーミングプラットフォームとして機能し、IT部門が様々なソースから膨大な量のデータを効率的に取り込み、処理、管理することを可能にします。TD CDPもリアルタイム機能を提供しています。しかし、KafkaとTD CDPを組み合わせることで、マーケターは既存の中央データストリーミングプラットフォームを活用し、リアルタイムの顧客行動に基づいてカスタマージャーニーを設計・実行できます。さらに、ITチームは異なるシステム、アプリケーション、データベース間でシームレスなデータ統合と同期を確保できます。これによりデータ管理プロセスが簡素化され、ITは顧客データの単一の信頼できる情報源を維持できるようになります。

# KafkaとTDの統合方法

![](/assets/image2023-7-29_8-15-42.acd9b8da093d311ef0e8933ec8282a07738217c246e7b8c3bf4ec12c1d103a2d.baa96da0.png)

ConfluentはApache Kafkaを実行するためにConfluent Cloud(マネージドサービス)とConfluent Platform(オンプレミスサービス)を提供しています。これらのサービスはHTTPS経由でAPIとKafkaを統合するためのHTTP Sink Connectorを正式にサポートしています。例えば、このコネクタはKafkaから下流のREST APIへデータをプルするSink設定で利用できます。

Confluentサービスを使用していない場合でも、Apache KafkaはAivenのOSS HTTP Sink Connectorでこのアプローチをサポートできます。これらのコネクタを使用することで、Ingestion API経由でTreasure Dataにほぼリアルタイムでレコードをプッシュできます。

# HTTP Sink ConnectorとIngestion APIの統合方法

HTTP Sink connectorはKafkaトピックからレコードを消費し、各レコード値をAvro、JSONなどから*request.body.format=json*でJSONに変換してから、設定された*http.api.url*へのリクエストボディで送信します。このURLはオプションでレコードキーやトピック名を参照できます。

Ingestion APIは変換されたJSONレコードを取得するためのPOSTリクエストをサポートしています。コネクタは設定された*batch.max.size*までレコードをバッチ処理してからAPIへバッチリクエストを送信します。各レコードはrequest.body.format=jsonでJSON表現に変換され、batch.separatorで区切られます。

以下は、Ingestion APIへPOSTリクエストを送信するための設定ファイルにおけるHTTP Sink Connectorの一般的なパラメータです。詳細については、[Importing Table Records Using the Data Ingestion API](/products/customer-data-platform/integration-hub/streaming/importing-table-records-using-the-data-ingestion-api)を参照してください。

*単一*のJSONリクエストのためにIngestion API用にレコードを変換するには、以下のパラメータを使用します。

- **http.api.url**: https://us01.records.in.treasuredata.com/DB/table
- **request.method**: POST
- **http.authorization.type**: none
- **http.headers.content.type**: application/vnd.treasuredata.v1.single+json
- **http.headers.additional**: "Authorization: TD1 TD WRITE API KEY"
- **batch.max.size**: 1
- **request.body.format**: json
- **[batch.json.as](http://batch.json.as).array**: false


高メッセージスループット環境の場合、リクエストごとに1つのJSONレコードに複数のレコードを取り込むことをお勧めします。このシナリオでは、Ingestion APIで複数レコードの取り込みをサポートするようにパラメータを変更します。

- **http.api.url**: https://us01.records.in.treasuredata.com/DB/table
- **request.method**: POST
- **http.authorization.type**: none
- **http.headers.content.type**: application/vnd.treasuredata.v1+json
- **http.headers.additional**: "Authorization: TD1 TD WRITE API KEY"
- **batch.max.size**: 2 (最大500)
- **request.body.format**: json
- **[batch.json.as](http://batch.json.as).array**: true
- **Batch prefix**: {"records":
- **Batch suffix**: }
- **Batch separator**: ,


KafkaとTreasure Data CDPを統合するための段階的なガイダンスについては、以下のチュートリアルを参照してください。

# Confluent CloudとTreasure Data CDPのチュートリアル

Confluent CloudはApache Kafkaをベースにしたマネージド型Kafkaサービスです。HTTP Sink Connectorを使用すると、Kafkaトピックからデータを取得し、外部HTTPエンドポイントとしてTreasure Dataに送信できます。このチュートリアルでは、Confluent CloudでHTTP Sink Connectorを設定する手順について説明します。

以下の手順は、最新のConfluence Cloudユーザーインターフェースとは異なる場合があります。

## 前提条件

- Confluent Cloud有料アカウントとKafka Cluster
- Confluent Cloudの基本的な理解
- Kafkaクラスタ内のトピックにメッセージがあること


本番データがない場合は、[このページ](https://developer.confluent.io/tutorials/kafka-connect-datagen/confluent.html)でダミーデータをテストしてください。

## HTTP Sink Connectorの設定

1. **Connectors**セクションに移動します。
2. **New Connector**を選択して新しいコネクタを作成します。
3. 利用可能なコネクタから**HTTP Sink**を選択します。


HTTP SinkとHTTP Sink Connectorの2つのHTTP Sinkコネクタが見つかる場合があります。Confluent Cloudを使用する場合は、**HTTP Sink**を選択してください。

1. 要件に基づいて**Credentials and Access**権限を選択します。


## Authentication

### コネクタのAuthenticationページで、必要な設定を入力します:

- **HTTP URL**: 例 `https://us01.records.in.treasuredata.com/support/kafka\_sample`
- フォーマットは`https://us01.records.in.treasuredata.com/DatabaseName/TableName`です
- 詳細については、[Importing Table Records Using the Data Ingestion API](/products/customer-data-platform/integration-hub/streaming/importing-table-records-using-the-data-ingestion-api)を参照してください。
- **SSL Protocol**: TLSv1.2
- **Enable Host verification**: false
- **Endpoint Authentication Type**: NONE


Master APIKEYを使用する場合、ユーザーIP ホワイトリストにConfluent clusterの静的IPアドレスを使用できます。

![](/assets/image2023-8-7_14-59-8.f4d331fc77c4daaab640937f11ed69f92d1b13f26473dcb8285f43257054a387.baa96da0.png)

## Configuration

以下の情報は、Treasure Data Ingestion APIの単一レコード用です。さらに、Treasure Dataには複数レコード用のパラメータも含まれています。続行する前に、どのユースケースが適切かを判断してください。

### 単一レコード用のパラメータ

- **Input Kafka record value format**:
  - 現在のトピック設定に基づいて適切な形式を選択してください
- **HTTP Request Method**: POST
- **HTTP Headers: Authorization:** ***TD1 XXXX/xxxxx|Content-Type:application/vnd.treasuredata.v1.single+json***
  - TD1の後にTreasure API Keyを記述し、その後にスペースとTreasure API Key |Content-Type:application/vnd.treasuredata.v1.single+jsonを追加してください
- **HTTP Headers Separator**: *|*
- **Behavior for null valued records**: ignore
- **Behavior on errors**: fail
  - 要件に応じて設定してください
- **Report errors as**: error_string
- **Request Body Format**: json
- **Batch json as array**: false
- **Batch max size**: 1


### 複数レコード用のパラメータ

- **Input Kafka record value format**:
  - 現在のトピック設定に基づいて適切な形式を選択してください
- **HTTP Request Method**: POST
- **HTTP Headers: Authorization:** ***TD1 XXXX/xxxxx|Content-Type:application/vnd.treasuredata.v1+json***
  - TD1の後にTreasure API Keyを記述し、その後にスペースとTreasure API Key |Content-Type:application/vnd.treasuredata.v1+jsonを追加してください
- **HTTP Headers Separator**: *|*
- **Behavior for null valued records**: ignore
- **Behavior on errors**: fail
  - 要件に応じて設定してください
- **Report errors as**: error_string
- **Request Body Format**: json
- **Batch json as array**: true
- **Batch max size**: 500
- **Batch prefix**: {"records":
- **Batch suffix**: }


![](/assets/image2023-8-7_15-15-15.d04cb24b67385114d0a42635e85e385910b1aab0c01a356ee8fe7b5cbafdcba3.baa96da0.png)

**Continue**を選択します。

## データの検証

設定が完了すると、コネクタが自動的に起動します。トピックページで、success_nameで始まるレコードの数を確認してください。

![](/assets/image2023-8-7_15-22-17.3bb1f180cb0f702370135ccbd5677c06b2cedf2cf8c0b0d9ea2d0080aa024b0a.baa96da0.png)

## Treasure Dataでのデータ可用性の確認

![](/assets/image2023-8-7_15-25-1.0977c0dec67aedb46750cf5807323b75dd2670dd23726466b6cb40a4cf49d2e1.baa96da0.png)

# Confluent PlatformとTreasure Data CDPのチュートリアル

Apache Kafkaはオープンソースの分散イベントストリーミングプラットフォームであり、Confluent PlatformはKafkaの商用ディストリビューションで、企業向けにデプロイメントと管理を簡素化する追加機能とツールを提供しています。自社の環境に適したプラットフォームを使用してください。両方のプラットフォームはHTTP Sink Connectorをサポートしており、Kafkaトピックからデータを取得し、外部のHTTPエンドポイントとしてTreasure Dataに送信することができます。このチュートリアルでは、Confluent PlatformおよびApache KafkaでHTTP Sink Connectorをセットアップする手順について説明します。

## 前提条件

- Confluent Platformの有料ライセンスまたはApache Kafkaクラスタ
- Confluent PlatformではConfluentがサポートするHTTP Sink Connectorが必要です（[リンク](https://confluent.cloud/environments/env-oqdr7y/clusters/lkc-gkjmv1/connectors/configure/kafka-connect-http)）
- Apache KafkaではKafkaコミュニティが保守するAivenのHTTP Sink Connectorが必要です（[リンク](https://github.com/Aiven-Open/http-connector-for-apache-kafka)）


Treasure Dataはこれらのコネクタに対する商用サポートを提供していません。

- Apache Kafkaの基本的な理解


## Kafka Connectフレームワークのインストール

Kafka Connectがインストールされていることを確認してください。このフレームワークは外部システムをKafkaに接続します。Confluent Platformを使用している場合、Kafka Connectはプリインストールされています。それ以外の場合は、別途ダウンロードしてインストールする必要があります。

- Apache Kafka Connect [https://kafka.apache.org/documentation/#connect](https://kafka.apache.org/documentation/#connect)
- Confluent Kafka Connect: [https://docs.confluent.io/platform/current/connect/index.html](https://docs.confluent.io/platform/current/connect/index.html)


## HTTP Sink Connectorプラグインのインストール

HTTPエンドポイントにデータを送信するには、HTTP Sink Connectorプラグインが必要です。お使いのKafka Connectバージョンと互換性のある特定のプラグインバージョンについては、Confluent Hubまたは他のソースを確認してください。その後、ダウンロードしたHTTP Sink Connectorプラグイン（JARファイル）を適切なKafka Connectプラグインディレクトリに配置します。

- Confluent Platform用にConfluent Hub経由でダウンロード: [https://docs.confluent.io/kafka-connectors/http/current/overview.html](https://docs.confluent.io/kafka-connectors/http/current/overview.html)
- Apache Kafka用にGitHub経由でダウンロード: [https://github.com/Aiven-Open/http-connector-for-apache-kafka](https://github.com/Aiven-Open/http-connector-for-apache-kafka)


## HTTP Sink Connectorの設定

HTTP Sink Connectorの設定を定義するための設定ファイル（JSONまたはproperties形式）を作成します。この設定ファイルには通常、以下のプロパティが含まれます：

- **name**: コネクタの一意の名前
- **connector.class**: HTTP Sink Connectorのクラス名（例：io.confluent.connect.http.HttpSinkConnector）
- **tasks.max**: コネクタに対して作成されるタスクの数（通常はKafkaパーティション数と同じ）
- **topics**: データが消費され、HTTPエンドポイントに送信されるKafkaトピック
- **http.api.url**: データが送信されるHTTPエンドポイントのURL
- **http.api.method**: リクエストを行う際に使用するHTTPメソッド（例：POST、PUTなど）
- **http.headers**: リクエストに含める追加のHTTPヘッダー
- **http.timeout.ms**: HTTPリクエストのタイムアウト


その他、使用ケースや要件に応じたオプションプロパティ。

これらの設定プロパティは、使用するKafka ConnectとHTTP Sink Connectorプラグインの特定のバージョンによって異なる場合があります。最新の情報と手順については、常に公式ドキュメントとプラグインのドキュメントを参照してください。

- Confluent Platform: [https://docs.confluent.io/kafka-connectors/http/current/connector_config.html](https://docs.confluent.io/kafka-connectors/http/current/connector_config.html)
- Apache Kafka [https://github.com/Aiven-Open/http-connector-for-apache-kafka/blob/main/docs/sink-connector-config-options.rst](https://github.com/Aiven-Open/http-connector-for-apache-kafka/blob/main/docs/sink-connector-config-options.rst)


### Confluent Platform

HTTP Sink Connectorの設定を定義するための設定JSONファイルを作成します。[https://docs.confluent.io/kafka-connectors/http/current/connector_config.html#http-sink-connector-configuration-properties](https://docs.confluent.io/kafka-connectors/http/current/connector_config.html#http-sink-connector-configuration-properties)のコネクタガイドラインに従ってください。

#### 単一行インジェストのJSON例


```json
{
    "name": "tdsinglejsonHttpSink",
    "config":
    {
      "topics": "rest-messages5",
      "connector.class": "io.confluent.connect.http.HttpSinkConnector",
      "http.api.url": "https://us01.records.in.treasuredata.com/database_name/table_name",
      "request.method": "POST",
      "headers": "Authorization:TD1 XXXXX/xxxxx|Content-Type:application/vnd.treasuredata.v1.single+json",
      "header.separator": "|",
      "report.errors.as": "error_string",
      "request.body.format": "json",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable": "false",
      "value.converter.schemas.enable": "false",
      "batch.prefix":"",
      "batch.suffix":"",
      "batch.max.size": "1",
      "batch.json.as.array": "false",
      "tasks.max": "1",
      "confluent.topic.bootstrap.servers": "localhost:9092",
      "confluent.topic.replication.factor": "1",
      "reporter.bootstrap.servers": "localhost:9092",
      "reporter.result.topic.name": "success-responses",
      "reporter.result.topic.replication.factor": "1",
      "reporter.error.topic.name": "error-responses",
      "reporter.error.topic.replication.factor": "1",
      "ssl.enabled":"true",
      "https.ssl.protocol":"TLSv1.2"
    }
}
```

#### 複数行取り込みJSONの例


```json
{
    "name": "tdBatchHttpSink",
    "config":
    {
      "topics": "rest-messages5",
      "connector.class": "io.confluent.connect.http.HttpSinkConnector",
      "http.api.url": "https://us01.records.in.treasuredata.com/DB/TBL",
      "request.method": "POST",
      "headers": "Authorization:TD1 XXXX/xxxx|Content-Type:application/vnd.treasuredata.v1+json",
      "header.separator": "|",
      "report.errors.as": "error_string",
      "request.body.format": "string",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter",
      "value.converter": "org.apache.kafka.connect.storage.StringConverter",
      "key.converter.schemas.enable": "false",
      "value.converter.schemas.enable": "false",
      "batch.prefix":"{\"records\":",
      "batch.suffix":"}",
      "batch.max.size": "500",
      "batch.json.as.array": "true",
      "tasks.max": "1",
      "confluent.topic.bootstrap.servers": "localhost:9092",
      "confluent.topic.replication.factor": "1",
      "reporter.bootstrap.servers": "localhost:9092",
      "reporter.result.topic.name": "success-responses",
      "reporter.result.topic.replication.factor": "1",
      "reporter.error.topic.name": "error-responses",
      "reporter.error.topic.replication.factor": "1",
      "ssl.enabled":"true",
      "https.ssl.protocol":"TLSv1.2"
    }
}
```

# Apache KafkaとTreasure Data CDPのチュートリアル

HTTP Sink Connectorの設定を定義するための設定JSONファイルを作成します。コネクタのガイドラインについては、[https://github.com/Aiven-Open/http-connector-for-apache-kafka/blob/main/docs/sink-connector-config-options.rst](https://github.com/Aiven-Open/http-connector-for-apache-kafka/blob/main/docs/sink-connector-config-options.rst)を参照してください。Confluent HTTP Sink Connectorがサポートしていたものと同様のパラメータをサポートしています。


```json
{
    "name": "tdCommunityHttpSink",
    "config":
    {
      "topics.regex": "td-rest-messages1",
      "connector.class": "io.aiven.kafka.connect.http.HttpSinkConnector",
      "http.url": "https://us01.records.in.treasuredata.com/database_name/table_name",
      "http.authorization.type": "none",
      "http.headers.content.type": "application/vnd.treasuredata.v1.single+json",
      "http.headers.additional": "Authorization: TD1 XXXX/xxxx",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable": "false",
      "value.converter.schemas.enable": "false",
      "batching.enabled": "false",
      "batch.max.size": "1",
      "batch.prefix":"{\"records\":[",
      "batch.suffix":"]}",
      "batch.separator": "n",
      "tasks.max": "1"
    }
}
```

## HTTP Sink Connectorの起動

設定をKafka Connectに送信してHTTP Sink Connectorを起動します。confluentコマンドラインツールを使用するか、Kafka Connect REST APIへREST API呼び出しを行うことができます。詳細については、[KAFKA CONNECT 101](https://developer.confluent.io/courses/kafka-connect/rest-api/)を参照してください。