Welcome to my personal place for love, peace and happiness 🤖

Архитектура Client Spooling: Как быстро выгружать гигантские датасеты в Trino и Apache DataFusion

Работа с Big Data часто упирается в классическое “узкое горлышко”: кластер может обработать терабайты данных за секунды, но передача результатов (Result Set) обратно на сторону клиента (например, в Jupyter или скрипт) занимает часы. На дворе апрель 2026 года, и современные аналитические движки предлагают эффективные методы обхода этой проблемы — концепцию Spooling.

В этой статье мы разберем, как передавать результаты запросов через промежуточное S3-хранилище, на примере движков Trino и Apache DataFusion.

Физика проблемы и математика Spooling

В классической архитектуре все воркеры кластера отправляют вычисленные строки на главный узел (Coordinator), а тот уже отдает их по одному каналу клиенту.

Если D — это объем результирующей выборки, а B c — пропускная способность сети координатора, то время выгрузки данных клиенту без спулинга равно:

T classic = B / Dc

В режиме Spooling координатор не гоняет данные через себя. Воркеры напрямую, параллельно пишут куски результата в дешевое объектное хранилище (S3/MinIO). Клиент получает лишь ссылки на эти файлы и скачивает их напрямую. Если у нас N файлов в S3, доступных для многопоточного скачивания с пропускной способностью клиента B client: T spooling ≈ min(N×B s3,B client)D

Это позволяет ускорить выгрузку в десятки раз, так как $B_{client}$ и распределенный $B_{s3}$ обычно значительно больше ограничений одного координатора.


Подготовка минимальной инфраструктуры

Для демонстрации двух подходов мы убрали из нашего кластера все тяжелые клиентские среды (Jupyter, Spark) и оставили только “голое” ядро: хранилище S3, REST-каталог и SQL-движок.

минимальный

docker-compose.yml

version: '3.8'

services:
  minio:
    image: minio/minio:latest
    ports:
      - "19000:9000"
      - "19001:9001"
    environment:
      MINIO_ROOT_USER: "minio-root-user"
      MINIO_ROOT_PASSWORD: "minio-root-password"
    command: server /data --console-address ":9001"

  minio-setup:
    image: minio/mc:latest
    depends_on:
      - minio
    entrypoint: >
      /bin/sh -c "
      sleep 5;
      mc alias set myminio http://minio:9000 minio-root-user minio-root-password;
      mc mb myminio/warehouse || true;
      "

  lakekeeper:
    image: dalongrong/lakekeeper:latest
    ports:
      - "8181:8181"
    environment:
      - S3_ENDPOINT=http://minio:9000
      - S3_REGION=us-east-1
      - S3_ACCESS_KEY_ID=minio-root-user
      - S3_SECRET_ACCESS_KEY=minio-root-password
    depends_on:
      - minio-setup

  trino:
    image: trinodb/trino:latest
    ports:
      - "8080:8080"

Шаг 1. Настройка каталога и генерация данных (Trino)


Сначала мы генерируем данные в Trino. Запрос

CREATE CATALOG

использует динамическое подключение к Lakekeeper REST API. Скрипт записывает файлы в формате Parquet в MinIO:

config.properties

protocol.spooling.enabled=true
# 256-битный ключ в формате base64. Вы можете сгенерировать свой с помощью команды `openssl rand -base64 32`
protocol.spooling.shared-secret-key=jxTKysfCBuMZtFqUf8UJDQ1w9ez8rynEJsJqgJf66u0=

catalog.management=dynamic

spooling-manager.properties

spooling-manager.name=filesystem
# Включаем чтение/запись в S3 для Spooling
fs.s3.enabled=true
# Путь внутри MinIO (указываем через s3://)
fs.location=s3://warehouse/client-spooling/

# Системные настройки S3 (MinIO)
s3.endpoint=http://minio:9000
s3.region=us-east-1
s3.aws-access-key=minio-root-user
s3.aws-secret-key=minio-root-password
s3.path-style-access=true

-- 1. Подключение каталога Iceberg

CREATE CATALOG test_warehouse USING iceberg
WITH (
    "iceberg.catalog.type" = 'rest',
    "iceberg.rest-catalog.uri" = 'http://lakekeeper:8181/catalog/',
    "iceberg.rest-catalog.warehouse" = '00000000-0000-0000-0000-000000000000/test_warehouse',
    "iceberg.rest-catalog.security" = 'OAUTH2',
    "iceberg.rest-catalog.nested-namespace-enabled" = 'true',
    "iceberg.rest-catalog.vended-credentials-enabled" = 'true',
    "fs.native-s3.enabled" = 'true',
    "s3.region" = 'us-east-1',
    "s3.path-style-access" = 'true',
    "s3.endpoint" = 'http://minio:9000'
);

-- 2. Создание структуры

CREATE SCHEMA test_warehouse.test_schema;

CREATE TABLE test_warehouse.test_schema.my_table (
    id BIGINT,
    data VARCHAR
) WITH (format = 'PARQUET');

-- 3. Запись данных

INSERT INTO test_warehouse.test_schema.my_table VALUES (1, 'hello'), (2, 'world');

Если написать Select – должно быть как-то так

Аналог Spooling в Apache DataFusion (Через экспорт)

Trino поддерживает протокол *Client Spooling* “из коробки” — когда Python-клиент запрашивает огромный `SELECT`, Trino сам незаметно пишет куски в S3 и отдает клиенту готовые ссылки.

В Apache DataFusion (который часто работает как локальный движок `datafusion-cli` или встраиваемая библиотка поверх S3) применяется более прозрачный паттерн делегирования (Explicit Spooling). Мы вручную инструктируем движок сохранить результаты агрегации в распределенное хранилище, чтобы позже забрать их в удобном формате — например, упаковав их в `JSON` и сжав алгоритмом `ZSTD`.

1. Подключение к S3 и маппинг исходной таблицы

Запускаем `datafusion-cli`, передав доступы как переменные среды (для предотвращения ошибок парсинга опций):

AWS_ACCESS_KEY_ID="minio-root-user" \
AWS_SECRET_ACCESS_KEY="minio-root-password" \
AWS_ENDPOINT="http://localhost:19000" \
AWS_REGION="us-east-1" \
AWS_ALLOW_HTTP="true" \
datafusion-cli

Внутри консоли подключаем директорию с Parquet-файлами, сгенерированными Trino:

CREATE EXTERNAL TABLE my_parquet_data 
STORED AS PARQUET 
LOCATION 's3://warehouse/019d81a3-c2d6-7ed2-ab15-070becf62582/my_table-13e4b91a2b4e47d98f312b1384263880/data/';
2. Массовая конвертация и выгрузка (DataFusion COPY)

Вместо того чтобы тянуть миллионы строк на локальный терминал, мы просим DataFusion выполнить преобразование и записать итог запроса обратно в MinIO.

Мы выбираем построчный JSON с экстремальным сжатием:

COPY (
    -- Тут может быть любая сложная агрегация:
    -- SELECT id, count(data) FROM my_parquet_data GROUP BY id
    SELECT * FROM my_parquet_data
) 
TO 's3://warehouse/019d81a3-c2d6-7ed2-ab15-070becf62582/my_table-13e4b91a2b4e47d98f312b1384263880/json_export/' 
STORED AS JSON
OPTIONS (
    'format.compression' 'zstd'
);

Результат:

+-------+
| count |
+-------+
| 2     |
+-------+
1 row(s) fetched. 
Elapsed 0.270 seconds.

За миллисекунды (0.270 sec) DataFusion прочитал партиции, трансформировал бинарные столбцы в текст и сжал его.

В чем преимущество подхода DataFusion?

Описанный паттерн выполнения команды `COPY TO` с сохранением `.json.zst` в MinIO полностью воспроизводит механику Spooling:

  1. Отсутствие OOM (Out Of Memory): Клиент получает только метаданные `count`, а не гигабайты сырых данных в оперативную память.
  2. Параллелизм: Если исходных файлов много, DataFusion будет писать множество потоков `part-0.json.zst`, `part-1.json.zst` в бакет параллельно.
  3. Удаленное потребление: Вы можете запустить легкий Python-скрипт (Pandas) на дешевой машине, который просто прочитает эти сжатые легковесные JSON объекты напрямую из MinIO, минуя дорогостоящие вычислительные кластеры.
Follow this blog
Send
Share
Tweet
Pin