Apache Airflow 是一个用于编排和调度任务的开源平台。它适用于创建、调度和监控数据工作流。以下是使用 Airflow 的基本步骤:
你可以通过以下命令来安装 Airflow:
1 |
pip install apache-airflow |
建议使用虚拟环境来管理 Airflow 的依赖项。
Airflow 需要一个数据库来存储任务执行状态和其他元数据信息。初始化数据库的命令:
1 |
airflow db init |
你需要创建一个管理员账户以访问 Airflow 的 web 界面:
1 2 3 4 5 6 7 |
airflow users create \ --username admin \ --password admin \ --firstname Firstname \ --lastname Lastname \ --role Admin \ --email admin@example.com |
Airflow 包含一个调度器(Scheduler)和一个 Web 服务器(Web Server)。你需要分别启动这两个服务:
启动调度器:
1 |
airflow scheduler |
启动 Web Server:
1 |
airflow webserver |
Web Server 默认在 localhost:8080 上运行,你可以通过浏览器访问它。
在 Airflow 中,工作流是通过 DAG(Directed Acyclic Graph)来定义的。一个简单的 DAG 例子如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def my_task(): print("This is a task") default_args = { 'start_date': datetime(2023, 9, 1), 'retries': 1 } with DAG( 'my_dag', default_args=default_args, schedule_interval='@daily' ) as dag: task = PythonOperator( task_id='my_task', python_callable=my_task ) |
你可以通过设置任务的依赖来定义任务的执行顺序。例如:
1 |
task1 >> task2 # task1 先执行,task2 后执行 |
将你定义的 DAG 文件保存到 Airflow 的 DAGs 文件夹中。这个文件夹的位置通常是 $AIRFLOW_HOME/dags/,或者你可以在 airflow.cfg 文件中配置。
访问 Airflow 的 Web 界面,你可以看到所有定义的 DAG,查看它们的执行状态,手动触发执行,并监控各个任务的日志。
触发 DAG:
1 |
airflow dags trigger my_dag |
列出 DAG:
1 |
airflow dags list |
查看任务状态:
1 |
airflow tasks list my_dag |
Airflow 是一个强大的调度和工作流管理工具,适合处理复杂的数据管道和任务依赖。