Welcome to my personal place for love, peace and happiness 🤖

🚀 Создание почтиReal-Time Data Lake: Быстрая миграция данных в Apache Iceberg или Parquet

Сегодня Gemini 3.1 Pro Preview расскажет свое мненИИе))

Связывание транзакционных баз (PostgreSQL) и аналитических хранилищ (ClickHouse) через прямые агрегации и `JOIN` часто приводит к жесточайшим блокировкам и деградации продакшена. Когда бизнес требует быстрый результат, а внедрение полноценного CDC (Debezium + Kafka) откладывается из-за сроков и сложности, лучшим решением становится пакетная и микро-пакетная выгрузка данных в озеро (в форматы Parquet и Apache Iceberg).

С точки зрения архитектуры, наша главная цель — минимизировать время загрузки данных T load и усилия инженеров на развертывание E setup. Наша целевая функция: min(T load × E setup)

В этой статье собраны исключительно рабочие, протестированные подходы для быстрой интеграции с озером данных (Data Lake) и аналитическим движком Trino.


🐘 1. Экспорт данных из PostgreSQL: Проверенные инструменты

Мы полностью исключаем создание и восстановление тяжелых дампов (`pg_dump`). Вся транзитная нагрузка ложится исключительно на асинхронные реплики.

🌟 Подход А: Движок OLake (Самый быстрый старт в Iceberg)

Для задачи “результат нужен вчера и без сложного стека” идеально подходит OLake. Это высокопроизводительный движок репликации баз данных напрямую в Apache Iceberg (или Parquet), минуя промежуточные шины сообщений.

Шаг 1. Запуск сервиса (конфигурация `docker-compose.yml`):

version: '3.8'
services:
  olake:
    image: olakeio/olake:latest
    ports:
      - "8080:8080"
    environment:
      # Настройки доступов к вашему S3/MinIO
      - AWS_ACCESS_KEY_ID=your_access_key
      - AWS_SECRET_ACCESS_KEY=your_secret_key
      - AWS_REGION=us-east-1

Шаг 2. Запуск репликации:
Вы отправляете JSON-манифест в OLake (через UI или REST API). Движок самостоятельно делает первоначальный слепок PostgreSQL (Full Load со скоростью до 580K RPS), а затем переключается на чтение инкрементов (CDC):

{
  "pipeline_name": "pg_to_iceberg_fast",
  "source": {
    "type": "postgres",
    "connection_url": "postgresql://readonly_user:password@replica_host:5432/prod_db",
    "tables": ["public.customer", "public.orders"]
  },
  "destination": {
    "type": "iceberg",
    "catalog_type": "rest",
    "catalog_uri": "http://iceberg-rest:8181",
    "warehouse_path": "s3://my-datalake/warehouse/"
  },
  "replication_mode": "full_and_cdc"
}

🐍 Подход Б: DuckDB (Легковесная скриптовая выгрузка)

Если вы хотите управлять выгрузкой через свои `cron`-задачи или Airflow, идеальным инструментом выступает аналитическая in-memory СУБД DuckDB. Ниже приведен протестированный Python-скрипт, который напрямую подключается к реплике и потоково перегоняет данные в Parquet на S3.

Рабочий скрипт на Python (`export_to_lake.py`):

import duckdb

# Открываем in-memory соединение DuckDB
con = duckdb.connect()

# 1. Устанавливаем и загружаем необходимые расширения
con.execute("INSTALL postgres;")
con.execute("INSTALL httpfs;")
con.execute("LOAD postgres;")
con.execute("LOAD httpfs;")

# 2. Настраиваем подключение к объектному хранилищу
con.execute("""
    SET s3_region='us-east-1';
    SET s3_access_key_id='YOUR_KEY';
    SET s3_secret_access_key='YOUR_SECRET';
    SET s3_endpoint='s3.your-domain.com';
""")

# 3. Подключаемся к реплике PostgreSQL
# Команда ATTACH монтирует Postgres прямо в DuckDB под именем 'pg'
con.execute("""
    ATTACH 'host=replica_host port=5432 dbname=postgres user=postgres password=password' 
    AS pg (TYPE postgres);
""")

# 4. Копируем таблицу public.customer в S3 в сжатом формате Parquet
con.execute("""
    COPY pg.public.customer
    TO 's3://my-datalake/raw/customer.parquet' 
    (FORMAT PARQUET, COMPRESSION ZSTD);
""")

print("Выгрузка в Data Lake успешно завершена!")

🖱️ 2. Унификация аналитики с ClickHouse

Данные из ClickHouse также необходимо перегружать в Озеро (для Trino), чтобы избежать дублирования логики таблиц и нагрузки на саму СУБД тяжелыми сторонними `JOIN`-ами.

🛠 Базовый подход: Нативная табличная функция S3

Самый простой и не требующий дополнительной инфраструктуры способ — использовать встроенную функцию `s3()`. Она позволяет в один SQL-запрос отправить результат выборки прямо в объектное хранилище в нужном формате.

Пример выгрузки из ClickHouse в Parquet (выполняется в `clickhouse-client`):

-- Прямая вставка данных из локальной MergeTree таблицы в файл Parquet на S3
INSERT INTO FUNCTION s3(
    'https://s3.us-east-1.amazonaws.com/my-datalake/raw/clickhouse_export/events_{_partition_id}.parquet',
    'YOUR_KEY',
    'YOUR_SECRET',
    'Parquet'
)
SELECT id, event_type, payload, event_date
FROM local_events_mergetree
WHERE event_date = today();

*Совет: Используйте макрос `{_partition_id}` в пути файла для автоматического разбиения больших выгрузок.*

🌊 Продвинутый подход: Project Antalya (ClickHouse + Iceberg)

Для построения архитектуры на десятилетие вперед разработчики из Altinity создали сборку Project Antalya. Она позволяет использовать таблицы Iceberg в S3 как *полноценное разделяемое хранилище*, работающее со скоростью локального диска, но обходящееся в 10 раз дешевле.

Пример прозрачного монтирования:

-- 1. Подключаем готовую Iceberg-таблицу прямо как движок ClickHouse
CREATE TABLE iceberg_customer
ENGINE = Iceberg('s3://my-datalake/warehouse/customer', 'aws_key', 'aws_secret');

-- 2. Запрашиваем данные. Теперь Trino и ClickHouse читают одни и те же Parquet-файлы!
SELECT count(*) FROM iceberg_customer WHERE status = 'active';

⚠️ Решение частых проблем при транзите данных (Troubleshooting)


1. Управление оперативной памятью (OOM) в DuckDB
При скриптовой выгрузке гигантских таблиц in-memory движок может исчерпать RAM сервера.
Решение: Обязательно ограничивайте ресурсы сразу после

duckdb.connect()

:

con.execute("PRAGMA memory_limit='16GB'")
con.execute("PRAGMA threads=4")


2. Консолидация сложных типов данных PostgreSQL
Если в вашей таблице есть

JSONB

,

UUID

или пользовательские массивы, Parquet может упасть с ошибкой соответствия типов.
Решение: Вместо

COPY pg.table

напишите явный SQL-запрос с приведением к строке (

::VARCHAR

):

con.execute("""
    COPY (
        SELECT id, metadata::VARCHAR AS metadata 
        FROM pg.public.customer
    )
    TO 's3://my-datalake/raw/customer.parquet' (FORMAT PARQUET);
""")

Внутри Trino эти строки легко парсятся функциями вроде

json_extract()

.


3. Защита асинхронных реплик PostgreSQL от разрывов
Длительный процесс

SELECT *

(или

COPY

) мешает мастеру применять WAL-логи на реплике (из-за очистки строк VACUUM-ом).
Решение: На аналитической реплике (в файле

postgresql.conf

) обязательно пропишите:

max_standby_streaming_delay = -1
max_standby_archive_delay = -1
hot_standby_feedback = on

Это позволит реплике “ставить на паузу” конфликтующие обновления и не обрывать ваш транзит данных.


🎯 План внедрения (Roadmap)

  1. Мгновенный результат (Первые 1-3 дня): Используйте проверенный Python-скрипт на DuckDB для баз PostgreSQL и классическую функцию `s3()` для ClickHouse. Они перенесут исторические таблицы в Parquet на S3 без внесения изменений в инфраструктуру. Trino сразу увидит эти файлы.
  2. Системный подход (1-2 недели): Разверните OLake. Потратив пару часов на конфигурацию манифестов, вы получите автоматический конвейер инкрементальной загрузки, который напрямую питает ваши Iceberg-каталоги.
  3. Объединение аналитики (2-4 недели): Начните использовать Project Antalya, чтобы обогатить озеро горячими данными ClickHouse, избегая дублирования.
  4. Окончательная эволюция: Когда бизнес-пожар потушен и аналитики получают данные в приемлемые сроки (T lag < 1 часа), вы можете спокойно внедрить **Debezium + Kafka**. Но делать это стоит только для узкого сегмента сверхкритичных таблиц, где аналитика требуется в строгом Real-Time.

Часть 2 – Интеграция PostgreSQL, Trino и Iceberg

Эффективный ELT: Интеграция PostgreSQL, Trino и Iceberg (сравнение подходов Table Functions и pg_lake)

В современных data-архитектурах часто возникает задача переноса реляционных данных в озера данных (Data Lakes). Если ваш стек включает PostgreSQL, Trino и Iceberg (например, с REST-каталогом Lakekeeper), возникает архитектурный вопрос: как переносить данные и обращаться к ним максимально эффективно?

В этой статье мы разберем два мощных подхода: использование “нативного” для Trino проталкивания через `system.query()` и применение расширения `pg_lake` на стороне базы данных.


Проблема: Почему Trino иногда “вытягивает” всю таблицу?

Обычно в Trino мы пишем простой федеративный запрос:

SELECT * FROM postgres_catalog.public.customer WHERE acctbal > 1000;

В идеальном сценарии оптимизатор Trino считывает предикат (`acctbal > 1000`) и транслирует его в SQL-диалект PostgreSQL. Это называется Pushdown (проталкивание).

Но на практике аналитические запросы гораздо сложнее. Если запрос содержит специфичную бизнес-логику, нестандартные оконные функции, сложные JOIN-ы или функции обработки строк, которых нет в базовом словаре коннектора Trino, оптимизатор не сможет транслировать этот кусок SQL. В результате Trino принимает решение скачать всю таблицу в память своих воркеров и применить фильтрацию уже там.

Как работает Dynamic Filtering в Trino и почему он может не сработать (Детали)

Особую роль при JOIN-ах играет механизм динамической фильтрации (Dynamic Filtering). Когда вы джоините большую таблицу из Postgres с маленькой таблицей (например, справочником из Hive/Iceberg), Trino сначала читает справочник (Build side), извлекает ключи, формирует SQL-фильтр (например, `IN (1, 2, 3)`) и на лету отправляет его в Postgres (Probe side).

Два критичных параметра в конфигурации коннектора управляют этим процессом:

  • `dynamic-filtering.enabled`: Включает передачу динамических фильтров в JDBC-запросы (по умолчанию `true`).
  • `dynamic-filtering.wait-timeout`: Максимальное время, которое Trino ждет сбора фильтров из Build-стороны JOIN-а перед тем, как запустить запрос в JDBC. По умолчанию это `20s`.

В чем кроется опасность?
Если вычисление справочника на стороне Trino занимает больше времени, чем задано в `dynamic-filtering.wait-timeout` (например, 25 секунд против 20), координатор Trino прерывает ожидание. Чтобы не блокировать выполнение, он отправляет в Postgres “голый” запрос: `SELECT * FROM table`.
Вместо пары тысяч строк по сети внезапно начинают передаваться миллионы. Если загрузка сети — B, а объем таблицы PostgreSQL — V total, то время выполнения стремится к: T pull = B V total
что может привести к Out-of-Memory на воркерах Trino и падению кластера.


Решение 1: Полный Pushdown через `system.query` (Для ELT-оркестрации)

Чтобы гарантировать, что вычисления и фильтры 100% выполнятся на мощностях PostgreSQL, мы можем использовать специальную табличную функцию `system.query()`.

Этот подход разделяет обязанности: PostgreSQL занимается фильтрацией и тяжелой математикой локально, а Trino просто оркестрирует запись результата в Parquet/Iceberg.

-- Создаем таблицу в Iceberg (Lakekeeper) и наполняем её результатами из Postgres
CREATE TABLE iceberg_catalog.raw_data.customer_metrics WITH (
    format = 'PARQUET',
    partitioning = ARRAY['mktsegment']
) AS 
SELECT
    *
FROM
    TABLE(
        postgres_catalog.system.query(
            query => '
                -- Этот SQL выполняется СТРОГО внутри PostgreSQL
                SELECT 
                    custkey, 
                    name, 
                    mktsegment,
                    acctbal,
                    array_agg(acctbal) OVER (
                        PARTITION BY mktsegment 
                        ORDER BY custkey 
                        ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING
                        EXCLUDE GROUP
                    ) AS rolling_bals
                FROM public.customer
                WHERE acctbal > 1000 
                  AND created_at >= current_date - interval ''1 month''
            '
        )
    );

Преимущество: Если селективность нашего фильтра S равна 0.05 (остается 5% строк), то объем передаваемых по сети данных составит строго V total \ times S. Никакие таймауты Trino не заставят Postgres отдать лишние данные.


Решение 2: Использование `pg_lake` (Для концепции Lakehouse в PostgreSQL)

Если первый метод идеально подходит для использования Trino как движка трансформации, то зачем вообще существует проект `pg_lake`?

`pg_lake` внедряет под капот PostgreSQL движок DuckDB через `pgduck_server`. Это позволяет базе данных самостоятельно подключаться к S3 и читать/писать формат Iceberg, минуя Trino.

В чем выгода использования

pg_lake

прямо в PostgreSQL?

  1. Чтение ледяных архивов (Cold Data) без сторонних движков.
    Допустим, вы переносите старые партиции данных из Postgres в Iceberg (S3) для экономии места. С `pg_lake` база Postgres “учится” читать эти архивы. Вы можете написать обычный запрос в вашем любимом клиенте (DBeaver, DataGrip, pgAdmin):
-- Объединение горячих данных из кучи (heap) PG и холодных данных из Iceberg
SELECT * FROM public.orders_current
UNION ALL
SELECT * FROM iceberg.orders_archive WHERE order_date < '2023-01-01';
  1. Работа в родном диалекте PostgreSQL.
    Если ваши аналитики и приложения жестко завязаны на специфические функции PostgreSQL (например, PostGIS для геоданных или сложные хранимые процедуры PL/pgSQL), интеграция с `pg_lake` позволяет анализировать гигантские внешние Iceberg-файлы, используя всю мощь экосистемы PG, без необходимости переписывать SQL-код под диалект Trino.
  1. Меньше точек отказа.
    Для небольших команд, которым не нужна горизонтальная масштабируемость Trino, установка `pg_lake` позволяет построить Data Lake вообще без развертывания отдельного аналитического кластера. Postgres сам выполняет COPY-команды в S3.

Итог итогов

  • Используйте `system.query()` в Trino, если ваша цель — построить надежный, масштабируемый процесс выгрузки (ELT). Это самый безопасный паттерн: он разгружает сеть платформы данных, защищает от капризов динамической фильтрации и оставляет сервер БД свободным от сторонних плагинов.
  • Используйте `pg_lake`, если ваша бизнес-потребность — позволить самому PostgreSQL прозрачно обращаться к Data Lake. Это идеальное решение для архивации холодных данных прямо из СУБД или если ваши процессы глубоко интегрированы с инструментами, понимающими только нативный протокол Postgres.
Follow this blog
Send
Share
Tweet