wiki-js/DevOps/Clickhouse/Cluster/3-Create-manual.md

20 KiB
Raw Permalink Blame History

title description published date tags editor dateCreated
3. Создание таблиц вручную true 2025-05-15T20:07:34.738Z clickhouse markdown 2025-05-15T20:07:34.738Z

Можно создать все таблицы для кластера вручную. Запрашиваем структуру каждой таблицы, создаем под другим именем и копируем данные с исходника.

Может возникнуть проблема, что таблицы содержат много данных, поэтому внизу статьи будет решение

Создаем реплицированные таблицы

Разберем первую таблицу подробно

1. api_request_log_replicated

Запрашиваем структуру

show create table default.api_request_log

Выход:

CREATE TABLE default.api_request_log
(
    `uuid` UUID DEFAULT '00000000-0000-0000-0000-000000000000',
    `ver` UInt8,
    `request_ts` DateTime64(3) DEFAULT '0000000000.000',
    `duration` Float64,
    `endpoint` String,
    `method` FixedString(5),
    `status_code` UInt32,
    `remote_addr` String,
    `username` Nullable(String),
    `error` Nullable(String),
    `request_body` Nullable(String) TTL toDateTime(request_ts) + toIntervalMonth(1),
    `query_params` Nullable(String) TTL toDateTime(request_ts) + toIntervalMonth(1),
    `response_body` Nullable(String) TTL toDateTime(request_ts) + toIntervalMonth(1)
)
ENGINE = MergeTree(ver)
PARTITION BY toYYYYMM(request_ts)
ORDER BY uuid
SETTINGS index_granularity = 8192;

Дорабатываем запрос, чтобы получилась на выходе реплицируемая таблица

CREATE TABLE default.api_request_log_replicated ON CLUSTER smvu2_cluster
(
    `uuid` UUID DEFAULT '00000000-0000-0000-0000-000000000000',
    `ver` UInt8,
    `request_ts` DateTime64(3) DEFAULT '0000000000.000',
    `duration` Float64,
    `endpoint` String,
    `method` FixedString(5),
    `status_code` UInt32,
    `remote_addr` String,
    `username` Nullable(String),
    `error` Nullable(String),
    `request_body` Nullable(String) TTL toDateTime(request_ts) + toIntervalMonth(1),
    `query_params` Nullable(String) TTL toDateTime(request_ts) + toIntervalMonth(1),
    `response_body` Nullable(String) TTL toDateTime(request_ts) + toIntervalMonth(1)
)
ENGINE = ReplicatedReplacingMergeTree(
    '/clickhouse/tables/{shard}/default/api_request_log_replicated',
    '{replica}',
    ver
)
PARTITION BY toYYYYMM(request_ts)
ORDER BY uuid
SETTINGS index_granularity = 8192;

Копируем данные

INSERT INTO default.api_request_log_replicated
SELECT * FROM default.api_request_log

2. event_replicated

show create table default.event
CREATE TABLE default.event_replicated ON CLUSTER smvu2_cluster
(
    `event_id` UUID,
    `unipoll_id` UUID,
    `sensor_id` UUID,
    `sensor_type_id` UUID,
    `sensor_type_name` String,
    `sensor_activation_status_name` String,
    `sensor_logic_status_id` UUID,
    `sensor_logic_status_name` String,
    `sensor_comment` Nullable(String),
    `command` String,
    `priority_name` String,
    `pub_ts` DateTime,
    `sub_ts` DateTime64(3, 'Europe/Moscow'),
    `state_id` UInt8,
    `state_name` String,
    `status_id` UInt8,
    `status_name` String,
    `unitype_id` UInt16,
    `unitype_name` String,
    `system_id` UInt16,
    `system_short_name` String,
    `system_name` String,
    `checked` UInt8,
    `object_id` UUID,
    `object_type_name` String,
    `data_id` UInt8,
    `value` Nullable(String),
    `raw_value` Nullable(String),
    `data` String,
    `region_id` UUID,
    `region_name` String,
    `complex_id` UUID,
    `complex_name` String,
    `collector_id` Nullable(UUID),
    `collector_name` Nullable(String),
    `control_room_id` Nullable(UUID),
    `control_room_name` Nullable(String),
    `picket_id` Nullable(UUID),
    `picket_name` Nullable(String),
    `picket_n` Nullable(UInt32),
    `cstate_id` UUID,
    `cstate_name` String,
    `cstate_color_id` UInt8,
    `sub_ms` UInt64,
    `linked_channel_state` Nullable(String),
    `on_guard` Nullable(UInt8),
    `internal_id` Nullable(String),
    `floor_number` Nullable(Int32)
)
ENGINE = ReplicatedMergeTree(
    '/clickhouse/tables/{shard}/default/event_replicated',
    '{replica}'
)
PARTITION BY toYYYYMM(pub_ts)
ORDER BY (priority_name, sensor_type_id, cstate_id, object_id, sensor_id, pub_ts, sub_ts, event_id)
SETTINGS index_granularity = 8192;

Копируем данные

INSERT INTO default.event_replicated
SELECT * FROM default.event

3. event_automation_replicated

show create table default.event_automation
CREATE TABLE default.event_automation_replicated ON CLUSTER smvu2_cluster
(
    `event_id` UUID DEFAULT '00000000-0000-0000-0000-000000000000',
    `unipoll_id` UUID DEFAULT '00000000-0000-0000-0000-000000000000',
    `automation_id` UUID DEFAULT '00000000-0000-0000-0000-000000000000',
    `automation_name` String,
    `automation_mode` UInt8,
    `pub_ts` DateTime DEFAULT '0000000000',
    `sub_ts` DateTime64(3) DEFAULT '0000000000.000',
    `sub_ms` UInt64,
    `state_id` UInt8,
    `state_name` String,
    `status_id` UInt8,
    `unitype_id` UInt8,
    `unitype_name` String,
    `group_id` UUID DEFAULT '00000000-0000-0000-0000-000000000000',
    `group_name` String,
    `data` String,
    `cstate_id` UUID DEFAULT '00000000-0000-0000-0000-000000000000',
    `cstate_name` String,
    `cstate_color_id` UInt8
)
ENGINE = ReplicatedMergeTree(
    '/clickhouse/tables/{shard}/default/event_automation_replicated',
    '{replica}'
)
PARTITION BY toYYYYMMDD(pub_ts)
ORDER BY (event_id, pub_ts, sub_ts)
SETTINGS index_granularity = 8192;

Копируем данные

INSERT INTO default.event_automation_replicated
SELECT * FROM default.event_automation

4. event_confirmation_replicated

show create table default.event_confirmation
CREATE TABLE default.event_confirmation_replicated ON CLUSTER smvu2_cluster
(
    `uuid` UUID,
    `event_id` UUID,
    `reason` String DEFAULT '',
    `event_checked` Int8,
    `username` String,
    `datetime` DateTime,
    `confirmation_group` String,
    `node_id` UUID
)
ENGINE = ReplicatedMergeTree(
    '/clickhouse/tables/{shard}/default/event_confirmation_replicated',
    '{replica}'
)
PARTITION BY toYYYYMM(datetime)
ORDER BY datetime
SETTINGS index_granularity = 8192;

Копируем данные

INSERT INTO default.event_confirmation_replicated
SELECT * FROM default.event_confirmation

5. event_objects_replicated

show create table default.event_objects
CREATE TABLE default.event_objects_replicated ON CLUSTER smvu2_cluster
(
    `event_id` UUID,
    `node_id` UUID,
    `pub_ts` DateTime,
    `sub_ts` DateTime64(3, 'UTC'),
    `cstate_id` UUID,
    `source_id` UUID,
    `source_name` String DEFAULT '',
    `source_type_id` UUID,
    `event_data` String,
    `complex_id` UUID
)
ENGINE = ReplicatedMergeTree(
    '/clickhouse/tables/{shard}/default/event_objects_replicated',
    '{replica}'
)
PARTITION BY toYYYYMM(pub_ts)
PRIMARY KEY event_id
ORDER BY (event_id, pub_ts, sub_ts)
SETTINGS index_granularity = 8192;

Копируем данные

INSERT INTO default.event_objects_replicated
SELECT * FROM default.event_objects

6. infi_clickhouse_orm_migrations_replicated

show create table default.infi_clickhouse_orm_migrations
CREATE TABLE default.infi_clickhouse_orm_migrations_replicated ON CLUSTER smvu2_cluster
(
    `package_name` String,
    `module_name` String,
    `applied` Date DEFAULT '1970-01-01'
)
ENGINE = ReplicatedMergeTree(
    '/clickhouse/tables/{shard}/default/infi_clickhouse_orm_migrations_replicated',
    '{replica}'
)
PARTITION BY toYYYYMM(applied)
ORDER BY (package_name, module_name)
SETTINGS index_granularity = 8192;

Копируем данные

INSERT INTO default.infi_clickhouse_orm_migrations_replicated
SELECT * FROM default.infi_clickhouse_orm_migrations

7. unchecked_event_replicated

show create table default.unchecked_event
CREATE TABLE default.unchecked_event_replicated ON CLUSTER smvu2_cluster
(
    `event_id` UUID,
    `unipoll_id` Nullable(UUID),
    `sensor_id` Nullable(UUID),
    `sensor_type_id` Nullable(UUID),
    `sensor_type_name` Nullable(String),
    `sensor_activation_status_name` Nullable(String),
    `sensor_logic_status_id` Nullable(UUID),
    `sensor_logic_status_name` Nullable(String),
    `sensor_comment` Nullable(String),
    `command` Nullable(String),
    `priority_name` Nullable(String),
    `pub_ts` Nullable(DateTime),
    `sub_ts` DateTime64(3),
    `state_id` Nullable(UInt8),
    `state_name` Nullable(String),
    `status_id` Nullable(UInt8),
    `status_name` Nullable(String),
    `unitype_id` Nullable(UInt16),
    `unitype_name` Nullable(String),
    `system_id` Nullable(UInt16),
    `system_short_name` Nullable(String),
    `system_name` Nullable(String),
    `checked` Nullable(UInt8),
    `object_id` Nullable(UUID),
    `object_type_name` Nullable(String),
    `data_id` Nullable(UInt8),
    `data` Nullable(String),
    `region_id` Nullable(UUID),
    `region_name` Nullable(String),
    `complex_id` Nullable(UUID),
    `complex_name` Nullable(String),
    `collector_id` Nullable(UUID),
    `collector_name` Nullable(String),
    `control_room_id` Nullable(UUID),
    `control_room_name` Nullable(String),
    `picket_id` Nullable(UUID),
    `picket_name` Nullable(String),
    `picket_n` Nullable(UInt32),
    `cstate_id` Nullable(UUID),
    `cstate_name` Nullable(String),
    `cstate_color_id` Nullable(UInt8),
    `sub_ms` Nullable(UInt64),
    `linked_channel_state` Nullable(String),
    `on_guard` Nullable(UInt8),
    `internal_id` Nullable(String),
    `floor_number` Nullable(Int32),
    `sign` Int8,
    `value` Nullable(String),
    `raw_value` Nullable(String)
)
ENGINE = ReplicatedCollapsingMergeTree(
    '/clickhouse/tables/{shard}/default/unchecked_event_replicated',
    '{replica}',
    sign
    )
PARTITION BY toYYYYMM(sub_ts)
PRIMARY KEY event_id
ORDER BY event_id
SETTINGS index_granularity = 8192;

Копируем

INSERT INTO default.unchecked_event_replicated
SELECT * FROM default.unchecked_event;

8. unchecked_objects_event_replicated

show create table default.unchecked_objects_event;
CREATE TABLE default.unchecked_objects_event_replicated ON CLUSTER smvu2_cluster
(
    `event_id` UUID,
    `node_id` Nullable(UUID),
    `complex_id` Nullable(UUID),
    `pub_ts` Nullable(DateTime),
    `sub_ts` DateTime64(3, 'UTC'),
    `cstate_id` Nullable(UUID),
    `source_id` Nullable(UUID),
    `source_name` Nullable(String),
    `source_type_id` Nullable(UUID),
    `event_data` Nullable(String),
    `sign` Int8
)
ENGINE = ReplicatedCollapsingMergeTree(
    '/clickhouse/tables/{shard}/default/unchecked_objects_event_replicated',
    '{replica}',
    sign
)
PARTITION BY toYYYYMM(sub_ts)
PRIMARY KEY event_id
ORDER BY event_id
SETTINGS index_granularity = 8192;

Копируем

INSERT INTO default.unchecked_objects_event_replicated
SELECT * FROM default.unchecked_objects_event;

Проверка состояния репликации после создания:

SELECT database, table, is_leader, total_replicas, active_replicas
  FROM system.replicas
  WHERE database = 'default'

Узнаем и сравниваем сколько занимают места таблицы в базе default

SELECT
    name AS table_name,
    formatReadableSize(total_bytes) AS size_readable,
    total_rows AS rows_count
FROM system.tables
WHERE database = 'default'
ORDER BY total_bytes DESC;

Переименовываем исходные таблицы

rename table api_request_log to api_request_log_old;
rename table event to event_old;
rename table event_objects to event_objects_old;
rename table event_confirmation to event_confirmation_old;
rename table audit_new to audit_new_old;
rename table event_automation to event_automation_old;
rename table infi_clickhouse_orm_migrations to infi_clickhouse_orm_migrations_old;
rename table unchecked_event to unchecked_event_old;
rename table unchecked_objects_event to unchecked_objects_event_old;

Создаем Distributed-таблицы

CREATE TABLE default.event ON CLUSTER smvu2_cluster AS default.event_replicated 
ENGINE = Distributed(smvu2_cluster, default, api_request_log_replicated, rand());
CREATE TABLE default.event ON CLUSTER smvu2_cluster AS default.event_replicated 
ENGINE = Distributed(smvu2_cluster, default, event_replicated, rand());
CREATE TABLE default.audit_new ON CLUSTER smvu2_cluster AS default.audit_new_replicated 
ENGINE = Distributed(smvu2_cluster, default, audit_new_replicated, rand());
CREATE TABLE default.event_objects ON CLUSTER smvu2_cluster AS default.event_objects_replicated 
ENGINE = Distributed(smvu2_cluster, default, event_objects_replicated, rand());
CREATE TABLE default.event_automation ON CLUSTER smvu2_cluster AS default.event_automation_replicated 
ENGINE = Distributed(smvu2_cluster, default, event_automation_replicated, rand());
CREATE TABLE default.event_confirmation ON CLUSTER smvu2_cluster AS default.event_confirmation_replicated 
ENGINE = Distributed(smvu2_cluster, default, event_confirmation_replicated, rand());
CREATE TABLE default.infi_clickhouse_orm_migrations ON CLUSTER smvu2_cluster AS default.infi_clickhouse_orm_migrations_replicated 
ENGINE = Distributed(smvu2_cluster, default, infi_clickhouse_orm_migrations_replicated, rand());
CREATE TABLE default.unchecked_event ON CLUSTER smvu2_cluster AS default.unchecked_event_replicated 
ENGINE = Distributed(smvu2_cluster, default, unchecked_event_replicated, rand());
CREATE TABLE default.unchecked_objects_event ON CLUSTER smvu2_cluster AS default.unchecked_objects_event_replicated 
ENGINE = Distributed(smvu2_cluster, default, unchecked_objects_event_replicated, rand());

Решение проблемы размера таблицы

1. Используя ограничение отправляемых строк, прикладываю скрипт, который делает в автоматическом режиме

#!/bin/bash

# Список таблиц для репликации
TABLES=(
  api_request_log
  audit_new
  event
  event_automation
  event_confirmation
  event_objects
  infi_clickhouse_orm_migrations
)

PASSWORD="q1"
BATCH_SIZE=100000  # Размер порции для вставки
MAX_THREADS=4      # Максимальное количество потоков для вставки

for table in "${TABLES[@]}"; do
  replicated_table="${table}_replicated"
  echo "Processing $table to $replicated_table"

  # 1. Получаем общее количество строк
  total_rows=$(clickhouse-client --password "$PASSWORD" --query="
    SELECT count() FROM remote('localhost', 'default', '$table')")

  echo "Total rows to copy: $total_rows"

  # 2. Копируем данные порциями
  for ((offset=0; offset<total_rows; offset+=BATCH_SIZE)); do
    echo "Copying rows $((offset+1))-$((offset+BATCH_SIZE))..."

    clickhouse-client --password "$PASSWORD" --query="
      INSERT INTO default.$replicated_table
      SELECT * FROM remote('localhost', 'default', '$table')
      LIMIT $BATCH_SIZE OFFSET $offset
      SETTINGS
        max_insert_threads=$MAX_THREADS,
        max_memory_usage='10Gi'"

    if [ $? -ne 0 ]; then
      echo "Error copying batch $((offset/BATCH_SIZE+1))"
      break
    fi
  done

  # 3. Проверяем количество скопированных строк
  copied_rows=$(clickhouse-client --password "$PASSWORD" --query="
    SELECT count() FROM default.$replicated_table")

  echo "Successfully copied $copied_rows/$total_rows rows from $table to $replicated_table"
done

2. Копирование с помощью программы clickhouse-copier

Создается конфиг подключения к кластеру (keeper) config.xml

<yandex>
    <logger>
        <level>warning</level>
        <log>/var/log/clickhouse-server/clickhouse-copier.log</log>
        <errorlog>/var/log/clickhouse-server/clickhouse-copier.err.log</errorlog>
    </logger>

    <zookeeper>
        <!-- Для ClickHouse Keeper (рекомендуется) -->
        <node>
            <host>172.16.212.41</host>
            <port>9181</port>
        </node>
    </zookeeper>

    <distributed_ddl>
        <path>/clickhouse/task_queue/ddl</path>
    </distributed_ddl>

    <http_port>9999</http_port>
</yandex>

Создается папка tasks и там размещаются файлы для каждой таблицы, например api_request_log.xml

<yandex>
    <!-- Configuration of clusters as in an ordinary server config -->
    <remote_servers>
        <source_cluster>
            <shard>
                <internal_replication>true</internal_replication>
                    <replica>
                        <host>172.16.212.14</host>
                        <port>9000</port>
                        <user>default</user>
                        <password>q1</password>
                        <secure>0</secure>
                    </replica>
            </shard>
        </source_cluster>

        <destination_cluster>
            <shard>
                <internal_replication>true</internal_replication>
                    <replica>
                        <host>172.16.212.14</host>
                        <port>9000</port>
                        <user>default</user>
                        <password>q1</password>
                        <secure>0</secure>
                    </replica>
            </shard>
        </destination_cluster>
    </remote_servers>

    <!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
    <max_workers>2</max_workers>

    <!-- Setting used to fetch (pull) data from source cluster tables -->
    <settings_pull>
        <readonly>1</readonly>
    </settings_pull>

    <!-- Setting used to insert (push) data to destination cluster tables -->
    <settings_push>
        <readonly>0</readonly>
    </settings_push>

    <!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
         They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
    <settings>
        <connect_timeout>3</connect_timeout>
        <!-- Sync insert is set forcibly, leave it here just in case. -->
        <insert_distributed_sync>1</insert_distributed_sync>
    </settings>

    <tables>
        <!-- A table task, copies one table. -->
        <event>
            <!-- Source cluster name (from <remote_servers/> section) and tables in it that should be copied -->
            <cluster_pull>source_cluster</cluster_pull>
            <database_pull>default</database_pull>
            <table_pull>api_request_log</table_pull>

            <!-- Destination cluster name and tables in which the data should be inserted -->
            <cluster_push>destination_cluster</cluster_push>
            <database_push>default</database_push>
            <table_push>api_request_log_replicated</table_push>

            <engine>
                ENGINE = ReplicatedReplacingMergeTree(
                    '/clickhouse/tables/{shard}/default/api_request_log_replicated',
                    '{replica}',
                    ver
                )
                PARTITION BY toYYYYMM(request_ts)
                ORDER BY uuid
                SETTINGS index_granularity = 8192;
            </engine>
            <sharding_key>rand()</sharding_key>
        </event>
    </tables>
</yandex>

В каждом файле меняется название базы в разделе tables После этого запускается копирование, где --task-path должен быть уникальным для каждой таблицы

clickhouse-copier --config config.xml --task-file ./tasks/api_request_log.xml --task-path /clickhouse/copier/task1