# User-defined partitioning (UDP)

User-defined partitioning provides hash partitioning for a table on one or more columns
in addition to the time range partitioning. A query that filters on the set of columns used as
user-defined partitioning keys can be more efficient because Trino can skip scanning
partitions that have matching values on that set of columns.

## Use Cases

Where the lookup and aggregations are based on one or more specific
columns, UDP can lead to:

- efficient lookup and aggregation queries
- efficient join queries


UDP can add the most value when records are filtered or joined
frequently by non-time attributes:

- a customer's ID, first name+last name+birth date, gender, or other profile values or flags
- a product's SKU number, bar code, manufacturer, or other exact-match attributes
- an address's country code; city, state, or province; or postal code


Performance benefits become more significant on tables with >100M rows.

UDP can help with these Trino query types:

- "Needle-in-a-Haystack" lookup on the partition key
- Aggregations on the partition key
- Very large joins on partition keys used in tables on both sides of the join


## Creating UDP table

Only Trino query engine supports creation of UDP tables.

### CREATE TABLE Syntax for UDP


```
CREATE TABLE table_name (...)
WITH(
  bucketed_on = array['col1' [, 'col2', ...]]
  [, bucket_count = n]
  [, max_file_size = 'nnnMB']
  [, max_time_range = 'nnd']
);
```

- `bucketed_on`: the partitioning key columns
- `bucket_count`: bucket count for the hash partitioning
  - default value is 512
  - must be a power of two
- `max_file_size`: the max partition file size
  - adaptively adjusted if not specified
- `max_time_range`: the max time range stored in a partition
  - adaptively adjusted if not specified


`bucket_count`, `max_file_size` and `max_time_range` are optional.
The default values work well in most cases.
You don't need to specify them unless you have a special reason.

For example:


```
CREATE TABLE mytable_p (
  time bigint,
  col1 VARCHAR,
  col2 VARCHAR,
  col3 bigint
) WITH (bucketed_on = array['col1', 'col2']);
```

### Choosing Bucketing Columns for UDP

Supported TD data types for UDP partition keys are int, long, and string.
These correspond to Trino data types as described in [About TD Primitive Data Types](/ja/products/customer-data-platform/data-workbench/queries/sql-reference/sql_tips#treasure-data-native-data-types).

Choose a set of columns used widely to select data for analysis.
That is, one frequently used to look up results, drill down to details, or aggregate data.
For example, depending on the most frequently used types, you might choose:

- customer_id
- Country + State/Province + City
- Postal_code
- Customer-first name + last name + date of birth


Choose a column or set of columns that have high cardinality (relative
to the number of buckets), and are frequently used with equality
predicates. For example:

- Unique values, for example, an email address or account number
- Non-unique but high-cardinality columns with relatively even distribution, for example, date of birth


### Checking For and Addressing Data Skew

The performance is inconsistent if the number of rows in each bucket is
not roughly equal. For example, if you partition on the US zip code,
urban postal codes will have more customers than rural ones.

To help determine bucket count and partition size, you can run a SQL
query that identifies distinct key column combinations and counts their
occurrences. For example:


```
SELECT
  concat(key_column1, '|', key_column2, ...) udp_key,
  COUNT(*) occurrences
FROM
  tbl
GROUP BY 1;
```

If the counts across different buckets are roughly comparable, your data is not skewed.

For consistent results, choose a combination of columns where the distribution is roughly equal.

If you do decide to use partitioning keys that do not produce an even
distribution, see [Improving Performance with Skewed Data](#improving-performance-on-skewed-data).

## Populating UDP table

### Using INSERT and INSERT OVERWRITE to UDP tables

INSERT and INSERT OVERWRITE with partitioned tables work the same as
with other tables. You can create an empty UDP table and then insert
data into it the usual way. The resulting data is partitioned.

### Partitioning an Existing Table

Tables must have partitioning specified when first created. For an
existing table, you must create a copy of the table with UDP options
configured and copy the rows over. To do this use a CTAS from the source
table.

For example:


```
drop table if EXISTS customer_p;

create table customer_p with (bucketed_on = array['customer_id'])
as select * from customer;
```

When partitioning an existing table:

- Creating a partitioned version of a very large table is likely to take hours or days.
- If the source table is continuing to receive updates, you must update it further with SQL.
For example:



```
INSERT INTO customer_p SELECT * FROM customer WHERE...
```

## Limitations

The benefits of UDP can be limited when used with complex queries.
The query optimizer might not apply optimizations for UDP.

### The Maximum Number of Partitioning Keys is Three

If the limit is exceeded, Trino causes the following error message:

'bucketed_on' must be less than 4 columns

### Imports Other than SQL

The import method provided by Treasure Data for the following does not
support UDP tables:

- Streaming Import
- Data Connector
- BulkImport


If you try to use any of these import methods, you will get an error. As
a workaround, you can use a workflow to copy data from a table that is
receiving streaming imports to the UDP table.

## Example: Creating and Using UDP Tables

Create and populate a partitioned table customers_p to speed up lookups
on "city+state" columns:


```
-- create partitioned table, bucketing on combination of city + state columns
create table customer_p with (bucketed_on = array['city', 'state']);

-- Insert new records into the partitioned table
INSERT INTO customer_p SELECT ...;

-- accelerates queries that test equality on all bucketing columns
SELECT
   *
FROM
  customer_p
WHERE
  city = 'San Francisco'
  AND state = 'California'
;

SELECT
   *
FROM
  customer_p
WHERE
  city IN(
    'San Francisco',
    'San Jose'
  )
  AND state = 'California'
  AND monthly_income > 10000
;

-- NOT accelerated: filter predicate does not use all hash columns
SELECT
   *
FROM
  customer_p
WHERE
  state = 'California'
;

SELECT
   *
FROM
  customer_p
WHERE
  city IN(
    'San Francisco',
    'San Jose'
  )
;
```

## Advanced Use Cases

### Choosing Bucket Count

A higher bucket count means dividing data among many smaller partitions, which can be less efficient
to scan. TD suggests starting with 512 for most cases. If you aren't
sure of the best bucket count, it is safer to err on the low side.

### Aggregations on the Hash Key

Using a GROUP BY key as the bucketing key, major improvements in
performance and reduction in cluster load on aggregation queries were seen.
For example, you can see that the UDP version of this query on a 1TB table

- used 10 Trino workers instead of 19
- ran in 45 seconds instead of 2 minutes 31 seconds
- processed three times as many rows per second.


The total data processed in GB was greater because the UDP version of the table occupied more storage.

**UDP Version**


```
presto: udp_tpch_sf1000 > SELECT
  COUNT(*)
FROM
  (
    SELECT
      max_by(
        l_discount,
        time
      ),
      max_by(
        l_partkey,
        time
      )
    FROM
      lineitem
    GROUP BY
      l_orderkey
  )
;

_col0------------
1500000000(
  1 row
) Query 20171227_014452_14154_sjh9g,
FINISHED,
10 nodes Splits: 517 total,
517 done(
  100.00 %
) 0: 45 [6B ROWS,
25.5GB] [134M ROWS / s,
585MB / s]
```

**Non-UDP Version**


```
presto: udp_tpch_sf1000 > SELECT
  COUNT(*)
FROM
  (
    SELECT
      max_by(
        l_discount,
        time
      ),
      max_by(
        l_partkey,
        time
      )
    FROM
      tpch_sf1000.lineitem
    GROUP BY
      l_orderkey
  )
;

_col0------------
1500000000(
  1 row
) Query 20171227_014549_14273_sjh9g,
FINISHED,
19 nodes Splits: 175 total,
175 done(
  100.00 %
) 2: 31 [6B ROWS,
18.3GB] [39.7M ROWS / s,
124MB / s]
```

### Needle-in-a-Haystack Lookup on the Hash Key

The largest improvements – 5x, 10x, or more – will be on lookup or
filter operations where the partition key columns are tested for equality.
Only partitions in the bucket from hashing the partition keys are scanned.

For example, consider:

- table `customers` is bucketed on `customer_id`
- table `contacts` is bucketed on `country_code` and `area_code`


These queries will improve:


```
SELECT ... FROM customers WHERE customer_id = 10001;
```

Here UDP Trino scans only one bucket (the one that 10001 hashes to) if
customer_id is the only bucketing key.


```
SELECT ... FROM contacts WHERE country_code='1' and area_code = '650' and phone like '555-____';
```

Here UDP Trino scans only the bucket that matches the hash of
country_code 1 + area_code 650.

These queries will not improve:


```
SELECT ... FROM customers WHERE customer_id >= 10001;
```

Here UDP will not improve performance, because the predicate doesn't use '='.


```
SELECT ... FROM contacts WHERE area_code = '650' ;
```

Here UDP will not improve performance, because the predicate does not
include both bucketing keys.

### Very Large Join Operations

Very large join operations can sometimes run out of memory. Such joins
can benefit from UDP. Distributed and colocated joins will use less
memory, CPU, and shuffle less data among Trino workers. This may enable
you to finish queries that would otherwise run out of resources. To
leverage these benefits, you must:

1. Make sure the two tables to be joined are partitioned on the same keys
2. Use equijoin across all the partitioning keys
3. Set the following options on your join using a magic comment:



```
-- set session join_distribution_type = 'PARTITIONED'
-- set session colocated_join = 'true'
```

### Improving Performance on Skewed Data

When processing a UDP query, Trino ordinarily creates one split of
filtering work per bucket (typically 512 splits, for 512 buckets). But
if data is not evenly distributed, filtering on skewed bucket could make
performance worse -- one Trino worker node will handle the filtering of
that skewed set of partitions, and the whole query lags.

To enable higher scan parallelism you can use:


```
-- set session distributed_bucket='true|false'
```

When set to true, multiple splits are used to scan the files in a bucket
in parallel, increasing performance. The tradeoff is that colocated join
is always disabled when distributed_bucket is true. As a result, some
operations such as GROUP BY will require shuffling and more memory during execution.

This query hint is most effective with needle-in-a-haystack queries.
Even if these queries perform well with the query hint, test performance
with and without the query hint in other use cases on those tables to find the best performance tradeoffs.