# Delta Sharing Import Integration

This feature is in BETA version. For more information, contact your Customer Success Representative.

Delta sharingは、社内の顧客データを整理・統合し、マーケティング目的に最適なデータに変換することを支援します。このインテグレーションにより、Delta sharingデータをTreasure Dataにインポートできます。

## 前提条件

- Treasure Dataの基本知識
- [Delta Sharing](https://delta.io/sharing/) serverの基本知識


## 要件と制限事項

- Delta Sharing server上の情報にアクセスするための十分な権限を持つユーザーIDが必要です。
- Delta Sharing metastoresのAPIエンドポイントが必要です。
- Delta Sharing serverへのAPI呼び出しを認証するためのbearer tokenが必要です。


## Treasure DataのStatic IPアドレス

Treasure DataのStatic IPアドレスは、このインテグレーションのアクセスポイントおよび連携元となります。Static IPアドレスを確認するには、Customer Successの担当者またはテクニカルサポートにお問い合わせください。

## DatabricksからEndpointと認証Tokenを取得

このインテグレーションは、recipientsを使用したDelta Sharingオープン共有を使用してDatabricksのデータを読み取ることができます。

主要なエンドポイントは以下のとおりです:

- 共有のための新しいrecipientを作成する — [https://docs.databricks.com/en/delta-sharing/create-recipient.html](https://docs.databricks.com/en/delta-sharing/create-recipient#create-a-recipient-object-for-users-who-have-access-to-databricks-databricks-to-databricks-sharing)
- recipientにshareへのアクセス権を付与する — [https://docs.databricks.com/en/delta-sharing/create-recipient.html](https://docs.databricks.com/en/delta-sharing/create-recipient.html#create-a-recipient-object-for-users-who-have-access-to-databricks-databricks-to-databricks-sharing)
- エンドポイントとtokenを含むアクセス情報ファイルをダウンロードする — [https://docs.databricks.com/en/delta-sharing/create-recipient.html](https://docs.databricks.com/en/delta-sharing/create-recipient.html#create-a-recipient-object-for-users-who-have-access-to-databricks-databricks-to-databricks-sharing)
- tokenの有効期限を延長する — [https://docs.databricks.com/en/delta-sharing/create-recipient.html](https://docs.databricks.com/en/delta-sharing/create-recipient.html#create-a-recipient-object-for-users-who-have-access-to-databricks-databricks-to-databricks-sharing)


## Treasure コンソールからDelta Sharing Serverをインポート

### 認証の作成

最初のステップは、認証情報のセットを使用して新しい認証を作成することです。

1. **Integrations Hub**を選択します。
2. **Catalog**を選択します。


![](/assets/integrationshub-catalog2.e33c0a4c7d81c40cc83dd056c2143b97b1406220e213cab14ef349d69412ffef.7705d4c2.png)
3. CatalogでDelta Sharingを検索し、アイコンの上にマウスを置いて**Create Authentication**を選択します。

![](/assets/deltasharing.98f59c95c397048249583f946418382764540b40fbccdd143b278988d420afc7.94e6bd79.png)
4. **Credentials**タブが選択されていることを確認し、インテグレーションの認証情報を入力します。

![](/assets/newauthdeltasharing.f72c2d2e18026ab8d3b79405c32e0b912c29c96511009ec92d588b59f5512283.7705d4c2.png)

#### **新規認証のフィールド**

| パラメータ | 説明 |
|  --- | --- |
| Endpoint | delta-sharing serverへのAPIエンドポイント |
| Bearer Token | delta-sharing server APIにアクセスするためのtoken |


1. **Continue**を選択します。
2. 認証の名前を入力し、**Done**を選択します。


### Sourceの作成

1. Treasure コンソールを開きます。
2. **Integrations Hub** > **Authentications**に移動します。
3. 新しく作成した認証を見つけて、**New Source**を選択します。


Create Sourceページが表示され、Source Tableタブが選択された状態になります。

#### 接続の作成

1. Data Transfer Nameフィールドにソース名を入力します。
2. データ転送に使用する認証の名前を入力します。
3. **Next**を選択します。


![](/assets/deltasharing_connection.1532b8ae5d891d8187702b1f33c9e2872aa8b930bcb4009b576888d9e81c9c44.7705d4c2.png)

Create Sourceページが表示され、**Source Table**タブが選択された状態になります。

#### Sourceテーブルの指定

1. パラメータを編集します


![](/assets/deltasharing_sourcetable.b8907baf314bcfb81f5c247c0d55474171df1d9f2da57de79b2b48b85aca8f7e.7705d4c2.png)

| パラメータ | 必須 | 説明 |
|  --- | --- | --- |
| Share | はい | Share名 |
| Schema | はい | Schema名 |
| Table | はい | Table名 |
| Default Timezone | はい | タイムスタンプ形式のデフォルトのタイムゾーン |


1. **Next**を選択します。


**Data Settings**タブが選択された状態でCreate Sourceページが表示されます。

#### Data Settingsの指定

1. パラメータを編集します。


![](/assets/deltasharing_datasettings.c7efd5bb9e32e027d1d2f15a6bac15c29d300893ff9fb45d1917306d6c6645f3.7705d4c2.png)

| パラメータ | 必須 | 説明 |
|  --- | --- | --- |
| Retry Limit | いいえ | リトライ回数 |
| Initial Retry Interval In Millis | いいえ | 初期リトライ間隔（ミリ秒） |
| Max Retry Wait In Millis | いいえ | 最大リトライ待機時間（ミリ秒） |
| HTTP Connect Timeout In Millis | いいえ | HTTPタイムアウト（ミリ秒） |
| HTTP Read Timeout In Millis | いいえ | HTTP読み取りタイムアウト（ミリ秒） |
| HTTP Write Timeout In Millis | いいえ | HTTP書き込みタイムアウト（ミリ秒） |


1. **Next**を選択します。


**Data Preview**タブが選択された状態でCreate Sourceページが表示されます。

### Data Preview

インポートを実行する前に、Generate Preview を選択してデータの[プレビュー](/products/customer-data-platform/integration-hub/batch/import/previewing-your-source-data)を表示できます。Data preview はオプションであり、選択した場合はダイアログの次のページに安全にスキップできます。

1. **Next** を選択します。Data Preview ページが開きます。
2. データをプレビューする場合は、**Generate Preview** を選択します。
3. データを確認します。


### Data Placement

データの配置について、データを配置したいターゲット database と table を選択し、インポートを実行する頻度を指定します。

1. **Next** を選択します。Storage の下で、インポートされたデータを配置する新しい database を作成するか、既存の database を選択し、新しい table を作成するか、既存の table を選択します。
2. **Database** を選択 > **Select an existing** または **Create New Database** を選択します。
3. オプションで、database 名を入力します。
4. **Table** を選択 > **Select an existing** または **Create New Table** を選択します。
5. オプションで、table 名を入力します。
6. データをインポートする方法を選択します。
  - **Append** (デフォルト) - データインポートの結果は table に追加されます。
table が存在しない場合は作成されます。
  - **Always Replace** - 既存の table の全体の内容をクエリの結果出力で置き換えます。table が存在しない場合は、新しい table が作成されます。
  - **Replace on New Data** - 新しいデータがある場合のみ、既存の table の全体の内容をクエリの結果出力で置き換えます。
7. **Timestamp-based Partition Key** 列を選択します。
デフォルトキーとは異なるパーティションキーシードを設定したい場合は、long または timestamp 列をパーティショニング時刻として指定できます。デフォルトの時刻列として、add_time フィルターで upload_time を使用します。
8. データストレージの **Timezone** を選択します。
9. **Schedule** の下で、このクエリを実行するタイミングと頻度を選択できます。


#### 一度だけ実行

1. **Off** を選択します。
2. **Scheduling Timezone** を選択します。
3. **Create & Run Now** を選択します。


#### 定期的に繰り返す

1. **On** を選択します。
2. **Schedule** を選択します。UI では、*@hourly*、*@daily*、*@monthly*、またはカスタム *cron* の 4 つのオプションが提供されます。
3. **Delay Transfer** を選択して、実行時間の遅延を追加することもできます。
4. **Scheduling Timezone** を選択します。
5. **Create & Run Now** を選択します。


転送が実行された後、**Data Workbench** > **Databases** で転送の結果を確認できます。

## Workflowを使用してDelta Sharing Serverからインポート

Workflowのtd_load>:オペレータを使用して、Delta Sharing serverからデータをインポートできます。すでにSourceを作成している場合は実行できます。Sourceを作成したくない場合は、.ymlファイルを使用してインポートできます。

### Sourceの実行

1. Sourceを特定します。
2. 一意のIDを取得するには、Sourceリストを開き、**Delta Sharing**でフィルタリングします。
3. メニューを開き、**Copy Unique ID**を選択します。


![](/assets/image2021-10-12_12-26-58.09d9b84b0f1f752c7c95b0bc1c2d8e8b7302e5b91c6a3cb5f01309dadf53a604.5f7e1b55.png)
4. td_load>:オペレータを使用してworkflowタスクを定義します。


```yaml
+load:
   td_load>: unique_id_of_your_source
   database: ${td.dest_db}
   table: ${td.dest_table}
```

1. Workflowを実行します。


**パラメータリファレンス**

| Name | Description | Value | Default Value | Required |
|  --- | --- | --- | --- | --- |
| endpoint | Delta Sharing serverのエンドポイント |  |  | true |
| token | API連携を認証するためのBearerトークン |  |  | true |
| share | 接続するShare名 |  |  | true |
| schema | 接続するSchema名 |  |  | true |
| table | 接続するTable名 |  |  | true |
| default_timezone | タイムスタンプ形式のデフォルトのタイムゾーン |  | UTC | true |
| retry_limit | 最大リトライ回数 |  | 6 | false |
| retry_initial_wait_msecs | リトライの初期待機時間（ミリ秒） |  | 30000 | false |
| max_retry_wait_msecs | 最大リトライ待機時間（ミリ秒） |  | 120000 | false |
| connection_timeout_msecs | HTTP接続タイムアウト（ミリ秒） |  | 1800000 | false |
| write_timeout_msecs | HTTP読み取り接続タイムアウト（ミリ秒） |  | 1800000 | false |
| read_timeout_msecs | HTTP書き込み接続タイムアウト（ミリ秒） |  | 1800000 | false |


#### Workflowコードのサンプル

Workflowコードのサンプルについては、[Treasure Boxes](https://github.com/treasure-data/treasure-boxes/tree/master/td_load/s3)をご覧ください。

## CLI（Toolbelt）を使用してDelta Sharing serverからインポート

統合を設定する前に、最新の[TD Toolbelt](https://toolbelt.treasuredata.com/)をインストールしてください。

### シード設定ファイル (seed.yml) の作成


```yaml
in:
  type: delta_sharing
  endpoint: http://endpoint.com
  token: ***
  schema: schema
  table: table
  share: share
  retry_limit: 7
  retry_initial_wait_msecs: 30000
  max_retry_wait_msecs: 120000
  connection_timeout_msecs: 1800000
  write_timeout_msecs: 1800000
  read_timeout_msecs: 1800000
out:
  mode: append
```

**パラメータリファレンス**

| 名前 | 説明 | 値 | デフォルト値 | 必須 |
|  --- | --- | --- | --- | --- |
| endpoint | Delta Sharingエンドポイント |  |  | true |
| token | API連携の認証に使用するBearer token |  |  | true |
| share | 接続するshare名 |  |  | true |
| schema | 接続するschema名 |  |  | true |
| table | 接続するtable名 |  |  | true |
| default_timezone | タイムスタンプ形式のデフォルトタイムゾーン。短縮形とフルゾーンIDの両方をサポート |  | UTC | true |
| retry_limit | 最大リトライ回数 |  | 6 | false |
| retry_initial_wait_msecs | リトライの初期待機時間(ミリ秒) |  | 30000 | false |
| max_retry_wait_msecs | 最大リトライ待機時間(ミリ秒) |  | 120000 | false |
| connection_timeout_msecs | HTTP接続タイムアウト(ミリ秒) |  | 1800000 | false |
| write_timeout_msecs | HTTP読み取り接続タイムアウト(ミリ秒) |  | 1800000 | false |
| read_timeout_msecs | HTTP書き込み接続タイムアウト(ミリ秒) |  | 1800000 | false |


Delta Sharing連携は、指定されたprefixに一致するすべてのファイルをインポートします。

**例**

path_prefix: path/to/sample_ –> path/to/sample_201501.csv.gz, path/to/sample_201502.csv.gz, …, path/to/sample_201505.csv.gz

### load.yml の生成

connector:guessを使用します。このコマンドは、ソースファイルを自動的に読み取り、ロジックを使用してファイル形式とそのフィールドおよびカラムを推測します。


```
$ td connector:guess seed.yml -o load.yml
```

load.ymlを開いて、ファイル形式、エンコーディング、カラム名、型などのファイル形式定義を確認できます。

**例**


```yaml
in:
  type: delta_sharing
  endpoint: http://endpoint.com
  token: ***
  schema: schema
  table: table
  share: share
  retry_limit: 7
  retry_initial_wait_msecs: 30000
  max_retry_wait_msecs: 120000
  connection_timeout_msecs: 1800000
  write_timeout_msecs: 1800000
  read_timeout_msecs: 1800000
out:
  mode: append
```

データをプレビューするには、td connector:previewコマンドを使用します。


```
$ td connector:preview load.yml
+-------+---------+----------+---------------------+
| id | company | customer | created_at |
+-------+---------+----------+---------------------+
| 11200 | AA Inc. | David | 2015-03-31 06:12:37 |
| 20313 | BB Imc. |Tom | 2015-04-01 01:00:07 |
| 32132 | CC Inc. | Fernando | 2015-04-01 10:33:41 |
| 40133 | DD Inc. | Cesar | 2015-04-02 05:12:32 |
| 93133 | EE Inc. |  Jake | 2015-04-02 14:11:13 |
+-------+---------+----------+---------------------+
```

guessコマンドは、ソースデータファイルに3行以上、2列以上のデータが必要です。これは、コマンドがソースデータのサンプル行を使用して列定義にアクセスするためです。

システムが予期しない列名または列タイプを検出した場合は、load.ymlファイルを修正して再度プレビューしてください。

### ロードジョブの実行

データのサイズによっては、ジョブの実行に数時間かかる場合があります。データを保存するTreasure Dataのデータベースとテーブルを必ず指定してください。

Treasure Dataのストレージは時間によってパーティション分割されているため（[data partitioning](https://docs.treasuredata.com/smart/project-product-documentation/data-partitioning-in-treasure-data)を参照）、*--time-column*オプションの指定を推奨します。このオプションが指定されていない場合、データ統合は最初の*long*または*timestamp*列をパーティション分割時間として選択します。*--time-column*で指定する列のタイプは、*long*型または*timestamp*型である必要があります。

データに時間列がない場合は、*add_time*フィルタオプションを使用して時間列を追加できます。詳細については、[add_time filter plugin](https://docs.treasuredata.com/smart/project-product-documentation/add_time-filter-function)を参照してください。


```bash
$ td connector:issue load.yml --database td_sample_db --table td_sample_table \
--time-column created_at
```

connector:issueコマンドは、データベース(*td_sample_db)*とテーブル(td_sample_table)が既に作成されていることを前提としています。TDにデータベースまたはテーブルが存在しない場合、このコマンドは失敗します。データベースとテーブルを手動で作成するか、td connector:issueコマンドで--auto-create-tableオプションを使用してデータベースとテーブルを自動作成してください。


```
$ td connector:issue load.yml --database td_sample_db --table td_sample_table
 --time-column created_at --auto-create-table
```

データ統合はサーバー側でレコードをソートしません。時間ベースのパーティション分割を効果的に使用するには、事前にファイル内のレコードをソートしてください。

timeという名前のフィールドがある場合は、--time-columnオプションを指定する必要はありません。


```
$ td connector:issue load.yml --database td_sample_db --table td_sample_table
```

### インポートモード

load.ymlファイルのout:セクションでファイルインポートモードを指定できます。out:セクションは、Treasure Dataテーブルへのデータのインポート方法を制御します。たとえば、Treasure Dataの既存のテーブルにデータを追加するか、データを置き換えるかを選択できます。

| **モード** | **説明** | **例** |
|  --- | --- | --- |
| Append | レコードがターゲットテーブルに追加されます。 | in:    ...  out:    mode: append |
| AlwaysReplace | ターゲットテーブル内のデータを置き換えます。ターゲットテーブルに対して手動で行われたスキーマ変更はそのまま保持されます。 | in:    ...  out:    mode: replace |
| Replace on new data | インポートする新しいデータがある場合にのみ、ターゲットテーブル内のデータを置き換えます。 | in:    ...  out:    mode: replace_on_new_data |


### 実行のスケジュール設定

定期的なデータ統合の実行をスケジュール設定できます。

Treasure Dataは、高可用性を確保するためにスケジューラーを慎重に設定しています。

### TD Toolbeltを使用したスケジュールの作成

td connector:createコマンドを使用して新しいスケジュールを作成できます。


```
$ td connector:create daily_import "10 0 * * *" \
 td_sample_db td_sample_table load.yml
```

Treasure Dataのストレージは時間によってパーティション分割されているため（[data partitioning](https://docs.treasuredata.com/smart/project-product-documentation/data-partitioning-in-treasure-data)も参照）、--time-columnオプションの指定を推奨します。


```
$ td connector:create daily_import "10 0 * * *" \
 td_sample_db td_sample_table load.yml \
 --time-column created_at
```

cronパラメータは、@hourly、@daily、@monthlyの3つの特殊オプションも受け入れます。

デフォルトでは、スケジュールはUTCタイムゾーンで設定されます。-tまたは--timezoneオプションを使用して、タイムゾーンでスケジュールを設定できます。--timezoneオプションは、'Asia/Tokyo'、'America/Los_Angeles'などの拡張タイムゾーン形式のみをサポートしています。PST、CSTなどのタイムゾーン略語はサポートされておらず、予期しないスケジュールにつながる可能性があります。