Atanu Maity
Airflow + MLFlow Template
Updated: Aug 23, 2021
Airflow
Airflow is a platform to programmatically author, schedule and monitor workflows. Use Airflow to author workflows as Directed Acyclic Graphs (DAGs) of tasks.
The main 4 components of Airflow are Webserver, Scheduler, Executer, and Metadata Database. The details about these 4 components can be found here
Now the fact is if we start to discuss Airflow in detail it will take page long to finish the discussion. So here we will strongly recommend following some tutorials to make a deeper understanding of Airflow and its functionalities
Tutorials:
Apache Airflow: The Hands-On Guide (This one is paid one in Udemy, it's highly recommended to purchase and finish this course)
Now lets come straight to the installation and initiation part:
Installation - First create a virtual environment and then in your environment do the installation
$ pip install apache-airflow
If you want some extra packages to be installed as per your requirement like Postgres or GCP, please look here
Initiate a DB: Airflow requires a database to be initiated, to store its metadata before you can run tasks. Default is SQLite database, you can change it to other DB too
$ airflow initdb
Run Webserver: Once DB initiation is done, you need to start the webserver
$ airflow webserver
It will open the webserver at default port 8080. If you want to open the webserver at some other port, mention it by
$ airflow webserver -p <port_number>
Run Scheduler: Open another terminal, and run the scheduler
$ airflow scheduler
Note: Usually for development purposes, we will mostly use a single machine or more precisely a single worker, so we will not mostly play with the worker functionalities.
Airflow provides operators for many common tasks, including:
BashOperator - executes a bash command
PythonOperator - calls an arbitrary Python function
EmailOperator - sends an email
SimpleHttpOperator - sends an HTTP request
MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, etc. - executes a SQL command
Sensor - an Operator that waits (polls) for a certain time, file, database row, S3 key, etc…
Sample Airflow DAG (Just another Python Script 🙂 ):
import datetime as dt
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
# Say this is Task1
def greet():
'''
A python function to write a text file
'''
print('Writing in file')
with open('/Development/airflow_tutorial/greet.txt', 'a+', encoding='utf8') as f:
now = dt.datetime.now()
t = now.strftime("%Y-%m-%d %H:%M")
f.write(str(t) + '\n')
return 'Greeted'
# Say this is task2
def respond():
'''
A python function to return a simple greetingthon function to return a simple greeting
'''
return 'Greet Responded Again'
# Declaring DAG default settings
default_args = {
'owner': 'airflow',
'start_date': dt.datetime(2018, 9, 24, 10, 00, 00),
'concurrency': 1,
'retries': 0
}
# Building the DAG, 'my_simple_dag' is the dag id, which will be
# visible in the airflow ui
with DAG('my_simple_dag',
catchup=False,
default_args=default_args,
schedule_interval='*/10 * * * *',
# schedule_interval=None,
) as dag:
opr_hello = BashOperator(task_id='say_Hi',
bash_command='echo "Hi!!"')
opr_greet = PythonOperator(task_id='greet',
python_callable=greet)
opr_sleep = BashOperator(task_id='sleep_me',
bash_command='sleep 5')
opr_respond = PythonOperator(task_id='respond',
python_callable=respond)
# Setting the task flow dependencies
opr_hello >> opr_greet >> opr_sleep >> opr_respond
MLFlow
mlFlow is a framework that supports the machine learning lifecycle. This means that it has components to monitor your model during training and running, ability to store models, load the model in production code and create a pipeline.
What exactly it can track:
Parameters - Key-value input parameters of your choice. Both keys and values are strings.
example - mlflow.log_parameter('alpha',alpha_val_of_regression)
Metrics - Key-value metrics where the value is numeric. Each metric can be updated throughout the course of the run (for example, to track how your model’s loss function is converging), and MLflow will record and let you visualize the metric’s full history.
example - mlflow.log_metric('rmse', rmse_value_of_model_train)
Artifacts - Output files in any format. For example, you can record images (for example, PNGs), models (for example, a pickled scikit-learn model) or even data files (for example, a parquet file) as artifacts.
example - mlflow.sklearn.log_model('Ridge', model_ridge)
Tutorials:
How to install:
$ pip install mlflow
How to integrate with code (Put more concentration on the lines where and how MLFlow components are getting introduced):
import os
import warnings
import sys
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet
from urllib.parse import urlparse
# Importing mlflow and its sklearn component
import mlflow
import mlflow.sklearn
import logging
logging.basicConfig(level=logging.WARN)
logger = logging.getLogger(__name__)
def eval_metrics(actual, pred):
rmse = np.sqrt(mean_squared_error(actual, pred))
mae = mean_absolute_error(actual, pred)
r2 = r2_score(actual, pred)
return rmse, mae, r2
if __name__ == "__main__":
warnings.filterwarnings("ignore")
np.random.seed(40)
# Read the wine-quality csv file from the URL
csv_url =\
'http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv'
try:
data = pd.read_csv(csv_url, sep=';')
except Exception as e:
logger.exception(
"Unable to download training & test CSV, check your internet connection. Error: %s", e)
# Split the data into training and test sets. (0.75, 0.25) split.
train, test = train_test_split(data)
# The predicted column is "quality" which is a scalar from [3, 9]
train_x = train.drop(["quality"], axis=1)
test_x = test.drop(["quality"], axis=1)
train_y = train[["quality"]]
test_y = test[["quality"]]
alpha = float(sys.argv[1]) if len(sys.argv) > 1 else 0.5
l1_ratio = float(sys.argv[2]) if len(sys.argv) > 2 else 0.5
# Initiating mlflow run
with mlflow.start_run():
lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
lr.fit(train_x, train_y)
predicted_qualities = lr.predict(test_x)
(rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)
print("Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio))
print(" RMSE: %s" % rmse)
print(" MAE: %s" % mae)
print(" R2: %s" % r2)
# parameter logging
mlflow.log_param("alpha", alpha)
mlflow.log_param("l1_ratio", l1_ratio)
# metric logging
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("r2", r2)
mlflow.log_metric("mae", mae)
tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme
# model logging
# Model registry does not work with file store
if tracking_url_type_store != "file":
# Register the model
# There are other ways to use the Model Registry, which depends on the use case,
# please refer to the doc for more information:
# https://mlflow.org/docs/latest/model-registry.html#api-workflow
mlflow.sklearn.log_model(lr, "model", registered_model_name="ElasticnetWineModel")
else:
mlflow.sklearn.log_model(lr, "model")
See Tracked result:
From a new terminal,
$ mlflow ui
The above command will initiate the mlflow server at the default port 5000. In our browser, simply go to localhost:5000
Template
Building an easy and simple template for using Airflow + MLFlow solution in your data science eco-system is not an easy task. To serve the purpose and to make everyone familiar with its basic functionalities, we have made three simple scripts (py),
one data_preparation script ( https://github.com/ytiam/setup_scripts/blob/master/airflow/dags/model_pipeline/data_process.py)
one model preparation script ( https://github.com/ytiam/setup_scripts/blob/master/airflow/dags/model_pipeline/model.py)
the DAG script (https://github.com/ytiam/setup_scripts/blob/master/airflow/dags/model_pipeline/data_demo_dag.py)
The DAG script is actually built upon the tasks/functions defined in data and model preparation script.
Where you need to put a DAG script:
Inside your virtual environment, where your airflow folder is situated, inside that there should be another folder named ‘dags’. All your dags that will be scheduled and executed by airflow should be in this ‘dags’ folder.
So our template DAG and its helping scripts all should be in this ‘dags’ folder too.
The structure would look like
airflow/
└── dags
└── model_pipeline
├── data_demo_dag.py
├── data_process.py
├── model.py
Inside dags, we made another folder ‘model_pipeline’ and this contains the DAG and other supporting scripts.
Note: while initiating the airflow DB (airflow initdb ), the airflow will automatically parse the dag information and update the meta-database accordingly.
Docker Image
For everyone's ease of use, I have created a docker image with all the setups and configurations done. Please follow the below instructions:
Pull the docker image: ytiam20/airmlflow:latest
create a container from the above image keeping 8080 and 5000 ports exposed.
Once you are in the container created from this image, follow the below steps one by one:
$cd
$conda activate Ayata/dev_proj
$cd Ayata/dev_proj/dev/
$bash start_airflow_dev 8080 (it will initiate the airflow db , webserver and scheduler)
$mlflow UI --host 0.0.0.0 &> /dev/null & (it will initiate the mlflow server and UI tracker at the 5000 port)
Now you are done with all the setups.
I have put an experiment dag in the airflow dag (Ayata/dev_proj/dev/airflow/dags) folder, the same discussed in the blog. You can do your own experiments and add new dags accordingly. In your web browser, in one tab open localhost:8080 for airflow, and in another tab open localhost:5000 for airflow.