Yuriy Gavrilov

Welcome to my personal place for love, peace and happiness 🤖

QueryFlux: Universal SQL Proxy для аналитических движков

В этой статье я расскажу, как поднять полноценную инфраструктуру для аналитических запросов, используя QueryFlux — высокопроизводительный SQL-прокси на Rust, который умеет принимать запросы по разным протоколам (Trino HTTP, PostgreSQL wire, MySQL wire) и маршрутизировать их на различные бэкенды (Trino, StarRocks, DuckDB, Athena). Мы соберем стек: Trino как основной движок, Lakekeeper как Iceberg REST-каталог, MinIO как S3-хранилище, StarRocks как альтернативный MPP-движок, и наконец сам QueryFlux, который предоставит единую точку входа для клиентов.

Все конфигурации взяты из реального рабочего проекта, запущенного на macOS с Podman (но совместимы и с Docker). Детально разберем файлы, шаги запуска, решим типичные проблемы, покажем интерфейс управления и сравним QueryFlux с Trino Gateway и другими решениями.

https://github.com/lakeops-org/queryflux/blob/main/examples/full-stack/docker-compose.yml


1. Что такое QueryFlux и зачем он нужен

Современные data-платформы часто состоят из нескольких движков: Trino для федеративных запросов, StarRocks/ClickHouse для низкой задержки, DuckDB для ad-hoc аналитики, Athena для serverless-задач. Каждый движок имеет свой wire-протокол, свой диалект SQL и свои настройки аутентификации. Клиенты вынуждены либо подключаться напрямую к каждому движку, создавая $N \times M$ интеграций, либо использовать «костыли» в коде.

QueryFlux решает эту проблему, становясь единым шлюзом:

  • Принимает запросы по протоколам: Trino HTTP, PostgreSQL Wire, MySQL Wire, Arrow Flight SQL.
  • Маршрутизирует запросы по правилам (протокол, заголовки, regex, Python-скрипты).
  • Ограничивает конкурентность (через параметр `maxRunningQueries`), ведет очереди, отдает метрики в Prometheus.
  • Поддерживает аутентификацию (OIDC, static, LDAP) и авторизацию (OpenFGA).

Документация: queryflux.dev


2. Наша лабораторная конфигурация

Мы развернем следующий стек через `podman-compose` (или `docker-compose`):

Сервис Назначение Порт на хосте
trino Движок запросов (федерация + Iceberg) 8081 (прямой доступ)
starrocks Альтернативный MPP-движок 9030 (MySQL протокол)
lakekeeper Iceberg REST-каталог 8181
minio S3-совместимое хранилище (данные Iceberg) 19000 (API), 19001 (консоль)
postgres БД метаданных Lakekeeper 5433
queryflux Прокси-сервер 8080 (Trino), 5434 (PG wire), 3306 (MySQL), 9000 (Admin API), 3000 (Studio UI)

3. Математика планирования нагрузки (ограничение ресурсов)

Одним из важных аспектов настройки QueryFlux является управление конкурентностью (concurrency limit) через параметр `maxRunningQueries`.

Если мы обозначим лимит конкурентных запросов в группе маршрутизации как N, а среднее время выполнения одного запроса на бэкенде как T (в секундах), то теоретическая максимальная пропускная способность группы (Throughput, обозначается как R, в запросах в секунду) рассчитывается так:

R = N /T

Например, в нашем файле `config.yaml` мы задаем N = 100. Если средний аналитический запрос отрабатывает за T = 2.5 секунды, то пропускная способность нашей Trino-группы составит R = 40 запросов в секунду. Запросы сверх этого лимита попадают в очередь на стороне самого QueryFlux.


4. Конфигурационные файлы

Создайте папку `queryflux-demo/examples/full-stack` и перейдите в нее. Ниже приведены все необходимые файлы.


📄 Показать содержимое файла

docker-compose.yml

(Полный стек)

name: queryflux-example-full

services:
  queryflux:
    image: ghcr.io/lakeops-org/queryflux:latest
    platform: linux/amd64
    ports:
      - "8080:8080"   # Trino HTTP через QueryFlux
      - "9000:9000"   # Admin API
      - "3000:3000"   # QueryFlux Studio
      - "3306:3306"   # MySQL wire
      - "5434:5434"   # PostgreSQL wire
    volumes:
      - ./config.yaml:/etc/queryflux/config.yaml:ro
    environment:
      RUST_LOG: ${RUST_LOG:-queryflux=info,queryflux_frontend=info}
    depends_on:
      postgres:
        condition: service_healthy
      trino:
        condition: service_healthy
      starrocks:
        condition: service_healthy
    restart: unless-stopped

  trino:
    image: trinodb/trino:latest
    platform: linux/amd64
    environment:
      CATALOG_MANAGEMENT: dynamic
    ports:
      - "8081:8080"
    healthcheck:
      test: ["CMD", "curl", "-sf", "http://localhost:8080/v1/info"]
      interval: 10s
      timeout: 5s
      retries: 15
      start_period: 30s
    volumes:
      - ./trino-config/access-control.properties:/etc/trino/access-control.properties:ro

  starrocks:
    image: starrocks/allin1-ubuntu:latest
    platform: linux/amd64
    ports:
      - "9030:9030"
      - "8030:8030"
    healthcheck:
      test: ["CMD", "curl", "-sf", "http://localhost:8030/api/health"]
      interval: 15s
      timeout: 10s
      retries: 20
      start_period: 60s

  postgres:
    image: postgres:16-alpine
    platform: linux/amd64
    ports:
      - "5433:5432"
    environment:
      POSTGRES_DB: queryflux
      POSTGRES_USER: queryflux
      POSTGRES_PASSWORD: queryflux
    volumes:
      - queryflux-pg:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U queryflux"]
      interval: 5s
      timeout: 3s
      retries: 10

  lakekeeper-db:
    image: postgres:17
    platform: linux/amd64
    environment:
      POSTGRES_PASSWORD: postgres
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres -p 5432 -d postgres"]
      interval: 2s
      timeout: 10s
      retries: 10
      start_period: 10s

  minio:
    image: minio/minio:latest
    platform: linux/amd64
    environment:
      MINIO_ROOT_USER: minio-root-user
      MINIO_ROOT_PASSWORD: minio-root-password
    command: ["server", "--console-address", ":9001", "/data"]
    ports:
      - "19000:9000"
      - "19001:9001"
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/ready"]
      interval: 2s
      timeout: 10s
      retries: 20
      start_period: 15s

  createbuckets:
    image: minio/mc:latest
    platform: linux/amd64
    depends_on:
      minio:
        condition: service_healthy
    restart: on-failure
    entrypoint: >
      /bin/sh -c "
      /usr/bin/mc alias set local http://minio:9000 minio-root-user minio-root-password;
      /usr/bin/mc mb --ignore-existing local/warehouse;
      exit 0;
      "

  migrate:
    image: quay.io/lakekeeper/catalog:latest-main
    platform: linux/amd64
    pull_policy: always
    environment:
      LAKEKEEPER__PG_ENCRYPTION_KEY: dev-key-not-secure
      LAKEKEEPER__PG_DATABASE_URL_READ: postgresql://postgres:postgres@lakekeeper-db:5432/postgres
      LAKEKEEPER__PG_DATABASE_URL_WRITE: postgresql://postgres:postgres@lakekeeper-db:5432/postgres
    restart: "no"
    command: ["migrate"]
    depends_on:
      lakekeeper-db:
        condition: service_healthy

  lakekeeper:
    image: quay.io/lakekeeper/catalog:latest-main
    platform: linux/amd64
    pull_policy: always
    environment:
      LAKEKEEPER__PG_ENCRYPTION_KEY: dev-key-not-secure
      LAKEKEEPER__PG_DATABASE_URL_READ: postgresql://postgres:postgres@lakekeeper-db:5432/postgres
      LAKEKEEPER__PG_DATABASE_URL_WRITE: postgresql://postgres:postgres@lakekeeper-db:5432/postgres
    command: ["serve"]
    ports:
      - "8181:8181"
    healthcheck:
      test: ["CMD", "/home/nonroot/lakekeeper", "healthcheck"]
      interval: 2s
      timeout: 10s
      retries: 30
      start_period: 10s
    depends_on:
      migrate:
        condition: service_completed_successfully
      lakekeeper-db:
        condition: service_healthy
      minio:
        condition: service_healthy
      createbuckets:
        condition: service_completed_successfully

  bootstrap:
    image: alpine/curl
    platform: linux/amd64
    tty: true
    stdin_open: true
    depends_on:
      lakekeeper:
        condition: service_healthy
    restart: "no"
    entrypoint: /bin/sh
    command:
      - -c
      - |
        curl -sv -X POST http://lakekeeper:8181/management/v1/bootstrap \
          -H 'Content-Type: application/json' \
          --data '{"accept-terms-of-use": true}'
        exit 0

  initialwarehouse:
    image: alpine/curl
    platform: linux/amd64
    tty: true
    stdin_open: true
    depends_on:
      lakekeeper:
        condition: service_healthy
      bootstrap:
        condition: service_completed_successfully
    restart: "no"
    entrypoint: /bin/sh
    command:
      - -c
      - |
        curl -sv -X POST http://lakekeeper:8181/management/v1/warehouse \
          -H 'Content-Type: application/json' \
          --data @/config/create-warehouse.json
        exit 0
    volumes:
      - ./create-warehouse.json:/config/create-warehouse.json:ro

  sentinel:
    image: alpine
    platform: linux/amd64
    command: ["tail", "-f", "/dev/null"]
    depends_on:
      lakekeeper:
        condition: service_healthy
      initialwarehouse:
        condition: service_completed_successfully
    healthcheck:
      test: ["CMD", "true"]
      interval: 1s
      retries: 1
      start_period: 0s

  data-loader:
    image: trinodb/trino:476
    platform: linux/amd64
    profiles: ["loader"]
    environment:
      TPCH_SCALE: ${TPCH_SCALE:-tiny}
    entrypoint: ["/bin/bash", "-c"]
    command:
      - |
        set -euo pipefail
        sed "s/FROM tpch\\.tiny\\./FROM tpch.$${TPCH_SCALE}./g" /test-data/init.sql > /tmp/init.run.sql
        exec trino --server http://trino:8080 --user loader --file /tmp/init.run.sql
    volumes:
      - ../../docker/fixtures/init.docker-network.sql:/test-data/init.sql:ro
    depends_on:
      trino:
        condition: service_healthy
      sentinel:
        condition: service_healthy

  starrocks-catalog-setup:
    image: mysql:8.0
    platform: linux/amd64
    profiles: ["loader"]
    entrypoint: ["/bin/bash", "-c"]
    command: ["mysql -h starrocks -P 9030 -u root --connect-timeout=30 < /setup/starrocks-setup.sql"]
    volumes:
      - ../../docker/fixtures/starrocks-setup.sql:/setup/starrocks-setup.sql:ro
    depends_on:
      starrocks:
        condition: service_healthy

volumes:
  queryflux-pg:


📄 Вспомогательные конфигурационные файлы

Файл `config.yaml` (настройки QueryFlux):

queryflux:
  externalAddress: http://localhost:8080
  frontends:
    trinoHttp:
      enabled: true
      port: 8080
    postgresWire:
      enabled: true
      port: 5434
  persistence:
    type: inMemory

clusters:
  trino-1:
    engine: trino
    endpoint: http://trino:8080
    enabled: true
    auth:
      type: basic
      username: trino
      password: ""

clusterGroups:
  trino-default:
    enabled: true
    maxRunningQueries: 100
    members: [trino-1]

routers:
  - type: protocolBased
    trinoHttp: trino-default
    postgresWire: trino-default

routingFallback: trino-default

Файл `trino-config/access-control.properties`:

access-control.name=allow-all

Этот файл монтируется в `trino` и разрешает имперсонацию и чтение системных таблиц – иначе статистика в QueryFlux Studio не будет работать.

Файл `./create-warehouse.json` (инициализация warehouse Lakekeeper):

{
  "warehouse-name": "test_warehouse",
  "project-id": "00000000-0000-0000-0000-000000000000",
  "storage-profile": {
    "type": "s3",
    "bucket": "warehouse",
    "endpoint": "http://minio:9000",
    "region": "us-east-1",
    "path-style-access": true,
    "flavor": "minio",
    "sts-enabled": false
  },
  "storage-credential": {
    "type": "s3",
    "credential-type": "access-key",
    "aws-access-key-id": "minio-root-user",
    "aws-secret-access-key": "minio-root-password"
  }
}


5. Запуск стека и проверка

Запускаем весь стек в фоновом режиме:

cd examples/full-stack
podman-compose up -d --wait

5.1. Тест прямого доступа к Trino

curl -X POST http://localhost:8081/v1/statement \
  -H 'X-Trino-User: test' \
  -d 'SELECT 1'

5.2. Тест через QueryFlux (PostgreSQL wire)

Подключимся через стандартный клиент `psql` к порту `5434`, который прослушивает QueryFlux:

psql -h localhost -p 5434 -U trino -d trino

Сначала выполним простой запрос для проверки работоспособности протокола:

SELECT 42;

А теперь проверим аналитический потенциал стека. Выполним тяжелый запрос к таблице `call_center` в БД Iceberg, сгенерированной по стандарту TPC-DS:

SELECT cc_call_center_sk, cc_call_center_id, cc_rec_start_date, cc_rec_end_date, 
       cc_closed_date_sk, cc_open_date_sk, cc_name, cc_class, cc_employees, 
       cc_sq_ft, cc_hours, cc_manager, cc_mkt_id, cc_mkt_class, cc_mkt_desc, 
       cc_market_manager, cc_division, cc_division_name, cc_company, 
       cc_company_name, cc_street_number, cc_street_name, cc_street_type, 
       cc_suite_number, cc_city, cc_county, cc_state, cc_zip, cc_country, 
       cc_gmt_offset, cc_tax_percentage
FROM tpcds.sf10.call_center;

*Скриншот успешного выполнения запроса через psql*

*Рис. 1 – Запрос `SELECT count(*) FROM system.runtime.queries` успешно выполняется через QueryFlux, статистика сразу же фиксируется и видна в Studio.*

5.3. Проверка QueryFlux Studio

Откройте браузер и перейдите на `http://localhost:3000`. Логин по умолчанию: `admin` / `admin`.

*Главная панель (Dashboard)*

*Рис. 2 – Дашборд QueryFlux Studio: количество запросов, ошибки, средняя длительность, статус кластеров.*

*Список кластеров*

*Рис. 3 – Страница кластеров: виден наш кластер `trino-1`, его группа `trino-default`, состояние и уровень загрузки.*

*Группы кластеров*

*Рис. 4 – Управление группами: здесь можно задать ограничение `maxRunningQueries`, список участников и стратегии балансировки. Пока группы инициализируются из in-memory конфигурации.*

*Скрипты (translation fixups)*

*Рис. 5 – Скрипты для трансляции диалектов SQL “на лету” (в этой демке не используются).*

*Guardrails (ограничения)*

*Рис. 6 – Глобальные и групповые guardrails для инспекции и фильтрации SQL перед отправкой в движок.*

*Протоколы (frontends)*

*Рис. 7 – Включённые фронтенды: Trino HTTP (8080) и PostgreSQL wire (5434).*

*Маршрутизация*

*Рис. 8 – Правила маршрутизации: `protocolBased` направляет Trino HTTP и PostgreSQL wire в нашу группу `trino-default`.*

*Admin API (Swagger)*

*Рис. 9 – Документация Admin API: эндпоинты для управления кластерами, группами, конфигурациями и получения статистики.*

6. Решение типичных проблем


🐞 1. Ошибка

internal libpod error

для одноразовых контейнеров на macOS


Причина: podman-compose на macOS иногда имеет баг с `tty` и `stdin_open`.
Решение: Параметры уже добавлены в наш `docker-compose.yml`, но если баг не ушел, выполните инициализацию Lakekeeper вручную:

podman run --rm --network queryflux-example-full_default alpine/curl \
  -X POST http://lakekeeper:8181/management/v1/bootstrap \
  -H 'Content-Type: application/json' \
  -d '{"accept-terms-of-use": true}'


🐞 2. PostgreSQL Extended Query Protocol
QueryFlux поддерживает только Simple Query Protocol (сообщение `Q`). Extended Query (Parse/Bind/Execute) не поддерживается.

  • `psql` работает “из коробки”.
  • JDBC-драйверы: добавьте параметр `prepareThreshold=0` в строку подключения, чтобы переключиться в Simple Query режим.
    Пример:
jdbc:postgresql://localhost:5434/trino?prepareThreshold=0


🐞 3. Ошибка

Access Denied: User trino cannot impersonate user queryflux-running-query-reconcile


Причина: Trino не разрешает имперсонацию для системных запросов QueryFlux.
Решение: Мы добавили файл `access-control.properties` со свойством `access-control.name=allow-all`. После этого статистика в Studio заработала (см. Рис. 1 и Рис. 2).


7. Мониторинг и управление

QueryFlux предоставляет три основных интерфейса для наблюдения:

  • QueryFlux Studio (порт 3000) – веб-интерфейс для просмотра истории запросов, управления кластерами, группами, маршрутами, скриптами и guardrails.
  • Admin API (порт 9000) – REST API для автоматизации (логин: admin/admin). Документация OpenAPI доступна на `/docs`.
  • Prometheus метрики (порт 9000/metrics) – стандартные метрики для интеграции с Grafana.

Рекомендуемая практика: для production используйте `persistence: postgres`, чтобы конфигурация групп и маршрутов сохранялась при перезапусках, а история запросов накапливалась.


8. Сравнение QueryFlux с альтернативами

8.1. Trino Gateway (официальный)

Характеристика QueryFlux Trino Gateway
Поддерживаемые протоколы клиента Trino HTTP, PostgreSQL wire, MySQL wire, Arrow Flight SQL Только Trino HTTP
Бэкенды Trino, DuckDB, StarRocks, Athena, ClickHouse (planned) Только Trino
Маршрутизация По протоколу, заголовкам, тегам, regex, Python скриптам По весам, группам, header `X-Trino-Routing-Group`
SQL трансляция Да (sqlglot) – из PostgreSQL в Trino и наоборот Нет
Конкурентность и очереди `maxRunningQueries` на группу, очередь на прокси, spillover `maxConcurrentQueries` на кластер, очереди нет
Auth/AuthZ OIDC, LDAP, Static, OpenFGA Базовая поддержка `X-Trino-User`
Метрики Prometheus, Grafana, Admin API, Studio Prometheus (JMX), менее развит
GUI управления Полноценный веб-интерфейс (Studio) Отсутствует (только конфигурация API)

Плюсы QueryFlux: гетерогенность (один шлюз на разные виды движков), гибкая маршрутизация, встроенный перевод диалектов, PostgreSQL wire, наличие красивого веб-интерфейса.
Минусы: молодой проект (версия 0.1.2), не поддерживается Extended Query Protocol для PostgreSQL, требует настройки доступа к системным таблицам Trino.

8.2. Другие альтернативы

  • Trino + многокаталожность – простейшее решение, но требует доработки приложений для переключения на trino диалект.
  • Apache Linkis – тяжеловесный ETL-ориентированный шлюз, не подходит для лёгкой ad-hoc аналитики.
  • Nginx + Lua + sqlglot – сложно поддерживать, требует глубокой кастомной разработки.
  • Коммерческие решения (Starburst, Dremio) – дорогостоящие, но предоставляют готовую маршрутизацию, закрытый код и полноценный SLA. но 100% всего не решает так как это готовые коробки. явно захочется что-то под себя подкрутить.

и еще много с акцентов на gateway: Hoop.dev кстати интересный и GatewayD

  • GatewayD и ProxySQL: Не заменяют Trino, но отлично решают вашу задачу с логированием. GatewayD работает с PostgreSQL и может проверять запросы через Casbin, а ProxySQL предоставляет детальное логирование запросов (время, строки, IP и т.д.). Логирование — есть (аудит запросов), диалект Postgres — полный, подключение к 90 БД — сложно (нужно настраивать 90 подключений).
  • Mammoth и JumpWire: Специализированные прокси для PostgreSQL. Первый упрощает аудит, логируя каждую команду, второй позволяет гибко настраивать политики доступа и маскировать данные. Логирование — есть, диалект Postgres — полный, подключение к 90 БД — сложно (на каждый экземпляр нужен свой прокси).
  • Hoop.dev: Платформа для контролируемого доступа к базам данных с сильным акцентом на аудит и безопасность. Логирует все: от попыток входа до полного текста запросов и даже планов выполнения. Логирование — детальное, диалект Postgres — полный, подключение к 90 БД — сложно (требует развёртывания на каждую базу).
  • Уже посмотрели QueryFlux: Это решение ближе всего к Trino, но работает как высокоуровневый шлюз. На входе может принимать запросы через “PostgreSQL wire”, а на выходе автоматически транслировать диалект под Trino, Clickhouse и другие системы. Логирование — ограниченное, диалект Postgres — только как входной интерфейс (запросы уходят в Trino), подключение к 90 БД — замена Trino (шлюз к 90 разным источникам).
  • SQL Gateway (CData): Позволяет представить любые ODBC-источники как виртуальную PostgreSQL или SQL Server базу. Логирование — только общее, диалект Postgres — виртуальный (эмуляция), подключение к 90 БД — сложно (настройка ODBC).
  • Cloud Service Gateways (Infisical и др.): Специализированные облачные решения. Обещают централизованный доступ и аудит, но их возможности нативных диалектов сильно привязаны к конкретному провайдеру.
  • Native PostgreSQL Gateways: Как сборник технологий (например, PgCat), из которых можно построить своё решение. Позволяет гибко настраивать подключения и логи, но требует ручной сборки и высокой квалификации.
    Интеграция с Keycloak: К сожалению, прямой интеграции с Keycloak для аутентификации SQL-запросов практически нет. Keycloak используется для аутентификации доступа к веб-интерфейсам административных консолей, но не для самих SQL-клиентов. Исключение — GatewayD, который, хотя и не интегрируется с Keycloak, позволяет реализовать схожую логику через Casbin.

Вывод: QueryFlux идеален, если у вас уже есть несколько движков и вы хотите дать единую точку входа для бизнес-пользователей и аналитиков (особенно тех, кто привык к `psql`). Для production, где критична поддержка prepare-statements, стоит использовать Trino JDBC напрямую или использовать дополнительный прокси (например, `trino-pg-gateway`).


9. Итоги и рекомендации

Мы успешно запустили полноценный аналитический стек с Lakekeeper (Iceberg), Trino и StarRocks, а QueryFlux обеспечил единый вход через HTTP и PostgreSQL wire. Ключевые достижения:

  • ✅ QueryFlux принимает Trino HTTP и PostgreSQL wire запросы, направляя их в Trino.
  • ✅ Клиент `psql` выполняет сложные `SELECT`-запросы к Iceberg таблицам (даже TPC-DS) через порт 5434.
  • ✅ Статистика в Studio отображается корректно.
  • ✅ Маршрутизация по протоколу (`protocolBased`) работает как задумано.
  • ✅ Веб-интерфейс Studio даёт полный контроль над кластерами, группами, маршрутами и скриптами.

Рекомендации для production:

  1. Замените `persistence: inMemory` на `persistence: postgres` и настройте репликацию БД конфигурации (чтобы не терять историю и настройки).
  2. Включите аутентификацию OIDC (Keycloak) и авторизацию OpenFGA для разграничения доступа к группам кластеров.
  3. Рассчитайте `maxRunningQueries` по формуле N = R \times T, исходя из планируемой нагрузки и SLA.
  4. Для PostgreSQL-клиентов с GUI (DataGrip/DBeaver) используйте параметр `prepareThreshold=0` (через JDBC) или переключитесь на официальный Trino JDBC драйвер.
  5. Настройте сбор метрик в Prometheus и дашборды Grafana для мониторинга длины очередей и задержек.

Заключение: QueryFlux — очень перспективный и многообещающий инструмент для построения унифицированного доступа к аналитическим движкам. Несмотря на молодость, он уже пригоден для некоторых сценариев, особенно если вы готовы ограничиться simple query protocol при использовании PostgreSQL wire. В связке с Iceberg-каталогами и объектным хранилищем он образует мощную open-source альтернативу дорогим коммерческим решениям.

Распределённые вычисления с Ray и отчетики

Введение в распределённые вычисления с Ray

ПредположенИИе 🤖

Ray — это унифицированный фреймворк с открытым исходным кодом для масштабирования AI- и Python-приложений. Он предоставляет простой API для создания распределённых приложений, которые могут масштабироваться от одного ноутбука до целого кластера без изменения кода. Ray эффективно обрабатывает разнообразные рабочие нагрузки: от пакетной обработки данных и распределённого обучения моделей до гиперпараметрической оптимизации и serving-а инференса моделей в продакшене. Ray не ограничивается только задачами ML: он также предоставляет Ray Data и потоковые примитивы для эффективных входных пайплайнов, пакетной обработки и онлайн-инференса.

Ключевые возможности Ray

  • Единый фреймворк: Одна кодовая база охватывает все этапы жизненного цикла AI — от обработки данных до развёртывания моделей, устраняя сложность интеграции разнородных систем.
  • Масштабируемость: Бесшовный переход от локальной разработки к кластеру из тысяч ядер без переписывания кода.
  • Производительность: На некоторых ML-задачах Ray показывает результаты лучше, чем Spark и Dask, а на одном узле — на ~10% быстрее стандартной многопроцессорной обработки Python.
  • Гибкость: Поддержка как stateful (сохраняющих состояние), так и stateless (не сохраняющих состояние) рабочих нагрузок с помощью задач (tasks) и акторов (actors).

Фронтенд для дашбордов: Streamlit и Marimo

Для визуализации данных и создания интерактивных дашбордов на Python сегодня доступны два мощных инструмента: проверенный временем Streamlit и современный Marimo.

Streamlit: Классика для data-приложений

Streamlit — это open-source Python-библиотека, которая позволяет превратить скрипты анализа данных в полноценные веб-приложения за считанные минуты, без необходимости писать HTML, CSS или JavaScript. Streamlt поддерживает:

  • Виджеты: Слайдеры, кнопки, текстовые поля для создания интерактивных интерфейсов.
  • Кэширование: Декораторы `st.cache_data` и `st.cache_resource` для оптимизации загрузки данных и управления тяжёлыми объектами (моделями, подключениями к БД).
  • Многозатраничность: Возможность создавать приложения с несколькими страницами через папку `pages/`.

Marimo: Реактивная альтернатива

Marimo — это реактивный Python-ноутбук нового поколения, который также можно использовать для создания веб-приложений. Главное отличие от Streamlit — реактивная модель выполнения: при изменении одной ячейки или взаимодействии с UI-элементом автоматически пересчитываются только зависимые ячейки, а не весь скрипт. Marimo подходит для сложного исследовательского анализа и интерактивных дашбордов, где важна производительность и детальный контроль выполнения.

Сравнение Streamlit и Marimo

  • Подход: Streamlit — это фреймворк для data-приложений, тогда как Marimo — ноутбук-среда, которую можно запускать как приложение.
  • Производительность: Marimo часто показывает лучшую производительность, так как перезапускает только зависимые части, в отличие от Streamlit, который выполняет весь скрипт заново при каждом взаимодействии.
  • Сценарии использования: Streamlt идеален для быстрой разработки надёжных бизнес-дашбордов, а Marimo — для исследовательских задач и сложной аналитики.

Архитектура системы: Ray как бэкенд для дашбордов

Рассмотрим практический пример построения масштабируемой системы отчётности, где Ray выступает в роли мощного бэкенда для обработки и serving-а данных, а Streamlit (или Marimo) — в роли фронтенда для визуализации. Код визуализаций хранится в Git, что упрощает версионирование, совместную работу и развёртывание.

Основные компоненты

  1. Бэкенд (Ray): Распределённое приложение (деплоймент) на Ray Serve, которое подключается к источнику данных (например, Trino), выполняет сложные агрегации и возвращает результат.
  2. Фронтенд (Streamlit/Marimo): Веб-приложение, которое отправляет HTTP-запросы к Ray-бэкенду, получает данные и отображает их в интерактивных дашбордах.
  3. Внешнее хранилище состояния (опционально): Redis или база данных для хранения состояния сессий пользователей.
  4. Git: Репозиторий для хранения кода фронтенда (Streamlit-скрипты, Marimo-ноутбуки) и конфигураций.

Пример кода: Бэкенд на Ray Serve

import ray
from ray import serve
from fastapi import FastAPI, HTTPException
import trino
import pandas as pd

app = FastAPI()

@serve.deployment(
    ray_actor_options={"num_cpus": 0.5},
    autoscaling_config={"min_replicas": 1, "max_replicas": 2},
)
@serve.ingress(app)
class TrinoQuery:
    def __init__(self):
        self.conn = trino.dbapi.connect(
            host="192.168.0.125",
            port=9999,
            user="jupyter",
            catalog="test_warehouse",
            schema="test_schema",
            http_scheme="http",
        )
        print("Соединение с Trino установлено.")

    @app.get("/query")
    async def execute_query(self, query: str):
        if not query:
            raise HTTPException(status_code=400, detail="Query parameter is required.")
        try:
            cursor = self.conn.cursor()
            cursor.execute(query)
            rows = cursor.fetchall()
            col_names = [desc[0] for desc in cursor.description]
            df = pd.DataFrame(rows, columns=col_names)
            return df.to_dict(orient="records")
        except Exception as e:
            raise HTTPException(status_code=500, detail=str(e))

ray.init(ignore_reinit_error=True)
serve.start(http_options={"host": "0.0.0.0", "port": 8000})
serve.run(TrinoQuery.bind(), blocking=True)

Пример кода: Фронтенд на Streamlit

import streamlit as st
import pandas as pd
import requests

BACKEND_URL = "http://127.0.0.1:8000/query"

st.set_page_config(page_title="Аналитическая панель", layout="wide")
st.title("Дашборд данных из Trino через Ray")

with st.sidebar:
    st.header("Параметры запроса")
    query = st.text_area(
        "SQL-запрос:",
        value="SELECT nationkey, COUNT(*) as cnt FROM test_warehouse.test_schema.my_table1 GROUP BY nationkey",
        height=200,
    )
    execute_button = st.button("Выполнить запрос", type="primary")

if execute_button:
    if not query:
        st.warning("Введите SQL-запрос.")
    else:
        with st.spinner("Выполняется запрос через Ray Serve..."):
            try:
                response = requests.get(BACKEND_URL, params={"query": query}, timeout=30)
                response.raise_for_status()
                data = response.json()
                if data:
                    df = pd.DataFrame(data)
                    st.success(f"Запрос выполнен. Получено строк: {len(df)}")
                    st.dataframe(df, use_container_width=True)
                    if df.select_dtypes(include='number').shape[1] > 0:
                        st.subheader("Статистика по числовым колонкам")
                        st.dataframe(df.describe(), use_container_width=True)
                else:
                    st.info("Запрос вернул пустой результат.")
            except Exception as e:
                st.error(f"Ошибка: {e}")

Хранение визуализаций в Git

Код фронтенда (Streamlit-скрипты или Marimo-ноутбуки) должен храниться в Git-репозитории. Это обеспечивает:

  • Версионирование: Возможность отслеживать изменения, откатываться к предыдущим версиям.
  • Совместную работу: Команда разработчиков может одновременно работать над разными частями дашборда.
  • Автоматизацию развёртывания: CI/CD пайплайны могут автоматически деплоить новую версию дашборда на сервер при пуше в определённую ветку.

Репозиторий может иметь следующую структуру:

.
├── app.py                 # Основной файл Streamlit-приложения
├── pages/                 # Дополнительные страницы (если используются)
├── marimo_notebooks/      # Marimo-ноутбуки (если используются)
├── requirements.txt       # Зависимости
├── .gitignore
└── README.md

Управление состоянием: как построить систему отчётов

В распределённой системе, где множество пользователей одновременно обращаются к дашборду, а сам бэкенд масштабируется на множество реплик, управление состоянием (state management) становится критически важным. Ошибка может привести к тому, что пользователь увидит чужие данные или потеряет свой прогресс в сессии.

Stateless vs. Stateful: Основной выбор

Ray поддерживает оба подхода:

  • Stateless бэкенд (рекомендуемый): Бэкенд не хранит состояние пользователей. Вся сессионная информация (например, результаты фильтрации, текущая страница) хранится во фронтенде или во внешнем хранилище. Любая реплика Ray может обработать любой запрос. Это делает систему простой и отказоустойчивой, но требует, чтобы состояние было “лёгким” (например, хранилось в cookies или `session_state`).
  • Stateful бэкенд: Бэкенд хранит состояние в своей памяти. В этом случае необходимо обеспечить, чтобы все запросы от одного пользователя направлялись на одну и ту же реплику (sticky sessions).

Рекомендуемая архитектура: Stateless бэкенд + Session State во фронтенде

Для большинства BI-дашбордов идеальна следующая схема:

  1. Бэкенд (Ray): Полностью stateless. Он принимает запрос, выполняет вычисления и возвращает результат. Он не помнит, какие запросы делал пользователь ранее.
  2. Фронтенд (Streamlit/Marimo): Хранит состояние сессии локально. В Streamlit для этого используется `st.session_state`. Например, вы можете сохранить в `session_state` фильтры, выбранные пользователем, чтобы они применялись при каждом взаимодействии.
  3. Внешнее хранилище: Для кэширования результатов тяжёлых запросов или для хранения общего состояния (например, результатов обучения модели) используйте Redis или базу данных.

Если требуется Stateful бэкенд (например, кэш в памяти реплики)

Иногда возникает необходимость, чтобы бэкенд хранил какое-то состояние для повышения производительности. Например, каждая реплика может загружать большую модель машинного обучения в свою память. В таком случае используется подход Soft Session Affinity: все запросы от одного пользователя направляются на одну и ту же реплику, используя уникальный ключ (`X-SERVE-SHARD-KEY`).

Сценарий: Долгоживущий отчёт (Report as a Service)

Рассмотрим сценарий, где бизнес-пользователь хочет “заказать” отчёт, который генерируется 10 минут, и вернуться за ним через час. Stateless архитектура здесь не подойдёт, так как бэкенд “забудет” о задаче.

  1. Stateful бэкенд (Ray Actor): Используется долгоживущий Ray Actor (актор), который хранит состояние задачи и её результат.
  2. Хранилище задач: База данных (например, PostgreSQL) используется для хранения информации о задаче (статус, результат). Актор периодически обновляет статус.
  3. Фронтенд: Пользователь запускает задачу, получает её `task_id`, а затем периодически опрашивает эндпоинт `GET /task/{task_id}/status`, который возвращает статус и, при готовности, результат.

Преимущества использования Ray в архитектуре отчётов

  1. Масштабируемость под нагрузку: Ray может автоматически масштабировать количество реплик бэкенда в зависимости от количества запросов. Если вашим дашбордом пользуется 10 или 10 000 человек, Ray адаптируется.
  2. Производительность: Ray оптимизирован для параллельных вычислений и может обрабатывать большие объёмы данных быстрее, чем традиционные инструменты.
  3. Единая кодовая база: Вы можете использовать Ray не только для serving-а данных, но и для их предварительной обработки, обучения моделей и т.д. Это упрощает инфраструктуру.
  4. Отказоустойчивость: Ray автоматически перезапускает упавшие реплики, обеспечивая высокую доступность ваших дашбордов.
  5. Гибкость управления ресурсами: Вы можете точно указать, сколько CPU и GPU нужно выделить для каждого компонента системы.

Заключение

Ray, Streamlit и Marimo образуют мощный тандем для построения современных систем отчётности и аналитики. Ray обеспечивает масштабируемый и производительный бэкенд, способный обрабатывать большие объёмы данных. Streamlit и Marimo предоставляют удобные средства для создания интерактивных и красивых дашбордов, а Git гарантирует контроль версий и простоту развёртывания. Ключом к успешной архитектуре является правильный выбор стратегии управления состоянием: в большинстве случаев подходит stateless бэкенд с хранением состояния во фронтенде, что обеспечивает простоту и отказоустойчивость. Для более сложных сценариев (долгие задачи, кэширование моделей) можно использовать stateful подход с Ray акторами и внешним хранилищем.

Если вы хотите увидеть полный рабочий пример с кодом, архитектурной схемой и инструкцией по развёртыванию, дайте знать — я подготовлю подробный гайд.

ИИгрушки 🤖

Сегодня еще кстати крылатое выражение на уме или цитата, как хотите. «Когда выручка не растет, кровати 🛌 передвинуты, ш..х сменили и все против вас, то на помощь приходят ИИгрушки) 😁☺️😉 (с)

Утиные истории: часть 2. Экосистема DuckDB в 2026 году

В первой части Утиных историй мы детально разбирали, как DuckDB переворачивает принципы локальной и встраиваемой аналитики. Сегодня на календаре 19 апреля 2026 года, и экосистема «утки» развивается с невероятной скоростью. На днях вышел юбилейный, 40-й выпуск информационного бюллетеня от команды MotherDuck.

В этой статье мы разберем самые горячие новинки обновления: релиз DuckLake 1.0, нативную поддержку протокола PostgreSQL, векторный поиск и то, как DuckDB покоряет новые горизонты программирования (от Elixir к Rust).


🦆 DuckLake 1.0: Озерный формат (Lakehouse) готов к продакшену

Главная новость апреля — релиз DuckLake 1.0. Это lakehouse-формат, в котором все метаданные хранятся непосредственно в каталоге базы данных (в PostgreSQL, SQLite или самой DuckDB), а не в разрозненных файлах, как это сделано в Delta Lake или Apache Iceberg.

Что под капотом?

  • Сортированные таблицы и Bucket-партиционирование: Оптимизируют чтение и ускоряют аналитику.
  • Решение проблемы “маленьких файлов”: Мелкие транзакции (где количество строк N≤10 по умолчанию) сохраняются напрямую (inlining) в каталог. Для сброса в объектное хранилище используется команда `CHECKPOINT`.
  • Векторы удаления (Deletion vectors): Поддержка совместимости с Iceberg.
  • Новый тип Variant: Позволяет работать с полуструктурированными данными, автоматически “раскладывая” их на примитивные типы для быстрого выполнения запросов.

Ускорение в цифрах

Отказ от чтения разрозненных файлов метаданных дает феноменальный прирост производительности базовых операций агрегации. Если сравнивать время выполнения запросов до оптимизации (T old) и с использованием чтения исключительного из каталога метаданных DuckLake (T new), то выигрыш в скорости можно выразить формулой:

Speedup =T new / T old

Для запросов вида `COUNT(*)` этот Speedup составляет от 8 до 258 раз! А вызов системной функции `duckdb_views()` ускорился примерно в 70 раз.

Неудивительно, что DuckLake уже входит в топ-10 расширений по количеству скачиваний и поддерживается клиентами Apache DataFusion, Spark, Trino и Pandas. Издательство O’Reilly даже готовит книгу *“DuckLake: The Definitive Guide”*. (Фича доступна в DuckDB v1.5.2).


🐘 MotherDuck теперь говорит на языке Postgres

Чтобы внедрить мощь DuckDB в свою инфраструктуру разработчикам часто приходилось искать специальные драйверы и коннекторы. Это в прошлом!

MotherDuck запустили PostgreSQL wire-protocol endpoint. Теперь вы можете выполнять аналитические SQL-запросы к DuckDB, используя совершенно любой клиент, пулер (pooler) или BI-инструмент, совместимый с Postgres. Устанавливать библиотеки DuckDB на клиент больше не нужно!

Достаточно направить ваш текущий клиент по адресу:

pg.us-east-1-aws.motherduck.com:5432

Авторизация происходит с помощью токена MotherDuck. При этом диалект SQL остается утиным (хотя он в значительной степени и совместим с PostgreSQL). Миграция данных возможна через обычные ETL-утилиты или расширение `pg_duckdb`.


🦀 `quack-rs`: Пишем расширения на чистом Rust

Мощным толчком для развития комьюнити-плагинов стал релиз `quack-rs`. До сих пор написание расширений для DuckDB на Rust требовало создания слоев совместимости (C++ glue) и возни с CMake.

`quack-rs` — это SDK на чистом Rust, который оборачивает *C Extension API* (v1.1+). Инструмент предоставляет безопасные абстракции и устраняет 16 задокументированных проблем с FFI (Foreign Function Interface), предотвращая “тихую” порчу данных через NULL и ошибки “double-free” в callback-функциях агрегации.

Для старта нового расширения достаточно вызвать функцию:

generate_scaffold();

Она сгенерирует все 11 файлов, необходимых для подачи плагина в репозиторий сообщества. Теперь безопасность памяти Rust и скорость DuckDB идут рука об руку.


🛠️ Важные новости комьюнити и новые инструменты (Нажмите, чтобы развернуть)

1. Lance Extension и векторный поиск

Открытый колоночный формат Lance, оптимизированный под ML и векторный поиск, теперь доступен и в DuckDB! Hao Ding реализовал поддержку чтения и записи таблиц Lance.

Писать данные можно так:

COPY (...) TO 'path/dataset.lance' (FORMAT lance, MODE 'overwrite');

Для поиска доступны функции: `lance_vector_search()`, `lance_fts()` и `lance_hybrid_search()`.

2. Dux: Распределенные DataFrame для Elixir

Появилась библиотека `dux` — lazy-by-default (ленивые по умолчанию) датафреймы для Elixir поверх DuckDB. Конвейеры данных аккумулируются в AST структуре `%Dux{}` и компилируются в SQL CTE. Заявлено, что на тестах ($10$ млн строк, Apple M4 Max) Dux обгоняет Polars (Explorer) до 2.5 раз на операциях фильтрации.

3. eBPF трассировка с ИИ (`systing 1.0`)

Инструмент для трассировки ядра Linux `systing` (написанный Josef Bacik) перешел с сохранения логов Perfetto на прямую запись в DuckDB. А интеграция с Claude Code MCP (Model Context Protocol) позволяет ИИ динамически анализировать эти базы данных DuckDB в реальном времени.

4. Jupyter и DuckDB Kernel на Go

Создано полноценное Go-ядро DuckDB для Jupyter, которое напрямую отправляет поток данных (Arrow IPC) во встроенный WASM-просмотрщик `hugr-perspective-viewer`. На панели также агрегируются метрики без написания SQL: `approx_unique`, `avg`, `min`, `max`, `count`.

5. Web-framework, Neovim и игры

  • `neovim-web`: Фреймворк для создания статических сайтов с горячими клавишами Vim. Фишка — встроенная консоль DuckDB-Wasm (команда `:sql`) прямо в браузере.
  • `connections.duckdb`: Аналог игры “Connections” от NYT, целиком реализованный на SQL макросах.

💻 Бенчмарки: Большие данные на самом дешевом MacBook Neo

Способен ли базовый ноутбук переваривать серьезную аналитику? Gábor проверил работу DuckDB на новом MacBook Neo с процессором Apple A18 Pro.

Бенчмарк Параметры Результат (медиана)
ClickBench 100M строк, лимит RAM: 5GB < 1 секунды (cold run)
TPC-DS SF100 1.63 секунды на запрос
TPC-DS SF300 79 минут (высокий disk spill)

Даже при 5 гигабайтах оперативной памяти DuckDB демонстрирует субсекундные ответы, эффективно утилизируя NVMe-память, когда RAM исчерпан (disk spill).


🎓 Внедрение в Академическую Среду

Стоит отдельно отметить профессора Dr. Torsten Grust из Тюбингенского университета (Германия). Его исследовательская группа, стоящая на стыке баз данных и технологий языков программирования, недавно запустила открытый курс DiDi (*Design and Implementation of DuckDB Internals*).

Курс использует DuckDB для обучения студентов архитектуре СУБД: от управления памятью и векторизованного исполнения до оптимизации запросов (включает около 50 рабочих примеров кода).


🗓 Ближайшие Мероприятия

  • 21 апреля 2026 (Онлайн): Стрим MotherDuck Now Speaks Postgres: Fast Analytics Without Changing Your Stack. Демонстрация нового PG wire-protocol.
  • 30 апреля 2026 (Сан-Франциско): DuckDB + MotherDuck Meetup. Разговоры про DuckLake 1.0 и распределенный DuckDB (проект OpenDuck).

Экосистема DuckDB перестала быть просто *“SQLite для аналитики”*. С релизом DuckLake, нативной интеграцией протокола Postgres и появлением SDK для Rust, “утка” окончательно закрепилась как основополагающий инструмент в стеке современных данных.

🚀 Создание почтиReal-Time Data Lake: Быстрая миграция данных в Apache Iceberg или Parquet

Сегодня Gemini 3.1 Pro Preview расскажет свое мненИИе))

Связывание транзакционных баз (PostgreSQL) и аналитических хранилищ (ClickHouse) через прямые агрегации и `JOIN` часто приводит к жесточайшим блокировкам и деградации продакшена. Когда бизнес требует быстрый результат, а внедрение полноценного CDC (Debezium + Kafka) откладывается из-за сроков и сложности, лучшим решением становится пакетная и микро-пакетная выгрузка данных в озеро (в форматы Parquet и Apache Iceberg).

С точки зрения архитектуры, наша главная цель — минимизировать время загрузки данных T load и усилия инженеров на развертывание E setup. Наша целевая функция: min(T load × E setup)

В этой статье собраны исключительно рабочие, протестированные подходы для быстрой интеграции с озером данных (Data Lake) и аналитическим движком Trino.


🐘 1. Экспорт данных из PostgreSQL: Проверенные инструменты

Мы полностью исключаем создание и восстановление тяжелых дампов (`pg_dump`). Вся транзитная нагрузка ложится исключительно на асинхронные реплики.

🌟 Подход А: Движок OLake (Самый быстрый старт в Iceberg)

Для задачи “результат нужен вчера и без сложного стека” идеально подходит OLake. Это высокопроизводительный движок репликации баз данных напрямую в Apache Iceberg (или Parquet), минуя промежуточные шины сообщений.

Шаг 1. Запуск сервиса (конфигурация `docker-compose.yml`):

version: '3.8'
services:
  olake:
    image: olakeio/olake:latest
    ports:
      - "8080:8080"
    environment:
      # Настройки доступов к вашему S3/MinIO
      - AWS_ACCESS_KEY_ID=your_access_key
      - AWS_SECRET_ACCESS_KEY=your_secret_key
      - AWS_REGION=us-east-1

Шаг 2. Запуск репликации:
Вы отправляете JSON-манифест в OLake (через UI или REST API). Движок самостоятельно делает первоначальный слепок PostgreSQL (Full Load со скоростью до 580K RPS), а затем переключается на чтение инкрементов (CDC):

{
  "pipeline_name": "pg_to_iceberg_fast",
  "source": {
    "type": "postgres",
    "connection_url": "postgresql://readonly_user:password@replica_host:5432/prod_db",
    "tables": ["public.customer", "public.orders"]
  },
  "destination": {
    "type": "iceberg",
    "catalog_type": "rest",
    "catalog_uri": "http://iceberg-rest:8181",
    "warehouse_path": "s3://my-datalake/warehouse/"
  },
  "replication_mode": "full_and_cdc"
}

🐍 Подход Б: DuckDB (Легковесная скриптовая выгрузка)

Если вы хотите управлять выгрузкой через свои `cron`-задачи или Airflow, идеальным инструментом выступает аналитическая in-memory СУБД DuckDB. Ниже приведен протестированный Python-скрипт, который напрямую подключается к реплике и потоково перегоняет данные в Parquet на S3.

Рабочий скрипт на Python (`export_to_lake.py`):

import duckdb

# Открываем in-memory соединение DuckDB
con = duckdb.connect()

# 1. Устанавливаем и загружаем необходимые расширения
con.execute("INSTALL postgres;")
con.execute("INSTALL httpfs;")
con.execute("LOAD postgres;")
con.execute("LOAD httpfs;")

# 2. Настраиваем подключение к объектному хранилищу
con.execute("""
    SET s3_region='us-east-1';
    SET s3_access_key_id='YOUR_KEY';
    SET s3_secret_access_key='YOUR_SECRET';
    SET s3_endpoint='s3.your-domain.com';
""")

# 3. Подключаемся к реплике PostgreSQL
# Команда ATTACH монтирует Postgres прямо в DuckDB под именем 'pg'
con.execute("""
    ATTACH 'host=replica_host port=5432 dbname=postgres user=postgres password=password' 
    AS pg (TYPE postgres);
""")

# 4. Копируем таблицу public.customer в S3 в сжатом формате Parquet
con.execute("""
    COPY pg.public.customer
    TO 's3://my-datalake/raw/customer.parquet' 
    (FORMAT PARQUET, COMPRESSION ZSTD);
""")

print("Выгрузка в Data Lake успешно завершена!")

🖱️ 2. Унификация аналитики с ClickHouse

Данные из ClickHouse также необходимо перегружать в Озеро (для Trino), чтобы избежать дублирования логики таблиц и нагрузки на саму СУБД тяжелыми сторонними `JOIN`-ами.

🛠 Базовый подход: Нативная табличная функция S3

Самый простой и не требующий дополнительной инфраструктуры способ — использовать встроенную функцию `s3()`. Она позволяет в один SQL-запрос отправить результат выборки прямо в объектное хранилище в нужном формате.

Пример выгрузки из ClickHouse в Parquet (выполняется в `clickhouse-client`):

-- Прямая вставка данных из локальной MergeTree таблицы в файл Parquet на S3
INSERT INTO FUNCTION s3(
    'https://s3.us-east-1.amazonaws.com/my-datalake/raw/clickhouse_export/events_{_partition_id}.parquet',
    'YOUR_KEY',
    'YOUR_SECRET',
    'Parquet'
)
SELECT id, event_type, payload, event_date
FROM local_events_mergetree
WHERE event_date = today();

*Совет: Используйте макрос `{_partition_id}` в пути файла для автоматического разбиения больших выгрузок.*

🌊 Продвинутый подход: Project Antalya (ClickHouse + Iceberg)

Для построения архитектуры на десятилетие вперед разработчики из Altinity создали сборку Project Antalya. Она позволяет использовать таблицы Iceberg в S3 как *полноценное разделяемое хранилище*, работающее со скоростью локального диска, но обходящееся в 10 раз дешевле.

Пример прозрачного монтирования:

-- 1. Подключаем готовую Iceberg-таблицу прямо как движок ClickHouse
CREATE TABLE iceberg_customer
ENGINE = Iceberg('s3://my-datalake/warehouse/customer', 'aws_key', 'aws_secret');

-- 2. Запрашиваем данные. Теперь Trino и ClickHouse читают одни и те же Parquet-файлы!
SELECT count(*) FROM iceberg_customer WHERE status = 'active';

⚠️ Решение частых проблем при транзите данных (Troubleshooting)


1. Управление оперативной памятью (OOM) в DuckDB
При скриптовой выгрузке гигантских таблиц in-memory движок может исчерпать RAM сервера.
Решение: Обязательно ограничивайте ресурсы сразу после

duckdb.connect()

:

con.execute("PRAGMA memory_limit='16GB'")
con.execute("PRAGMA threads=4")


2. Консолидация сложных типов данных PostgreSQL
Если в вашей таблице есть

JSONB

,

UUID

или пользовательские массивы, Parquet может упасть с ошибкой соответствия типов.
Решение: Вместо

COPY pg.table

напишите явный SQL-запрос с приведением к строке (

::VARCHAR

):

con.execute("""
    COPY (
        SELECT id, metadata::VARCHAR AS metadata 
        FROM pg.public.customer
    )
    TO 's3://my-datalake/raw/customer.parquet' (FORMAT PARQUET);
""")

Внутри Trino эти строки легко парсятся функциями вроде

json_extract()

.


3. Защита асинхронных реплик PostgreSQL от разрывов
Длительный процесс

SELECT *

(или

COPY

) мешает мастеру применять WAL-логи на реплике (из-за очистки строк VACUUM-ом).
Решение: На аналитической реплике (в файле

postgresql.conf

) обязательно пропишите:

max_standby_streaming_delay = -1
max_standby_archive_delay = -1
hot_standby_feedback = on

Это позволит реплике “ставить на паузу” конфликтующие обновления и не обрывать ваш транзит данных.


🎯 План внедрения (Roadmap)

  1. Мгновенный результат (Первые 1-3 дня): Используйте проверенный Python-скрипт на DuckDB для баз PostgreSQL и классическую функцию `s3()` для ClickHouse. Они перенесут исторические таблицы в Parquet на S3 без внесения изменений в инфраструктуру. Trino сразу увидит эти файлы.
  2. Системный подход (1-2 недели): Разверните OLake. Потратив пару часов на конфигурацию манифестов, вы получите автоматический конвейер инкрементальной загрузки, который напрямую питает ваши Iceberg-каталоги.
  3. Объединение аналитики (2-4 недели): Начните использовать Project Antalya, чтобы обогатить озеро горячими данными ClickHouse, избегая дублирования.
  4. Окончательная эволюция: Когда бизнес-пожар потушен и аналитики получают данные в приемлемые сроки (T lag < 1 часа), вы можете спокойно внедрить **Debezium + Kafka**. Но делать это стоит только для узкого сегмента сверхкритичных таблиц, где аналитика требуется в строгом Real-Time.

Часть 2 – Интеграция PostgreSQL, Trino и Iceberg

Эффективный ELT: Интеграция PostgreSQL, Trino и Iceberg (сравнение подходов Table Functions и pg_lake)

В современных data-архитектурах часто возникает задача переноса реляционных данных в озера данных (Data Lakes). Если ваш стек включает PostgreSQL, Trino и Iceberg (например, с REST-каталогом Lakekeeper), возникает архитектурный вопрос: как переносить данные и обращаться к ним максимально эффективно?

В этой статье мы разберем два мощных подхода: использование “нативного” для Trino проталкивания через `system.query()` и применение расширения `pg_lake` на стороне базы данных.


Проблема: Почему Trino иногда “вытягивает” всю таблицу?

Обычно в Trino мы пишем простой федеративный запрос:

SELECT * FROM postgres_catalog.public.customer WHERE acctbal > 1000;

В идеальном сценарии оптимизатор Trino считывает предикат (`acctbal > 1000`) и транслирует его в SQL-диалект PostgreSQL. Это называется Pushdown (проталкивание).

Но на практике аналитические запросы гораздо сложнее. Если запрос содержит специфичную бизнес-логику, нестандартные оконные функции, сложные JOIN-ы или функции обработки строк, которых нет в базовом словаре коннектора Trino, оптимизатор не сможет транслировать этот кусок SQL. В результате Trino принимает решение скачать всю таблицу в память своих воркеров и применить фильтрацию уже там.

Как работает Dynamic Filtering в Trino и почему он может не сработать (Детали)

Особую роль при JOIN-ах играет механизм динамической фильтрации (Dynamic Filtering). Когда вы джоините большую таблицу из Postgres с маленькой таблицей (например, справочником из Hive/Iceberg), Trino сначала читает справочник (Build side), извлекает ключи, формирует SQL-фильтр (например, `IN (1, 2, 3)`) и на лету отправляет его в Postgres (Probe side).

Два критичных параметра в конфигурации коннектора управляют этим процессом:

  • `dynamic-filtering.enabled`: Включает передачу динамических фильтров в JDBC-запросы (по умолчанию `true`).
  • `dynamic-filtering.wait-timeout`: Максимальное время, которое Trino ждет сбора фильтров из Build-стороны JOIN-а перед тем, как запустить запрос в JDBC. По умолчанию это `20s`.

В чем кроется опасность?
Если вычисление справочника на стороне Trino занимает больше времени, чем задано в `dynamic-filtering.wait-timeout` (например, 25 секунд против 20), координатор Trino прерывает ожидание. Чтобы не блокировать выполнение, он отправляет в Postgres “голый” запрос: `SELECT * FROM table`.
Вместо пары тысяч строк по сети внезапно начинают передаваться миллионы. Если загрузка сети — B, а объем таблицы PostgreSQL — V total, то время выполнения стремится к: T pull = B V total
что может привести к Out-of-Memory на воркерах Trino и падению кластера.


Решение 1: Полный Pushdown через `system.query` (Для ELT-оркестрации)

Чтобы гарантировать, что вычисления и фильтры 100% выполнятся на мощностях PostgreSQL, мы можем использовать специальную табличную функцию `system.query()`.

Этот подход разделяет обязанности: PostgreSQL занимается фильтрацией и тяжелой математикой локально, а Trino просто оркестрирует запись результата в Parquet/Iceberg.

-- Создаем таблицу в Iceberg (Lakekeeper) и наполняем её результатами из Postgres
CREATE TABLE iceberg_catalog.raw_data.customer_metrics WITH (
    format = 'PARQUET',
    partitioning = ARRAY['mktsegment']
) AS 
SELECT
    *
FROM
    TABLE(
        postgres_catalog.system.query(
            query => '
                -- Этот SQL выполняется СТРОГО внутри PostgreSQL
                SELECT 
                    custkey, 
                    name, 
                    mktsegment,
                    acctbal,
                    array_agg(acctbal) OVER (
                        PARTITION BY mktsegment 
                        ORDER BY custkey 
                        ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING
                        EXCLUDE GROUP
                    ) AS rolling_bals
                FROM public.customer
                WHERE acctbal > 1000 
                  AND created_at >= current_date - interval ''1 month''
            '
        )
    );

Преимущество: Если селективность нашего фильтра S равна 0.05 (остается 5% строк), то объем передаваемых по сети данных составит строго V total \ times S. Никакие таймауты Trino не заставят Postgres отдать лишние данные.


Решение 2: Использование `pg_lake` (Для концепции Lakehouse в PostgreSQL)

Если первый метод идеально подходит для использования Trino как движка трансформации, то зачем вообще существует проект `pg_lake`?

`pg_lake` внедряет под капот PostgreSQL движок DuckDB через `pgduck_server`. Это позволяет базе данных самостоятельно подключаться к S3 и читать/писать формат Iceberg, минуя Trino.

В чем выгода использования

pg_lake

прямо в PostgreSQL?

  1. Чтение ледяных архивов (Cold Data) без сторонних движков.
    Допустим, вы переносите старые партиции данных из Postgres в Iceberg (S3) для экономии места. С `pg_lake` база Postgres “учится” читать эти архивы. Вы можете написать обычный запрос в вашем любимом клиенте (DBeaver, DataGrip, pgAdmin):
-- Объединение горячих данных из кучи (heap) PG и холодных данных из Iceberg
SELECT * FROM public.orders_current
UNION ALL
SELECT * FROM iceberg.orders_archive WHERE order_date < '2023-01-01';
  1. Работа в родном диалекте PostgreSQL.
    Если ваши аналитики и приложения жестко завязаны на специфические функции PostgreSQL (например, PostGIS для геоданных или сложные хранимые процедуры PL/pgSQL), интеграция с `pg_lake` позволяет анализировать гигантские внешние Iceberg-файлы, используя всю мощь экосистемы PG, без необходимости переписывать SQL-код под диалект Trino.
  1. Меньше точек отказа.
    Для небольших команд, которым не нужна горизонтальная масштабируемость Trino, установка `pg_lake` позволяет построить Data Lake вообще без развертывания отдельного аналитического кластера. Postgres сам выполняет COPY-команды в S3.

Итог итогов

  • Используйте `system.query()` в Trino, если ваша цель — построить надежный, масштабируемый процесс выгрузки (ELT). Это самый безопасный паттерн: он разгружает сеть платформы данных, защищает от капризов динамической фильтрации и оставляет сервер БД свободным от сторонних плагинов.
  • Используйте `pg_lake`, если ваша бизнес-потребность — позволить самому PostgreSQL прозрачно обращаться к Data Lake. Это идеальное решение для архивации холодных данных прямо из СУБД или если ваши процессы глубоко интегрированы с инструментами, понимающими только нативный протокол Postgres.

Немного сборной сборки про качество и ML

Немного сборной сборки про качество и ML

https://github.com/andkret/Cookbook

https://podcast.ru/e/3Ldlf9-6ebG

https://habr.com/ru/companies/vtb/news/762384/

Полезные ресурсы и ссылки:
Курс MLOps (OTUS): https://otus.ru/lessons/ml-bigdata/
Основные идеи из книги «Сотрудничество в DevOps-культуре»: http://agilemindset.ru/основные-идеи-из-книги-сотрудничест/
MLOps: Continuous delivery and automation pipelines in machine learning: https://cloud.google.com/architecture/mlops-continuous-delivery-and-automation-pipelines-in-machine-learning
Как создавать качественные ML-системы. Часть 1: каждый проект должен начинаться с плана: https://habr.com/ru/companies/vk/articles/749850/
Как создавать качественные ML-системы. Часть 2: приручаем хаос: https://habr.com/ru/companies/vk/articles/749852/
The Data Engineering Cookbook: https://github.com/andkret/Cookbook
Стандарты:
ISO/IEC DIS 5259-1: https://www.iso.org/standard/81088.html
SO/IEC DIS 5259-4: https://www.iso.org/standard/81093.html
ISO/IEC 8183:2023: https://www.iso.org/standard/83002.html

Архитектура Client Spooling: Как быстро выгружать гигантские датасеты в Trino и Apache DataFusion

Работа с Big Data часто упирается в классическое “узкое горлышко”: кластер может обработать терабайты данных за секунды, но передача результатов (Result Set) обратно на сторону клиента (например, в Jupyter или скрипт) занимает часы. На дворе апрель 2026 года, и современные аналитические движки предлагают эффективные методы обхода этой проблемы — концепцию Spooling.

Немного душноты: https://www.starburst.io/blog/trino-spooling-protocol/

Архитектура Client Spooling в Trino создавалась с параноидальным акцентом на безопасность, в S3 выкидываются куски сырых, возможно, чувствительных данных.

Когда Trino решает сбросить данные в объектное хранилище, он всегда шифрует их на лету.
Для этого используется механизм S3 SSE-C (Server-Side Encryption with Customer-provided keys). Trino генерирует уникальный случайный AES-ключ для каждого запроса, отправляет его в MinIO вместе с данными, а клиенту (вашему Jupyter) отдает ссылку + этот же ключ для расшифровки.
Если мы используем локальный MinIO по адресу http://minio:9000 (без SSL/TLS), сервер MinIO видит, что ему пытаются передать секретный пароль (SSE-C ключ) по открытому незащищенному HTTP-каналу.
MinIO (как и настоящий AWS S3) строго запрещает это по спецификации. Он возвращает HTTP 400 Bad Request с ошибкой: “Requests specifying Server Side Encryption... must be made over a secure connection”. Поэтому тестировать лучше на реальном s3. И еще

Мгновенное удаление (Сборка мусора)

Главное правило Client Spooling: Trino удаляет файлы сразу же, как только они были прочитаны клиентом.
Как только ваш Python-скрипт или Jupyter получает ссылку на файл, скачивает его и отправляет координатору Trino HTTP-сигнал (ACK), что кусок получен, координатор дает команду немедленно удалить этот объект из S3.
Если запрос отменен или упал с ошибкой, Trino тоже моментально зачищает за собой fs.location. Вы просто не успеете их там увидеть.

Данных слишком мало (Thresholds)

Писать 10 строк в S3, генерировать для них Pre-signed URLs и отдавать клиенту — это дольше, чем просто плюнуть эти 10 строк текстом через координатор. Trino использует эвристику: если Result Set маленький, он отдается “инлайн” (внутри JSON-ответа самого координатора), и S3 не задействуется.

В этой статье мы разберем, как передавать результаты запросов через промежуточное S3-хранилище, на примере движков Trino и Apache DataFusion.

Физика проблемы и математика Spooling

В классической архитектуре все воркеры кластера отправляют вычисленные строки на главный узел (Coordinator), а тот уже отдает их по одному каналу клиенту.

Если D — это объем результирующей выборки, а B c — пропускная способность сети координатора, то время выгрузки данных клиенту без спулинга равно:

T classic = B / Dc

В режиме Spooling координатор не гоняет данные через себя. Воркеры напрямую, параллельно пишут куски результата в дешевое объектное хранилище (S3/MinIO). Клиент получает лишь ссылки на эти файлы и скачивает их напрямую. Если у нас N файлов в S3, доступных для многопоточного скачивания с пропускной способностью клиента B client: T spooling ≈ min(N×B s3,B client)D

Это позволяет ускорить выгрузку в десятки раз, так как $B_{client}$ и распределенный $B_{s3}$ обычно значительно больше ограничений одного координатора.


Подготовка минимальной инфраструктуры

Для демонстрации двух подходов мы убрали из нашего кластера все тяжелые клиентские среды (Jupyter, Spark) и оставили только “голое” ядро: хранилище S3, REST-каталог и SQL-движок.

минимальный

docker-compose.yml

version: '3.8'

services:
  minio:
    image: minio/minio:latest
    ports:
      - "19000:9000"
      - "19001:9001"
    environment:
      MINIO_ROOT_USER: "minio-root-user"
      MINIO_ROOT_PASSWORD: "minio-root-password"
    command: server /data --console-address ":9001"

  minio-setup:
    image: minio/mc:latest
    depends_on:
      - minio
    entrypoint: >
      /bin/sh -c "
      sleep 5;
      mc alias set myminio http://minio:9000 minio-root-user minio-root-password;
      mc mb myminio/warehouse || true;
      "

  lakekeeper:
    image: dalongrong/lakekeeper:latest
    ports:
      - "8181:8181"
    environment:
      - S3_ENDPOINT=http://minio:9000
      - S3_REGION=us-east-1
      - S3_ACCESS_KEY_ID=minio-root-user
      - S3_SECRET_ACCESS_KEY=minio-root-password
    depends_on:
      - minio-setup

  trino:
    image: trinodb/trino:latest
    ports:
      - "8080:8080"

Шаг 1. Настройка каталога и генерация данных (Trino)


Сначала мы генерируем данные в Trino. Запрос

CREATE CATALOG

использует динамическое подключение к Lakekeeper REST API. Скрипт записывает файлы в формате Parquet в MinIO:

config.properties

protocol.spooling.enabled=true
# 256-битный ключ в формате base64. Вы можете сгенерировать свой с помощью команды `openssl rand -base64 32`
protocol.spooling.shared-secret-key=jxTKysfCBuMZtFqUf8UJDQ1w9ez8rynEJsJqgJf66u0=

catalog.management=dynamic

spooling-manager.properties

spooling-manager.name=filesystem
# Включаем чтение/запись в S3 для Spooling
fs.s3.enabled=true
# Путь внутри MinIO (указываем через s3://)
fs.location=s3://warehouse/client-spooling/

# Системные настройки S3 (MinIO)
s3.endpoint=http://minio:9000
s3.region=us-east-1
s3.aws-access-key=minio-root-user
s3.aws-secret-key=minio-root-password
s3.path-style-access=true

-- 1. Подключение каталога Iceberg

CREATE CATALOG test_warehouse USING iceberg
WITH (
    "iceberg.catalog.type" = 'rest',
    "iceberg.rest-catalog.uri" = 'http://lakekeeper:8181/catalog/',
    "iceberg.rest-catalog.warehouse" = '00000000-0000-0000-0000-000000000000/test_warehouse',
    "iceberg.rest-catalog.security" = 'OAUTH2',
    "iceberg.rest-catalog.nested-namespace-enabled" = 'true',
    "iceberg.rest-catalog.vended-credentials-enabled" = 'true',
    "fs.native-s3.enabled" = 'true',
    "s3.region" = 'us-east-1',
    "s3.path-style-access" = 'true',
    "s3.endpoint" = 'http://minio:9000'
);

-- 2. Создание структуры

CREATE SCHEMA test_warehouse.test_schema;

CREATE TABLE test_warehouse.test_schema.my_table (
    id BIGINT,
    data VARCHAR
) WITH (format = 'PARQUET');

-- 3. Запись данных

INSERT INTO test_warehouse.test_schema.my_table VALUES (1, 'hello'), (2, 'world');

Если написать Select – должно быть как-то так

Аналог Spooling в Apache DataFusion (Через экспорт)

Trino поддерживает протокол *Client Spooling* “из коробки” — когда Python-клиент запрашивает огромный `SELECT`, Trino сам незаметно пишет куски в S3 и отдает клиенту готовые ссылки.

В Apache DataFusion (который часто работает как локальный движок `datafusion-cli` или встраиваемая библиотка поверх S3) применяется более прозрачный паттерн делегирования (Explicit Spooling). Мы вручную инструктируем движок сохранить результаты агрегации в распределенное хранилище, чтобы позже забрать их в удобном формате — например, упаковав их в `JSON` и сжав алгоритмом `ZSTD`.

1. Подключение к S3 и маппинг исходной таблицы

Запускаем `datafusion-cli`, передав доступы как переменные среды (для предотвращения ошибок парсинга опций):

AWS_ACCESS_KEY_ID="minio-root-user" \
AWS_SECRET_ACCESS_KEY="minio-root-password" \
AWS_ENDPOINT="http://localhost:19000" \
AWS_REGION="us-east-1" \
AWS_ALLOW_HTTP="true" \
datafusion-cli

Внутри консоли подключаем директорию с Parquet-файлами, сгенерированными Trino:

CREATE EXTERNAL TABLE my_parquet_data 
STORED AS PARQUET 
LOCATION 's3://warehouse/019d81a3-c2d6-7ed2-ab15-070becf62582/my_table-13e4b91a2b4e47d98f312b1384263880/data/';
2. Массовая конвертация и выгрузка (DataFusion COPY)

Вместо того чтобы тянуть миллионы строк на локальный терминал, мы просим DataFusion выполнить преобразование и записать итог запроса обратно в MinIO.

Мы выбираем построчный JSON с экстремальным сжатием:

COPY (
    -- Тут может быть любая сложная агрегация:
    -- SELECT id, count(data) FROM my_parquet_data GROUP BY id
    SELECT * FROM my_parquet_data
) 
TO 's3://warehouse/019d81a3-c2d6-7ed2-ab15-070becf62582/my_table-13e4b91a2b4e47d98f312b1384263880/json_export/' 
STORED AS JSON
OPTIONS (
    'format.compression' 'zstd'
);

Результат:

+-------+
| count |
+-------+
| 2     |
+-------+
1 row(s) fetched. 
Elapsed 0.270 seconds.

За миллисекунды (0.270 sec) DataFusion прочитал партиции, трансформировал бинарные столбцы в текст и сжал его.

В чем преимущество подхода DataFusion?

Описанный паттерн выполнения команды `COPY TO` с сохранением `.json.zst` в MinIO полностью воспроизводит механику Spooling:

  1. Отсутствие OOM (Out Of Memory): Клиент получает только метаданные `count`, а не гигабайты сырых данных в оперативную память.
  2. Параллелизм: Если исходных файлов много, DataFusion будет писать множество потоков `part-0.json.zst`, `part-1.json.zst` в бакет параллельно.
  3. Удаленное потребление: Вы можете запустить легкий Python-скрипт (Pandas) на дешевой машине, который просто прочитает эти сжатые легковесные JSON объекты напрямую из MinIO, минуя дорогостоящие вычислительные кластеры.

Еще немного про Fault-Tolerant Execution (FTE), нужно провести важную границу между архитектурой Trino (готовый распределенный кластер) и архитектурой DataFusion (ядро/библиотека выполнения запросов).

В самом “голом” ядре DataFusion (которое вы запускаете в `datafusion-cli` или в Jupyter) нет встроенного механизма Task Retries, потому что процессы выполняются на одной машине в рамках одного приложения. Если сервер падает — запрос прерывается.

Однако, в экосистеме DataFusion есть механизмы отказоустойчивости, которые делятся на два уровня: локальный (Spilling) и распределенный (Apache Ballista / Ray).


1. Локальная отказоустойчивость (защита от OOM)

В Trino частой причиной падения задач является нехватка памяти (Out of Memory). В DataFusion реализован мощный механизм управления памятью.

Если DataFusion понимает, что оперативной памяти для агрегации или JOIN’а не хватает, он не “роняет” задачу, а начинает сбрасывать промежуточные данные на диск (Spill to Disk).

  • Это настраивается через конфигурацию `datafusion.execution.disk_manager`.
  • Это аналог локального `spill-enabled = true` в Trino. Запрос замедлится, но выполнится до конца, не упав с ошибкой.

2. Распределенная отказоустойчивость (Аналог Trino FTE)

Trino использует архитектуру Fault-Tolerant Execution (FTE), при которой промежуточные результаты (Shuffle Exchange) пишутся в S3, а упавшие воркеры заменяются, и их задачи (Tasks) перезапускаются координатором.

В мире DataFusion эту задачу решает не само ядро, а распределенные планировщики, построенные поверх него:

А. Apache Ballista (Официальный распределенный DataFusion)

Ballista — это надстройка над DataFusion, превращающая его в полноценный кластер (с Coordinator и Executors), архитектурно очень похожая на Apache Spark и Trino.

  • Task Retries: Если один из Executor’ов теряется из-за сбоя сети или железа, Ballista Coordinator замечает это и переназначает задачу (Task) другому воркеру.
  • Shuffle Spilling: Промежуточные данные между стадиями (Stages) записываются во временные файлы. Следовательно, если упала только последняя стадия, кластеру не нужно пересчитывать весь запрос с нуля — он прочитает промежуточные Shuffle-файлы и повторит только упавший кусок.
Б. DataFusion on Ray (datafusion-ray)

Сейчас огромную популярность набирает запуск DataFusion поверх кластера Ray.
Ray — это супер-устойчивый распределенный фреймворк. Интеграция `datafusion-ray` позволяет разбить SQL-запрос на граф задач прямо в Ray.

  • За отказоустойчивость, Retry-логику и восстановление упавших узлов (Actor/Task) здесь отвечает сам Ray, который делает это на уровне индустриального стандарта.
  • Это максимально близко к концепции отказоустойчивого кластера.

Резюме: Как получить “Trino-like” Fault Tolerance в DataFusion?

  1. Если вы используете локальный DataFusion (в Python или CLI): Отказоустойчивости уровня узлов нет, но есть защита от падений по памяти (Spill to Disk). Если упадет процесс — нужно перезапускать запрос руками.
  2. Если вам нужен настоящий Task Repeat / Fault Tolerance на сотнях серверов, где падение серверов — норма: вы используете движок DataFusion вместе с кластерным менеджером Apache Ballista или Ray, которые прозрачно обеспечат перезапуск задач (Retries) и сохранение промежуточных состояний (Shuffle), полностью повторяя логику Trino FTE.

UPD: В локальном тестировании есть некоторые особенности. Когда контейнеры внутри имеют свою сеть, то трино посылает в dbeaver ссылки. А есть хост не знает что это за минива или localstack-spooling, то оно отдаст кусок данных, а остальные части просто не доедут. Квери упадет как отмененная, так как клиент получил не все результаты. Короче, надо просто так сделать

sudo nano /etc/hosts

и вставить строку вашего s3 хоста.

127.0.0.1       localstack-spooling

то есть при спулинге клиент должен не только иметь сетевую связанность с s3 но различать dns имена корректно.

Короче сравния строк пройдено, все сошлося :)

со спулингом 2.2 сек
без спулинга 4.4 сек

Питончик 2.16 сек с чанками

в самом трино еще быстрее

все строки на месте: 150тыщъ

код !!


from trino.dbapi import connect
import json

Конфигурация –

TRINO_HOST = “localhost”
TRINO_PORT = 9999
TRINO_USER = “trino”
TRINO_CATALOG = “test_warehouse”
TRINO_SCHEMA = “test_schema”
OUTPUT_FILE = “output.json”
CHUNK_SIZE = 10000 # Количество строк, обрабатываемых за один раз

def export_to_json():
conn = connect(
host=TRINO_HOST,
port=TRINO_PORT,
user=TRINO_USER,
catalog=TRINO_CATALOG,
schema=TRINO_SCHEMA,
)
cursor = conn.cursor()

try:

Отключаем Fault-Tolerant Execution

cursor.execute(“SET SESSION retry_policy = ‘NONE’”)
cursor.execute(“SELECT * FROM my_table2”)

column_names = [desc[0] for desc in cursor.description]
row_count = 0

with open(OUTPUT_FILE, “w”, encoding=“utf-8”) as f:

Используем fetchmany для чанков

while True:
rows = cursor.fetchmany(CHUNK_SIZE)
if not rows:
break
for row in rows:
row_dict = dict(zip(column_names, row))
f.write(json.dumps(row_dict, ensure_ascii=False, default=str) + “\n”)
row_count += len(rows)
print(f“Processed {row_count} rows...”)

print(f“Successfully exported {row_count} rows to {OUTPUT_FILE}”)

finally:
cursor.close()
conn.close()

if __name__ == “__main__”:
export_to_json()

Вот еще с уточкой и чанками

код


import duckdb
import json

OUTPUT_FILE = “/home/jovyan/examples/output_duckdb.json”
CHUNK_SIZE = 10000

conn = duckdb.connect()

расширения и настройки (как у вас)

conn.execute(“INSTALL httpfs; LOAD httpfs;”)
conn.execute(“INSTALL iceberg; LOAD iceberg;”)
conn.execute(“SET memory_limit = ‘4GB’;”)
conn.execute(“SET s3_region = ‘us-east-1’;”)

conn.execute(“‘’
CREATE OR REPLACE SECRET minio_secret (
TYPE S3,
KEY_ID ‘minio-root-user’,
SECRET ‘minio-root-password’,
ENDPOINT ‘minio:9000’,
USE_SSL false,
URL_STYLE ‘path’
);
‘‘’)

conn.execute(‘‘’
CREATE OR REPLACE SECRET iceberg_secret (
TYPE ICEBERG,
TOKEN ‘dummy’
);
‘‘’)

conn.execute(‘‘’
ATTACH ‘test_warehouse’ AS lakekeeper_db (
TYPE ICEBERG,
ENDPOINT ’http://lakekeeper:8181/catalog/',
ACCESS_DELEGATION_MODE ‘none’,
SECRET iceberg_secret
);
‘‘’)

Используем cursor и fetchmany для чанков

cursor = conn.cursor()
cursor.execute(‘SELECT * FROM lakekeeper_db.test_schema.my_table2’)

Получаем имена колонок

col_names = [desc[0] for desc in cursor.description]

total_rows = 0
with open(OUTPUT_FILE, ‘w’, encoding=’utf-8’) as f:
while True:
rows = cursor.fetchmany(CHUNK_SIZE)
if not rows:
break
for row in rows:
row_dict = dict(zip(col_names, row))
f.write(json.dumps(row_dict, ensure_ascii=False, default=str) + ‘\n’)
total_rows += len(rows)
print(f’Обработано строк: {total_rows}’)

print(f’✅ Загружено и сохранено строк: {total_rows}”)
print(f“📁 Данные сохранены в {OUTPUT_FILE}”)
conn.close()

Можно даже так внутри уточки


import duckdb

OUTPUT_FILE = “/home/jovyan/examples/output_duckdb_direct.json”

conn = duckdb.connect()

Расширения и настройки

conn.execute(“INSTALL httpfs; LOAD httpfs;”)
conn.execute(“INSTALL iceberg; LOAD iceberg;”)
conn.execute(“SET memory_limit = ‘4GB’;”)
conn.execute(“SET s3_region = ‘us-east-1’;”)

Секрет для MinIO

conn.execute(“‘’
CREATE OR REPLACE SECRET minio_secret (
TYPE S3,
KEY_ID ‘minio-root-user’,
SECRET ‘minio-root-password’,
ENDPOINT ‘minio:9000’,
USE_SSL false,
URL_STYLE ‘path’
);
‘‘’)

Секрет для Iceberg REST

conn.execute(‘‘’
CREATE OR REPLACE SECRET iceberg_secret (
TYPE ICEBERG,
TOKEN ‘dummy’
);
‘‘’)

Подключение каталога Lakekeeper

conn.execute(‘‘’
ATTACH ‘test_warehouse’ AS lakekeeper_db (
TYPE ICEBERG,
ENDPOINT ’http://lakekeeper:8181/catalog/',
ACCESS_DELEGATION_MODE ‘none’,
SECRET iceberg_secret
);
‘‘’)

Экспорт в JSON (массив)

conn.execute(f’’’
COPY (
SELECT * FROM lakekeeper_db.test_schema.my_table2
) TO ‘{OUTPUT_FILE}’ (FORMAT JSON);
‘‘’)

print(f’✅ Данные сохранены в {OUTPUT_FILE}’)
conn.close()

К конце концов я использовал

localstack-spooling

protocol.spooling.enabled=true
# 256-битный ключ в формате base64. Вы можете сгенерировать свой с помощью команды `openssl rand -base64 32`
protocol.spooling.shared-secret-key=jxTKysfCBuMZtFqUf8UJDQ1w9ez8rynEJsJqgJf66u0=
catalog.management=dynamic

так

spooling-manager.name=filesystem
fs.s3.enabled=true
fs.location=s3://spooling-bucket/client-spooling/

s3.endpoint=http://localstack-spooling:4566
s3.region=us-east-1
s3.aws-access-key=test
s3.aws-secret-key=test
s3.path-style-access=true

и так

services:

  trino:
    build: ./trino
    environment:
      - CATALOG_MANAGEMENT=dynamic
      - LANCE_ALLOW_HTTP=true
      - AWS_ALLOW_HTTP=true
      - AWS_ACCESS_KEY_ID=minio-root-user
      - AWS_SECRET_ACCESS_KEY=minio-root-password
      - AWS_REGION=us-east-1
      - AWS_ENDPOINT_URL=http://minio:9000
      - CATALOG_MANAGEMENT=dynamic
      - JDK_JAVA_OPTIONS=--add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED
    healthcheck:
      test: ["CMD", "curl", "-I", "http://localhost:8080/v1/status"]
      interval: 2s
      timeout: 10s
      retries: 2
      start_period: 10s
    ports:
      - "9999:8080"
    volumes:
      - ./lance5.properties:/etc/trino/catalog/lance5.properties
      - ./lance_rest.properties:/etc/trino/catalog/lance_rest.properties
      - ./lance_ice.properties:/etc/trino/catalog/lance_ice.properties
      # --- ДОБАВЬТЕ ЭТУ СТРОКУ ---
      - ./spooling-manager.properties:/etc/trino/spooling-manager.properties
      # (При необходимости пробросьте и config.properties, если он не копируется при build: ./trino)
      - ./config.properties:/etc/trino/config.properties
      - spooling-data:/tmp/spooling
    networks:
      - lakekeeper-network
    depends_on:
      localstack-setup:    # <--- Trino ждет, пока AWS CLI не создаст бакет!
        condition: service_completed_successfully

  localstack-spooling:
    image: localstack/localstack:3.4.0    # Жестко фиксируем бесплатную рабочую версию!
    container_name: localstack-spooling
    ports:
      - "4566:4566"
    environment:
      - SERVICES=s3
      - AWS_DEFAULT_REGION=us-east-1
    networks:
      - lakekeeper-network

  localstack-setup:
    image: amazon/aws-cli:latest
    container_name: localstack-setup
    depends_on:
      - localstack-spooling
    restart: "no"
    environment:
      - AWS_ACCESS_KEY_ID=test
      - AWS_SECRET_ACCESS_KEY=test
      - AWS_DEFAULT_REGION=us-east-1
    entrypoint: >
      /bin/sh -c "
        echo 'Waiting for LocalStack to fully start...';
        sleep 10;
        aws --endpoint-url=http://localstack-spooling:4566 s3 mb s3://spooling-bucket;
        echo 'LocalStack bucket created successfully!';
      "
    networks:
      - lakekeeper-network
      
  jupyter:
    image: quay.io/jupyter/pyspark-notebook:2024-10-14
    depends_on:
      lakekeeper:
        condition: service_healthy
      # Исправлено: теперь зависим от рабочего setup сервиса
      lakekeeper-setup:
        condition: service_completed_successfully
      trino:
        condition: service_healthy
      # Удалено: starrocks (сервис не описан в compose файле)
    command: start-notebook.sh --NotebookApp.token=''
    volumes:
      - ./notebooks:/home/jovyan/examples/
      - spooling-data:/tmp/spooling
    networks:
      - lakekeeper-network
    ports:
      - "8888:8888"

  # Сервис initialwarehouse УДАЛЕН, так как он дублировал lakekeeper-setup 
  # и ссылался на несуществующие сервисы (bootstrap, createbuckets).

  postgres-lakekeeper:
    image: postgres:17
    container_name: postgres-lakekeeper
    environment:
      POSTGRES_USER: lakekeeper
      POSTGRES_PASSWORD: lakekeeper
      POSTGRES_DB: lakekeeper
    ports:
      - "5435:5432"
    volumes:
      - lakekeeper-postgres-data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U lakekeeper -d lakekeeper"]
      interval: 2s
      timeout: 10s
      retries: 5
    networks:
      - lakekeeper-network

  minio:
    image: minio/minio:latest
    container_name: minio-lakekeeper
    environment:
      MINIO_ROOT_USER: minio-root-user
      MINIO_ROOT_PASSWORD: minio-root-password
      # MINIO_DOMAIN: minio
    command: server /data --console-address ":9001"
    ports:
      - "19000:9000"
      - "19001:9001"
    volumes:
      - lakekeeper-minio-data:/data
    healthcheck:
      test: ["CMD", "mc", "ready", "local"]
      interval: 2s
      timeout: 10s
      retries: 5
    networks:
      - lakekeeper-network

  minio-setup:
    image: minio/mc:latest
    container_name: minio-setup
    depends_on:
      minio:
        condition: service_healthy
    entrypoint: >
      /bin/sh -c "
        mc alias set myminio http://minio:9000 minio-root-user minio-root-password &&
        mc mb myminio/warehouse --ignore-existing &&
        echo 'MinIO bucket created'
      "
    networks:
      - lakekeeper-network

  lakekeeper-migrate:
    image: quay.io/lakekeeper/catalog:latest-main
    container_name: lakekeeper-migrate
    depends_on:
      postgres-lakekeeper:
        condition: service_healthy
    environment:
      - LAKEKEEPER__PG_ENCRYPTION_KEY=test-encryption-key-not-secure
      - LAKEKEEPER__PG_DATABASE_URL_READ=postgresql://lakekeeper:lakekeeper@postgres-lakekeeper:5432/lakekeeper
      - LAKEKEEPER__PG_DATABASE_URL_WRITE=postgresql://lakekeeper:lakekeeper@postgres-lakekeeper:5432/lakekeeper
    restart: "no"
    command: ["migrate"]
    networks:
      - lakekeeper-network

  lakekeeper:
    image: quay.io/lakekeeper/catalog:latest-main
    container_name: lakekeeper
    depends_on:
      lakekeeper-migrate:
        condition: service_completed_successfully
      minio-setup:
        condition: service_completed_successfully
    environment:
      - LAKEKEEPER__PG_ENCRYPTION_KEY=test-encryption-key-not-secure
      - LAKEKEEPER__PG_DATABASE_URL_READ=postgresql://lakekeeper:lakekeeper@postgres-lakekeeper:5432/lakekeeper
      - LAKEKEEPER__PG_DATABASE_URL_WRITE=postgresql://lakekeeper:lakekeeper@postgres-lakekeeper:5432/lakekeeper
      - LAKEKEEPER__AUTHZ_BACKEND=allowall
      - RUST_LOG=info
    command: ["serve"]
    healthcheck:
      test: ["CMD", "/home/nonroot/lakekeeper", "healthcheck"]
      interval: 2s
      timeout: 10s
      retries: 5
      start_period: 5s
    ports:
      - "8282:8181"
    networks:
      - lakekeeper-network

  lakekeeper-bootstrap:
    image: curlimages/curl
    container_name: lakekeeper-bootstrap
    depends_on:
      lakekeeper:
        condition: service_healthy
    restart: "no"
    command:
      - -w
      - "%{http_code}"
      - "-X"
      - "POST"
      - "-v"
      - "http://lakekeeper:8181/management/v1/bootstrap"
      - "-H"
      - "Content-Type: application/json"
      - "--data"
      - '{"accept-terms-of-use": true}'
      - "-o"
      - "/dev/null"
    networks:
      - lakekeeper-network

  lakekeeper-setup:
    image: curlimages/curl
    container_name: lakekeeper-setup
    depends_on:
      lakekeeper-bootstrap:
        condition: service_completed_successfully
    restart: "no"
    entrypoint: ["/bin/sh", "-c"]
    command:
      - |
        echo "Creating test_warehouse..."
        curl -sf -X POST "http://lakekeeper:8181/management/v1/warehouse" \
          -H "Content-Type: application/json" \
          -d '{
            "warehouse-name": "test_warehouse",
            "project-id": "00000000-0000-0000-0000-000000000000",
            "storage-profile": {
              "type": "s3",
              "bucket": "warehouse",
              "endpoint": "http://minio:9000",
              "region": "us-east-1",
              "path-style-access": true,
              "flavor": "minio",
              "sts-enabled": false
            },
            "storage-credential": {
              "type": "s3",
              "credential-type": "access-key",
              "aws-access-key-id": "minio-root-user",
              "aws-secret-access-key": "minio-root-password"
            }
          }' && echo "Warehouse created successfully" || echo "Failed to create warehouse"
    networks:
      - lakekeeper-network

volumes:
  lakekeeper-postgres-data:
  lakekeeper-minio-data:
  spooling-data:
  
networks:
  lakekeeper-network:
    driver: bridge
Earlier Ctrl + ↓