# Kafka インポート連携とFluentd Consumer

Kafka用のFluentd consumerを使用して、KafkaからTreasure Dataにデータを送信できます。consumer自体に加えて、現在のセットアップによっては、いくつかの追加要件がある場合があります。

![](/assets/2020-kafka-import-integration.4fbf2e5d70d6e900644e1ca016b7bb1c74df7f0af9dbb3192095f881859d7d3f.0af5a2b9.png)

このドキュメントは、Ubuntu Precise Debianを想定しています。それ以外の場合は、オペレーティングシステムに応じてコマンドを選択する必要があります。

この機能はベータ版です。詳細については、[support@treasuredata.com](mailto:support@treasuredata.com)までお問い合わせください。

# 前提条件

- Treasure Dataの基本知識
- [Apache Kafka](https://kafka.apache.org/41/getting-started/quickstart/)のセットアップ、設定、およびプロデューサーとコンシューマーを含むアーキテクチャに関する基本知識
- Gradleでビルドする能力
- Linuxの実用的な知識
- [Fluentd](http://www.fluentd.org/)の実用的な知識
- 動作しているKafkaシングルノードまたはマルチブローカーのインストール


# Fluentdのインストールと設定

## プレインストール

[Fluentdのインストール](https://docs.fluentd.org/installation)の事前セットアップおよびインストール手順を確認してください。fluentdをセットアップする前に、以下を確認してください。

- ntpサーバーをインストールし、/etc/ntp.confに適切なrestrict値を設定
- ローカルクロックをNTPバックアップとして追加し、ログパラメータも設定
- NTPサービスデーモンを再起動
- /etc/security/limits.confで正しいファイルディスクリプタ数を設定


## Fluentdのインストール

Trustyの場合:


```
$ curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-trusty-td-agent3.sh | sh
```

他のディストリビューションについては、Treasure Agent [(td-agent)](/products/customer-data-platform/integration-hub/streaming/td-agent/about-treasure-data-s-server-side-agent)のインストールを参照してください。

## Fluentd.confの編集


```
$ sudo nano /etc/td-agent/td-agent.conf
```

Kafka用のFluentd Consumerを使用してイベントを転送します(fluentdはデフォルトポート24224からイベントを受信します)。そのため、設定ファイルは次のようになります。


```
source
  type forward
</source>

<match td.*.*>
  type tdlog
  apikey "#{ENV['TD_API_KEY']}"

  auto_create_table
  buffer_type file
  buffer_path /var/log/td-agent/buffer/td
  flush_interval 5s

  secondary
    type file
    path /var/log/td-agent/failed_records
  </secondary>
</match>
```

Treasure Data Write-Only APIキーをマシンの環境変数として設定してください。その後、Fluentdを再起動することをお勧めします。


```
$ /etc/init.d/td-agent restart
```

# Fluentd用Kafka Consumerのインストールと設定

## JARファイルのダウンロード

GitHubのリリースページから、プリコンパイルされたJARファイルをダウンロードします。

[https://github.com/treasure-data/kafka-fluentd-consumer/releases](https://github.com/treasure-data/kafka-fluentd-consumer/releases)

JARファイル名は`kafka-fluentd-consumer-x.y.z-all.jar`です。

## 設定ファイルのダウンロード

GitHubから2つの設定ファイルもダウンロードする必要があります。


```
$ curl -L https://raw.githubusercontent.com/treasure-data/kafka-fluentd-consumer/master/config/fluentd-consumer.properties -o fluentd-consumer.properties
$ curl -L https://raw.githubusercontent.com/treasure-data/kafka-fluentd-consumer/master/config/log4j.properties -o log4j.properties
```

- fluentd-consumer.properties: Kafkaコンシューマーとログ転送パラメータを設定するために使用
- log4j.properties: コンシューマー自体のログパラメータを設定するために使用


以下は、kafka-fluentd-consumerのファイルリストの例です。


```
$ ls
kafka-fluentd-consumer-0.2.4-all.jar    fluentd-consumer.properties    log4j.properties
```

## ホストでのJavaのアップグレード

少なくともJavaバージョン1.7を実行していることを確認してください。


```
$ java -version
```

そうでない場合は、次の手順を実行してください。


```
$ sudo add-apt-repository ppa:webupd8team/java
$ sudo apt-get update
$ sudo apt-get install oracle-java7-installer
$ sudo apt-get install oracle-java7-set-default
```

# Kafka Fluentd Consumerの実行

## Zookeeperの起動


```
$ bin/zookeeper-server-start.sh config/zookeeper.properties
```

## テストトピックの作成


```
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
```

## JSON形式での複数メッセージの送信

Kafka Fluentd Consumerでは、メッセージをJSON形式で送信する必要があります。


```
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
{"a": 1}
{"a": 1, "b": 2}
```

## コンシューマーを起動してメッセージを確認


```
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
```

次のように表示されるはずです。


```
{"a": 1}
{"a": 1, "b": 2}
```

## Kafka Fluentd Consumerの設定と実行

適切な設定でfluentd-consumer.propertiesを変更してください。以下のトピック名を、意図したTreasure Dataのテーブル名に変更してください。fluentd.consumer.topics=test。

次のコマンドを実行してコンシューマーを起動します。関連する.jarファイルと.propertiesファイルへのパスを調整する必要がある場合があります。


```
$ java -Dlog4j.configuration=file:///path/to/log4j.properties -jar kafka-fluentd-consumer-0.2.1-all.jar fluentd-consumer.properties
```

プロデューサーから追加のイベントを送信できます。ただし、それらは有効なJSON形式である必要があります。

## Treasure Dataでデータをクエリ

Treasure コンソールで次のクエリを実行します。


```
select * from test
```

「Query Result」で実行が成功したかどうかを確認できます。