Архитектура Client Spooling: Как быстро выгружать гигантские датасеты в Trino и Apache DataFusion
Работа с Big Data часто упирается в классическое “узкое горлышко”: кластер может обработать терабайты данных за секунды, но передача результатов (Result Set) обратно на сторону клиента (например, в Jupyter или скрипт) занимает часы. На дворе апрель 2026 года, и современные аналитические движки предлагают эффективные методы обхода этой проблемы — концепцию Spooling.
В этой статье мы разберем, как передавать результаты запросов через промежуточное S3-хранилище, на примере движков Trino и Apache DataFusion.
Физика проблемы и математика Spooling
В классической архитектуре все воркеры кластера отправляют вычисленные строки на главный узел (Coordinator), а тот уже отдает их по одному каналу клиенту.
Если D — это объем результирующей выборки, а B c — пропускная способность сети координатора, то время выгрузки данных клиенту без спулинга равно:
T classic = B / Dc
В режиме Spooling координатор не гоняет данные через себя. Воркеры напрямую, параллельно пишут куски результата в дешевое объектное хранилище (S3/MinIO). Клиент получает лишь ссылки на эти файлы и скачивает их напрямую. Если у нас N файлов в S3, доступных для многопоточного скачивания с пропускной способностью клиента B client: T spooling ≈ min(N×B s3,B client)D
Это позволяет ускорить выгрузку в десятки раз, так как $B_{client}$ и распределенный $B_{s3}$ обычно значительно больше ограничений одного координатора.
Подготовка минимальной инфраструктуры
Для демонстрации двух подходов мы убрали из нашего кластера все тяжелые клиентские среды (Jupyter, Spark) и оставили только “голое” ядро: хранилище S3, REST-каталог и SQL-движок.
docker-compose.yml
version: '3.8'
services:
minio:
image: minio/minio:latest
ports:
- "19000:9000"
- "19001:9001"
environment:
MINIO_ROOT_USER: "minio-root-user"
MINIO_ROOT_PASSWORD: "minio-root-password"
command: server /data --console-address ":9001"
minio-setup:
image: minio/mc:latest
depends_on:
- minio
entrypoint: >
/bin/sh -c "
sleep 5;
mc alias set myminio http://minio:9000 minio-root-user minio-root-password;
mc mb myminio/warehouse || true;
"
lakekeeper:
image: dalongrong/lakekeeper:latest
ports:
- "8181:8181"
environment:
- S3_ENDPOINT=http://minio:9000
- S3_REGION=us-east-1
- S3_ACCESS_KEY_ID=minio-root-user
- S3_SECRET_ACCESS_KEY=minio-root-password
depends_on:
- minio-setup
trino:
image: trinodb/trino:latest
ports:
- "8080:8080"
Сначала мы генерируем данные в Trino. Запрос
CREATE CATALOGиспользует динамическое подключение к Lakekeeper REST API. Скрипт записывает файлы в формате Parquet в MinIO:
config.properties
protocol.spooling.enabled=true
# 256-битный ключ в формате base64. Вы можете сгенерировать свой с помощью команды `openssl rand -base64 32`
protocol.spooling.shared-secret-key=jxTKysfCBuMZtFqUf8UJDQ1w9ez8rynEJsJqgJf66u0=
catalog.management=dynamicspooling-manager.properties
spooling-manager.name=filesystem
# Включаем чтение/запись в S3 для Spooling
fs.s3.enabled=true
# Путь внутри MinIO (указываем через s3://)
fs.location=s3://warehouse/client-spooling/
# Системные настройки S3 (MinIO)
s3.endpoint=http://minio:9000
s3.region=us-east-1
s3.aws-access-key=minio-root-user
s3.aws-secret-key=minio-root-password
s3.path-style-access=true-- 1. Подключение каталога Iceberg
CREATE CATALOG test_warehouse USING iceberg
WITH (
"iceberg.catalog.type" = 'rest',
"iceberg.rest-catalog.uri" = 'http://lakekeeper:8181/catalog/',
"iceberg.rest-catalog.warehouse" = '00000000-0000-0000-0000-000000000000/test_warehouse',
"iceberg.rest-catalog.security" = 'OAUTH2',
"iceberg.rest-catalog.nested-namespace-enabled" = 'true',
"iceberg.rest-catalog.vended-credentials-enabled" = 'true',
"fs.native-s3.enabled" = 'true',
"s3.region" = 'us-east-1',
"s3.path-style-access" = 'true',
"s3.endpoint" = 'http://minio:9000'
);-- 2. Создание структуры
CREATE SCHEMA test_warehouse.test_schema;
CREATE TABLE test_warehouse.test_schema.my_table (
id BIGINT,
data VARCHAR
) WITH (format = 'PARQUET');-- 3. Запись данных
INSERT INTO test_warehouse.test_schema.my_table VALUES (1, 'hello'), (2, 'world');Если написать Select – должно быть как-то так
Аналог Spooling в Apache DataFusion (Через экспорт)
Trino поддерживает протокол *Client Spooling* “из коробки” — когда Python-клиент запрашивает огромный `SELECT`, Trino сам незаметно пишет куски в S3 и отдает клиенту готовые ссылки.
В Apache DataFusion (который часто работает как локальный движок `datafusion-cli` или встраиваемая библиотка поверх S3) применяется более прозрачный паттерн делегирования (Explicit Spooling). Мы вручную инструктируем движок сохранить результаты агрегации в распределенное хранилище, чтобы позже забрать их в удобном формате — например, упаковав их в `JSON` и сжав алгоритмом `ZSTD`.
1. Подключение к S3 и маппинг исходной таблицы
Запускаем `datafusion-cli`, передав доступы как переменные среды (для предотвращения ошибок парсинга опций):
AWS_ACCESS_KEY_ID="minio-root-user" \
AWS_SECRET_ACCESS_KEY="minio-root-password" \
AWS_ENDPOINT="http://localhost:19000" \
AWS_REGION="us-east-1" \
AWS_ALLOW_HTTP="true" \
datafusion-cliВнутри консоли подключаем директорию с Parquet-файлами, сгенерированными Trino:
CREATE EXTERNAL TABLE my_parquet_data
STORED AS PARQUET
LOCATION 's3://warehouse/019d81a3-c2d6-7ed2-ab15-070becf62582/my_table-13e4b91a2b4e47d98f312b1384263880/data/';2. Массовая конвертация и выгрузка (DataFusion COPY)
Вместо того чтобы тянуть миллионы строк на локальный терминал, мы просим DataFusion выполнить преобразование и записать итог запроса обратно в MinIO.
Мы выбираем построчный JSON с экстремальным сжатием:
COPY (
-- Тут может быть любая сложная агрегация:
-- SELECT id, count(data) FROM my_parquet_data GROUP BY id
SELECT * FROM my_parquet_data
)
TO 's3://warehouse/019d81a3-c2d6-7ed2-ab15-070becf62582/my_table-13e4b91a2b4e47d98f312b1384263880/json_export/'
STORED AS JSON
OPTIONS (
'format.compression' 'zstd'
);Результат:
+-------+
| count |
+-------+
| 2 |
+-------+
1 row(s) fetched.
Elapsed 0.270 seconds.За миллисекунды (0.270 sec) DataFusion прочитал партиции, трансформировал бинарные столбцы в текст и сжал его.
В чем преимущество подхода DataFusion?
Описанный паттерн выполнения команды `COPY TO` с сохранением `.json.zst` в MinIO полностью воспроизводит механику Spooling:
- Отсутствие OOM (Out Of Memory): Клиент получает только метаданные `count`, а не гигабайты сырых данных в оперативную память.
- Параллелизм: Если исходных файлов много, DataFusion будет писать множество потоков `part-0.json.zst`, `part-1.json.zst` в бакет параллельно.
- Удаленное потребление: Вы можете запустить легкий Python-скрипт (Pandas) на дешевой машине, который просто прочитает эти сжатые легковесные JSON объекты напрямую из MinIO, минуя дорогостоящие вычислительные кластеры.