# Snowflake Bulk Unloading インポート インテグレーション

Snowflake はクラウドベースのデータプラットフォームを提供し、ほぼ無制限のスケール、同時実行性、パフォーマンスでデータの価値を引き出すことができます。Treasure Data の Snowflake Bulk Unloading インテグレーションは、Snowflake の[Bulk Data Unloading](https://docs.snowflake.com/en/guides-overview-unloading-data) 機能を活用して、Snowflake テーブルから Treasure Data へ大規模データセットをインポートするための高パフォーマンスソリューションを提供します。このインテグレーションは、大規模データセットに対して標準の Snowflake インポートインテグレーションと比較して最大 **300% のパフォーマンス向上** を実現します。また、[Treasure Data の Snowflake インポート インテグレーション](/ja/int/snowflake-import-integration)および[Snowflake エクスポート インテグレーション](/ja/int/snowflake-export-integration)についても詳しく確認できます。

This feature is in BETA version. For more information, contact your Customer Success Representative.

## 前提条件

- Treasure Data の基本知識
- テーブル/ビューへのクエリおよびステージ管理の適切な権限を持つ Snowflake データウェアハウスの既存アカウント
- 大規模データセットの場合、最適なパフォーマンスのために適切なリソース割り当て（特に CPU）を確保してください


## Treasure コンソール を使用して接続を作成する

### 新しい接続を作成

Integrations Hub > Catalog に移動して検索します。**Snowflake Bulk Unload** を選択します。

![](/assets/snowflake.9de61a1e773ed9029928a822ac86942f3852e822e77da3adc5a217c957908eb7.70d1d129.png)

### 新しい Snowflake Bulk Unload Connector を作成

以下のダイアログが開きます。

![](/assets/newauth.d6513201a167216ea22999e661dc2549bde18ef90b069847ac43f70dd233a892.70d1d129.png)

認証方法を選択します:

- **Basic**: Treasure Data が Snowflake に認証するための必要な認証情報を入力します: **User**、**Password**、**Account**
  - **User**: Snowflake ログインユーザー名
  - **Password**: Snowflake ログインパスワード
- **Key Pair**: 暗号化された秘密鍵の場合は **Private Key** とその **Passphrase** を入力します
  - **Private Key**: 生成された秘密鍵。[configuring-key-pair-authentication](https://docs.snowflake.com/en/user-guide/key-pair-auth) を参照してください
  - **Passphrase**: 秘密鍵のパスフレーズ。秘密鍵が暗号化されていない場合は空欄のままにします
  - **User**: Snowflake ログインユーザー名
- **Account**: Snowflake から提供されたアカウント名。[Snowflake でアカウント名を見つける方法](https://docs.snowflake.com/en/user-guide/admin-account-identifier)を参照してください
- **OPTIONS**: JDBC 接続オプション（ある場合）


**Continue** を選択します。

![](/assets/auth.635f55c0b6096eb82d5dc551ed7d9d4dd119753acda0363863934e763506a3b5.70d1d129.png)

データコネクタの名前を指定します。認証を他のユーザーと共有するかどうかを指定します。共有すると、他のチームメンバーがあなたの認証を使用してコネクタソースを作成できます。**Done** を選択します。

### Snowflake データを Treasure Data に転送

Authentications ページで **New Source** を作成します。
![](/assets/newsource.d8890eedac2a92827d452261986172ccb8a8f634de468fbfafac6d695ac2308f.70d1d129.png)
**Connection** に名前を入力し、**Next** をクリックします。

**Source Table** の詳細を入力します。
![](/assets/sourcetable.17f84d9909b1e59d78d3ae4cb8e21a17d12387059ae31675f38492cd57613981.70d1d129.png)

### Source Table

取り込みたい情報を登録する必要があります。パラメータは以下の通りです:

- **Role Name**: Snowflake で使用するデフォルトのアクセス制御ロールを指定します。デフォルトの [Role](https://docs.snowflake.com/en/user-guide/admin-user-management#user-roles) を使用する場合は空欄のままにします
- **Warehouse**: （必須）使用する仮想ウェアハウスを指定します
- **Database**: （必須）Snowflake データベース
- **Schema**: （必須）接続時に指定されたデータベースのデフォルトスキーマを指定します
- **Source Type**: **Table/View** または **Query** を選択します
  - **Query**:
    - **SELECT Query**: 実行するRAW SQLクエリ。**SELECT** タイプのクエリのみ許可されます。INSERT、UPDATE、またはデータ変更クエリは使用できません
  - **Table/View**（デフォルト）:
    - **SELECT Columns**: （必須）Table/View からすべての列を選択する場合はカンマ区切りの列名または `*` を使用します
    - **From Table/View**: （必須）転送先テーブル名
    - **WHERE Condition**: 行をフィルタリングするための WHERE 条件を指定します
    - **ORDER By Columns**: 行をソートするための ORDER BY の式
- **Incremental Loading**: パフォーマンス最適化を備えた増分ロードを有効にします
  - **Incremental Column(s)**: 増分ロードのための列名。**Timestamp** および **Numeric** 列を指定できます。指定しない場合、Primary 列（存在する場合）が使用されます
  - **ORDER BY Columns:** **Incremental Loading** を選択した場合は使用できません
- **Invalid Value Handling Mode:** テーブルに無効なデータが含まれる行がある場合、**Fail Job** または **Ignore Row** のオプションがあります。**Fail Job** を選択すると、現在のジョブが停止し ERROR ステータスになります。**Ignore Row** を選択すると、無効な値を含む行を無視して処理を継続します。指定しない場合、デフォルトで **Fail Job** が選択されます


指定した **Warehouse**、**Database**、**Schema** は事前に存在している必要があります。指定した **Role** は、指定したウェアハウス、データベース、スキーマ、テーブル/ビューへの権限を持っている必要があります。

#### 設定例


```
Warehouse: DEMO_WH
Database: Test_DB
Schema: PUBLIC
Source Type: Query selected
SELECT Query: SELECT column1, column2, column3
              FROM table_test
              WHERE column4 != 1
Incremental Loading: Checked
Incremental Column(s): column1
```

#### Table View


```
Warehouse: DEMO_WH
Database: Test_DB
Schema: PUBLIC
Source Type: Table/View selected
SELECT Columns: column1, column2, column3
FROM Table/View: table_test
WHERE Condition: column4 != 1
ORDER By Columns:
Incremental Loading: Checked
Incremental Column(s): column1
```

**Next** を選択すると、データの詳細設定が表示されます

### Data Settings

Data Settings では転送のカスタマイズができます。必要に応じて以下のセクションを編集してください。

![](/assets/datasettings.7dfdca92e3f99f30f645b63a08d8200f72fbb09dea91dda3465112b649036c81.70d1d129.png)

#### パフォーマンスオプション

- **Stage File Size**: 個々のステージファイルのサイズをバイト単位で制御します。デフォルトは 100 MB（104,857,600 バイト）です。ファイルサイズを大きくすると非常に大きなデータセットのパフォーマンスが向上する場合がありますが、より多くのメモリが必要になる場合があります（500 MB ファイルは 6GB のデータセットでテスト済みで良好な結果が得られています）


#### Columns Settings

列は実際のデータセットから推測されます。列のデータタイプが実際のデータと一致しない場合は変更できます。

- **Column Name**: Snowflake の列名（変更しないでください）
- **Data Type**: 転送先のデータタイプを選択します（string、boolean、timestamp、double、long、json）
- **Timestamp Format**: 転送先タイプが string の場合のタイムスタンプ列のフォーマットパターン。例: *%Y-%m-%dT%H:%M:%S.%L%z*


#### START AFTER（値）

- **Start After (values)**: Incremental Loading が選択されている場合、この値を指定すると、この値より大きいデータのみがインポートされます。このフィールドのサイズと順序は **Incremental Columns** と同じでなければなりません


**Network Timeout**: エラーを返す前に Snowflake サービスとのやり取りでレスポンスを待つ時間を指定します。

**Query Timeout**: エラーを返す前にクエリの完了を待つ時間を指定します。ゼロ（0）は無制限に待機することを示します。

**Next** を選択すると、データの[プレビュー](https://docs.treasuredata.com/smart/project-product-documentation/about-data-preview)が表示されます。

### プレビュー

プレビューは Snowflake の [SAMPLE](https://docs.snowflake.com/en/sql-reference/constructs/sample) 機能を使用して、データセット全体を処理せずにサンプル行を効率的に表示します。これは特に大きなテーブルで有効です。

**Order By** 列を使用しているがインデックスが付いていない大きなテーブルへのクエリなど、クエリの完了に時間がかかる場合、プレビューにダミーデータが表示される場合があります。

複数の条件、テーブルの結合を含む複雑なクエリを使用する場合、プレビューに「**No records to preview**」と表示されることがありますが、ジョブの実行結果には影響しません。[プレビューの詳細](https://docs.treasuredata.com/smart/project-product-documentation/about-data-preview)を参照してください。

![](/assets/datapreview.5f170217c8ff6027af056517ee8cadc6977faaa2ef4e1f2cce7cae706422a1bc.70d1d129.png)

### Data Placement - 転送先データベースとテーブルを選択

#### ストレージ

インポートしたいデータベースとテーブルを既存のものから選択するか、新規作成します。

- **Method**: Append または Replace。既存のテーブルにレコードを**追加**するか、既存のテーブルを**置き換える**かを選択します
- **Timestamp-based Partition Key**: パーティショニング時間として long または timestamp 列を選択します。デフォルトの時間列として、add_time フィルターを使用した upload_time が使用されます


![](/assets/dataplacement.b572b786a9a888637d7384bba188cb6ae53af531b643583a6a93c4c6288dea5f.70d1d129.png)

#### スケジュール

1回限りの転送を指定するか、自動繰り返し転送をスケジュールすることができます。

パラメータ

- **Once now**: 1回限りのジョブを設定します
- **Repeat…**
  - **Schedule**: オプション: *@hourly*、*@daily*、*@monthly*、カスタム *cron*
  - **Delay Transfer**: 実行時間の遅延を追加します
  - **Scheduling TimeZone**: 'Asia/Tokyo' などの拡張タイムゾーン形式をサポートします


![](/assets/schedule.0da876ce7e1db3c17434bca3b1b4f37ba34cdd0003cd73ed8a40779e0b9c1d08.70d1d129.png)

## CLI を使用してコネクタを設定する

コネクタを設定する前に、'td' コマンドをインストールしてください。[Treasure Data Toolbelt](https://docs.treasuredata.com/smart/project-product-documentation/installing-and-updating-td-toolbelt-and-treasure-agent) をインストールしてください。

### seed.yml ファイルの準備

in: セクションでは Snowflake からコネクタに入ってくるデータを指定し、out: セクションでは Treasure Data のデータベースにコネクタが出力するデータを指定します。

Snowflake アカウントのアクセス情報を以下のように入力します:


```yaml
in:
  type: snowflake_unload
  account_name: treasuredata
  user: Test_user
  password: xxxx
  warehouse: DEMO_WH
  db: TEST_DB
  schema: PUBLIC
  query: |
    SELECT column1, column2, column3
    FROM table_test
    WHERE column4 != 1
  invalid_value_option: ignore_row
out:
  mode: append
```

設定キーと説明は以下の通りです:

| 設定キー | タイプ | 必須 | 説明 |
|  --- | --- | --- | --- |
| type | string | yes | コネクタのタイプ（"snowflake_unload" である必要があります） |
| account_name | string | yes | アカウント名を指定します（Snowflake から提供されます）。Snowflake の[アカウント名](https://docs.snowflake.com/en/user-guide/admin-account-identifier)を参照してください |
| warehouse | string | yes | 使用する仮想ウェアハウスを指定します |
| db | string | yes | 使用する Snowflake のデフォルトデータベース |
| schema | string | yes | 接続後に指定されたデータベースに使用するデフォルトスキーマを指定します |
| role | string | no | ドライバーによって開始された Snowflake セッションで使用するデフォルトのアクセス制御ロール |
| auth_method | string | yes | 認証方法。現在サポート: basic、key_pair（デフォルト "basic"） |
| private_key | string | no | **auth_method** が **key_pair** の場合は必須 |
| passphrase | string | no | private_key のパスフレーズ |
| user | string | yes | Snowflake ログイン名 |
| password | string | no | **auth_method** が **basic** の場合は必須。指定したユーザーのパスワード |
| query | string | no | 実行する SQL クエリ |
| incremental | boolean | no | true の場合、ORDER BY の代わりに最大値比較を使用した増分ロードを有効にします |
| incremental_columns | strings array | no | 増分ロードの列名。指定しない場合は Primary キー列が使用されます |
| invalid_value_option | string | no | 行の無効なデータを処理するオプション。使用可能なオプション: **fail_job** および **ignore_row** |
| last_record | objects array | no | 増分ロードの最後のレコードの値 |
| stage_file_size | integer | no | ステージファイルサイズ（バイト単位）。デフォルト 104857600（100 MB） |
| network_timeout | integer | no | エラーを返す前に Snowflake サービスとのやり取りでレスポンスを待つ時間。デフォルト 0 |
| query_timeout | integer | no | エラーを返す前にクエリの完了を待つ時間。デフォルト 0 |
| select | string | no | select の式。'query' が設定されていない場合は必須 |
| table | string | no | 転送先テーブル名。'query' が設定されていない場合は必須 |
| where | string | no | 行をフィルタリングするための条件 |
| order_by | string | no | 行をソートするための ORDER BY の式 |
| columns | array | no | タイプとフォーマット仕様を含む列設定。列は Snowflake のサンプルデータに基づいて推測されます |
| stage_folder | string | no | 内部ステージフォルダの設定（自動管理） |


### スキーマ設定の推測

コネクタを実行する前に、`td connector:guess` コマンドを使用して列タイプを自動検出し、完全な設定を生成することをお勧めします。このコマンドは Snowflake からデータをサンプリングして最適な列設定を推測します。


```bash
td connector:guess seed.yml -o load.yml
```

これにより、以下を含む完全な `load.yml` ファイルが生成されます:

- 検出された列名とタイプ
- Snowflake データタイプに基づく最適なデータタイプマッピング
- 一括アンロードのパフォーマンス設定


guess コマンド実行後に生成される *load.yml* の例:


```yaml
in:
  type: snowflake_unload
  account_name: treasuredata
  user: snowflake_user
  password: xxxxx
  warehouse: DEMO_WH
  db: TEST_DB
  schema: PUBLIC
  query: |
    SELECT column1, column2, column3
    FROM table_test
    WHERE column4 != 1
  incremental: true
  incremental_columns: [column1, column3]
  last_record: [140, 1500.5]
  stage_file_size: 104857600
  invalid_value_option: ignore_row
  columns:
  - name: column1
    type: long
  - name: column2
    type: string
  - name: column3
    type: timestamp
    format: "%Y-%m-%dT%H:%M:%S.%L%z"
out:
  mode: append
```

使用可能な *out* モードの詳細については、[modes](/ja/int/snowflake-import-integration#h2_1159725868) を参照してください。

データをプレビューするには、preview コマンドを実行します


```bash
td connector:preview load.yml
+--------------+-----------------+----------------+
| COLUMN1:long | COLUMN2:string  | COLUMN3:double |
+--------------+-----------------+----------------+
| 100          | "Sample value1" | 1900.1         |
| 120          | "Sample value3" | 1700.3         |
| 140          | "Sample value5" | 1500.5         |
+--------------+-----------------+----------------+
```

### ロードジョブの実行

ロードジョブを送信します。データのサイズによっては数時間かかる場合があります。データが保存されているデータベースとテーブルを指定する必要があります。

Treasure Data のストレージは時間でパーティション分割されているため、--time-column オプションを指定することをお勧めします。オプションが指定されない場合、データコネクタは最初の *long* または *timestamp* 列をパーティショニング時間として選択します。*--time-column* で指定される列のタイプは *long* または *timestamp* タイプのいずれかである必要があります。

データに時間列がない場合は、*add_time* フィルターオプションを使用して追加できます。詳細については、[add_time フィルタープラグイン](https://docs.treasuredata.com/smart/project-product-documentation/add_time-filter-function)を参照してください。


```bash
td connector:issue load.yml --database td_sample_db --table td_sample_table \
--time-column created_at
```

td connector:issue は *database(td_sample_db)* と *table(td_sample_table)* がすでに作成されていることを前提としています。データベースまたはテーブルが TD に存在しない場合、td connector:issue は失敗します。そのため、データベースとテーブルを手動で作成するか、*td connector:issue* コマンドの *--auto-create-table* オプションを使用してデータベースとテーブルを自動的に作成してください。


```bash
td connector:issue load.yml \
--database td_sample_db --table td_sample_table \
--time-column created_at --auto-create-table
```

*time* というフィールドがある場合は、*--time-column* オプションを指定する必要はありません。


```bash
td connector:issue load.yml --database td_sample_db --table td_sample_table
```

### スケジュール実行

定期的な Snowflake 一括アンロードインポートのために、データコネクタの定期実行をスケジュールすることができます。

#### スケジュールの作成

*td connector:create* コマンドを使用して新しいスケジュールを作成できます。以下が必要です:

- スケジュールの名前
- cron 形式のスケジュール
- データが保存されるデータベースとテーブル
- Data Connector 設定ファイル



```bash
td connector:create \
    daily_bulk_import \
    "10 0 * * *" \
    td_sample_db \
    td_sample_table \
    load.yml
```

Treasure Data のストレージは時間でパーティション分割されているため、*--time-column* オプションを指定することもお勧めします（[データパーティショニング](https://docs.treasuredata.com/smart/project-product-documentation/data-partitioning-in-treasure-data)も参照してください）。


```bash
td connector:create \
    daily_bulk_import \
    "10 0 * * *" \
    td_sample_db \
    td_sample_table \
    load.yml \
    --time-column created_at
```

`cron` パラメータは `@hourly`、`@daily`、`@monthly` の3つの特別なオプションも受け付けます。

デフォルトでは、スケジュールは UTC タイムゾーンで設定されます。-t または --timezone オプションを使用してタイムゾーンを指定してスケジュールを設定できます。`--timezone` オプションは 'Asia/Tokyo'、'America/Los_Angeles' などの拡張タイムゾーン形式のみをサポートします。PST、CST などのタイムゾーン略語はサポートされておらず、予期しないスケジュールが設定される可能性があります。

### スケジュールの一覧表示

*td connector:list* コマンドを実行することで、スケジュールされたエントリの一覧を確認できます。


```bash
td connector:list
```

## 付録

### SSL のサポート

Snowflake への接続は、Snowflake の[公式 JDBC ドライバー](https://docs.snowflake.com/en/user-guide/jdbc-download)を介して行われます。JDBC ドライバーはデフォルトかつ必須として SSL の使用を強制します（つまり、SSL = false の接続は拒否されます）。

### Snowflake 認証

Snowflake への接続には、このツールは2つの JDBC 認証オプションを提供します:

- **Basic（ユーザー名/パスワード）:** この標準ログイン方法は、ユーザーの Snowflake アカウントで**多要素認証（MFA）が有効になっている場合は失敗します**。これは、ジョブ実行中のコネクタの認証プロセスとの競合の可能性によるものです。
- **Key Pair 認証:** この安全な方法は暗号化鍵ペアを使用します。MFA が有効かどうかに関わらず確実に動作するため、推奨されるアプローチです。


### 一括アンロードの仕組み

このコネクタは Snowflake の [Bulk Data Unloading](https://docs.snowflake.com/en/guides-overview-unloading-data) 機能を使用して高パフォーマンスのデータ転送を実現します。このプロセスには以下が含まれます:

1. **COPY INTO コマンド**: [COPY INTO <location>](https://docs.snowflake.com/en/sql-reference/sql/copy-into-location.html) コマンドを使用して、Snowflake データベーステーブルからSnowflake ステージのファイルにデータをコピーします。
2. **ファイルダウンロード**: [GET](https://docs.snowflake.com/en/sql-reference/sql/get.html) コマンドを使用して、ステージからコネクタに直接ファイルをダウンロードします。
3. **並列処理**: ダウンロードされたファイルは並列で処理され、取り込みパフォーマンスが大幅に向上します。


### 増分ロードの仕組み

一括アンロードコネクタの増分ロードは、標準の Snowflake コネクタとは異なるアプローチを使用します:

- **最大値アプローチ**: ORDER BY ステートメントを使用する代わりに、コネクタは最大値関数クエリを使用して上限と下限を決定します: `SELECT Max(inc_col) FROM table`
- **辞書順**: 複数の増分列の場合、コネクタは辞書順に並んだ列の組み合わせを使用して境界を特定します。
- **パフォーマンス向上**: このアプローチにより、大規模データセットでの高コストな ORDER BY 操作を回避できます。


2つの増分列を使用した例:


```sql
SELECT *
FROM my_table
WHERE (("region" > ?) OR ("region" = ? AND "date" > ?))
  AND (("region" < ?) OR ("region" = ? AND "date" <= ?))
```

最初の実行ではすべてのレコードがロードされ、以降の実行では最大値によって決定された新しい境界内に収まるレコードのみがロードされます。

**重要な注意事項:**

- incremental_columns としてサポートされるのは Timestamp、Datetime、数値列のみです
- 生クエリの場合、複雑なクエリでは主キーを自動的に検出できないため、incremental_columns が必要です
- 最適なパフォーマンスのために、増分列に適切なインデックスが設定されていることを確認してください


### パフォーマンスに関する考慮事項

- **リソース割り当て**: このコネクタは最適なパフォーマンスのために通常のコネクタよりも大きな CPU 割り当てが必要です
- **Stage File Size**: `stage_file_size` パラメータを調整することで、データセットサイズと利用可能なメモリに基づいてパフォーマンスを最適化できます
- **Warehouse サイズ**: より大きな Snowflake ウェアハウスを使用すると、非常に大きなデータセットのアンロードパフォーマンスが向上する場合があります
- **並列処理**: コネクタはアンロードされたファイルの並列処理を自動的に処理して最大スループットを実現します


### トラブルシューティング

- **ステージファイルの問題**: 指定したウェアハウスにアンロード操作のための十分なリソースがあることを確認してください
- **プレビューの問題**: 大きなテーブルのプレビューはサンプリングを使用するため、実際のデータ分布を反映しない場合があります
- **パフォーマンスの問題**: パフォーマンスを向上させるために、ウェアハウスのサイズを大きくするか、ステージファイルサイズを調整することを検討してください