Let suppose you have a clickstream data
and you store it in non-aggregated form.

You need to generate reports for your customers on the fly.

This is typical ClickHouse use case.


Most customers are small, but some are rather big.
You want to get instant reports even for largest customers.

Solution: define a sample key in your MergeTree table.


CREATE TABLE ... ENGINE = MergeTree ORDER BY (CounterID, Date, intHash32(UserID)) PARTITION BY toYYYYMM(Date) SAMPLE BY intHash32(UserID)


SELECT uniq(UserID) FROM hits_all WHERE CounterID = 76543 AND EventDate BETWEEN '2018-03-25' AND '2018-04-25' ┌─uniq(UserID)─┐ │ 47362335 │ └──────────────┘ 1 rows in set. Elapsed: 4.571 sec. Processed 1.17 billion rows, 16.37 GB (255.88 million rows/s., 3.58 GB/s.)


SELECT uniq(UserID) FROM hits_all SAMPLE 1/10 WHERE CounterID = 76543 AND EventDate BETWEEN '2018-03-25' AND '2018-04-25' ┌─uniq(UserID)─┐ │ 4742578 │ └──────────────┘ 1 rows in set. Elapsed: 0.638 sec. Processed 117.73 million rows, 1.65 GB (184.50 million rows/s., 2.58 GB/s.)


Must be:

— included in the primary key;

— uniformly distributed in the domain of its data type:
Bad: Timestamp;
Good: intHash32(UserID);

— cheap to calculate:
Bad: cityHash64(URL);
Good: intHash32(UserID);

— not after high granular fields in primary key:
Bad: ORDER BY (Timestamp, sample_key);
Good: ORDER BY (CounterID, Date, sample_key).


Sampling is:

— deterministic;

— works in a consistent way for different tables;

— allows to read less amount of data from disk;

SAMPLE key, bonus


— select data for 1/10 of all possible sample keys;

SAMPLE 1000000

— select from about (not less than) 1 000 000 rows on each shard;
— you can use _sample_factor virtual column to determine the relative sample factor;


— select second 1/10 of all possible sample keys;

SET max_parallel_replicas = 3

— select from multiple replicas of each shard in parallel;

Aggregate function combiners


Example: sumIf(x, cond)

Aggregate function combiners: -If

SELECT uniqIf(UserID, RefererDomain = '') AS users_yandex, uniqIf(UserID, RefererDomain = '') AS users_google FROM test.hits ┌─users_yandex─┬─users_google─┐ │ 19731 │ 8149 │ └──────────────┴──────────────┘

Aggregate function combiners: -Array

SELECT uniq(arr), uniqArray(arr), groupArray(arr), groupUniqArray(arr), groupArrayArray(arr), groupUniqArrayArray(arr) FROM ( SELECT ['hello', 'world'] AS arr UNION ALL SELECT ['goodbye', 'world'] ) FORMAT Vertical

Aggregate function combiners: -Array

Row 1: ────── uniq(arr): 2 uniqArray(arr): 3 groupArray(arr): [['hello','world'],['goodbye','world']] groupUniqArray(arr): [['hello','world'],['goodbye','world']] groupArrayArray(arr): ['hello','world','goodbye','world'] groupUniqArrayArray(arr): ['goodbye','world','hello']

Aggregate function combiners

... they may be combined

Example: sumArrayIf, sumIfArray.

Aggregate function combiners

... they may be combined

Example: sumForEachStateForEachIfArrayIfState.

Intermediate aggregation states are the first class citizens

Obtain Intermediate state with -State combiner;

Example: uniqState(user_id) AS state;

— it will return a value of AggregateFunction(...) data type;

— you can store them in tables;

— merge them back with -Merge combiner;

Example: uniqMerge(state) AS result;

Intermediate aggregation states

SELECT avg(x), uniq(x) FROM ( SELECT 123 AS x UNION ALL SELECT 456 ) ┌─avg(x)─┬─uniq(x)─┐ │ 289.5 │ 2 │ └────────┴─────────┘

Intermediate aggregation states

SELECT avgState(x), uniqState(x) FROM ( SELECT 123 AS x UNION ALL SELECT 456 ) ┌─avgState(x)─────┬─uniqState(x)─┐ │ C\0\0\0\0\0\0 │ \0▒�P���a� │ └─────────────────┴──────────────┘

Intermediate aggregation states

SELECT toTypeName(avgState(x)), toTypeName(uniqState(x)) FROM ( SELECT 123 AS x UNION ALL SELECT 456 ) FORMAT Vertical Row 1: ────── toTypeName(avgState(x)): AggregateFunction(avg, UInt16) toTypeName(uniqState(x)): AggregateFunction(uniq, UInt16)

Intermediate aggregation states

CREATE TABLE t ( users_state AggregateFunction(uniq, UInt64), ... ) ENGINE = AggregatingMergeTree ORDER BY ...
SELECT uniqMerge(uniq_state) FROM t GROUP BY ...

Intermediate aggregation states

Main use case:

Incremental data aggregation
with AggregatingMergeTree table engine

How we can make it better

— versioning of state serialization format;

— identify the cases when different aggregate functions have the same state (sumState, sumIfState must be compatible);

— allow to create aggregation state with a function (now it's possible to use arrayReduce for that purpose);

— allow to insert AggregateFunction values into a table directly as a tuple of arguments;

— adaptive index_granularity;

Consistency modes

By default, ClickHouse implements:

asynchronous, conflict-free, multi-master replication.


INSERT is acknowledged after being written on a single replica
and the replication is done in background.

Some replicas may lag and miss some data;

All replicas may miss some different parts of data.

By default, you have only eventual consistency.

Consistency modes

You can enable strong consistency.

SET insert_quorum = 2;

— each INSERT is acknowledged by a quorum of replicas;

— all replicas in quorum are consistent: they contain data from all previous INSERTs (INSERTs are linearized);

SET select_sequential_consistency = 1;

— allow to SELECT only acknowledged data from consistent replicas
(that contain all acknowledged INSERTs).

GROUP BY in external memory

GROUP BY in external memory

You can simply increase max_memory_usage

GROUP BY in external memory

Also you can enable aggregation with external memory:



Geospatial functions

— pointInPolygon;

— pointInEllipses;

— greatCircleDistance;

SELECT pointInPolygon((lat, lon), [(6, 0), (8, 4), (5, 8), (0, 2), ...])

Machine learned models

SELECT modelEvaluate('name', f1, ... fn) AS ctr_prediction

Machine learned models

How we can make it better:

— add simple regression models;

— train models in ClickHouse directly;

— online training of models;

— parametrized models (dictionaries of multiple models);

Data processing without server

clickhouse-local tool

$ clickhouse-local \ --input-format=CSV --output-format=PrettyCompact \ --structure="SearchPhrase String, UserID UInt64" \ --query="SELECT SearchPhrase, count(), uniq(UserID) FROM table \ WHERE SearchPhrase != '' GROUP BY SearchPhrase \ ORDER BY count() DESC LIMIT 20" < hits.csv ┌─SearchPhrase────────────┬─count()─┬─uniq(UserID)─┐ │ интерьер ванной комнаты │ 2166 │ 1 │ │ яндекс │ 1655 │ 478 │ │ весна 2014 мода │ 1549 │ 1 │ │ фриформ фото │ 1480 │ 1 │ │ анджелина джоли │ 1245 │ 1 │

Data processing without server

Bonus: SELECT and process data from an offline server.

Data processing without server

How we can make it better?

— add more supported formats for Date and DateTime values in text form;

— add formats like Avro, Parquet;

— customizable CSV format;

— "template" and "regexp" formats for more freeform data;



