Welcome to my personal place for love, peace and happiness 🤖

Обработка логов Trino из Kafka с помощью Vector для удаления полей

В современных архитектурах данных, построенных на Kafka, часто возникает задача обработки или фильтрации потока событий “на лету”. Один из распространенных кейсов — удаление чувствительной информации из логов перед их передачей в следующую систему (например, в SIEM или систему долгосрочного хранения).

Kafka: https://hub.docker.com/r/apache/kafka
Vector: https://vector.dev/docs

Рассмотрим реальный пример:

  • Кластер Trino (или Presto) пишет подробные логи о каждом выполненном запросе в топик Kafka.
  • Эти логи содержат как полезные метаданные (пользователь, время, объем данных), так и полную текстовую версию самого SQL-запроса в поле, например, `query`.
  • Задача: Переложить эти логи в другой топик Kafka, но уже без** поля `query`, чтобы система-подписчик не имела доступа к потенциально конфиденциальной информации в текстах запросов.

Для решения этой задачи мы воспользуемся Vector — легковесным и сверхбыстрым инструментом для обработки данных.

План действий

  1. Создадим два топика в Kafka: `trino-logs-raw` (для сырых логов) и `trino-logs-cleaned` (для очищенных).
  2. Настроим Vector для чтения из первого топика, удаления поля `query` и всех служебных метаданных.
  3. Настроим Vector на запись результата во второй топик.
  4. Запустим всю цепочку в Docker и протестируем.

Шаг 1: Подготовка Kafka

Предполагается, что у вас уже запущен Kafka-брокер в Docker. На основе нашего примера, у вас есть контейнер с именем `broker1`, который является частью Docker-сети `minimal_iceberg_net`.

Откройте терминал и подключитесь к контейнеру Kafka, чтобы создать топики:

Создадим сеть 

docker network create my_net 

Запускаем брокер broker:

docker run -d \
  --name broker3 \
  --network=my_net \
  -p 8893:9092 \
  -e KAFKA_NODE_ID=3 \
  -e KAFKA_PROCESS_ROLES='broker,controller' \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS='3@broker3:9093' \
  -e KAFKA_LISTENERS='INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092,CONTROLLER://broker3:9093' \
  -e KAFKA_ADVERTISED_LISTENERS='INTERNAL://broker3:29092,EXTERNAL://localhost:8893' \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP='INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT' \
  -e KAFKA_INTER_BROKER_LISTENER_NAME='INTERNAL' \
  -e KAFKA_CONTROLLER_LISTENER_NAMES='CONTROLLER' \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  -e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
  -e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
  apache/kafka:latest


docker exec --workdir /opt/kafka/bin/ -it broker3 sh

Теперь, находясь внутри контейнера, выполните команды:

# Создаем "сырой" топик для входящих логов Trino
./kafka-topics.sh --create --topic trino-logs-raw --bootstrap-server localhost:29092 --partitions 1 --replication-factor 1

# Создаем "чистый" топик для обработанных логов
./kafka-topics.sh --create --topic trino-logs-cleaned --bootstrap-server localhost:29092 --partitions 1 --replication-factor 1

*Обратите внимание: я использую внутренний порт брокера `29092`, который узнали ранее.*

Выйдите из контейнера командой `exit`.

Шаг 2: Конфигурация Vector

На вашей локальной машине создайте структуру папок:

vector-trino-processor/
└── config/
    └── vector.toml

Поместите в файл `vector.toml` следующую конфигурацию. Это сердце нашего решения.

# vector-trino-processor/config/vector.toml

# ==================================
#          ИСТОЧНИК ДАННЫХ
# ==================================
# Читаем сырые логи из Kafka
[sources.trino_raw_logs]
  type = "kafka"
  # Подключаемся к брокеру по имени контейнера и внутреннему порту
  bootstrap_servers = "broker3:29092"
  # Указываем, какой топик слушать
  topics = ["trino-logs-raw"]
  group_id = "vector-trino-cleaner"
  # Vector автоматически распарсит входящие сообщения как JSON
  decoding.codec = "json"

# ==================================
#             ТРАНСФОРМАЦИЯ
# ==================================
# Удаляем поле `query` и служебные метаданные Vector
[transforms.clean_trino_log]
  type = "remap"
  # Получаем данные от нашего источника
  inputs = ["trino_raw_logs"]
  # Скрипт на языке Vector Remap Language (VRL)
  source = '''
  # 1. Удаляем чувствительное поле "query" из лога.
  del(.query)

  # 2. Удаляем все служебные поля, которые Vector добавляет
  #    при чтении из Kafka, чтобы на выходе был чистый JSON.
  del(.headers)
  del(.message_key)
  del(.offset)
  del(.partition)
  del(.source_type)
  del(.timestamp)
  del(.topic)
  '''

# ==================================
#           ПРИЕМНИК ДАННЫХ
# ==================================
# Пишем очищенные логи в новый топик Kafka
[sinks.trino_cleaned_logs]
  type = "kafka"
  # Принимаем на вход данные, прошедшие трансформацию
  inputs = ["clean_trino_log"]
  bootstrap_servers = "broker3:29092"
  # Указываем топик для записи
  topic = "trino-logs-cleaned"
  # Кодируем итоговое событие обратно в JSON
  encoding.codec = "json"

Шаг 3: Запуск и Тестирование

Нам понадобится три терминала.

В Терминале №1 — Запустим Vector

Перейдите в папку `vector-trino-processor` и выполните команду:

docker run \
  -d \
  --name vector-processor \
  -v "$(pwd)/config:/etc/vector/" \
  --network=my_net \
  --rm \
  timberio/vector:latest-alpine --config /etc/vector/vector.toml

Эта команда:

  • Запускает контейнер Vector в фоновом режиме (`-d`).
  • Дает ему имя `vector-processor`.
  • Монтирует ваш локальный конфиг (`-v`).
  • Подключает его к той же сети, что и Kafka (`--network`).
  • Явно указывает, какой файл конфигурации использовать (`--config`).

В Терминале №2 — Симулируем отправку лога Trino

Запустим интерактивный Kafka-продюсер.

docker exec --workdir /opt/kafka/bin -it broker3 ./kafka-console-producer.sh --topic trino-logs-raw --bootstrap-server localhost:29092

Теперь вставьте в этот терминал JSON, имитирующий лог от Trino, и нажмите Enter:

{"user":"yuriy","source":"trino-cli","queryId":"20231120_123456_00001_abcde","query":"SELECT * FROM sensitive_table a JOIN other_table b ON a.id = b.id WHERE a.credit_card = '1234-5678-9012-3456'","state":"FINISHED"}

В Терминале №3 — Проверяем результат

Запустим Kafka-консьюмер, который будет слушать очищенный топик `trino-logs-cleaned`.

docker exec --workdir /opt/kafka/bin -it broker3 ./kafka-console-consumer.sh --topic trino-logs-cleaned --bootstrap-server localhost:29092 --from-beginning

Вы практически мгновенно увидите результат работы Vector — тот же самый лог, но уже без поля `query`:

{"user":"yuriy","source":"trino-cli","queryId":"20231120_123456_00001_abcde","state":"FINISHED"}

Мы построили простой, но мощный конвейер для обработки данных в режиме реального времени, решив поставленную задачу с минимальными усилиями.

Follow this blog
Send
Share
Pin
18 h   big data   Data   Data Engineer   Security