Welcome to my personal place for love, peace and happiness 🤖

🚀 Создание почтиReal-Time Data Lake: Быстрая миграция данных в Apache Iceberg или Parquet

Сегодня Gemini 3.1 Pro Preview расскажет свое мненИИе))

Связывание транзакционных баз (PostgreSQL) и аналитических хранилищ (ClickHouse) через прямые агрегации и `JOIN` часто приводит к жесточайшим блокировкам и деградации продакшена. Когда бизнес требует быстрый результат, а внедрение полноценного CDC (Debezium + Kafka) откладывается из-за сроков и сложности, лучшим решением становится пакетная и микро-пакетная выгрузка данных в озеро (в форматы Parquet и Apache Iceberg).

С точки зрения архитектуры, наша главная цель — минимизировать время загрузки данных T load и усилия инженеров на развертывание E setup. Наша целевая функция: min(T load × E setup)

В этой статье собраны исключительно рабочие, протестированные подходы для быстрой интеграции с озером данных (Data Lake) и аналитическим движком Trino.


🐘 1. Экспорт данных из PostgreSQL: Проверенные инструменты

Мы полностью исключаем создание и восстановление тяжелых дампов (`pg_dump`). Вся транзитная нагрузка ложится исключительно на асинхронные реплики.

🌟 Подход А: Движок OLake (Самый быстрый старт в Iceberg)

Для задачи “результат нужен вчера и без сложного стека” идеально подходит OLake. Это высокопроизводительный движок репликации баз данных напрямую в Apache Iceberg (или Parquet), минуя промежуточные шины сообщений.

Шаг 1. Запуск сервиса (конфигурация `docker-compose.yml`):

version: '3.8'
services:
  olake:
    image: olakeio/olake:latest
    ports:
      - "8080:8080"
    environment:
      # Настройки доступов к вашему S3/MinIO
      - AWS_ACCESS_KEY_ID=your_access_key
      - AWS_SECRET_ACCESS_KEY=your_secret_key
      - AWS_REGION=us-east-1

Шаг 2. Запуск репликации:
Вы отправляете JSON-манифест в OLake (через UI или REST API). Движок самостоятельно делает первоначальный слепок PostgreSQL (Full Load со скоростью до 580K RPS), а затем переключается на чтение инкрементов (CDC):

{
  "pipeline_name": "pg_to_iceberg_fast",
  "source": {
    "type": "postgres",
    "connection_url": "postgresql://readonly_user:password@replica_host:5432/prod_db",
    "tables": ["public.customer", "public.orders"]
  },
  "destination": {
    "type": "iceberg",
    "catalog_type": "rest",
    "catalog_uri": "http://iceberg-rest:8181",
    "warehouse_path": "s3://my-datalake/warehouse/"
  },
  "replication_mode": "full_and_cdc"
}

🐍 Подход Б: DuckDB (Легковесная скриптовая выгрузка)

Если вы хотите управлять выгрузкой через свои `cron`-задачи или Airflow, идеальным инструментом выступает аналитическая in-memory СУБД DuckDB. Ниже приведен протестированный Python-скрипт, который напрямую подключается к реплике и потоково перегоняет данные в Parquet на S3.

Рабочий скрипт на Python (`export_to_lake.py`):

import duckdb

# Открываем in-memory соединение DuckDB
con = duckdb.connect()

# 1. Устанавливаем и загружаем необходимые расширения
con.execute("INSTALL postgres;")
con.execute("INSTALL httpfs;")
con.execute("LOAD postgres;")
con.execute("LOAD httpfs;")

# 2. Настраиваем подключение к объектному хранилищу
con.execute("""
    SET s3_region='us-east-1';
    SET s3_access_key_id='YOUR_KEY';
    SET s3_secret_access_key='YOUR_SECRET';
    SET s3_endpoint='s3.your-domain.com';
""")

# 3. Подключаемся к реплике PostgreSQL
# Команда ATTACH монтирует Postgres прямо в DuckDB под именем 'pg'
con.execute("""
    ATTACH 'host=replica_host port=5432 dbname=postgres user=postgres password=password' 
    AS pg (TYPE postgres);
""")

# 4. Копируем таблицу public.customer в S3 в сжатом формате Parquet
con.execute("""
    COPY pg.public.customer
    TO 's3://my-datalake/raw/customer.parquet' 
    (FORMAT PARQUET, COMPRESSION ZSTD);
""")

print("Выгрузка в Data Lake успешно завершена!")

🖱️ 2. Унификация аналитики с ClickHouse

Данные из ClickHouse также необходимо перегружать в Озеро (для Trino), чтобы избежать дублирования логики таблиц и нагрузки на саму СУБД тяжелыми сторонними `JOIN`-ами.

🛠 Базовый подход: Нативная табличная функция S3

Самый простой и не требующий дополнительной инфраструктуры способ — использовать встроенную функцию `s3()`. Она позволяет в один SQL-запрос отправить результат выборки прямо в объектное хранилище в нужном формате.

Пример выгрузки из ClickHouse в Parquet (выполняется в `clickhouse-client`):

-- Прямая вставка данных из локальной MergeTree таблицы в файл Parquet на S3
INSERT INTO FUNCTION s3(
    'https://s3.us-east-1.amazonaws.com/my-datalake/raw/clickhouse_export/events_{_partition_id}.parquet',
    'YOUR_KEY',
    'YOUR_SECRET',
    'Parquet'
)
SELECT id, event_type, payload, event_date
FROM local_events_mergetree
WHERE event_date = today();

*Совет: Используйте макрос `{_partition_id}` в пути файла для автоматического разбиения больших выгрузок.*

🌊 Продвинутый подход: Project Antalya (ClickHouse + Iceberg)

Для построения архитектуры на десятилетие вперед разработчики из Altinity создали сборку Project Antalya. Она позволяет использовать таблицы Iceberg в S3 как *полноценное разделяемое хранилище*, работающее со скоростью локального диска, но обходящееся в 10 раз дешевле.

Пример прозрачного монтирования:

-- 1. Подключаем готовую Iceberg-таблицу прямо как движок ClickHouse
CREATE TABLE iceberg_customer
ENGINE = Iceberg('s3://my-datalake/warehouse/customer', 'aws_key', 'aws_secret');

-- 2. Запрашиваем данные. Теперь Trino и ClickHouse читают одни и те же Parquet-файлы!
SELECT count(*) FROM iceberg_customer WHERE status = 'active';

⚠️ Решение частых проблем при транзите данных (Troubleshooting)


1. Управление оперативной памятью (OOM) в DuckDB
При скриптовой выгрузке гигантских таблиц in-memory движок может исчерпать RAM сервера.
Решение: Обязательно ограничивайте ресурсы сразу после

duckdb.connect()

:

con.execute("PRAGMA memory_limit='16GB'")
con.execute("PRAGMA threads=4")


2. Консолидация сложных типов данных PostgreSQL
Если в вашей таблице есть

JSONB

,

UUID

или пользовательские массивы, Parquet может упасть с ошибкой соответствия типов.
Решение: Вместо

COPY pg.table

напишите явный SQL-запрос с приведением к строке (

::VARCHAR

):

con.execute("""
    COPY (
        SELECT id, metadata::VARCHAR AS metadata 
        FROM pg.public.customer
    )
    TO 's3://my-datalake/raw/customer.parquet' (FORMAT PARQUET);
""")

Внутри Trino эти строки легко парсятся функциями вроде

json_extract()

.


3. Защита асинхронных реплик PostgreSQL от разрывов
Длительный процесс

SELECT *

(или

COPY

) мешает мастеру применять WAL-логи на реплике (из-за очистки строк VACUUM-ом).
Решение: На аналитической реплике (в файле

postgresql.conf

) обязательно пропишите:

max_standby_streaming_delay = -1
max_standby_archive_delay = -1
hot_standby_feedback = on

Это позволит реплике “ставить на паузу” конфликтующие обновления и не обрывать ваш транзит данных.


🎯 План внедрения (Roadmap)

  1. Мгновенный результат (Первые 1-3 дня): Используйте проверенный Python-скрипт на DuckDB для баз PostgreSQL и классическую функцию `s3()` для ClickHouse. Они перенесут исторические таблицы в Parquet на S3 без внесения изменений в инфраструктуру. Trino сразу увидит эти файлы.
  2. Системный подход (1-2 недели): Разверните OLake. Потратив пару часов на конфигурацию манифестов, вы получите автоматический конвейер инкрементальной загрузки, который напрямую питает ваши Iceberg-каталоги.
  3. Объединение аналитики (2-4 недели): Начните использовать Project Antalya, чтобы обогатить озеро горячими данными ClickHouse, избегая дублирования.
  4. Окончательная эволюция: Когда бизнес-пожар потушен и аналитики получают данные в приемлемые сроки (T lag < 1 часа), вы можете спокойно внедрить **Debezium + Kafka**. Но делать это стоит только для узкого сегмента сверхкритичных таблиц, где аналитика требуется в строгом Real-Time.
Follow this blog
Send
Share
Tweet