Обработка логов Trino из Kafka с помощью Vector для удаления полей
В современных архитектурах данных, построенных на Kafka, часто возникает задача обработки или фильтрации потока событий “на лету”. Один из распространенных кейсов — удаление чувствительной информации из логов перед их передачей в следующую систему (например, в SIEM или систему долгосрочного хранения).
Kafka: https://hub.docker.com/r/apache/kafka
Vector: https://vector.dev/docs
Рассмотрим реальный пример:
- Кластер Trino (или Presto) пишет подробные логи о каждом выполненном запросе в топик Kafka.
- Эти логи содержат как полезные метаданные (пользователь, время, объем данных), так и полную текстовую версию самого SQL-запроса в поле, например, `query`.
- Задача: Переложить эти логи в другой топик Kafka, но уже без** поля `query`, чтобы система-подписчик не имела доступа к потенциально конфиденциальной информации в текстах запросов.
Для решения этой задачи мы воспользуемся Vector — легковесным и сверхбыстрым инструментом для обработки данных.
План действий
- Создадим два топика в Kafka: `trino-logs-raw` (для сырых логов) и `trino-logs-cleaned` (для очищенных).
- Настроим Vector для чтения из первого топика, удаления поля `query` и всех служебных метаданных.
- Настроим Vector на запись результата во второй топик.
- Запустим всю цепочку в Docker и протестируем.
Шаг 1: Подготовка Kafka
Предполагается, что у вас уже запущен Kafka-брокер в Docker. На основе нашего примера, у вас есть контейнер с именем `broker1`, который является частью Docker-сети `minimal_iceberg_net`.
Откройте терминал и подключитесь к контейнеру Kafka, чтобы создать топики:
Создадим сеть
docker network create my_net
Запускаем брокер broker:
docker run -d \
--name broker3 \
--network=my_net \
-p 8893:9092 \
-e KAFKA_NODE_ID=3 \
-e KAFKA_PROCESS_ROLES='broker,controller' \
-e KAFKA_CONTROLLER_QUORUM_VOTERS='3@broker3:9093' \
-e KAFKA_LISTENERS='INTERNAL://0.0.0.0:29092,EXTERNAL://0.0.0.0:9092,CONTROLLER://broker3:9093' \
-e KAFKA_ADVERTISED_LISTENERS='INTERNAL://broker3:29092,EXTERNAL://localhost:8893' \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP='INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT' \
-e KAFKA_INTER_BROKER_LISTENER_NAME='INTERNAL' \
-e KAFKA_CONTROLLER_LISTENER_NAMES='CONTROLLER' \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
-e KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
-e KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 \
apache/kafka:latest
docker exec --workdir /opt/kafka/bin/ -it broker3 shТеперь, находясь внутри контейнера, выполните команды:
# Создаем "сырой" топик для входящих логов Trino
./kafka-topics.sh --create --topic trino-logs-raw --bootstrap-server localhost:29092 --partitions 1 --replication-factor 1
# Создаем "чистый" топик для обработанных логов
./kafka-topics.sh --create --topic trino-logs-cleaned --bootstrap-server localhost:29092 --partitions 1 --replication-factor 1*Обратите внимание: я использую внутренний порт брокера `29092`, который узнали ранее.*
Выйдите из контейнера командой `exit`.
Шаг 2: Конфигурация Vector
На вашей локальной машине создайте структуру папок:
vector-trino-processor/
└── config/
└── vector.tomlПоместите в файл `vector.toml` следующую конфигурацию. Это сердце нашего решения.
# vector-trino-processor/config/vector.toml
# ==================================
# ИСТОЧНИК ДАННЫХ
# ==================================
# Читаем сырые логи из Kafka
[sources.trino_raw_logs]
type = "kafka"
# Подключаемся к брокеру по имени контейнера и внутреннему порту
bootstrap_servers = "broker3:29092"
# Указываем, какой топик слушать
topics = ["trino-logs-raw"]
group_id = "vector-trino-cleaner"
# Vector автоматически распарсит входящие сообщения как JSON
decoding.codec = "json"
# ==================================
# ТРАНСФОРМАЦИЯ
# ==================================
# Удаляем поле `query` и служебные метаданные Vector
[transforms.clean_trino_log]
type = "remap"
# Получаем данные от нашего источника
inputs = ["trino_raw_logs"]
# Скрипт на языке Vector Remap Language (VRL)
source = '''
# 1. Удаляем чувствительное поле "query" из лога.
del(.query)
# 2. Удаляем все служебные поля, которые Vector добавляет
# при чтении из Kafka, чтобы на выходе был чистый JSON.
del(.headers)
del(.message_key)
del(.offset)
del(.partition)
del(.source_type)
del(.timestamp)
del(.topic)
'''
# ==================================
# ПРИЕМНИК ДАННЫХ
# ==================================
# Пишем очищенные логи в новый топик Kafka
[sinks.trino_cleaned_logs]
type = "kafka"
# Принимаем на вход данные, прошедшие трансформацию
inputs = ["clean_trino_log"]
bootstrap_servers = "broker3:29092"
# Указываем топик для записи
topic = "trino-logs-cleaned"
# Кодируем итоговое событие обратно в JSON
encoding.codec = "json"Шаг 3: Запуск и Тестирование
Нам понадобится три терминала.
В Терминале №1 — Запустим Vector
Перейдите в папку `vector-trino-processor` и выполните команду:
docker run \
-d \
--name vector-processor \
-v "$(pwd)/config:/etc/vector/" \
--network=my_net \
--rm \
timberio/vector:latest-alpine --config /etc/vector/vector.tomlЭта команда:
- Запускает контейнер Vector в фоновом режиме (`-d`).
- Дает ему имя `vector-processor`.
- Монтирует ваш локальный конфиг (`-v`).
- Подключает его к той же сети, что и Kafka (`--network`).
- Явно указывает, какой файл конфигурации использовать (`--config`).
В Терминале №2 — Симулируем отправку лога Trino
Запустим интерактивный Kafka-продюсер.
docker exec --workdir /opt/kafka/bin -it broker3 ./kafka-console-producer.sh --topic trino-logs-raw --bootstrap-server localhost:29092Теперь вставьте в этот терминал JSON, имитирующий лог от Trino, и нажмите Enter:
{"user":"yuriy","source":"trino-cli","queryId":"20231120_123456_00001_abcde","query":"SELECT * FROM sensitive_table a JOIN other_table b ON a.id = b.id WHERE a.credit_card = '1234-5678-9012-3456'","state":"FINISHED"}В Терминале №3 — Проверяем результат
Запустим Kafka-консьюмер, который будет слушать очищенный топик `trino-logs-cleaned`.
docker exec --workdir /opt/kafka/bin -it broker3 ./kafka-console-consumer.sh --topic trino-logs-cleaned --bootstrap-server localhost:29092 --from-beginningВы практически мгновенно увидите результат работы Vector — тот же самый лог, но уже без поля `query`:
{"user":"yuriy","source":"trino-cli","queryId":"20231120_123456_00001_abcde","state":"FINISHED"}Мы построили простой, но мощный конвейер для обработки данных в режиме реального времени, решив поставленную задачу с минимальными усилиями.