🚀 Создание почтиReal-Time Data Lake: Быстрая миграция данных в Apache Iceberg или Parquet
Сегодня Gemini 3.1 Pro Preview расскажет свое мненИИе))
Связывание транзакционных баз (PostgreSQL) и аналитических хранилищ (ClickHouse) через прямые агрегации и `JOIN` часто приводит к жесточайшим блокировкам и деградации продакшена. Когда бизнес требует быстрый результат, а внедрение полноценного CDC (Debezium + Kafka) откладывается из-за сроков и сложности, лучшим решением становится пакетная и микро-пакетная выгрузка данных в озеро (в форматы Parquet и Apache Iceberg).
С точки зрения архитектуры, наша главная цель — минимизировать время загрузки данных T load и усилия инженеров на развертывание E setup. Наша целевая функция: min(T load × E setup)
В этой статье собраны исключительно рабочие, протестированные подходы для быстрой интеграции с озером данных (Data Lake) и аналитическим движком Trino.
🐘 1. Экспорт данных из PostgreSQL: Проверенные инструменты
Мы полностью исключаем создание и восстановление тяжелых дампов (`pg_dump`). Вся транзитная нагрузка ложится исключительно на асинхронные реплики.
🌟 Подход А: Движок OLake (Самый быстрый старт в Iceberg)
Для задачи “результат нужен вчера и без сложного стека” идеально подходит OLake. Это высокопроизводительный движок репликации баз данных напрямую в Apache Iceberg (или Parquet), минуя промежуточные шины сообщений.
Шаг 1. Запуск сервиса (конфигурация `docker-compose.yml`):
version: '3.8'
services:
olake:
image: olakeio/olake:latest
ports:
- "8080:8080"
environment:
# Настройки доступов к вашему S3/MinIO
- AWS_ACCESS_KEY_ID=your_access_key
- AWS_SECRET_ACCESS_KEY=your_secret_key
- AWS_REGION=us-east-1Шаг 2. Запуск репликации:
Вы отправляете JSON-манифест в OLake (через UI или REST API). Движок самостоятельно делает первоначальный слепок PostgreSQL (Full Load со скоростью до 580K RPS), а затем переключается на чтение инкрементов (CDC):
{
"pipeline_name": "pg_to_iceberg_fast",
"source": {
"type": "postgres",
"connection_url": "postgresql://readonly_user:password@replica_host:5432/prod_db",
"tables": ["public.customer", "public.orders"]
},
"destination": {
"type": "iceberg",
"catalog_type": "rest",
"catalog_uri": "http://iceberg-rest:8181",
"warehouse_path": "s3://my-datalake/warehouse/"
},
"replication_mode": "full_and_cdc"
}🐍 Подход Б: DuckDB (Легковесная скриптовая выгрузка)
Если вы хотите управлять выгрузкой через свои `cron`-задачи или Airflow, идеальным инструментом выступает аналитическая in-memory СУБД DuckDB. Ниже приведен протестированный Python-скрипт, который напрямую подключается к реплике и потоково перегоняет данные в Parquet на S3.
Рабочий скрипт на Python (`export_to_lake.py`):
import duckdb
# Открываем in-memory соединение DuckDB
con = duckdb.connect()
# 1. Устанавливаем и загружаем необходимые расширения
con.execute("INSTALL postgres;")
con.execute("INSTALL httpfs;")
con.execute("LOAD postgres;")
con.execute("LOAD httpfs;")
# 2. Настраиваем подключение к объектному хранилищу
con.execute("""
SET s3_region='us-east-1';
SET s3_access_key_id='YOUR_KEY';
SET s3_secret_access_key='YOUR_SECRET';
SET s3_endpoint='s3.your-domain.com';
""")
# 3. Подключаемся к реплике PostgreSQL
# Команда ATTACH монтирует Postgres прямо в DuckDB под именем 'pg'
con.execute("""
ATTACH 'host=replica_host port=5432 dbname=postgres user=postgres password=password'
AS pg (TYPE postgres);
""")
# 4. Копируем таблицу public.customer в S3 в сжатом формате Parquet
con.execute("""
COPY pg.public.customer
TO 's3://my-datalake/raw/customer.parquet'
(FORMAT PARQUET, COMPRESSION ZSTD);
""")
print("Выгрузка в Data Lake успешно завершена!")🖱️ 2. Унификация аналитики с ClickHouse
Данные из ClickHouse также необходимо перегружать в Озеро (для Trino), чтобы избежать дублирования логики таблиц и нагрузки на саму СУБД тяжелыми сторонними `JOIN`-ами.
🛠 Базовый подход: Нативная табличная функция S3
Самый простой и не требующий дополнительной инфраструктуры способ — использовать встроенную функцию `s3()`. Она позволяет в один SQL-запрос отправить результат выборки прямо в объектное хранилище в нужном формате.
Пример выгрузки из ClickHouse в Parquet (выполняется в `clickhouse-client`):
-- Прямая вставка данных из локальной MergeTree таблицы в файл Parquet на S3
INSERT INTO FUNCTION s3(
'https://s3.us-east-1.amazonaws.com/my-datalake/raw/clickhouse_export/events_{_partition_id}.parquet',
'YOUR_KEY',
'YOUR_SECRET',
'Parquet'
)
SELECT id, event_type, payload, event_date
FROM local_events_mergetree
WHERE event_date = today();*Совет: Используйте макрос `{_partition_id}` в пути файла для автоматического разбиения больших выгрузок.*
🌊 Продвинутый подход: Project Antalya (ClickHouse + Iceberg)
Для построения архитектуры на десятилетие вперед разработчики из Altinity создали сборку Project Antalya. Она позволяет использовать таблицы Iceberg в S3 как *полноценное разделяемое хранилище*, работающее со скоростью локального диска, но обходящееся в 10 раз дешевле.
Пример прозрачного монтирования:
-- 1. Подключаем готовую Iceberg-таблицу прямо как движок ClickHouse
CREATE TABLE iceberg_customer
ENGINE = Iceberg('s3://my-datalake/warehouse/customer', 'aws_key', 'aws_secret');
-- 2. Запрашиваем данные. Теперь Trino и ClickHouse читают одни и те же Parquet-файлы!
SELECT count(*) FROM iceberg_customer WHERE status = 'active';⚠️ Решение частых проблем при транзите данных (Troubleshooting)
1. Управление оперативной памятью (OOM) в DuckDB
При скриптовой выгрузке гигантских таблиц in-memory движок может исчерпать RAM сервера.
Решение: Обязательно ограничивайте ресурсы сразу после
duckdb.connect():
con.execute("PRAGMA memory_limit='16GB'")
con.execute("PRAGMA threads=4")
2. Консолидация сложных типов данных PostgreSQL
Если в вашей таблице есть
JSONB,
UUIDили пользовательские массивы, Parquet может упасть с ошибкой соответствия типов.
Решение: Вместо
COPY pg.tableнапишите явный SQL-запрос с приведением к строке (
::VARCHAR):
con.execute("""
COPY (
SELECT id, metadata::VARCHAR AS metadata
FROM pg.public.customer
)
TO 's3://my-datalake/raw/customer.parquet' (FORMAT PARQUET);
""")Внутри Trino эти строки легко парсятся функциями вроде
json_extract().
3. Защита асинхронных реплик PostgreSQL от разрывов
Длительный процесс
SELECT *(или
COPY) мешает мастеру применять WAL-логи на реплике (из-за очистки строк VACUUM-ом).
Решение: На аналитической реплике (в файле
postgresql.conf) обязательно пропишите:
max_standby_streaming_delay = -1
max_standby_archive_delay = -1
hot_standby_feedback = onЭто позволит реплике “ставить на паузу” конфликтующие обновления и не обрывать ваш транзит данных.
🎯 План внедрения (Roadmap)
- Мгновенный результат (Первые 1-3 дня): Используйте проверенный Python-скрипт на DuckDB для баз PostgreSQL и классическую функцию `s3()` для ClickHouse. Они перенесут исторические таблицы в Parquet на S3 без внесения изменений в инфраструктуру. Trino сразу увидит эти файлы.
- Системный подход (1-2 недели): Разверните OLake. Потратив пару часов на конфигурацию манифестов, вы получите автоматический конвейер инкрементальной загрузки, который напрямую питает ваши Iceberg-каталоги.
- Объединение аналитики (2-4 недели): Начните использовать Project Antalya, чтобы обогатить озеро горячими данными ClickHouse, избегая дублирования.
- Окончательная эволюция: Когда бизнес-пожар потушен и аналитики получают данные в приемлемые сроки (T lag < 1 часа), вы можете спокойно внедрить **Debezium + Kafka**. Но делать это стоит только для узкого сегмента сверхкритичных таблиц, где аналитика требуется в строгом Real-Time.
Часть 2 – Интеграция PostgreSQL, Trino и Iceberg
Эффективный ELT: Интеграция PostgreSQL, Trino и Iceberg (сравнение подходов Table Functions и pg_lake)
В современных data-архитектурах часто возникает задача переноса реляционных данных в озера данных (Data Lakes). Если ваш стек включает PostgreSQL, Trino и Iceberg (например, с REST-каталогом Lakekeeper), возникает архитектурный вопрос: как переносить данные и обращаться к ним максимально эффективно?
В этой статье мы разберем два мощных подхода: использование “нативного” для Trino проталкивания через `system.query()` и применение расширения `pg_lake` на стороне базы данных.
Проблема: Почему Trino иногда “вытягивает” всю таблицу?
Обычно в Trino мы пишем простой федеративный запрос:
SELECT * FROM postgres_catalog.public.customer WHERE acctbal > 1000;В идеальном сценарии оптимизатор Trino считывает предикат (`acctbal > 1000`) и транслирует его в SQL-диалект PostgreSQL. Это называется Pushdown (проталкивание).
Но на практике аналитические запросы гораздо сложнее. Если запрос содержит специфичную бизнес-логику, нестандартные оконные функции, сложные JOIN-ы или функции обработки строк, которых нет в базовом словаре коннектора Trino, оптимизатор не сможет транслировать этот кусок SQL. В результате Trino принимает решение скачать всю таблицу в память своих воркеров и применить фильтрацию уже там.
Особую роль при JOIN-ах играет механизм динамической фильтрации (Dynamic Filtering). Когда вы джоините большую таблицу из Postgres с маленькой таблицей (например, справочником из Hive/Iceberg), Trino сначала читает справочник (Build side), извлекает ключи, формирует SQL-фильтр (например, `IN (1, 2, 3)`) и на лету отправляет его в Postgres (Probe side).
Два критичных параметра в конфигурации коннектора управляют этим процессом:
- `dynamic-filtering.enabled`: Включает передачу динамических фильтров в JDBC-запросы (по умолчанию `true`).
- `dynamic-filtering.wait-timeout`: Максимальное время, которое Trino ждет сбора фильтров из Build-стороны JOIN-а перед тем, как запустить запрос в JDBC. По умолчанию это `20s`.
В чем кроется опасность?
Если вычисление справочника на стороне Trino занимает больше времени, чем задано в `dynamic-filtering.wait-timeout` (например, 25 секунд против 20), координатор Trino прерывает ожидание. Чтобы не блокировать выполнение, он отправляет в Postgres “голый” запрос: `SELECT * FROM table`.
Вместо пары тысяч строк по сети внезапно начинают передаваться миллионы. Если загрузка сети — B, а объем таблицы PostgreSQL — V total, то время выполнения стремится к: T pull = B V total
что может привести к Out-of-Memory на воркерах Trino и падению кластера.
Решение 1: Полный Pushdown через `system.query` (Для ELT-оркестрации)
Чтобы гарантировать, что вычисления и фильтры 100% выполнятся на мощностях PostgreSQL, мы можем использовать специальную табличную функцию `system.query()`.
Этот подход разделяет обязанности: PostgreSQL занимается фильтрацией и тяжелой математикой локально, а Trino просто оркестрирует запись результата в Parquet/Iceberg.
-- Создаем таблицу в Iceberg (Lakekeeper) и наполняем её результатами из Postgres
CREATE TABLE iceberg_catalog.raw_data.customer_metrics WITH (
format = 'PARQUET',
partitioning = ARRAY['mktsegment']
) AS
SELECT
*
FROM
TABLE(
postgres_catalog.system.query(
query => '
-- Этот SQL выполняется СТРОГО внутри PostgreSQL
SELECT
custkey,
name,
mktsegment,
acctbal,
array_agg(acctbal) OVER (
PARTITION BY mktsegment
ORDER BY custkey
ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING
EXCLUDE GROUP
) AS rolling_bals
FROM public.customer
WHERE acctbal > 1000
AND created_at >= current_date - interval ''1 month''
'
)
);Преимущество: Если селективность нашего фильтра S равна 0.05 (остается 5% строк), то объем передаваемых по сети данных составит строго V total \ times S. Никакие таймауты Trino не заставят Postgres отдать лишние данные.
Решение 2: Использование `pg_lake` (Для концепции Lakehouse в PostgreSQL)
Если первый метод идеально подходит для использования Trino как движка трансформации, то зачем вообще существует проект `pg_lake`?
`pg_lake` внедряет под капот PostgreSQL движок DuckDB через `pgduck_server`. Это позволяет базе данных самостоятельно подключаться к S3 и читать/писать формат Iceberg, минуя Trino.
pg_lakeпрямо в PostgreSQL?
- Чтение ледяных архивов (Cold Data) без сторонних движков.
Допустим, вы переносите старые партиции данных из Postgres в Iceberg (S3) для экономии места. С `pg_lake` база Postgres “учится” читать эти архивы. Вы можете написать обычный запрос в вашем любимом клиенте (DBeaver, DataGrip, pgAdmin):
-- Объединение горячих данных из кучи (heap) PG и холодных данных из Iceberg
SELECT * FROM public.orders_current
UNION ALL
SELECT * FROM iceberg.orders_archive WHERE order_date < '2023-01-01';- Работа в родном диалекте PostgreSQL.
Если ваши аналитики и приложения жестко завязаны на специфические функции PostgreSQL (например, PostGIS для геоданных или сложные хранимые процедуры PL/pgSQL), интеграция с `pg_lake` позволяет анализировать гигантские внешние Iceberg-файлы, используя всю мощь экосистемы PG, без необходимости переписывать SQL-код под диалект Trino.
- Меньше точек отказа.
Для небольших команд, которым не нужна горизонтальная масштабируемость Trino, установка `pg_lake` позволяет построить Data Lake вообще без развертывания отдельного аналитического кластера. Postgres сам выполняет COPY-команды в S3.
Итог итогов
- Используйте `system.query()` в Trino, если ваша цель — построить надежный, масштабируемый процесс выгрузки (ELT). Это самый безопасный паттерн: он разгружает сеть платформы данных, защищает от капризов динамической фильтрации и оставляет сервер БД свободным от сторонних плагинов.
- Используйте `pg_lake`, если ваша бизнес-потребность — позволить самому PostgreSQL прозрачно обращаться к Data Lake. Это идеальное решение для архивации холодных данных прямо из СУБД или если ваши процессы глубоко интегрированы с инструментами, понимающими только нативный протокол Postgres.