ClickHouse Distributed Database and Tables Documentation

This documentation provides the steps to create a distributed database and tables in ClickHouse. We are using a sharded setup managed by ZooKeeper.

Prerequisites

  • ClickHouse is configured in sharded mode.
  • ZooKeeper is used for managing sharding and replication.
  • ClickHouse clusters are named, and we use the cluster named default.

Creating the Distributed Database

First, create a database that will contain the tables across the cluster. This ensures that all nodes in the cluster are aware of the database.

CREATE DATABASE statistics ON CLUSTER default;

This command will create the statistics database across all nodes in the cluster named default.

Creating Distributed Tables

For each table in the statistics database, we need to create a distributed version of the table. Distributed tables allow queries to be executed across multiple shards, enhancing performance and scalability.

Original Tables List

Below is the list of original tables for which we will create distributed versions:

  • analytics_stat
  • channels_stat
  • channels_stat_daily
  • channels_stat_hourly
  • dsp_messages
  • dsp_messages_stream
  • firebase_errors
  • firebase_errors_stream
  • firebase_subscription_errors
  • firebase_subscription_errors_stream
  • infi_clickhouse_orm_migrations
  • message_errors_events
  • message_errors_events_stream
  • message_events
  • message_events_amendments
  • message_events_amendments_stream
  • message_events_stream
  • message_example_stat
  • message_example_stat_old
  • message_groups_stat
  • message_responses
  • message_responses_stream
  • messagecallbackevents
  • messagecallbackevents_amendments
  • messagecallbackevents_amendments_stream
  • messagecallbackevents_stream
  • page_events
  • page_events_old
  • page_events_stream
  • sender_firebase_statistics
  • subscriber_sources_activities
  • subscriber_sources_activities_old
  • subscriber_sources_activities_stream
  • subscribers_activities
  • subscribers_activities_old
  • subscribers_activities_stream
  • subscription_events
  • subscription_events_old
  • subscription_events_stream
  • subscription_events_temp
  • user_custom_events
  • user_custom_events_stream
  • window_events
  • window_events_stream

Creating Distributed Tables

Below are the commands to create distributed versions of all the original tables listed above.

  • analytics_stat:

    CREATE TABLE statistics.analytics_stat_dist ON CLUSTER default
    AS statistics.analytics_stat
    ENGINE = Distributed(
        default,
        'statistics',
        'analytics_stat',
        murmurHash3_64(toYYYYMM(datetime))
    );
  • channels_stat:

    CREATE TABLE statistics.channels_stat_dist ON CLUSTER default
    AS statistics.channels_stat
    ENGINE = Distributed(
        default,
        'statistics',
        'channels_stat',
        murmurHash3_64(toYYYYMM(datetime))
    );
  • channels_stat_daily:

    CREATE TABLE statistics.channels_stat_daily_dist ON CLUSTER default
    AS statistics.channels_stat_daily
    ENGINE = Distributed(
        default,
        'statistics',
        'channels_stat_daily',
        murmurHash3_64(toYYYYMM(datetime))
    );
  • channels_stat_hourly:

    CREATE TABLE statistics.channels_stat_hourly_dist ON CLUSTER default
    AS statistics.channels_stat_hourly
    ENGINE = Distributed(
        default,
        'statistics',
        'channels_stat_hourly',
        murmurHash3_64(toYYYYMM(datetime))
    );
  • dsp_messages:

    CREATE TABLE statistics.dsp_messages_dist ON CLUSTER default
    AS statistics.dsp_messages
    ENGINE = Distributed(
        default,
        'statistics',
        'dsp_messages',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • dsp_messages_stream:

    CREATE TABLE statistics.dsp_messages_stream_dist ON CLUSTER default
    AS statistics.dsp_messages_stream
    ENGINE = Distributed(
        default,
        'statistics',
        'dsp_messages_stream',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • firebase_errors:

    CREATE TABLE statistics.firebase_errors_dist ON CLUSTER default
    AS statistics.firebase_errors
    ENGINE = Distributed(
        default,
        'statistics',
        'firebase_errors',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • firebase_errors_stream:

    CREATE TABLE statistics.firebase_errors_stream_dist ON CLUSTER default
    AS statistics.firebase_errors_stream
    ENGINE = Distributed(
        default,
        'statistics',
        'firebase_errors_stream',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • firebase_subscription_errors:

    CREATE TABLE statistics.firebase_subscription_errors_dist ON CLUSTER default
    AS statistics.firebase_subscription_errors
    ENGINE = Distributed(
        default,
        'statistics',
        'firebase_subscription_errors',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • firebase_subscription_errors_stream:

    CREATE TABLE statistics.firebase_subscription_errors_stream_dist ON CLUSTER default
    AS statistics.firebase_subscription_errors_stream
    ENGINE = Distributed(
        default,
        'statistics',
        'firebase_subscription_errors_stream',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • infi_clickhouse_orm_migrations:

    CREATE TABLE statistics.infi_clickhouse_orm_migrations_dist ON CLUSTER default
    AS statistics.infi_clickhouse_orm_migrations
    ENGINE = Distributed(
        default,
        'statistics',
        'infi_clickhouse_orm_migrations',
        murmurHash3_64(toYYYYMM(applied))
    );
  • message_errors_events:

    CREATE TABLE statistics.message_errors_events_dist ON CLUSTER default
    AS statistics.message_errors_events
    ENGINE = Distributed(
        default,
        'statistics',
        'message_errors_events',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • message_errors_events_stream:

    CREATE TABLE statistics.message_errors_events_stream_dist ON CLUSTER default
    AS statistics.message_errors_events_stream
    ENGINE = Distributed(
        default,
        'statistics',
        'message_errors_events_stream',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • message_events:

    CREATE TABLE statistics.message_events_dist ON CLUSTER default
    AS statistics.message_events
    ENGINE = Distributed(
        default,
        'statistics',
        'message_events',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • message_events_amendments:

    CREATE TABLE statistics.message_events_amendments_dist ON CLUSTER default
    AS statistics.message_events_amendments
    ENGINE = Distributed(
        default,
        'statistics',
        'message_events_amendments',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • message_events_amendments_stream:

    CREATE TABLE statistics.message_events_amendments_stream_dist ON CLUSTER default
    AS statistics.message_events_amendments_stream
    ENGINE = Distributed(
        default,
        'statistics',
        'message_events_amendments_stream',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • message_events_stream:

    CREATE TABLE statistics.message_events_stream_dist ON CLUSTER default
    AS statistics.message_events_stream
    ENGINE = Distributed(
        default,
        'statistics',
        'message_events_stream',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • message_example_stat:

    CREATE TABLE statistics.message_example_stat_dist ON CLUSTER default
    AS statistics.message_example_stat
    ENGINE = Distributed(
        default,
        'statistics',
        'message_example_stat',
        murmurHash3_64(pool_id)
    );
  • message_example_stat_old:

    CREATE TABLE statistics.message_example_stat_old_dist ON CLUSTER default
    AS statistics.message_example_stat_old
    ENGINE = Distributed(
        default,
        'statistics',
        'message_example_stat_old',
        murmurHash3_64(toYYYYMM(datetime))
    );
  • message_groups_stat:

    CREATE TABLE statistics.message_groups_stat_dist ON CLUSTER default
    AS statistics.message_groups_stat
    ENGINE = Distributed(
        default,
        'statistics',
        'message_groups_stat',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • message_responses:

    CREATE TABLE statistics.message_responses_dist ON CLUSTER default
    AS statistics.message_responses
    ENGINE = Distributed(
        default,
        'statistics',
        'message_responses',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • message_responses_stream:

    CREATE TABLE statistics.message_responses_stream_dist ON CLUSTER default
    AS statistics.message_responses_stream
    ENGINE = Distributed(
        default,
        'statistics',
        'message_responses_stream',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • messagecallbackevents:

    CREATE TABLE statistics.messagecallbackevents_dist ON CLUSTER default
    AS statistics.messagecallbackevents
    ENGINE = Distributed(
        default,
        'statistics',
        'messagecallbackevents',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • messagecallbackevents_amendments:

    CREATE TABLE statistics.messagecallbackevents_amendments_dist ON CLUSTER default
    AS statistics.messagecallbackevents_amendments
    ENGINE = Distributed(
        default,
        'statistics',
        'messagecallbackevents_amendments',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • messagecallbackevents_amendments_stream:

    CREATE TABLE statistics.messagecallbackevents_amendments_stream_dist ON CLUSTER default
    AS statistics.messagecallbackevents_amendments_stream
    ENGINE = Distributed(
        default,
        'statistics',
        'messagecallbackevents_amendments_stream',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • messagecallbackevents_stream:

    CREATE TABLE statistics.messagecallbackevents_stream_dist ON CLUSTER default
    AS statistics.messagecallbackevents_stream
    ENGINE = Distributed(
        default,
        'statistics',
        'messagecallbackevents_stream',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • page_events:

    CREATE TABLE statistics.page_events_dist ON CLUSTER default
    AS statistics.page_events
    ENGINE = Distributed(
        default,
        'statistics',
        'page_events',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • page_events_old:

    CREATE TABLE statistics.page_events_old_dist ON CLUSTER default
    AS statistics.page_events_old
    ENGINE = Distributed(
        default,
        'statistics',
        'page_events_old',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • page_events_stream:

    CREATE TABLE statistics.page_events_stream_dist ON CLUSTER default
    AS statistics.page_events_stream
    ENGINE = Distributed(
        default,
        'statistics',
        'page_events_stream',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • sender_firebase_statistics:

    CREATE TABLE statistics.sender_firebase_statistics_dist ON CLUSTER default
    AS statistics.sender_firebase_statistics
    ENGINE = Distributed(
        default,
        'statistics',
        'sender_firebase_statistics',
        murmurHash3_64(toYYYYMM(date))
    );
  • subscriber_sources_activities:

    CREATE TABLE statistics.subscriber_sources_activities_dist ON CLUSTER default
    AS statistics.subscriber_sources_activities
    ENGINE = Distributed(
        default,
        'statistics',
        'subscriber_sources_activities',
        murmurHash3_64(toYYYYMM(datetime))
    );
  • subscriber_sources_activities_old:

    CREATE TABLE statistics.subscriber_sources_activities_old_dist ON CLUSTER default
    AS statistics.subscriber_sources_activities_old
    ENGINE = Distributed(
        default,
        'statistics',
        'subscriber_sources_activities_old',
        murmurHash3_64(toYYYYMM(datetime))
    );
  • subscriber_sources_activities_stream:

    CREATE TABLE statistics.subscriber_sources_activities_stream_dist ON CLUSTER default
    AS statistics.subscriber_sources_activities_stream
    ENGINE = Distributed(
        default,
        'statistics',
        'subscriber_sources_activities_stream',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • subscribers_activities:

    CREATE TABLE statistics.subscribers_activities_dist ON CLUSTER default
    AS statistics.subscribers_activities
    ENGINE = Distributed(
        default,
        'statistics',
        'subscribers_activities',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • subscribers_activities_old:

    CREATE TABLE statistics.subscribers_activities_old_dist ON CLUSTER default
    AS statistics.subscribers_activities_old
    ENGINE = Distributed(
        default,
        'statistics',
        'subscribers_activities_old',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • subscribers_activities_stream:

    CREATE TABLE statistics.subscribers_activities_stream_dist ON CLUSTER default
    AS statistics.subscribers_activities_stream
    ENGINE = Distributed(
        default,
        'statistics',
        'subscribers_activities_stream',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • subscription_events:

    CREATE TABLE statistics.subscription_events_dist ON CLUSTER default
    AS statistics.subscription_events
    ENGINE = Distributed(
        default,
        'statistics',
        'subscription_events',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • subscription_events_old:

    CREATE TABLE statistics.subscription_events_old_dist ON CLUSTER default
    AS statistics.subscription_events_old
    ENGINE = Distributed(
        default,
        'statistics',
        'subscription_events_old',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • subscription_events_stream:

    CREATE TABLE statistics.subscription_events_stream_dist ON CLUSTER default
    AS statistics.subscription_events_stream
    ENGINE = Distributed(
        default,
        'statistics',
        'subscription_events_stream',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • subscription_events_temp:

    CREATE TABLE statistics.subscription_events_temp_dist ON CLUSTER default
    AS statistics.subscription_events_temp
    ENGINE = Distributed(
        default,
        'statistics',
        'subscription_events_temp',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • user_custom_events:

    CREATE TABLE statistics.user_custom_events_dist ON CLUSTER default
    AS statistics.user_custom_events
    ENGINE = Distributed(
        default,
        'statistics',
        'user_custom_events',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • user_custom_events_stream:

    CREATE TABLE statistics.user_custom_events_stream_dist ON CLUSTER default
    AS statistics.user_custom_events_stream
    ENGINE = Distributed(
        default,
        'statistics',
        'user_custom_events_stream',
        murmurHash3_64(event_id)
    );
  • window_events:

    CREATE TABLE statistics.window_events_dist ON CLUSTER default
    AS statistics.window_events
    ENGINE = Distributed(
        default,
        'statistics',
        'window_events',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );
  • window_events_stream:

    CREATE TABLE statistics.window_events_stream_dist ON CLUSTER default
    AS statistics.window_events_stream
    ENGINE = Distributed(
        default,
        'statistics',
        'window_events_stream',
        murmurHash3_64(toYYYYMM(toDateTime64(timestamp, 0)))
    );

Summary

  • Create a database across the cluster using CREATE DATABASE.
  • Create distributed tables for each original table to enable distributed querying and load balancing.

By following these steps, you will have a distributed setup for your ClickHouse tables that allows for efficient data distribution and high availability across your cluster.