# PostgreSQLインポート連携

PostgreSQL用データコネクタを使用すると、PostgreSQLデータベースからTreasure Dataにデータを直接インポートできます。PostgreSQLデータのエクスポートについては、[PostgreSQLエクスポート連携](/ja/int/postgresql-export-integration)を参照してください。

## 前提条件

- Treasure Dataの基本知識
- PostgreSQLの基本知識
- **リモート**で実行されているPostgreSQLインスタンス(例:RDS上)


## Treasure Data Integration の静的 IP アドレス

セキュリティポリシーで IP ホワイトリストが必要な場合は、接続を成功させるために Treasure Data の IP アドレスを許可リストに追加する必要があります。

リージョンごとに整理された静的 IP アドレスの完全なリストは、次のリンクにあります:
[IP Addresses for Integrations](/apis/endpoints/ip-addresses-integrations-result-workers)

## Treasure コンソールを使用したPostgreSQLからのインポート

### Authenticationの作成

データ接続を設定する際、連携にアクセスするためのAuthenticationを提供します。Treasure Dataでは、Authenticationを設定してからソース情報を指定します。

1. **Treasure コンソール**を開きます。
2. **Integrations Hub** > **Catalog**に移動します。
3. PostgreSQLを検索して選択します。**Create**を選択します。


![](/assets/image-20200807-223217.e12f01c92947408583fc23ce8b3aa685bc13b3aa4dd17f5289ba97dc2aa5ff53.0a0f313e.png)
4. 次のダイアログが開きます。

![](/assets/image-20200807-223310.42cc63fd8fcebfbb09119a91a2366037d0bb3c8ac869ffac5f6d77e2aa52d6e8.0a0f313e.png)![](/assets/image-20200807-223337.b2fb9306e1c756327fc3e5a97ae3ada3e132e3006642f4937eec0fd6cd5a9034.0a0f313e.png)
5. 必要なAuthentication情報を入力し、パラメータを設定します。**Continue**を選択します。

| パラメータ | 説明 |
|  --- | --- |
| **Host** | IPアドレスなど、ソースデータベースのホスト情報。 |
| **Port** | ソースインスタンスの接続ポート。PostgreSQLのデフォルトは5432です。 |
| **User** | ソースデータベースに接続するためのユーザー名。 |
| **Password** | ソースデータベースに接続するためのパスワード。 |
| **Use SSL** | SSLを使用して接続する場合は、このボックスをチェックします。 |
| **Specify SSL version** | 接続に使用するSSLバージョンを選択します。 |
| **Socket connection timeout** | ソケット接続のタイムアウト(秒単位)(デフォルトは300)。 |
| **Network timeout** | ネットワークソケット操作のタイムアウト(秒単位)。0はタイムアウトなしを意味します。 |


1. 接続の名前を入力します。
2. **Done**を選択します。


### ソースの作成

Authentication済み接続を作成すると、自動的にAuthenticationsに移動します。

1. 作成した接続を検索します。
2. **New Source**を選択します。Create Sourceダイアログが開きます。


### **Connection**

1. Data Transferフィールドに**Source**の名前を入力します。


![](/assets/image-20200807-224143.be148c35b05d7c1f6d5e5e3b2dc090ca71204e9ea769eb48724acae4e5e22edf.0a0f313e.png)

1. **Next**をクリックします。


### **Source Table**

1. 次のパラメータを編集します


![](/assets/postgresql-import-integration-2024-03-22-3.b0dc52811d5498dffd3736b63c338b7865379687dc59aecd393c1c7286c225fa.0a0f313e.png)

| **パラメータ** | **説明** |
|  --- | --- |
| **Driver version** | PostgreSQL JDBCドライバーを選択します |
| **Database name** | データを転送するデータベースの名前。例:your_database_name。 |
| **Use custom SELECT query?** | 単純なSELECT (columns) FROM table WHERE (condition)以上のものが必要な場合に使用します。 |
| **SELECT columns** | データを取得したい特定のカラムがある場合は、ここにリストします。それ以外の場合は、すべてのカラムが転送されます。 |
| **Table** | データをインポートするテーブル。 |
| **WHERE condition** | テーブルから取得するデータに追加の条件が必要な場合は、WHERE句の一部として指定します。 |
| **ORDER BY** | 特定のフィールドでレコードを順序付けする必要がある場合は指定します。 |


### **Data Settings**

1. **Next**を選択します。Data Settingsページが開きます。
2. 必要に応じてデータ設定を編集するか、このページをスキップします。


![](/assets/image-20200807-230323.c09b390ba74f7c7597deb4080e0aa3d5d43b12238e79df926f38ed27d0af0085.0a0f313e.png)

| **パラメータ** | **説明** |
|  --- | --- |
| **Incremental**: | この転送を繰り返し実行する場合は、このチェックボックスを選択して、前回インポートが実行されてからのデータのみをインポートします。 |
| **Rows per batch** | 非常に大きなデータセットはメモリの問題を引き起こし、結果としてジョブが失敗する可能性があります。このフラグを使用して、行数でインポートジョブをバッチに分割し、メモリの問題やジョブの失敗の可能性を減らします。 |
| **Default timezone** | インポート時に使用するタイムゾーン。 |
| **After SELECT** | このSQLは、同じトランザクション内のSELECTクエリの後に実行されます。 |
| **Column Options** | このオプションを選択して、インポート前にカラムのタイプを変更します。入力したデータ設定を保存するには、**Save**を選択します。 |
| **Default Column Options** | このオプションを選択して、インポート前にデフォルトのSQLタイプに従ってデータタイプを定義します。入力したデータ設定を保存するには、**Save**を選択します。     このオプションはTreasure コンソールでは使用できません。Treasure CLIまたはTreasure ワークフローを使用してこのオプションを設定します。 |


### Data Preview

インポートを実行する前に、Generate Preview を選択してデータの[プレビュー](/products/customer-data-platform/integration-hub/batch/import/previewing-your-source-data)を表示できます。Data preview はオプションであり、選択した場合はダイアログの次のページに安全にスキップできます。

1. **Next** を選択します。Data Preview ページが開きます。
2. データをプレビューする場合は、**Generate Preview** を選択します。
3. データを確認します。


### Data Placement

データの配置について、データを配置したいターゲット database と table を選択し、インポートを実行する頻度を指定します。

1. **Next** を選択します。Storage の下で、インポートされたデータを配置する新しい database を作成するか、既存の database を選択し、新しい table を作成するか、既存の table を選択します。
2. **Database** を選択 > **Select an existing** または **Create New Database** を選択します。
3. オプションで、database 名を入力します。
4. **Table** を選択 > **Select an existing** または **Create New Table** を選択します。
5. オプションで、table 名を入力します。
6. データをインポートする方法を選択します。
  - **Append** (デフォルト) - データインポートの結果は table に追加されます。
table が存在しない場合は作成されます。
  - **Always Replace** - 既存の table の全体の内容をクエリの結果出力で置き換えます。table が存在しない場合は、新しい table が作成されます。
  - **Replace on New Data** - 新しいデータがある場合のみ、既存の table の全体の内容をクエリの結果出力で置き換えます。
7. **Timestamp-based Partition Key** 列を選択します。
デフォルトキーとは異なるパーティションキーシードを設定したい場合は、long または timestamp 列をパーティショニング時刻として指定できます。デフォルトの時刻列として、add_time フィルターで upload_time を使用します。
8. データストレージの **Timezone** を選択します。
9. **Schedule** の下で、このクエリを実行するタイミングと頻度を選択できます。


#### 一度だけ実行

1. **Off** を選択します。
2. **Scheduling Timezone** を選択します。
3. **Create & Run Now** を選択します。


#### 定期的に繰り返す

1. **On** を選択します。
2. **Schedule** を選択します。UI では、*@hourly*、*@daily*、*@monthly*、またはカスタム *cron* の 4 つのオプションが提供されます。
3. **Delay Transfer** を選択して、実行時間の遅延を追加することもできます。
4. **Scheduling Timezone** を選択します。
5. **Create & Run Now** を選択します。


転送が実行された後、**Data Workbench** > **Databases** で転送の結果を確認できます。

## 詳細情報

- [PostgreSQLデータコネクタのオプション一覧](https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-postgresql)


## CLI(Toolbelt)を使用したPostgreSQLからのインポート

### 'td'コマンドのインストール

最新の[Treasure Data Toolbelt](https://docs.treasuredata.com/smart/project-product-documentation/td-toolbelt)をインストールします。

### Seed設定ファイル(seed.yml)の作成

PostgreSQLアクセス情報で*seed.yml*を設定します:


```yaml
in:
  type: postgresql
  host: postgresql_host_name
  port: 5432
  ssl: true
  ssl_version: TLS
  user: test_user
  password: test_password
  driver_version: 42.7.x
  database: test_database
  table: test_table
  select: "*"
  default_column_options:
    TIMESTAMP: {type: string, timestamp_format: '%Y-%m-%d', timezone: '+0900'}
    BIGINT: {type: string}
  mode: replace
```

この例では、テーブル内のすべてのレコードをインポートします。[追加パラメータ](https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-postgresql)を使用して、より詳細な制御を行うことができます。

利用可能なoutモードまたは利用可能なssl_versionの詳細については、付録を参照してください。

### フィールドの推測(load.ymlの生成)

*td connector:guess*を使用します。このコマンドは、ターゲットデータを自動的に読み取り、データ形式をインテリジェントに推測します。


```bash
td connector:guess seed.yml -o load.yml
```

*load.yml*を開くと、場合によってはファイル形式、エンコーディング、カラム名、タイプを含む、推測されたファイル形式定義が表示されます。

### オプション: インポートするデータのプレビュー

*td connector:preview*コマンドを使用して、インポートするデータをプレビューできます。


```bash
td connector:preview load.yml
```

## ロードジョブの実行

ロードジョブを送信します。データサイズに応じて、ジョブの実行に数時間かかる場合があります。データが保存されているデータベースとテーブルを指定する必要があります。

Treasure Dataのストレージは時間でパーティション分割されているため、--time-columnオプションを指定することをお勧めします(データパーティショニングも参照)。オプションが指定されていない場合、データコネクタは最初のlongまたはtimestampカラムをパーティショニング時間として選択します。--time-columnで指定されたカラムのタイプは、longまたはtimestampタイプである必要があります。

データに時間カラムがない場合は、*add_time*フィルターオプションを使用して追加できます。詳細については、[add_timeフィルタープラグイン](https://docs.treasuredata.com/smart/project-product-documentation/add_time-filter-function)を参照してください。


```bash
td connector:issue load.yml --database td_sample_db --table td_sample_table
```

*connector:issue*コマンドは、*database(td_sample_db)*と*table(td_sample_table)*をすでに作成していることを前提としています。データベースまたはテーブルがTDに存在しない場合、*connector:issue*コマンドは失敗します。この場合、データベースとテーブルを[手動で](https://docs.treasuredata.com/smart/project-product-documentation/data-management)作成するか、*td connector:issue*コマンドで*--auto-create-table*オプションを使用して、データベースとテーブルを自動作成します:


```bash
td connector:issue load.yml \
--database td_sample_db --table td_sample_table \
--time-column created_at --auto-create-table
```

"--time-column"オプションで、Time FormatカラムをPartitioning Keyに割り当てることができます。

## Workflowを使用したPostgreSQLからのインポート

PostgreSQLからデータをインポートする方法を示すサンプルワークフローについては、[Treasure Boxes](https://github.com/treasure-data/treasure-boxes/tree/master/td_load/postgresql)を参照してください。

### インクリメンタルロード

テーブルからカラムを*incremental_column*sパラメータに指定することで、レコードをインクリメンタルにロードできます。オプションで、*last_record*パラメータにいくつかの初期値を指定できます。


```yaml
in:
  type: postgresql
  host: postgresql_host_name
  port: 5432
  user: test_user
  password: test_password
  database: test_database
  table: test_table
  incremental: true
  incremental_columns: [id, created_at]
  last_record: [10000, '2014-02-16T13:01:06.000000Z']
out:
  mode: append
  exec: {}
```

`incremental\_columns:`オプションを最適に使用するには、関連するカラムにインデックスを作成して、フルテーブルスキャンを回避します。この例では、次のインデックスを作成する必要があります:


```sql
CREATE INDEX embulk_incremental_loading_index ON test_table (id, created_at);
```


```sql
-- last_recordが指定されていない場合
SELECT * FROM(
    ...original query is here
)
ORDER BY id, created_at
```


```sql
-- last_recordが指定された場合
SELECT * FROM(
    ...original query is here
)
WHERE id > 10000 OR (id = 10000 AND created_at > '2014-02-16T13:01:06.000000Z')
ORDER BY id, created_at
```

コネクタは自動的に*last_record*を生成し、次のスケジュール実行時にそれを使用します。


```yaml
in:
  type: postgresql
  ...
out:
  ...

Config Diff
---
in:
  last_record:
  - 20000
  - '2015-06-16T16:32:14.000000Z'
```

`incremental: true`を設定すると、`query`オプションは使用できません。
incremental_columnsとしてサポートされているのは、文字列、整数、timestamp、およびtimestamptz(タイムゾーン付きタイムスタンプ)のみです。

### Array Columnの読み込み

PostgreSQLの`array`型は、文字列型として取得されます。

### hstore Columnの読み込み

PostgreSQLのhstore型は、データコネクタが最初に読み取るときに文字列型として取得されます。したがって、hstore型をjson型として使用する場合は、config_optionsを指定して、明示的に型をjson型に変換する必要があります。

例えば、v_hstoreがPostgreSQLでhstore型の場合:


```yaml
in:
  type: postgresql
  host: xxx
  ...
  table: my_tbl
  select: "*"
  column_options:
    v_hstore: {type: json} # 明示的な型変換: string型からjson型へ
out:
  ...
```

### SSLバージョン

`ssl_version`オプションを使用して、PostgreSQLサーバーが使用している特定のSSLバージョンを使用できます。


```yaml
in:
  type: postgresql
  ...
  ssl: true
  ssl_version: TLSv1.1
```

サポートされている値は次のとおりです。

- TLS
- TLSv1.1
- TLSv1.2
- TLSv1.3


### Default Column Options

SQLデータ型の特定のデフォルト形式を使用できます。
以下の例では、TIMESTAMPは`%Y-%m-%d`形式でタイムゾーン`+0900`の文字列に変換されます。
SQLタイプは、TIMESTAMPやBIGINTなどの大文字である必要があります。


```yaml
in:
  type: postgresql
  ...
   default_column_options:
    TIMESTAMP: {type: string, timestamp_format: '%Y-%m-%d', timezone: '+0900'}
    BIGINT: {type: string}
```