Запуск Kafka на s3
Оригинал: https://medium.com/thedeephub/how-do-we-run-kafka-100-on-the-object-storage-521c6fec6341
А тут pdf’ка
Не стал переводить
Welcome to my personal place for love, peace and happiness 🤖
Оригинал: https://medium.com/thedeephub/how-do-we-run-kafka-100-on-the-object-storage-521c6fec6341
А тут pdf’ка
Не стал переводить
Оригинал: https://blog.det.life/no-data-engineers-dont-need-dbt-30573eafa15e
Нужно ли мне изучать dbt? Я часто вижу этот вопрос на Reddit, и он меня путает. Звучит просто. Использует ли ваша компания dbt? Если да, то да. Если нет, то нет. Как и любой другой инструмент, dbt лучше всего использовать в тех сценариях, где он хорошо подходит. В то же время, я часто читаю мнения людей, которые говорят, что dbt не приносит пользы. Затем они начинают перечислять десять инструментов, которые они используют вместо этого. Должен быть какой-то средний путь.
Видите ли, здесь важен именно аспект «хорошей подгонки», а не необходимость. Мы используем инструменты для решения проблем. Позвольте повторить это. Мы используем инструменты для решения проблем. Не потому, что они крутые или мы хотим добавить их в наш набор навыков, или потому что все остальные их используют. Инструменты помогают нам решать проблемы. Давайте рассмотрим проблемы, которые помогает решать dbt, и случаи, когда он подходит. Черт возьми. Давайте также поговорим о сценариях, в которых он не подходит.
Что такое dbt — лучшее определение?
Существует распространенное заблуждение, что dbt — это инструмент ELT, который позволяет извлекать, загружать и преобразовывать данные. Это неверно. Dbt — это всего лишь инструмент для преобразования данных с использованием SQL. Если вы не используете SQL для преобразования данных, использование dbt станет огромным изменением. Возможно, это не для вас. Если у вас есть хранилище данных, где SQL является общим языком, то dbt может быть хорошим вариантом.
Говорить, что dbt — это просто инструмент для преобразования, недооценивает его ценность, потому что преобразование данных — это большая задача в корпоративной среде. Так что давайте дадим ему лучшее определение:
Dbt — это набор утилит, который позволяет управлять преобразованием данных с использованием динамического SQL в гибком языке шаблонов.
Это уже получше, хотя я не уверен, как бы это восприняли в отделе маркетинга dbt Labs. Проблема в том, что это определение говорит нам, что делает dbt, но не объясняет, как он это делает или какие проблемы он решает. Чтобы уточнить определение, давайте обсудим некоторые проблемы, возникающие при преобразовании данных. В частности, давайте обсудим четыре ключевые проблемы, с которыми мы все сталкиваемся.
Управление зависимостями
Динамический код
Материализация
Тестирование – качество данных и проверка кода
Генерация документация ( плюс от меня )
Управление зависимостями
Прежде чем мы напишем строчку SQL, мы должны знать исходные данные для наших таблиц и представлений. Это очевидно, верно? Если мы выбираем столбцы, мы должны знать, откуда мы выбираем. Это включает в себя базу данных, схему и таблицу. Таблица обычно, но не всегда, статична. Однако база данных и схема часто меняются в разных средах. Например, у нас могут быть рабочие, промежуточные и производственные среды, использующие разные базы данных.
Не большая проблема для решения, но самодельные решения будут нуждаться в реализации какой-то системы управления зависимостями. Dbt делает это с помощью функции ref(). Вот пример того, как это работает у меня на работе. Если я использую ref() в WHERE clause, dbt автоматически подберет нужную таблицу для среды, в которой мы работаем.
dev: dev_lgodin_dw.common.date_d
staging: dw_staging.common.date_d
prod: dw.common.date_d
```sql
select
date,
year,
month
from {{ ref(‘date_d’) }}
```
Управление зависимостями – это гораздо больше, чем просто идентификация полностью квалифицированного имени таблицы. Мы должны понимать порядок выполнения преобразований. Например, fact_sales должна выполняться перед fact_sales_monthly. Использование ref() автоматически решает эту задачу. Фактически, dbt создает граф зависимостей каждый раз при его запуске. Посмотрите на зависимости ниже для dbt-fake.
Граф зависимостей dbt
Давайте надеяться, что вы никогда не увидите что-то подобное в вашем хранилище данных. Я использую это для создания фальшивых данных, поэтому это не типичный сценарий. Используя ref() в моих моделях, dbt фиксирует зависимости за меня. Он выполнит эти модели в четыре этапа, используя параллелизм, когда это возможно.
fake_companies, fake_dates и fake_numbers
companies_base
employees_base
enterprise_orders_base
Теперь представьте хранилище данных с сотнями источников и сотнями моделей. Ручное определение зависимостей было бы непростой задачей. Вам понадобилась бы какая-то утилита для управления зависимостями. Dbt решает эту задачу из коробки. Если вы хотите пойти дальше, вы можете изучить зависимости с помощью переменной graph или manifest.json в dbt.
Динамический SQL
```sql
{{
config(
materialized = ‘incremental’,
unique_key = [‘order_date’, ‘user_id’, ‘product_id’],
)
}}
select
date as order_date,
user_id,
product_id,
num_items,
sum(num_items) over (partition by user_id) as total_weekly_items,
current_timestamp() as created_at,
current_timestamp() as updated_at
from {{ get_date_filtered_subquery(
source_model=ref(‘stg_orders’),
target_model=this,
run_at_date=get_run_at_date()) }}
```
Моя главная жалоба на SQL заключается в его слабой поддержке для написания динамических запросов. Каждый поставщик реализует свои собственные методы для динамической генерации SQL путем склеивания строк и запуска некоторых команд exec. Dbt интегрирует Jinja-SQL как часть комплексного пакета для преобразования данных. Это значит, что мы можем писать динамический SQL на языке шаблонов.
Хотели ли вы когда-нибудь использовать цикл for в SQL для написания повторяющегося кода? Вы можете сделать это. Я часто использую метаданные для генерации больших запросов. Например, у нас есть таблица Customer 360 с ~200 столбцами. Эта таблица строится динамически с использованием CSV для метаданных и Jinja для генерации SQL. Это не только экономит время, но и значительно уменьшает количество ошибок, вызванных ручным вводом.
Как насчет автоматизации логики дат в ваших преобразованиях? Да, динамический SQL спасает положение. Нужно ли запускать ваши задания для конкретных дат, возможно, для обеспечения идемпотентности? Dbt поможет. Суть в том, что SQL хорош, но динамический SQL позволяет писать запросы, как программист. С Jinja мы можем создавать переиспользуемые компоненты, называемые макросами. Мы можем использовать циклы для написания повторяющегося кода. Мы можем думать о SQL как о результате нашей работы, а не только об усилиях.
Самое распространенное, что мы делаем в хранилищах данных, — это создаем/обновляем таблицы и представления. Dbt называет это материализацией. Обычно мы используем insert или merge для этого. Хотя эти операторы не сложны, они быстро становятся утомительными. Мы находим себя пишущими один и тот же код снова и снова, просто чтобы записать разные столбцы в базу данных. Какое это пустая трата времени.
С dbt нам никогда не нужно писать операторы insert или merge. Мы пишем запросы и настраиваем материализацию. Хотите представление (view)? materialization = ‘view’. Это инкрементальная таблица? Правильно, materialization = ‘incremental’. Может быть, вам нужна медленно меняющаяся размерность типа-2? Это немного другое, но dbt имеет для этого “снимки” (snapshots). Когда все сказано и сделано, мы настраиваем, как мы хотим материализовать данные, и dbt автоматически обрабатывает это. Это мощно. Давайте посмотрим на пример. Код ниже реализует инкрементальную таблицу.
```sql
{{
config(
materialized = ‘incremental’,
unique_key = [‘order_date’, ‘user_id’, ‘product_id’],
)
}}
select
date as order_date,
user_id,
product_id,
num_items,
sum(num_items) over (partition by user_id) as total_weekly_items,
current_timestamp() as created_at,
current_timestamp() as updated_at
from {{ get_date_filtered_subquery(
source_model=ref(‘stg_orders’),
target_model=this,
run_at_date=get_run_at_date()) }}
```
Вот и все, что я написал. Теперь давайте посмотрим на код, который выполняет dbt. В первый раз, когда я запустил эту модель, таблица не существовала в базе данных, поэтому она была создана с полной историей.
```sql
create or replace table `leogodin217-dbt-tutorial`.`enterprise_sales`.`weekly_orders`
OPTIONS()
as (
select
date as order_date,
user_id,
product_id,
num_items,
sum(num_items) over (partition by user_id) as total_weekly_items,
current_timestamp() as created_at,
current_timestamp() as updated_at
from (
select
*
from `leogodin217-dbt-tutorial`.`enterprise_sales`.`stg_orders`
) as stg_orders
);
```
Во второй раз, когда я запустил модель, таблица уже существовала, и dbt знал, что нужно сделать инкрементальное обновление. Не знаю, как вы, но я ненавижу писать подобный код. Это скучная, монотонная работа с множеством возможностей для ошибок. Особенно с десятью или более столбцами.
```sql
merge into `leogodin217-dbt-tutorial`.`enterprise_sales`.`weekly_orders` as DBT_INTERNAL_DEST
using (
select
date as order_date,
user_id,
product_id,
num_items,
sum(num_items) over (partition by user_id) as total_weekly_items,
current_timestamp() as created_at,
current_timestamp() as updated_at
from (
select
*
from `leogodin217-dbt-tutorial`.`enterprise_sales`.`stg_orders`
where date between ‘2024-07-07’ and ‘2024-07-13’
) as stg_orders
) as DBT_INTERNAL_SOURCE
on (
DBT_INTERNAL_SOURCE.order_date = DBT_INTERNAL_DEST.order_date
) and (
DBT_INTERNAL_SOURCE.user_id = DBT_INTERNAL_DEST.user_id
) and (
DBT_INTERNAL_SOURCE.product_id = DBT_INTERNAL_DEST.product_id
)
when matched then update set
`order_date` = DBT_INTERNAL_SOURCE.`order_date`,
`user_id` = DBT_INTERNAL_SOURCE.`user_id`,
`product_id` = DBT_INTERNAL_SOURCE.`product_id`,
`num_items` = DBT_INTERNAL_SOURCE.`num_items`,
`total_weekly_items` = DBT_INTERNAL_SOURCE.`total_weekly_items`,
`created_at` = DBT_INTERNAL_SOURCE.`created_at`,
`updated_at` = DBT_INTERNAL_SOURCE.`updated_at`
when not matched then insert
(`order_date`, `user_id`, `product_id`, `num_items`, `total_weekly_items`, `created_at`, `updated_at`)
values
(`order_date`, `user_id`, `product_id`, `num_items`, `total_weekly_items`, `created_at`, `updated_at`)
```
Не секрет, что хранилища данных отстают на световые годы от традиционной разработки программного обеспечения в плане тестирования. Существуют множество причин для этого. Во-первых, тестировать хранилища данных на порядки сложнее, чем общее ПО. В Python-приложении мы можем выделить отдельный модуль. Это не применимо в хранилище данных. Нам всегда нужна работающая база данных для тестирования чего-либо. В лучшем случае у нас есть интеграционные или системные тесты. Более того, помимо тестирования нашего кода, нам нужно тестировать сами данные. Это не часто входит в область ответственности разработчиков приложений. Как dbt справляется с этим?
Меня наняли на текущую должность для улучшения архитектуры dbt и качества данных. Обычно нам редко удается выполнить задачи такого рода, но мы сделали это — с огромной помощью команды. Мы не часто говорим о качестве данных в нашем хранилище, потому что мы приложили усилия, чтобы исправить это. Многие команды пытаются внедрить более сложные решения, но именно мы первыми находим проблемы с помощью тестов dbt.
Существуют и другие типы тестов. Важно отметить, что тесты качества данных проверяют данные и зависят от входящих источников. Мы не можем тестировать крайние случаи, если они не включены в наши данные. Проверка качества данных касается данных, но не гарантирует, что наш код корректен. Здесь на помощь приходит валидация кода.
Мы кратко обсудили концепцию юнит-тестов, и, честно говоря, у dbt нет настоящих юнит-тестов. Но у него есть возможность контролировать состояние входных данных нашей модели и сравнивать результаты с известными значениями. Это сложная задача в хранилищах данных. Проблема, которая не была хорошо решена в отрасли, но dbt предоставляет разумное решение.
Мы обсудили четыре ключевые проблемы, с которыми сталкиваемся при преобразовании данных и как dbt решает их. У dbt есть много других функций, и стоит посетить их веб-сайт для получения дополнительной информации. Там вы найдете больше о контроле версий, документации, графических операторах для запуска подмножеств моделей и многое другое. Но мы знаем достаточно, чтобы улучшить наше определение.
Что такое dbt?
Dbt — это комплексный пакет, позволяющий инженерам сосредоточиться на реализации бизнес-логики в преобразованиях хранилища данных.
Как работает dbt?
Dbt работает, абстрагируя общие шаблоны хранилищ данных в автоматизацию, управляемую конфигурацией, и предоставляя набор инструментов для упрощения SQL-преобразований, тестов и документации.
Не существует единого инструмента, который подошел бы всем. Мы говорили об этом. Мы используем инструменты для решения проблем. Dbt не решает все проблемы. В каких случаях dbt не является правильным инструментом для работы?
Хотите лучше понять dbt? Попробуйте мой список статей по промежуточному и продвинутому уровню dbt.
Надеюсь, к этому моменту вы поняли, что мы все время задавали неправильный вопрос. Никому не нужен dbt, если это не требуется для их работы. Гораздо лучше задать вопрос: поможет ли dbt решить проблемы, с которыми мы сталкиваемся? Если dbt делает вашу работу проще, то, вероятно, это хороший выбор. Dbt помог нам улучшить качество данных, уменьшить количество сбоев задач и улучшить нашу дисциплину разработки программного обеспечения. По этим причинам он хорошо подходит для нас. Нам не нужен dbt, но он, безусловно, решает множество проблем.
Перевод: https://medium.com/@ashishverma_93245/data-mesh-data-fabric-better-together-15a8b70f4a9e
Оригинал: Data Mesh & Data Fabric : лучше вместе!
Эта статья изучает, как два популярных подхода к разработке платформ данных — Data Mesh и Data Fabric — могут эффективно использоваться вместе. Хотя некоторые уже понимают, как эти подходы дополняют друг друга, многое из существующего контента представляет их как конкурирующие решения. Этот блог нацелен опровергнуть такое восприятие, подчеркивая их синергию. Хотя некоторое содержание о их взаимодополняющей природе уже существует, отсутствуют реальные сценарии, иллюстрирующие, как они работают вместе и с какими потенциальными вызовами они сталкиваются. Поэтому в данной статье будет исследоваться, как интегрировать эти подходы, реальные сценарии их использования и ловушки, которых следует избегать.
Отказ от ответственности: высказанные в этой статье мнения являются моими и не представляют взгляды моих предыдущих, текущих или будущих работодателей. В статье несколько раз упоминается Microsoft Fabric, и поскольку на данный момент я работаю в Microsoft, это может рассматриваться как конфликт интересов. Я выделяю Microsoft Fabric, потому что это единственная реализация Data Fabric, с которой я ознакомлен, и считаю, что она эффективно решает поставленные задачи.
Структура обсуждения следующая:
До появления Data Mesh большинство организаций, использующих облачные платформы данных, полагались на одну центральную платформу для решения всех задач. Независимо от зрелости платформ — некоторые боролись с базовым управлением озерами данных, в то время как у других были отдельные платформы для каждого домена — общим элементом была центральная команда, управляющая всеми техническими обязанностями. Это часто приводило к множеству проблем, по мере масштабирования платформы с целью поддержки растущего количества задач:
Задержки в выпуске новых функций и задач. В модели центральной платформы все технические обязанности ложатся на команду платформы. Это работает хорошо, когда задач мало, а зрелость данных низка. Однако по мере увеличения количества задач и стремления бизнес-команд изучать больше источников данных, backlog команды платформы быстро растет. Они оказываются растянутыми между добавлением функций в основополагающую платформу, поддержкой новых и существующих задач и устранением проблем. Это чрезмерная нагрузка часто приводит к задержкам, негативно влияющим на бизнес, не позволяя реализовать ранее выявленные возможности.
Снижение производительности и морального духа центральной команды платформы. По мере увеличения рабочих нагрузок центральной команды с увеличением числа задач и требований на новые функции, приоритет отдается предоставлению бизнес-ценности, оставляя мало времени на улучшение технологического стека. Это создает порочный круг: концентрация на удовлетворении немедленных бизнес-потребностей препятствует техническим улучшениям, которые могут повысить эффективность. Со временем это приводит к увеличению времени отклика на запросы, а моральный дух команды страдает из-за постоянных жалоб, несмотря на их усердную работу и длинные часы.
Управление и технологический долг отходит на второй план. Как уже отмечено, команда платформы часто работает в режиме реагирования, сосредотачиваясь на запросах функций, фундаментальных обновлениях и устранении проблем. Это оставляет мало времени для устранения технического долга или внедрения надлежащего управления. Последствия включают в себя ухудшающуюся производительность, растущие неучтенные расходы и проблемы с безопасностью из-за неправильно отслеженного доступа к данным, среди прочих возможных последствий.
Data Mesh решает эти вызовы, предлагая следующие принципы:
Владение доменами. Команды доменов, такие как маркетинг и финансы, владеют своими данными и ресурсами, необходимыми для извлечения из них ценности. Эти ресурсы включают в себя персонал, такой как инженеры и ученые по данным, а также технологические инструменты, предоставляемые центральной командой. Эта структура позволяет доменным командам внедрять новые функции или использовать их без необходимости полагаться на центральную команду платформы, используя свой собственный талант.
Данные как продукт. Data Mesh продвигает идею обращения с данными как с продуктом, применяя к данным принципы управления продуктом. Data Product включает в себя наборы данных, метаданные, вспомогательную инфраструктуру и механизмы доставки, такие как API или потоки. Data Products подразделяются на два типа: ingestion (выравненные с источником) и consumption (выравненные с задачами). Этот подход обеспечивает эффективное управление, требующее от каждого набора данных выполнять свои функции или быть выведенным из эксплуатации. Продуктовые команды, находящиеся в домене, создают и поддерживают эти Data Products, используя инструменты, предоставляемые центральной командой платформы.
Самообслуживание. Самодостаточная инфраструктура позволяет доменным командам самостоятельно создавать и управлять своими Data Products, взаимодействуя с базовой облачной инфраструктурой платформы Data Mesh. Основная команда платформы оптимизирует облачные услуги и программное обеспечение с открытым исходным кодом, чтобы повысить производительность, соответствие и безопасность, обеспечивая легкую доступность этих инструментов для продуктовых команд через интерфейс самообслуживания.
Федеративное вычислительное управление. Управление децентрализовано, но стандартизировано на уровне всей организации, чтобы обеспечить интероперабельность и соответствие как организационным, так и промышленным стандартам. Оно не должно быть второстепенной мыслью; вместо этого, его следует интегрировать на этапе первоначального планирования и поддерживать на ранних этапах использования.
Вот такими представляют себе Data Mesh (немного больше, чем в кратком изложении).
Разрастание стека данных. Управление объемом данных становится значительно сложной задачей, когда данные и инструменты для их управления распределены по различным облачным платформам и локальным системам. Ключевыми проблемами здесь являются:
а. Размножение инструментов. Использование множества инструментов для загрузки, хранения, обработки и анализа данных приводит к:
— путанице и дублированию
— сложностям в отслеживании происхождения данных
— трудностям в демонстрации бизнес-эффективности проектов
— потенциальным рискам безопасности и соответствия
b. Дублирование данных. Наборы данных часто копируются несколько раз в различных системах, усложняя стремление найти единый источник истины.
c. Отсутствие стандартизации. Без стандартизированных инструментов организации сталкиваются со сложностями в:
— контроле издержек и обеспечения безопасности
— поддержании единых стандартов качества на уровне всего объема данных
— росте затрат на лицензирование
d. Ограниченная видимость затрат. Обеспечение единого обзора затрат на уровень всего объема данных является редкостью из-за:
— отсутствия механизма отслеживания стоимости на уровне дизайна
— слабая интеграция различных инструментов и панель управления
Это ограничение видимости мешает директорами по технологиям (CTO) и директорами по данным (CDO) выявлять стратегические возможности для инвестиций и области для оптимизации затрат, усложняя эффективное управление объемом данных.
Обнаружение данных/инсайтов. Эта проблема в основном затрагивает бизнес-сторону организации. Без централизованного хаба для обнаружения данных бизнес-пользователи сталкиваются с значительными препятствиями:
— затруднения в исследовании и доступе к инсайтам
— неопределенность относительно точности, релевантности и своевременности данных
— ограниченная возможность делиться достижениями относительно данных
Это отсутствие координации и прозрачности значительно препятствует экспериментам и исследованиям, в конечном итоге ограничивая способность организации получать ценность из своих данных.
Любое программное обеспечение, созданное для решения бизнес-задачи, является всего лишь частью общего решения. Полное решение требует гораздо большего, начиная с спонсорства проекта, четких бизнес-требований и так далее.
Схема выше иллюстрирует соответствующие уровни окончательного решения, которые относятся к четырем областям: бизнес, технологии и данные, процесс и люди. Она также подчеркивает уровни, на которых действуют Data Mesh и Data Fabric.
Ключевые моменты о взаимодополняющей природе Data Mesh и Data Fabric:
Команды Data Product:
б. Дополнение существующей платформы Data Mesh с помощью Data Fabric : рассмотрим организацию с уже построенной платформой Data Mesh, поддерживающей многочисленные продукты данных, и дополнительными базами данных, как облачными, так и локальными. Учитывая этот обширный объем данных, должны быть веские причины для инвестирования в новую технологию. Некоторые из таких задач или причин могут быть:
b. Возможности самообслуживания:
— Упрощает исследование данных для нетехнических пользователей
— Включает AI-ассистентов (например, Copilot от Microsoft Fabric) для запросов на естественном языке и отчетности
Решение этих проблем с открытием данных может существенно повлиять на организацию, позволив выявить новые продуктовые линии и источники дохода.
Теперь, когда мы обосновали необходимость включения Fabric в дополнение к DataMesh, реализация — это еще одна история. Если вы захотите получить детали по конкретной реализации, пожалуйста, оставляйте комментарии ниже. Давайте быстро ознакомимся с ловушками.
Аналогично, решения Data Fabric, такие как Microsoft Fabric, предоставляют интегрированный набор инструментов для операций, таких как загрузка данных, обработка, хранение и визуализация из одного централизованного, универсального решения. Microsoft Fabric расширяет эту концепцию “центрального управления”, предоставляя доступ к обработке данных, хранящихся на других системах, без необходимости их копирования непосредственно в Fabric.
Заключение
В заключение, я верю, что оба этих подхода могут работать вместе для достижения значительной бизнес-ценности. Если у вас есть опыт с такими реализациями, пожалуйста, свяжитесь со мной... мне бы хотелось узнать больше. И, пожалуйста, не стесняйтесь делиться любыми отзывами. Спасибо за внимание!
Давно откопал этого китайского друга уже успело выйти пару версий. Не могу сказать, что хорошо с ним знаком, но чем то он меня по прежнему манит, то ли своей солидностью гибкой архитектуры, то ли масштабом охвата и в то же время акценте на синхронизацию данных. Сложно сказать. Одно ясно, что достаточно легко его запускать как в локальном режиме, так и в кластером если нужно. А текстовые конфиги заданий вообще мечта, а еще есть даже sql формат для заданий.
В общем знакомимся: https://seatunnel.apache.org Next-generation high-performance, distributed, massive data integration tool.
Вспомнил я его из за последнего релиза, где добавили LLM трансформер. А изначально была идея делать на нем синхронизацию данных из Кафки в s3 iceberg прямиком. Идея еще жива и потихоньку обрастает пылью. Но когда нибудь наступит час и все случится :) но не сегодня.
Пробуем записать файл 10gb в csv в s3:
Пишем конфигурацию:
env {
parallelism = 1
job.mode = "batch"
# checkpoint.interval = 30000
# checkpoint.timeout = 5000
}
# read csv
source {
LocalFile {
schema {
fields {
vendorid = string
tpep_pickup_datetime = string
tpep_dropoff_datetime = string
passenger_count = string
trip_distance = string
ratecodeid = string
store_and_fwd_flag = string
pulocationid = string
dolocationid = string
payment_type = string
fare_amount = string
extra = string
mta_tax = string
tip_amount = string
tolls_amount = string
improvement_surcharge = string
total_amount = string
}
}
path = "./2018_Yellow_Taxi_Trip_Data.csv"
file_format_type = "csv"
field_delimiter = ","
# datetime_format = "dd/MM/yyyy hh:mm:ss"
skip_header_row_number = 1
}
}
transform {
}
# csv to iceberg
sink {
iceberg {
catalog_name = "iceberg"
iceberg.catalog.config={
"type"="hive"
"uri" = "thrift://metastore:9083"
"warehouse"="s3a://test/iceberg_p_listner5_podman/"
}
hadoop.config={
"fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
"fs.s3a.endpoint" = "gateway.storjshare.io"
"fs.s3a.access.key" = "jvrKukurukutratatabq"
"fs.s3a.secret.key" = "jzwnieshotutblablabla44imhhs"
"fs.defaultFS" = "s3a://test/iceberg_p_listner5_podman/"
"fs.s3a.impl"="org.apache.hadoop.fs.s3a.S3AFileSystem"
}
namespace = "my_schema_i"
table = "taxi5"
iceberg.table.write-props={
write.format.default="parquet"
write.parquet.compression-codec="snappy"
write.target-file-size-bytes=136870912
}
# iceberg.table.primary-keys="id"
# iceberg.table.upsert-mode-enabled=true
# iceberg.table.schema-evolution-enabled=true
# case_sensitive=true
# result_table_name = "test_table"
}
}
Запускаем конфигурацию:
./bin/seatunnel.sh --config ./config/V2.LLM.csv-ice1.config.template -m local
пьем чай пару минут
***********************************************
Job Statistic Information
***********************************************
Start Time : 2024-09-12 20:38:36
End Time : 2024-09-12 20:55:45
Total Time(s) : 1028
Total Read Count : 112234626
Total Write Count : 112234626
Total Failed Count : 0
***********************************************
Готово.
Файл был про такси Нью Йорка
VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
1,02/06/2018 02:05:59 PM,02/06/2018 02:13:00 PM,1,1.1,1,N,161,100,2,6.5,0,0.5,0,0,0.3,7.3
1,02/06/2018 02:33:25 PM,02/06/2018 02:43:47 PM,1,1.1,1,N,170,161,1,8,0,0.5,1.75,0,0.3,10.55
1,02/06/2018 02:46:31 PM,02/06/2018 02:53:13 PM,1,0.9,1,N,161,170,2,6.5,0,0.5,0,0,0.3,7.3
1,02/06/2018 02:55:34 PM,02/06/2018 03:13:34 PM,1,6.1,1,N,170,231,1,20.5,0,0.5,2,0,0.3,23.3
1,02/06/2018 02:00:41 PM,02/06/2018 02:16:07 PM,1,1.4,1,N,163,170,1,10.5,0,0.5,2.25,0,0.3,13.55
1,02/06/2018 02:24:55 PM,02/06/2018 02:31:25 PM,1,0.7,1,N,170,161,1,6,0,0.5,1,0,0.3,7.8
1,02/06/2018 02:37:16 PM,02/06/2018 02:53:57 PM,1,2.2,1,N,162,262,2,12.5,0,0.5,0,0,0.3,13.3
1,02/06/2018 02:57:05 PM,02/06/2018 03:10:26 PM,1,1.8,1,N,262,162,1,11,0,0.5,2.35,0,0.3,14.15
1,02/06/2018 02:01:55 PM,02/06/2018 02:04:56 PM,1,0.4,1,N,239,143,2,4,0,0.5,0,0,0.3,4.8
и так далее до 112 234 626 строки
В Трино все хорошо
Но пришлось немного подрулить java в файле ./config/jvm_client_options
# JVM Heap
-Xms556m
-Xmx1512m
Задание после этого шло более стабильно.
Промежуточный контрольный лог выглядит норм:
2024-09-12 20:53:16,054 INFO [c.h.i.d.HealthMonitor ] [hz.main.HealthMonitor] - [localhost]:5801 [seatunnel-872250] [5.1] processors=6, physical.memory.total=14.5G, physical.memory.free=419.0M, swap.space.total=0, swap.space.free=0, heap.memory.used=1.0G, heap.memory.free=437.8M, heap.memory.total=1.4G, heap.memory.max=1.4G, heap.memory.used/total=70.25%, heap.memory.used/max=70.25%, minor.gc.count=5327, minor.gc.time=32576ms, major.gc.count=7, major.gc.time=677ms, load.process=40.78%, load.system=41.68%, load.systemAverage=2.35, thread.count=151, thread.peakCount=163, cluster.timeDiff=0, event.q.size=0, executor.q.async.size=0, executor.q.client.size=0, executor.q.client.query.size=0, executor.q.client.blocking.size=0, executor.q.query.size=0, executor.q.scheduled.size=0, executor.q.io.size=0, executor.q.system.size=0, executor.q.operations.size=0, executor.q.priorityOperation.size=0, operations.completed.count=764, executor.q.mapLoad.size=0, executor.q.mapLoadAllKeys.size=0, executor.q.cluster.size=0, executor.q.response.size=0, operations.running.count=0, operations.pending.invocations.percentage=0.00%, operations.pending.invocations.count=0, proxy.count=10, clientEndpoint.count=1, connection.active.count=0, client.connection.count=0, connection.count=0
Важно добавить:
Задание падало не ясно почему. Хазлкаст куда-то девался и писал проблемы с тайм-аутами.
тут все изложил, но пока все описывал нашел ошибки. https://github.com/apache/seatunnel/issues/7650
Писал слишком маленькие файлы и накосячил с полями, они оказывается чувствительные к регистру.
А еще заметил, что когда задание падает, то Селесты в трико не проходят. Точнее показывают пустую таблицу, но вот в s3 файлы есть. Команда optimize ничего не дает, все равно остаются.
в итоге пока не нашел как почистить ошибочные файлы.
а вот еще пример с моделью llm gpt4o. Для доступа к api я использовать сервис http://proxyapi.ru – вроде не очень дорого и удобно платить с России и расходовать по мере использования. Еще пользуюсь этим http://openrouter.ai там большое моделей, но платить чуть сложнее, можно криптой оплатить.
И так, вот пример конфига:
# Set the basic configuration of the task to be performed111
env {
parallelism = 1
job.mode = "batch"
# checkpoint.interval = 30000
# checkpoint.timeout = 5000
}
# Create a source to connect to Clickhouse
source {
Clickhouse {
host = "some-clickhouse-server:8123"
database = "default"
sql = "select * from test_table"
username = ""
password = ""
server_time_zone = "UTC"
# result_table_name = "test_table"
clickhouse.config = {
"socket_timeout": "300000"
}
}
}
transform {
LLM {
model_provider = OPENAI
model = gpt-4o
api_key = sk-GIkitutblablabalkakayatoest5D33l5a
prompt = "Determine whether someone is Chinese, American or Russian, give a feedback in json string with quatation"
openai.api_path = "https://api.proxyapi.ru/openai/v1/chat/completions"
output_data_type = "STRING"
}
}
# Console printing of the read Clickhouse data
sink {
iceberg {
catalog_name = "iceberg"
iceberg.catalog.config={
"type"="hive"
"uri" = "thrift://metastore:9083"
"warehouse"="s3a://test/iceberg_p_listner5_podman/"
}
hadoop.config={
"fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
"fs.s3a.endpoint" = "gateway.storjshare.io"
"fs.s3a.access.key" = "jvr2blablablayjrbq"
"fs.s3a.secret.key" = "jzwnleotuteshokayayatohrenbilaqskh3323imhhs"
"fs.defaultFS" = "s3a://test/iceberg_p_listner5_podman/"
"fs.s3a.impl"="org.apache.hadoop.fs.s3a.S3AFileSystem"
}
namespace = "my_schema_i"
table = "test_table6"
iceberg.table.write-props={
write.format.default="parquet"
write.parquet.compression-codec="snappy"
write.target-file-size-bytes=536870
}
iceberg.table.primary-keys="id"
iceberg.table.upsert-mode-enabled=true
iceberg.table.schema-evolution-enabled=true
case_sensitive=true
# result_table_name = "test_table"
}
}
Таблица в клике была такая:
схемка эта:
CREATE TABLE default.test_table
(
`id` Int32,
`name` String
)
ENGINE = MergeTree
ORDER BY id
SETTINGS index_granularity = 8192
......
insert into `default`.test_table values (7, 'Petya')
insert into `default`.test_table values (8, 'Dasha')
insert into `default`.test_table values (9, 'Dima')
insert into `default`.test_table values (10, 'Tony')
insert into `default`.test_table values (11, 'Fekla')
insert into `default`.test_table values (12, 'Jekie Chan')
insert into `default`.test_table values (13, 'ben')
insert into `default`.test_table values (14, 'Howard')
insert into `default`.test_table values (16, 'Semen')
insert into `default`.test_table values (17, 'Katya')
insert into `default`.test_table values (18, 'Kostya')
insert into `default`.test_table values (19, 'Natasha')
insert into `default`.test_table values (20, 'Tonya')
insert into `default`.test_table values (21, 'Tanya')
.....
Что выходит в итоге:
Данные были прочитаны из clickhouse и отправлены в модель построчно.
Модель ответила и заполнила колонку llm_outputю.
2024-09-12 21:47:12,075 INFO [s.c.s.s.c.ClientExecuteCommand] [main] -
***********************************************
Job Statistic Information
***********************************************
Start Time : 2024-09-12 21:46:33
End Time : 2024-09-12 21:47:12
Total Time(s) : 38
Total Read Count : 21
Total Write Count : 21
Total Failed Count : 0
***********************************************
В общем если хотите пробуйте SeaTunnel. Норм работает и качество улучшается.
Iceberg например не получалось у меня загрузить в прошлой версии 2.3.7, пришлось собрать свежую по рекомендации. в Ней еще надо было добавить пару либ. Они не все нужны, но обязательны для hive внизу плагины hive-exec-3.1.3.jar и libfb303-0.9.3.jar, ну и рабочая либа seatunnel-hadoop3-3.1.4-uber.jar из за которой как раз и пришлось все собирать вручную.
Портал продуктов данных: интеграция с вашей платформой данных
Несколько недель назад мы объявили о выпуске Портала продуктов данных в качестве репозитория с открытым исходным кодом. Портал продуктов данных — это инструмент с открытым исходным кодом, предназначенный для помощи организациям в создании и управлении продуктами данных в широком масштабе. Он интуитивно понятен, гибок и направлен на упрощение и повышение эффективности управления продуктами данных.
Пример интерфейса портала продуктов данных
Цель его заключается в интеграции всех аспектов данных, связанных с управлением, платформами данных и управлением пользователями, в единое решение. Мы часто получаем вопросы о том, как портал продуктов данных взаимодействует с платформами данных и инструментами. Цель этого блога — объяснить именно это. Он охватывает такие вопросы, как: как портал продуктов данных будет взаимодействовать с моей платформой данных, как определяются продукты данных, как люди могут взаимодействовать с продуктами данных и их объемом доступа к данным, как это переводится в ресурсы платформы данных, такие как хранилища данных, базы данных и инструменты для создания продуктов данных.
Пример в этом блоге основан на AWS. В будущих версиях портала продуктов данных мы планируем добавить поддержку других платформ данных, таких как Databricks, Snowflake, Azure и других. Будем рады вашим вкладам, если вы хотите ускорить разработку!
Концепции портала продуктов данных
В анонсирующем блоге мы описали продукт данных как: Инициатива с ясной целью, находящаяся под ответственностью отдела или домена, состоящая из: доступа к данным, инструментов и артефактов, в которой члены команды работают вместе. Результаты продуктов данных могут быть разделены для использования с другими продуктами данных. Эти концепции продуктов данных можно визуализировать следующим образом:
Как продукты данных взаимодействуют друг с другом
Одним из важных последствий является то, что доступ к данным организован на уровне продукта данных, а не на уровне человека. Это означает, что объемные разрешения на доступ к данным и другие правила управления делятся между обработкой, инструментами и людьми в рамках этого продукта данных. Это приносит дополнительные преимущества, но если вы хотите узнать больше об этом, вы можете прочитать следующий блог.
Сложность, связанная с подходом к мышлению о продукте данных, заключается в разработке практической реализации, которая сочетает эти концепции вместе. Как настроить объем продукта данных для вашей платформы данных, требования к управлению данными и людей, которым нужно взаимодействовать с продуктом данных, — это сложная задача для решения.
Как портал стремится объединить платформы данных, управление и людей в единое понятие
Концепции интеграции
Здесь вступает в игру портал продуктов данных. Это инструмент процесса, который предоставляет четкие и простые концепции, которые переводятся в практическую реализацию. Это помогает как техническим специалистам, работающим над продуктами данных, так и людям из бизнес-отделов контролировать и предоставлять информацию о том, как их данные используются в организации.
Концепция интеграции на высоком уровне
На диаграмме выше люди, работающие с данными и бизнес-пользователи, взаимодействуют с порталом продуктов данных через наш интерфейс процесса для настройки своих продуктов данных и организации доступа к данным, произведенным другими продуктами данных. Эта конфигурация хранится и управляется в бэкэнде портала продуктов данных. Логика конфигурации платформы данных может взаимодействовать с API портала продуктов данных для извлечения этой конфигурации. Эта логика будет переводить эту конфигурацию в практическую реализацию того, как:
Команды платформ данных могут писать свою собственную логику конфигурации на основе этих концепций, но они также могут использовать логику конфигурации по умолчанию, предоставляемую порталом продуктов данных и написанную в Terraform. С логикой конфигурации по умолчанию мы предлагаем интеграции с AWS и Conveyor из коробки, но намерены расширять эти интеграции для Databricks, Azure, Snowflake, Tableau, Collibra и других релевантных инструментов.
Шаги интеграции
Если вы хотите использовать интеграцию по умолчанию с AWS с использованием terraform, предоставленную порталом продуктов данных, вы можете сделать это, выполнив следующие шаги. Больше информации доступно в нашем репозитории open source.
Заключительные мысли
Из опыта мы узнали, что многие организации испытывают трудности с переводом своей стратегии мышления о самообслуживании продуктов данных в практическую реализацию. Мы надеемся, что портал продуктов данных вдохновит вас на то, как это на самом деле достичь, и предлагаем вам его попробовать! Если вы хотите узнать больше, пожалуйста, ознакомьтесь с нашим репозиторием на GitHub или напрямую взаимодействуйте с нами в нашем Slack-канале.
Перевод: https://medium.com/conveyordata/data-product-portal-integrating-with-your-data-platform-41bf9fcf1fc1
Введение
Саммит Airflow не за горами (10–12 сентября), и поэтому время было идеальным для нового важного релиза Airflow. Этот минорный релиз, “2.10”, не разочаровывает, и эта короткая статья расскажет о основных функциях и исправлениях, представленных в этом релизе. Если вы хотите изучить все, что предлагает Airflow 2.10, эта статья для вас.
Датасеты в Airflow
Напоминаем, что датасет в Airflow — это логическая группировка данных. Задачи-производители на верхних уровнях могут обновлять датасеты, и обновления в датасетах способствуют планированию DAG’ов на нижних уровнях потребления. В этом релизе добавлено несколько новых функций для датасетов Airflow.
Динамическое определение датасета
Алиасы датасетов могут использоваться для генерации событий датасета с ассоциацией с алиасами. DAG’и на нижних уровнях могут зависеть от разрешённых датасетов. Эта функция позволяет определять сложные зависимости для выполнения DAG’ов на основе обновлений датасетов.
Представьте, что у вас есть задача, которая генерирует выходные файлы в S3-хранилище, причём расположение этих файлов зависит от даты выполнения (ds). Ранее вам необходимо было определять датасет статически, но с новым DatasetAlias вы можете динамически создавать путь к датасету на основе контекста выполнения:
@task(outlets=[DatasetAlias("my-task-outputs")])
def my_task_with_outlet_events(*, ds, outlet_events):
outlet_events["my-task-outputs"].add(Dataset(f"s3://bucket/my-task/{ds}"))
@task(outlets=[DatasetAlias("my-task-outputs")])
def my_task_with_metadata(*, ds):
s3_dataset = Dataset(f"s3://bucket/my-task/{ds}")
yield Metadata(s3_dataset, extra={"k": "v"}, alias="my-task-outputs")
Датасеты больше не активируют неактивные DAG’и
Ранее, когда DAG был приостановлен или удалён, входящие события датасета всё равно активировали его, и DAG запускался, когда он был возвращён из паузы или добавлен обратно в файл DAG’а. Это было изменено; теперь расписание датасета DAG’а может быть выполнено только событию, которое произошло во время активного состояния DAG’а.
Рассмотрите сценарий, когда у вас есть несколько DAG’ов, потребляющих один и тот же датасет, но некоторые из этих DAG’ов приостановлены на техническое обслуживание или больше не активны. С помощью этого обновления вы можете гарантировать, что события датасета запускают только активные, не приостановленные DAG’и, предотвращая ненужные выполнения и снижая риск ошибок.
Обновление в представлении датасетов
Представление датасетов в Airflow было значительно обновлено, чтобы обеспечить более интуитивный и детальный обзор событий датасетов и их влияния на рабочие процессы. Это обновление особенно важно для пользователей, управляющих сложными конвейерами данных, где важно понимать поток данных и его эффекты на запуск.
Ключевые улучшения:
Гибридные экзекьюторы
Эта функция экспериментальная. Ранее известные как гибридные экзекьюторы, эта новая функция позволяет Airflow использовать несколько экзекьюторов одновременно. DAG’и, или даже отдельные задачи, могут быть настроены для использования конкретного экзекьютора, который лучше всего отвечает их потребностям. Один DAG может содержать задачи, все они используют разные экзекьюторы.
Запуск нескольких экзекьюторов позволяет лучше использовать сильные стороны всех доступных экзекьюторов и избегать их слабостей. Другими словами, вы можете использовать определённый экзекьютор для определённого набора задач, где его особенные достоинства и преимущества наиболее уместны для данного случая использования.
Чтобы определить экзекьютор для задачи, используйте параметр “executor” в операторах Airflow:
BashOperator(
task_id="hello_world",
executor="LocalExecutor",
bash_command="echo 'hello world!'",
)
Тёмный режим
Мы ждали до версии 2.10, чтобы в Airflow появился тёмный режим. “Всё приходит к тем, кто умеет ждать”, как говорится в пословице. Ну, не о чем больше говорить о этой очевидной функции, и далее приведён скриншот этого тёмного режима:
Вы можете переключать тёмный/светлый режим, нажав на значок полумесяца справа от панели навигации.
Новый метод “concat()”
Значительное дополнение — введение метода concat() для объектов XComArg. Эта новая функция улучшает способ манипуляции и объединения данных, передаваемых между задачами в DAG, делая процесс более эффективным и менее ресурсоёмким.
Метод concat() позволяет пользователям объединять несколько ссылок XComArg в один объект XComArg. Этот метод особенно полезен, когда нужно выполнить операцию на нескольких списках или последовательностях элементов, не создавая дополнительных задач в вашем DAG. По сути, concat() работает как itertools.chain в Python, но с дополнительным преимуществом поддержки доступа по индексу к объединённым результатам.
Когда вы вызываете concat() на объекте XComArg, он объединяет его с одним или несколькими другими объектами XComArg, создавая новый объект ConcatXComArg. Этот объект ведёт себя как последовательность, к которой можно получить доступ, используя индексы, что упрощает работу с объединёнными данными, как если бы это был один список или словарь.
Например, если у вас есть два или более XComArg, представляющие списки файлов, вы теперь можете объединить их и затем обработать объединённый список в задаче нижнего уровня, без необходимости вручную объединять списки в отдельной задаче. Это не только упрощает DAG, но также экономит ресурсы, уменьшая количество данных, хранимых в XCom, и объём памяти, необходимый для выполнения задач.
Новые разрешения на уровне DAG
Airflow 2.10 вводит расширенный контроль над разрешениями на уровне DAG, предоставляя более подробный подход к управлению доступом в ваших рабочих процессах. Это обновление расширяет существующую функцию access_control, чтобы охватывать ресурсы DAG Run, позволяя администраторам определять специфические разрешения для запуска и удаления DAG’ов на уровне каждого отдельного DAG.
В предыдущих версиях Airflow, access_control позволял role-based access management (RBAC) на уровне DAG, но детализация разрешений была ограничена. С новыми обновлениями в версии 2.10, разрешения теперь могут быть назначены не только для DAG’ов в целом, но также для конкретных действий внутри этих DAGов, таких как создание и удаление DAG посещений.
С этой новой возможностью администраторы могут указать, какие роли имеют разрешение выполнять определённые действия на конкретных DAG’ах. Например, вы можете разрешить роли пользователя запускать DAG, но запретить им его удалять. Это особенно полезно в средах, где определённые операции должны быть ограничены для определённых пользователей или команд.
with DAG(
'tutorial',
...
access_control={
'User': {
'DAG Runs': {'can_create'}
}
}
) as dag:
В этом случае пользователям с ролью “User” предоставляется разрешение создаватирибовать выполнения DAG для DAG tutorial, но не обязательно удалять сам DAG.
with DAG(
'tutorial2',
...
access_control={
'User': {'can_delete'} # Старый стиль: {'DAGs': {'can_delete'}}
) as dag:
Здесь пользователи с ролью “User” могут удалить DAG tutorial2, демонстрируя возможность применения разрешений как на уровне DAG, так и на уровне DAG Run.
UI с выключением кнопок с работающими функциями delete и trigger, когда это разрешено:
Jinja-шаблонизация с ObjectStoragePath
Теперь Jinja-шаблонизация поддерживается для ObjectStoragePath, используемого в качестве аргумента оператора. Это улучшение позволяет пользователям динамически рендерить пути на основе переменных, таких как дата выполнения, идентификаторы задач или пользовательские параметры, предоставляя большую гибкость и адаптивность в определении путей хранения в их DAGах.
Например, теперь вы можете определить ObjectStoragePath следующим образом:
path = ObjectStoragePath("s3://my-bucket/data/{{ ds }}/{{ task_id }}.csv")
В этом примере путь будет динамически отображаться с конкретной датой (ds) и идентификатором задачи, что облегчает организацию и доступ к файлам в системах объектного хранения. Эта функция упрощает интеграцию Airflow с облачными сервисами хранения, улучшая общую эффективность ваших рабочих процессов.
Цвета для строк логов в UI для ошибок и предупреждений
Чтение и поиск ошибок/предупреждений в логах станет намного проще. Ошибки и предупреждения теперь будут выделены разными цветами (красным и синим соответственно) и жирным шрифтом.
Новые декораторы run_if и skip_if
Airflow 2.10 вводит декораторы run_if и skip_if, предоставляя более краткий и читаемый способ коэффического выполнения или пропуска задач в DAG. Эти декораторы работают как синтаксический сахар, упрощая логику задания выполнения задач на основе конкретных условий.
Декоратор run_if позволяет определить условие, при котором задача должна быть выполнена. Если условие не выполняется, задача автоматически пропускается. Это особенно полезно для сценариев, в которых выполнение задачи зависит от динамических условий, оцененных во время выполнения.
Вот пример, как использовать декоратор run_if:
from __future__ import annotations
from pendulum import datetime
from airflow.decorators import dag, task
@dag(start_date=datetime(2022, 1, 1), schedule=None, catchup=False)
def condition_sample():
@task.run_if(lambda context: context["task_instance"].task_id.endswith("_do_run"))
@task.bash()
def echo() -> str:
return "echo 'run'"
echo.override(task_id="echo_do_run")()
echo.override(task_id="echo_do_not_run")()
condition_sample()
В этом примере DAG задача echo выполнится только если её task_id оканчивается на _do_run. Декоратор run_if оценивает условие, и если оно True, задача выполняется; в противном случае она пропускается.
Заключение
Каждые 4-5 месяцев Airflow выпускает новую минорную версию, и это действительно захватывающее нововведение, чтобы увидеть, как много усилий прикладывается для постоянного улучшения этого отличного инструмента оркестрации с открытым исходным кодом. Сила Airflow заключается в его сильном сообществе и стремлении сделать продукт всегда лучше. Этот релиз пошёл в этом направлении, и компаниям не стоит ждать, чтобы мигрировать на эту новую версию, поскольку новые функции могут значительно упростить/улучшить ваши текущие/новые DAGи и общее управление Airflow.
Благодарность
Если вы с удовольствием прочли эту статью, следите за обновлениями, поскольку мы регулярно публикуем технические статьи об Airflow и Data Stack в целом. Подписывайтесь на Astrafy в LinkedIn, Medium и Youtube, чтобы быть в курсе выхода следующей статьи.
Если вам нужна поддержка по Airflow или вашему Data Stack в целом, не стесняйтесь обращаться к нам по адресу sales@astrafy.io.
перевод: https://medium.astrafy.io/airflow-2-10-is-just-wow-cde50c20553e
Знали ли вы, что Databricks открыли код Unity Catalog? Если нет, вас можно понять. В конце концов, в ту же неделю, когда это было объявлено на Databricks Data + AI Summit, новостной цикл был заполнен новостями об их приобретении компании Tabular. Несмотря на то, что Tabular привлекло всеобщее внимание, оба этих решения взаимосвязаны. Это намекает на более фундаментальный сдвиг в направлении Databricks, который изменит как сферу open source, так и коммерческий ландшафт для данных и ИИ.
В следующих разделах мы объясним, почему Databricks открыли код Unity Catalog, что это будет значить для разработки вашей архитектуры данных, и предоставим информацию о планах Databricks в свете этого объявления.
Поговорим о Tabular
Прежде чем углубиться в ситуацию с Unity Catalog, нам нужно обсудить приобретение Tabular. Как это связано с Unity Catalog? Здесь важны три основные вещи:
Борьба за право покупки Tabular
Много можно сказать о процессе приобретения Tabular, но стоит сфокусироваться на участниках этого процесса. Databricks были не единственными, кто делал ставки на Tabular. На самом деле, учитывая, что Tabular была продана за более чем $1 миллиард, ясно, что интерес к компании был высок и хорошо финансирован. Особенно в конкуренции выделялась компания Snowflake, еще один лидер в области аналитики. В сочетании с уже серьезными техническими инвестициями в Iceberg со стороны Databricks и Snowflake и их анонсами на саммитах Unity Catalog и Polaris, мы наблюдаем серьезные изменения в пространстве lakehouse и open source. Это будет продолжать оказывать эффект домино. Где идут Databricks и Snowflake, туда пойдет и весь остальной рынок аналитики.
Какое будущее у Apache Iceberg?
Много спекуляций относительно влияния приобретения Tabular на будущее Iceberg. Да, Tabular — коммерческая разработка, а Iceberg — open source, и даже вклад Tabular в Iceberg не является самым значительным. Однако отрицать любое влияние на Iceberg со стороны приобретения Tabular было бы неверно. Технические вклады — это одно (и это важно), но успешные проекты также требуют сильного и поддерживаемого сообщества для поощрения постоянных инвестиций и сосредоточенности на разработке нужных функций. Именно благодаря поддержке и вовлеченности сообщества Tabular был явным лидером.
Теперь, под управлением Databricks, будущее менее определенно. Пока нет оснований считать, что Databricks не станет хорошим управляющим для усилий Tabular до приобретения, они покупали Tabular ради бизнес-решений, а не из альтруистических побуждений. Как только деятельность Tabular в отношении Iceberg перестанет быть необходимой, сложно предсказать, что сделает Databricks. Однако одно можно сказать точно: в отличие от Tabular, Databricks не имеет неразрывной связи с Apache Iceberg.
Хорошие новости для открытых вычислений
Однако, это не все пессимистично и неопределенно. Тот факт, что Databricks владеет Tabular, действительно запутывает фокус Tabular на Iceberg, но это также означает, что у Databricks есть еще больший интерес в поддержке Iceberg и открытой экосистемы lakehouse, проект которой они помогают развивать. Если что-то и изменится, то теперь у команды Tabular будет больше ресурсов для укрепления Iceberg в будущем. Это еще больше повысит жизнеспособность концепции открытого lakehouse.
Каждый из этих моментов важен, но вместе они показывают, что между гигантами рынка аналитики разворачивается конкуренция, связанные с возможностями роста в пространстве lakehouse. То, насколько эти события переплетены с open source, добавляет интриги. Это особенно явно в недавних объявлениях о переходе к открытым исходным кодам Unity Catalog и Polaris.
Matei Zaharia, CTO Databricks, объявляет о версии Unity Catalog с открытым исходным кодом на Databricks Data + AI Summit 2024
Конкурирующие объявления: Unity Catalog против Polaris
Вот интересный факт: конференции Snowflake Summit и Databricks Data + AI Summit часто пересекаются, и в этом году Databricks заранее решили провести свою конференцию в другое время, чтобы избежать деления внимания СМИ и рынка. Это имеет смысл. Эти конференции предназначены для большого количества анонсов, сосредоточенных на максимально возможном освещении новостей, которые важны для этих компаний.
Обе компании сделали значимые объявления, сигнализируя о будущем открытых lakehouse-хранилищ данных: Unity Catalog стал open source, и Snowflake сделала то же самое с Polaris. Если связать это с борьбой за Tabular, которая завершилась в течение двух недель обеих конференций, то можно увидеть некоторое соответствие в стратегических видениях обеих компаний.
Причем речь идет не только о продолжающемся росте принятия концепции lakehouse, но и о том, что более открытый подход вызывает интерес. Настолько, что два крупнейших игрока в аналитике данных серьезно инвестируют и делают пожертвования проектам на основе open source.
Новая эра для открытых lakehouse-хранилищ
Что будет дальше с Databricks и Snowflake, еще предстоит увидеть, но немедленное воздействие на легитимность и ресурсы, вкладываемые в концепцию открытого lakehouse, нельзя игнорировать. Это огромное благо для сообщества lakehouse. Databricks и Snowflake оказывают значительное влияние на инвестиции почти каждой другой компании в области аналитики. Куда идут они, туда последуют и другие, и пользователи open lakehouse получат возможность воспользоваться преимуществами. Больше инструментов, больше выбора и поддержка сделают концепцию open lakehouse более доступной и источником новых проектов. Ожидайте больше положительных изменений в этой области в ближайшие недели, месяцы и годы.
Почему Databricks сделал Unity Catalog open source
Собирая вышеперечисленные пункты, становится проще понять, почему Databricks решили открыть исходный код и передать Unity Catalog:
Что дальше для Unity Catalog
Переход Unity Catalog на open source отмечает значительный этап, но это также вызывает вопрос: что будет дальше? Ответ лежит в расширении экосистемы решений, которые уже поддерживают видение Unity Catalog с открытым исходным кодом. Раннее принятие ведущими игроками свидетельствует о многообещающем будущем для архитектуры open lakehouse.
Архитектурные опции Unity Catalog
Многие заметные компании выразили поддержку Unity Catalog OSS, включая AWS, Nvidia, Confluent, LanceDB, StarRocks и многих других. Эти организации признают ценность открытой системы каталогов и готовы интегрироваться и инновационно развивать это основание.
Что делать с этой информацией?
Это призыв к действию для всех инженеров, находящихся на обочине, и тех, кто только начинает пробовать себя в lakehouse, чтобы серьезно отнестись к этому мощному подходу к своей архитектуре данных. Никогда не было лучшего времени, чтобы перейти к открытым решениям.
С чего начать? Начните свое расследование с движков запросов для lakehouse. От Apache Iceberg до Unity Catalog большинство производительности зависит от выбора правильного движка. Для этого вам следует обратить внимание на StarRocks и присоединиться к Slack StarRocks, чтобы получить все советы от сообщества, которые помогут вам ориентироваться в этой новой эре для открытых lakehouse.
Перевод: https://medium.com/starrocks-engineering/why-did-databricks-open-source-unity-catalog-b228bd9be367
Давно уже прошел вебинар про DuckDB, а я еще обещал ответить на вопросы.
Один из них был про работу с postgres. Напомню, что DuckDB это встраиваемая аналитическая база данных.
т.е. Так как она встраиваемая и уже встроена в DBeaver, то пробовать я это буду именно там. И так приступим.
Создаю новую утиную базу
База пока пустая
Надо бы что то в ней создать. Сделаем пару insert и подключим внешний каталог Postgres.
Базу Postgres я подниму локально в Docker этой командой
docker run --name some-postgres -p 5432:5432 -e POSTGRES_PASSWORD=mysecretpassword -d postgres
Так как она тоже пустая там понадобятся insert. А еще я хотел попробовать прочитать данные DuckDB с s3 и записать их через подключенный postgres прямо в него. Ну например данные такси нью йорка, там где-то 2 гигабайта, будет хороший кейс нагрузки.
Запускаю Postgres
docker run --name some-postgres -p 5432:5432 -e POSTGRES_PASSWORD=mysecretpassword -d postgres
Подключаю бобра.
Теперь сделаем табличку и запишем туда пару
CREATE TABLE public.test (
a bigint,
b varchar,
c float,
cdate date
)
INSERT INTO public.test VALUES (1,'hello', 1.24, now());
INSERT INTO public.test VALUES (2,'hello', 2.24, now());
INSERT INTO public.test VALUES (3,'hello', 3.24, now());
INSERT INTO public.test VALUES (4,'hello', 4.24, now());
INSERT INTO public.test VALUES (5,'hello', 5.24, now());
INSERT INTO public.test VALUES (6,'hello', 6.24, now());
INSERT INTO public.test VALUES (7,'hello', 7.24, now());
INSERT INTO public.test VALUES (8,'hello', 8.24, now());
INSERT INTO public.test VALUES (9,'hello', 9.24, now());
отлично
Возвращаемся в утку и будем настраивать.
Установим для начала плагины
INSTALL postgres;
LOAD postgres;
Судя по документации можно использоваться простую команду для локального postgres
ATTACH '' AS postgres_db (TYPE POSTGRES);
Но я решил использовать чуть подробное описание с параметрами.
ATTACH 'dbname=postgres user=postgres host=127.0.0.1 password=mysecretpassword port=5432' AS db (TYPE POSTGRES);
Дополнительные параметры можно узнать из доки тут https://duckdb.org/docs/extensions/postgres
Делаю SHOW ALL TABLES и вижу что то уже из Утки.
Пробуем сделать Select и кстати в дереве каталогов уже появилась моя табличка.
select * from db.public.test t
Класс, работает.
можно даже скопировать всю таблицу в утку из postgres.
Теперь попробуем пример из вебинара, прочитать данные с s3 уткой и записать их в Postgres.
Настраиваю s3:
INSTALL httpfs;
LOAD httpfs;
CREATE SECRET secret1 (
TYPE S3,
KEY_ID 'jvvgблаблачтотоещеjuma',
SECRET 'jyehmo3kfитуткакаятофигняещеmikcfak3v4lv6',
ENDPOINT 'gateway.storjshare.io'
);
Работает.
ну и пробуем писать в postgres.
CREATE table db.public.test2 as SELECT * FROM read_parquet('s3://duckdb/parquettest/tos4.parquet');
Сходу не получилось
Ошибка:
SQL Error: Invalid Error: Failed to prepare COPY "
COPY (SELECT "City", "count_star()", "Sum" FROM "public"."test2" WHERE ctid BETWEEN '(0,0)'::tid AND '(4294967295,0)'::tid) TO STDOUT (FORMAT binary);
": ERROR: column "City" does not exist
LINE 2: COPY (SELECT "City", "count_star()", "Sum" FROM "public"."t...
^
HINT: Perhaps you meant to reference the column "test2.city".
Попробуем чуть попроще типы указать и наименования полей. Вдруг поможет. Для начала так:
SELECT City, "count_star()" Cnt, ceiling(Sum) Sum FROM read_parquet('s3://duckdb/parquettest/tos4.parquet');
Запрос отработал, но это пока еще ничего не значит.
Пробуем select но он не хочет.
SQL Error: Invalid Error: Failed to prepare COPY "
COPY (SELECT "City", "Cnt", "Sum" FROM "public"."test3" WHERE ctid BETWEEN '(0,0)'::tid AND '(4294967295,0)'::tid) TO STDOUT (FORMAT binary);
": ERROR: column "City" does not exist
LINE 2: COPY (SELECT "City", "Cnt", "Sum" FROM "public"."test3" WHE...
^
HINT: Perhaps you meant to reference the column "test3.city".
А если через COPY? Тоже не получилось, написал что то такое.
copy (SELECT City, "count_star()" Cnt, ceiling(Sum) Sum FROM read_parquet('s3://duckdb/parquettest/tos4.parquet')) TO db.public.test4
SQL Error: java.sql.SQLException: Parser Error: syntax error at or near "."
org.jkiss.dbeaver.model.sql.DBSQLException: SQL Error: java.sql.SQLException: Parser Error: syntax error at or near "."
at org.jkiss.dbeaver.model.impl.jdbc.exec.JDBCStatementImpl.executeStatement(JDBCStatementImpl.java:133)
at org.jkiss.dbeaver.ui.editors.sql.execute.SQLQueryJob.executeStatement(SQLQueryJob.java:615)
at org.jkiss.dbeaver.ui.editors.sql.execute.SQLQueryJob.lambda$2(SQLQueryJob.java:506)
at org.jkiss.dbeaver.model.exec.DBExecUtils.tryExecuteRecover(DBExecUtils.java:192)
at org.jkiss.dbeaver.ui.editors.sql.execute.SQLQueryJob.executeSingleQuery(SQLQueryJob.java:525)
at org.jkiss.dbeaver.ui.editors.sql.execute.SQLQueryJob.extractData(SQLQueryJob.java:977)
at org.jkis
А вот первый пример заработает как-то))
select * from db.public.test2
Пробуем еще раз:
CREATE table db.public.test5 as SELECT City, "count_star()" Cnt, ceiling(Sum) Sum FROM read_parquet('s3://duckdb/parquettest/tos4.parquet');
ошибка на select * from db.public.test5
SQL Error: Invalid Error: Failed to prepare COPY "
COPY (SELECT "City", "Cnt", "Sum" FROM "public"."test5" WHERE ctid BETWEEN '(0,0)'::tid AND '(4294967295,0)'::tid) TO STDOUT (FORMAT binary);
": ERROR: column "City" does not exist
LINE 2: COPY (SELECT "City", "Cnt", "Sum" FROM "public"."test5" WHE...
^
HINT: Perhaps you meant to reference the column "test5.city".
Хм)) ну как то же db.public.test2 заполнилась. Какая-то магия. Еще пока неизвестная.
Попробуем с того как начинали, но пока никак.
Кстати вот так работает: COPY db.public.test7 FROM ‘s3://duckdb/parquettest/tos4.parquet’;
Но прочитать таблицу все равно не дает.
Ну а вот и сама магия.
Если зайти в сам постгрес, то все Талицы на месте. и даже данные там есть.
Вероятно есть еще некие ошибки при записи данных через приатачченый постгрес, но их скоро поправят.
Думаю если заново переаттачить это постгрес, то данные можно будет прочитать корректнее.
Ну да, так и получилось
В общем вариант рабочий, но есть еще баги. Ждем фиксов.
Ну а следующий вебинар сделаю про “УТИлизацию Табло” – Будем готовить утку с Табло в s3шном соусе. :))
таблица большая в итоге загрузилась с s3.
Долго было
Книги про искусство 📖
Замечено на канале: https://t.me/NapasioWorldwide
Всем привет! Давайте поделимся друг с другом интересными книгами на тему современного искусства.
Я рекомендую:
Отличный вариант для тех, кто хочет сделать первые шаги к понимаю современного искусства и истории его развития.
Книга рассказывает о том как устроен галерейный бизнес и мир арт-аукционов, а также о ценообразовании в сфере изобразительного искусства.
Интересное повествование от первого лица в формате писем к брату художника. Много личных размышлений автора на тему искусства и взаимоотношениях с коллегами-современниками создают интересный опыт от прочтения.
Можно прямо под заказ для себя сделать и материалы выбрать.
Рюкзаки и тоже сумки.