Apache Airflow 入门
从小白到略知一二。
背景
都说 Airflow 很强大,用 python 语言写 DAG。
对于我,语言不重要,了解他的运行架构和设计思想更重要。
先安装一个单机版本试试。
安装
在 Mac M1 上默认安装各种编译错误,然后使用 conda 重新安装了 python,终于搞定。
安装过程复制自官方:
AIRFLOW_VERSION=2.4.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
启动一个开发版本试试:
airflow standalone
成功启动后,控制台打印了一些有用信息。本机 Web 端默认为: http://localhost:8080
。
登录账号也在控制台中打印。
standalone |
standalone | Airflow is ready
standalone | Login with username: admin password: xxxxxxxxxx
standalone | Airflow Standalone is for development purposes only. Do not use this in production!
standalone |
进入之后,有很多现成的 DAG,一头雾水。
或者走下面这个步骤,都差不多。
初始化数据库:
airflow db init
这个过程中,有个警告,是说要安装 kubernetes 的 excutor。顺手给他装一下:
pip install apache-airflow-providers-cncf-kubernetes
创建一个新账号
airflow users create \
--username xie \
--firstname Cloudbeer \
--lastname Xie \
--role Admin \
--email cloudbeer@gmail.com
换个端口启动 Web Server
airflow webserver --port 9000
换个 终端窗口启动 Scheduler:
airflow scheduler
编写第一个 DAG
Airflow 的 hello world 代码如下:
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
'hellodag',
default_args={
'depends_on_past': False,
'email': ['cloudbeer@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='这是一个简单的 DAG',
schedule=timedelta(days=1),
start_date=datetime(2022, 12, 1),
catchup=False,
tags=['example'],
) as dag:
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes `doc_md` (markdown),
`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)
"""
)
dag.doc_md = """
这是一个简单的 DAG。
"""
templated_command = dedent(
"""
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
{% endfor %}
"""
)
t3 = BashOperator(
task_id='templated',
depends_on_past=False,
bash_command=templated_command,
)
t1 >> [t2, t3]
- 上面的代码修改 DAG 的名字为 hellodag
- 文件名命名为 hellodag.py
将这个文件放到 ~/airflow/dags 目录下。
使用 python 验证一下:
python hellodag.py
没有错误。
Airflow 任务测试
dag 文件放到了对应的目录了,现在查看一下 dags。(又要开一个终端)
airflow dags list
- hellodag 已经出现在列表了。
airflow tasks list hellodag --tree
- 这个命令可以看到这个 dag 包含了三个任务
print_date
,sleep
,templated
- –tree 显示了依赖关系。
<Task(BashOperator): print_date>
<Task(BashOperator): sleep>
<Task(BashOperator): templated>
测试一下任务:
airflow tasks test hellodag print_date 2015-06-01
airflow tasks test hellodag sleep 2015-06-01
airflow tasks test hellodag templated 2015-06-01
运行 backfill (回填)
airflow dags backfill hellodag \
--start-date 2015-06-01 \
--end-date 2015-06-07
好了。入了个门。我去研究 Airflow in K8S 了。