Повітряний потік: маловідомі поради, хитрощі та найкращі практики

З усіма інструментами, якими ви користуєтеся, є певні речі, про які ви не знатимете навіть після тривалого використання. І як тільки ви це знаєте, ви схожі на "Я хотів би, щоб я знав це раніше", як ви вже сказали своєму клієнту, що це не може бути зроблено кращим чином . Повітряний потік, як і інший інструмент, не відрізняється, є кілька прихованих дорогоцінних каменів, які можуть полегшити ваше життя та зробити розробка DAG цікавою.

Деякі з них ви, можливо, вже знаєте, і якщо ви все їх знаєте - добре, ви тоді PRO.

(1) DAG з менеджером контексту

Ви були роздратовані самим собою, коли ви забули додати dag = dag до свого завдання та помилка Airflow? Так, легко забути додавати його до кожного завдання. Також зайвим буде додати той же параметр, що показаний у наступному прикладі (файл example_dag.py):

Наведений вище приклад (файл example_dag.py) має лише 2 завдання, але якщо у вас є 10 і більше, то надмірність стає більш очевидною. Щоб уникнути цього, ви можете використовувати DAG Airflow як диспетчери контекстів, щоб автоматично призначити нових операторів цій DAG, як показано у наведеному вище прикладі (example_dag_with_context.py), використовуючи оператор.

(2) Використання списку для встановлення залежностей від задач

Коли ви хочете створити DAG, подібний до того, який показаний на зображенні нижче, вам доведеться повторити назви завдань, встановлюючи залежності задач.

Як показано у наведеному вище фрагменті коду, використання нашого звичайного способу встановлення залежностей задачі означало б, що task_two та end повторюються 3 рази. Це можна замінити за допомогою списків python для досягнення того ж результату більш елегантним способом.

(3) Використовуйте аргументи за замовчуванням, щоб уникнути повторення аргументів

Потік повітря, що дозволяє передавати словник параметрів, які були б доступні для всіх завдань у цій DAG.

Наприклад, у DataReply ми використовуємо BigQuery для всіх наших DAG-файлів, пов'язаних з DataWareshouse, і замість того, щоб передавати такі параметри, як мітки, bigquery_conn_id до кожного завдання, ми просто передаємо йому словник indefault_args, як показано в DAG нижче.

Це також корисно, коли ви хочете отримувати сповіщення про неполадки окремих завдань, а не лише про помилки DAG, про які я вже згадував у своєму останньому дописі на тему "Інтеграція сповіщень про слабкість у повітрі".

(4) Аргумент "парами"

"Params" - це словник параметрів рівня DAG, який стає доступним у шаблонах. Ці парами можуть бути замінені на рівні завдання.

Це надзвичайно корисний аргумент, і я особисто його багато використовую, оскільки можна отримати доступ до шаблонного поля з шаблоном jinja за допомогою params.param_name. Приклад використання:

Це дозволяє вам легко писати параметризований DAG замість значень жорсткого кодування. Також, як показано в прикладах вище, словник парам може бути визначений у 3 місцях: (1) В об'єкті DAG (2) У словнику default_args (3) Кожне завдання.

(5) Збереження чутливих даних у з'єднаннях

Більшість користувачів знають про це, але я все ще бачив паролі, збережені у простому тексті всередині DAG. На користь - не робіть цього. Ви повинні писати свої DAG так, щоб ви були досить впевнені, щоб зберігати свої DAG в загальнодоступному сховищі.

За замовчуванням Airflow збереже паролі для з'єднання в простому тексті в базі даних метаданих. Пакет криптовалют настійно рекомендується під час встановлення Airflow, і його можна просто виконати за допомогою pip install apache-airflow [crypto].

Потім ви можете легко отримати доступ до нього наступним чином:

з airflow.hooks.base_hook імпорту BaseHook
slack_token = BaseHook.get_connection ('слабкий') пароль

(6) Обмежте кількість змінних Airflow у вашому DAG

Змінні повітряних потоків зберігаються в базі даних метаданих, тому будь-який виклик змінних означатиме з'єднання з БД метаданих. Ваші DAG-файли аналізуються кожні X секунд. Використання великої кількості змінних у вашому DAG (а ще гірше у default_args) може означати, що ви можете наситити кількість дозволених підключень до вашої бази даних.

Щоб уникнути цієї ситуації, ви можете просто використовувати одну змінну Airflow зі значенням JSON. Оскільки змінна Airflow може містити значення JSON, ви можете зберігати всю свою конфігурацію DAG всередині однієї змінної, як показано на зображенні нижче:

Як показано на цьому скріншоті, ви можете зберігати значення в окремих змінних Airflow або під однією змінною Airflow як поле JSON

Потім ви можете отримати доступ до них, як показано нижче у розділі Рекомендований спосіб:

(7) Словник «контексту»

Користувачі часто забувають вміст контекстного словника під час використання PythonOperator з функцією дзвінка.

Контекст містить посилання на пов'язані об'єкти до екземпляра завдання та задокументований у розділі макросів API, оскільки вони також доступні для шаблонного поля.

{
      'dag': task.dag,
      'ds': ds,
      'next_ds': next_ds,
      'next_ds_nodash': next_ds_nodash,
      'prev_ds': prev_ds,
      'prev_ds_nodash': prev_ds_nodash,
      'ds_nodash': ds_nodash,
      'ц': ц,
      'ts_nodash': ts_nodash,
      'ts_nodash_with_tz': ts_nodash_with_tz,
      'вчора_ds': вчора_ds,
      'вчора_ds_nodash': вчора_ds_nodash,
      'завтра_ds': завтра_ds,
      'завтра_ds_nodash': завтра_ds_nodash,
      'END_DATE': ds,
      'end_date': ds,
      'dag_run': dag_run,
      'run_id': run_id,
      'Execution_date': self.execution_date,
      'prev_execution_date': prev_execution_date,
      'next_execution_date': next_execution_date,
      'остання_дача': ds,
      'макроси': макроси,
      'params': парами,
      'таблиці': таблиці,
      'завдання': завдання,
      'task_instan': самоврядування,
      'ti': я,
      'task_instan_key_str': ti_key_str,
      'conf': конфігурація,
      'test_mode': self.test_mode,
      'var': {
          'value': VariableAccessor (),
          'json': VariableJsonAccessor ()
      },
      'inlets': task.inlets,
      'торгові точки': task.outlets,
}

(8) Генерування задач динамічного повітряного потоку

Я відповідав на багато питань у StackOverflow про те, як створити динамічні завдання. Відповідь проста, вам просто потрібно створити унікальний task_id для всіх своїх завдань. Нижче наведено два приклади, як досягти цього:

(9) Запустіть "потік повітря оновлений" замість "повітряний потік initdb"

Завдяки Еш Берліну за цю пораду в його розмові на лондонському зустрічі First Apache Airflow.

Потік повітря initdb створить усі за замовчуванням з'єднання, графіки тощо, які ми можемо не використовувати і не хочемо в нашій виробничій базі даних. airflow upgradeedb замість цього просто застосує будь-які відсутні міграції до таблиці бази даних. (включаючи створення відсутніх таблиць тощо). Це також безпечно запускати кожного разу, він відстежує, які міграції вже застосовані (за допомогою модуля Alembic).

Повідомте мене в розділі коментарів нижче, якщо ви знаєте щось, що варто додати в цій публікації в блозі. Щасливий потік повітря :-)