Welcome to my personal place for love, peace and happiness❣️

Airflow 2.10

Введение

Саммит Airflow не за горами (10–12 сентября), и поэтому время было идеальным для нового важного релиза Airflow. Этот минорный релиз, “2.10”, не разочаровывает, и эта короткая статья расскажет о основных функциях и исправлениях, представленных в этом релизе. Если вы хотите изучить все, что предлагает Airflow 2.10, эта статья для вас.

Датасеты в Airflow

Напоминаем, что датасет в Airflow — это логическая группировка данных. Задачи-производители на верхних уровнях могут обновлять датасеты, и обновления в датасетах способствуют планированию DAG’ов на нижних уровнях потребления. В этом релизе добавлено несколько новых функций для датасетов Airflow.

Динамическое определение датасета

Алиасы датасетов могут использоваться для генерации событий датасета с ассоциацией с алиасами. DAG’и на нижних уровнях могут зависеть от разрешённых датасетов. Эта функция позволяет определять сложные зависимости для выполнения DAG’ов на основе обновлений датасетов.

Представьте, что у вас есть задача, которая генерирует выходные файлы в S3-хранилище, причём расположение этих файлов зависит от даты выполнения (ds). Ранее вам необходимо было определять датасет статически, но с новым DatasetAlias вы можете динамически создавать путь к датасету на основе контекста выполнения:

@task(outlets=[DatasetAlias("my-task-outputs")])
def my_task_with_outlet_events(*, ds, outlet_events):
    outlet_events["my-task-outputs"].add(Dataset(f"s3://bucket/my-task/{ds}"))

@task(outlets=[DatasetAlias("my-task-outputs")])
def my_task_with_metadata(*, ds):
    s3_dataset = Dataset(f"s3://bucket/my-task/{ds}")
    yield Metadata(s3_dataset, extra={"k": "v"}, alias="my-task-outputs")

Датасеты больше не активируют неактивные DAG’и

Ранее, когда DAG был приостановлен или удалён, входящие события датасета всё равно активировали его, и DAG запускался, когда он был возвращён из паузы или добавлен обратно в файл DAG’а. Это было изменено; теперь расписание датасета DAG’а может быть выполнено только событию, которое произошло во время активного состояния DAG’а.

Рассмотрите сценарий, когда у вас есть несколько DAG’ов, потребляющих один и тот же датасет, но некоторые из этих DAG’ов приостановлены на техническое обслуживание или больше не активны. С помощью этого обновления вы можете гарантировать, что события датасета запускают только активные, не приостановленные DAG’и, предотвращая ненужные выполнения и снижая риск ошибок.

Обновление в представлении датасетов

Представление датасетов в Airflow было значительно обновлено, чтобы обеспечить более интуитивный и детальный обзор событий датасетов и их влияния на рабочие процессы. Это обновление особенно важно для пользователей, управляющих сложными конвейерами данных, где важно понимать поток данных и его эффекты на запуск.

Ключевые улучшения:

  1. Список событий датасетов: новое представление включает список всех событий датасетов, что позволяет пользователям быстро видеть последние активности по всем датасетам. Это особенно полезно для мониторинга потоков данных и диагностики проблем.
  1. Карточки с информацией о событиях: вместо простой таблицы, события датасетов теперь отображаются в виде карточек с подробной информацией об источнике события, запусках на нижних уровнях и связанными метаданными. Этот более богатый дисплей предоставляет лучшее представление с первого взгляда.
  2. Вкладочный интерфейс: новый интерфейс организует информацию по вкладкам, включая события датасетов, детали, список датасетов и графическое представление. Это разделение позволяет пользователям сосредоточиться на определённом аспекте интересующих их датасетов, без излишней информации.
  3. Навигация с помощью “хлебных крошек”: внедрение навигации с помощью “хлебных крошек” улучшает пользовательский опыт, ясно показывая выбранный датасет или событие, что упрощает перемещение вперёд и назад между представлениями.
  4. Более насыщенные детали датасета: вкладка с деталями датасета теперь включает более полную информацию, такую как дополнительные метаданные, DAG’и-потребители и задачи-производители.Этот уровень детализации помогает пользователям понять полный жизненный цикл своих датасетов.

Гибридные экзекьюторы

Эта функция экспериментальная. Ранее известные как гибридные экзекьюторы, эта новая функция позволяет Airflow использовать несколько экзекьюторов одновременно. DAG’и, или даже отдельные задачи, могут быть настроены для использования конкретного экзекьютора, который лучше всего отвечает их потребностям. Один DAG может содержать задачи, все они используют разные экзекьюторы.

Запуск нескольких экзекьюторов позволяет лучше использовать сильные стороны всех доступных экзекьюторов и избегать их слабостей. Другими словами, вы можете использовать определённый экзекьютор для определённого набора задач, где его особенные достоинства и преимущества наиболее уместны для данного случая использования.

Чтобы определить экзекьютор для задачи, используйте параметр “executor” в операторах Airflow:

BashOperator(
    task_id="hello_world",
    executor="LocalExecutor",
    bash_command="echo 'hello world!'",
)

Тёмный режим

Мы ждали до версии 2.10, чтобы в Airflow появился тёмный режим. “Всё приходит к тем, кто умеет ждать”, как говорится в пословице. Ну, не о чем больше говорить о этой очевидной функции, и далее приведён скриншот этого тёмного режима:

Вы можете переключать тёмный/светлый режим, нажав на значок полумесяца справа от панели навигации.

Новый метод “concat()”

Значительное дополнение — введение метода concat() для объектов XComArg. Эта новая функция улучшает способ манипуляции и объединения данных, передаваемых между задачами в DAG, делая процесс более эффективным и менее ресурсоёмким.

Метод concat() позволяет пользователям объединять несколько ссылок XComArg в один объект XComArg. Этот метод особенно полезен, когда нужно выполнить операцию на нескольких списках или последовательностях элементов, не создавая дополнительных задач в вашем DAG. По сути, concat() работает как itertools.chain в Python, но с дополнительным преимуществом поддержки доступа по индексу к объединённым результатам.

Когда вы вызываете concat() на объекте XComArg, он объединяет его с одним или несколькими другими объектами XComArg, создавая новый объект ConcatXComArg. Этот объект ведёт себя как последовательность, к которой можно получить доступ, используя индексы, что упрощает работу с объединёнными данными, как если бы это был один список или словарь.

Например, если у вас есть два или более XComArg, представляющие списки файлов, вы теперь можете объединить их и затем обработать объединённый список в задаче нижнего уровня, без необходимости вручную объединять списки в отдельной задаче. Это не только упрощает DAG, но также экономит ресурсы, уменьшая количество данных, хранимых в XCom, и объём памяти, необходимый для выполнения задач.

Новые разрешения на уровне DAG

Airflow 2.10 вводит расширенный контроль над разрешениями на уровне DAG, предоставляя более подробный подход к управлению доступом в ваших рабочих процессах. Это обновление расширяет существующую функцию access_control, чтобы охватывать ресурсы DAG Run, позволяя администраторам определять специфические разрешения для запуска и удаления DAG’ов на уровне каждого отдельного DAG.

В предыдущих версиях Airflow, access_control позволял role-based access management (RBAC) на уровне DAG, но детализация разрешений была ограничена. С новыми обновлениями в версии 2.10, разрешения теперь могут быть назначены не только для DAG’ов в целом, но также для конкретных действий внутри этих DAGов, таких как создание и удаление DAG посещений.

С этой новой возможностью администраторы могут указать, какие роли имеют разрешение выполнять определённые действия на конкретных DAG’ах. Например, вы можете разрешить роли пользователя запускать DAG, но запретить им его удалять. Это особенно полезно в средах, где определённые операции должны быть ограничены для определённых пользователей или команд.

with DAG(
    'tutorial',
    ...
    access_control={
        'User': {
            'DAG Runs': {'can_create'}
        }
    }
) as dag:

В этом случае пользователям с ролью “User” предоставляется разрешение создаватирибовать выполнения DAG для DAG tutorial, но не обязательно удалять сам DAG.

with DAG(
    'tutorial2',
    ...
    access_control={
        'User': {'can_delete'}  # Старый стиль: {'DAGs': {'can_delete'}}
) as dag:

Здесь пользователи с ролью “User” могут удалить DAG tutorial2, демонстрируя возможность применения разрешений как на уровне DAG, так и на уровне DAG Run.

UI с выключением кнопок с работающими функциями delete и trigger, когда это разрешено:

Jinja-шаблонизация с ObjectStoragePath

Теперь Jinja-шаблонизация поддерживается для ObjectStoragePath, используемого в качестве аргумента оператора. Это улучшение позволяет пользователям динамически рендерить пути на основе переменных, таких как дата выполнения, идентификаторы задач или пользовательские параметры, предоставляя большую гибкость и адаптивность в определении путей хранения в их DAGах.

Например, теперь вы можете определить ObjectStoragePath следующим образом:

path = ObjectStoragePath("s3://my-bucket/data/{{ ds }}/{{ task_id }}.csv")

В этом примере путь будет динамически отображаться с конкретной датой (ds) и идентификатором задачи, что облегчает организацию и доступ к файлам в системах объектного хранения. Эта функция упрощает интеграцию Airflow с облачными сервисами хранения, улучшая общую эффективность ваших рабочих процессов.

Цвета для строк логов в UI для ошибок и предупреждений

Чтение и поиск ошибок/предупреждений в логах станет намного проще. Ошибки и предупреждения теперь будут выделены разными цветами (красным и синим соответственно) и жирным шрифтом.

Новые декораторы run_if и skip_if

Airflow 2.10 вводит декораторы run_if и skip_if, предоставляя более краткий и читаемый способ коэффического выполнения или пропуска задач в DAG. Эти декораторы работают как синтаксический сахар, упрощая логику задания выполнения задач на основе конкретных условий.

Декоратор run_if позволяет определить условие, при котором задача должна быть выполнена. Если условие не выполняется, задача автоматически пропускается. Это особенно полезно для сценариев, в которых выполнение задачи зависит от динамических условий, оцененных во время выполнения.

Вот пример, как использовать декоратор run_if:

from __future__ import annotations

from pendulum import datetime

from airflow.decorators import dag, task


@dag(start_date=datetime(2022, 1, 1), schedule=None, catchup=False)
def condition_sample():
    @task.run_if(lambda context: context["task_instance"].task_id.endswith("_do_run"))
    @task.bash()
    def echo() -> str:
        return "echo 'run'"

    echo.override(task_id="echo_do_run")()
    echo.override(task_id="echo_do_not_run")()


condition_sample()

В этом примере DAG задача echo выполнится только если её task_id оканчивается на _do_run. Декоратор run_if оценивает условие, и если оно True, задача выполняется; в противном случае она пропускается.

Заключение

Каждые 4-5 месяцев Airflow выпускает новую минорную версию, и это действительно захватывающее нововведение, чтобы увидеть, как много усилий прикладывается для постоянного улучшения этого отличного инструмента оркестрации с открытым исходным кодом. Сила Airflow заключается в его сильном сообществе и стремлении сделать продукт всегда лучше. Этот релиз пошёл в этом направлении, и компаниям не стоит ждать, чтобы мигрировать на эту новую версию, поскольку новые функции могут значительно упростить/улучшить ваши текущие/новые DAGи и общее управление Airflow.

Благодарность

Если вы с удовольствием прочли эту статью, следите за обновлениями, поскольку мы регулярно публикуем технические статьи об Airflow и Data Stack в целом. Подписывайтесь на Astrafy в LinkedIn, Medium и Youtube, чтобы быть в курсе выхода следующей статьи.

Если вам нужна поддержка по Airflow или вашему Data Stack в целом, не стесняйтесь обращаться к нам по адресу sales@astrafy.io.

перевод: https://medium.astrafy.io/airflow-2-10-is-just-wow-cde50c20553e

Follow this blog
Send
Share
Pin