Airflow 是一個使用 Python 語言編寫的 Data Pipeline 調度和監控工作流的平臺。
Airflow 是通過 DAG(Directed acyclic graph 有向無環圖)來管理任務流程的任務調度工具,不需要知道業務數據的具體內容,設置任務的依賴關系即可實現任務調度。
這個平臺擁有和 Hive、Presto、MySQL、HDFS、Postgres 等數據源之間交互的能力,并且提供了鉤子(hook)使其擁有很好地擴展性。除了使用命令行,該工具還提供了一個 WebUI 可以可視化的查看依賴關系、監控進度、觸發任務等。
Airflow 的架構
在一個可擴展的生產環境中,Airflow 含有以下組件:
元數據庫:這個數據庫存儲有關任務狀態的信息。
調度器:Scheduler 是一種使用 DAG 定義結合元數據中的任務狀態來決定哪些任務需要被執行以及任務執行優先級的過程。調度器通常作為服務運行。
執行器:Executor 是一個消息隊列進程,它被綁定到調度器中,用于確定實際執行每個任務計劃的工作進程。有不同類型的執行器,每個執行器都使用一個指定工作進程的類來執行任務。例如,LocalExecutor 使用與調度器進程在同一臺機器上運行的并行進程執行任務。其他像 CeleryExecutor 的執行器使用存在于獨立的工作機器集群中的工作進程執行任務。
Workers:這些是實際執行任務邏輯的進程,由正在使用的執行器確定。
Airflow 解決哪些問題
通常,在一個運維系統,數據分析系統,或測試系統等大型系統中,我們會有各種各樣的依賴需求。包括但不限于:
時間依賴:任務需要等待某一個時間點觸發。
外部系統依賴:任務依賴外部系統需要調用接口去訪問。
任務間依賴:任務 A 需要在任務 B 完成后啟動,兩個任務互相間會產生影響。
資源環境依賴:任務消耗資源非常多, 或者只能在特定的機器上執行。
crontab 可以很好地處理定時執行任務的需求,但僅能管理時間上的依賴。
Airflow 是一種 WMS,即:它將任務以及它們的依賴看作代碼,按照那些計劃規范任務執行,并在實際工作進程之間分發需執行的任務。
Airflow 提供了一個用于顯示當前活動任務和過去任務狀態的優秀 UI,并允許用戶手動管理任務的執行和狀態。
Airflow 中的工作流是具有方向性依賴的任務集合。
具體說就是 Airflow 的核心概念 DAG(有向無環圖)—— 來表現工作流。
DAG 中的每個節點都是一個任務,DAG 中的邊表示的是任務之間的依賴(強制為有向無環,因此不會出現循環依賴,從而導致無限執行循環)。
Airflow 在 ETL 上的實踐
ETL,是英文 Extract,Transform,Load 的縮寫,用來描述將數據從來源端經過抽?。╡xtract)、轉換(transform)、加載(load)至目的端的過程。ETL 一詞較常用在數據倉庫,Airflow 在解決 ETL 任務各種依賴問題上的能力恰恰是我們所需要的。
在現階段的實踐中,我們使用 Airflow 來同步各個數據源數據到數倉,同時定時執行一些批處理任務及帶有數據依賴、資源依賴關系的計算腳本。
本文立意于科普介紹,故在后面的用例中只介紹了 BashOperator,PythonOperator這倆個最為易用且在我們日常使用中最為常見的 Operator。
Airflow 同時也具有不錯的集群擴展能力,可使用 CeleryExecuter 以及多個 Pool 來提高任務并發度。
Airflow在 CeleryExecuter 下可以使用不同的用戶啟動 Worker,不同的 Worker 監聽不同的 Queue,這樣可以解決用戶權限依賴問題。Worker 也可以啟動在多個不同的機器上,解決機器依賴的問題。
Airflow 可以為任意一個 Task 指定一個抽象的 Pool,每個 Pool 可以指定一個 Slot 數。每當一個 Task 啟動時,就占用一個 Slot,當 Slot 數占滿時,其余的任務就處于等待狀態。這樣就解決了資源依賴問題。
二、安裝及使用
假設:你已經安裝好了 Python 及配置好了其包管理工具 pip。
1、安裝airflow
pip install apache-airflow
在安裝airflow的時候可能會報錯:
Cannot uninstall 'PyYAML'. It is a distutils installed project and thus we cannot
忽略掉 PyYAML
# 親測可用
pip install apache-airflow --ignore-installed PyYAML
安裝成功后查看命令:
[root@quant ~]# airflow -h
usage: airflow [-h] GROUP_OR_COMMAND ...
positional arguments:
GROUP_OR_COMMAND
Groups:
celery Celery components
config View configuration
connections Manage connections
dags Manage DAGs
db Database operations
kubernetes Tools to help run the KubernetesExecutor
pools Manage pools
providers Display providers
roles Manage roles
tasks Manage tasks
users Manage users
variables Manage variables
Commands:
cheat-sheet Display cheat sheet
info Show information about current Airflow and environment
kerberos Start a kerberos ticket renewer
plugins Dump information about loaded plugins
rotate-fernet-key
Rotate encrypted connection credentials and variables
scheduler Start a scheduler instance
sync-perm Update permissions for existing roles and DAGs
version Show the version
webserver Start a Airflow webserver instance
optional arguments:
-h, --help show this help message and exit
[root@quant ~]#
2、初始化數據庫
# initialize the database
airflow db init
報這樣的錯誤:
ImportError: Something is wrong with the numpy installation. While importing we detected an older version of numpy
解決方案:
如報錯信息所說
先卸載numpy:pip uninstall numpy
再卸載numpy,直到卸載到提示信息顯示,此時完全已經沒有numpy了為止
下載numpy:pip install numpy
此時應該可用;
若不可用,查看python安裝目錄下的libs文件夾,刪除掉其中的另一個dll文件,應該可用。
3、添加用戶
airflow users create \
--username admin \
--firstname Corwien \
--lastname Wong \
--role Admin \
--email 407544577@qq.com
創建的用戶密碼為:quant
4、啟動web服務
# start the web server, default port is 8080
airflow webserver --port 8080a
5、啟動定時任務
# start the scheduler
# open a new terminal or else run webserver with ``-D`` option to run it as a daemon
airflow scheduler
# visit localhost:8080 in the browser and use the admin account you just
# created to login. Enable the example_bash_operator dag in the home page
以上就是“Python教程——Airflow 快速學習入門”的詳細內容,想要了解更多Python教程歡迎持續關注編程學習網。
掃碼二維碼 獲取免費視頻學習資料
- 本文固定鏈接: http://www.stbrigidsathleticclub.com/post/11372/
- 轉載請注明:轉載必須在正文中標注并保留原文鏈接
- 掃碼: 掃上方二維碼獲取免費視頻資料
查 看2022高級編程視頻教程免費獲取