top of page
  • Writer's pictureAtanu Maity

Airflow + MLFlow Template

Updated: Aug 23, 2021


Airflow is a platform to programmatically author, schedule and monitor workflows. Use Airflow to author workflows as Directed Acyclic Graphs (DAGs) of tasks.

The main 4 components of Airflow are Webserver, Scheduler, Executer, and Metadata Database. The details about these 4 components can be found here

Now the fact is if we start to discuss Airflow in detail it will take page long to finish the discussion. So here we will strongly recommend following some tutorials to make a deeper understanding of Airflow and its functionalities


Now lets come straight to the installation and initiation part:

Installation - First create a virtual environment and then in your environment do the installation

$ pip install apache-airflow

If you want some extra packages to be installed as per your requirement like Postgres or GCP, please look here

Initiate a DB: Airflow requires a database to be initiated, to store its metadata before you can run tasks. Default is SQLite database, you can change it to other DB too

$ airflow initdb

Run Webserver: Once DB initiation is done, you need to start the webserver

$ airflow webserver

It will open the webserver at default port 8080. If you want to open the webserver at some other port, mention it by

$ airflow webserver -p <port_number>

Run Scheduler: Open another terminal, and run the scheduler

$ airflow scheduler

Note: Usually for development purposes, we will mostly use a single machine or more precisely a single worker, so we will not mostly play with the worker functionalities.

Airflow provides operators for many common tasks, including:

  • BashOperator - executes a bash command

  • PythonOperator - calls an arbitrary Python function

  • EmailOperator - sends an email

  • SimpleHttpOperator - sends an HTTP request

  • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, etc. - executes a SQL command

  • Sensor - an Operator that waits (polls) for a certain time, file, database row, S3 key, etc…

Sample Airflow DAG (Just another Python Script 🙂 ):

import datetime as dt

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

# Say this is Task1
def greet():
    A python function to write a text file
    print('Writing in file')
    with open('/Development/airflow_tutorial/greet.txt', 'a+', encoding='utf8') as f:
        now =
        t = now.strftime("%Y-%m-%d %H:%M")
        f.write(str(t) + '\n')
    return 'Greeted'

# Say this is task2
def respond():
    A python function to return a simple greetingthon function to return a simple greeting
    return 'Greet Responded Again'

# Declaring DAG default settings
default_args = {
    'owner': 'airflow',
    'start_date': dt.datetime(2018, 9, 24, 10, 00, 00),
    'concurrency': 1,
    'retries': 0

# Building the DAG, 'my_simple_dag' is the dag id, which will be
# visible in the airflow ui
with DAG('my_simple_dag',
         schedule_interval='*/10 * * * *',
         # schedule_interval=None,
         ) as dag:
    opr_hello = BashOperator(task_id='say_Hi',
                             bash_command='echo "Hi!!"')

    opr_greet = PythonOperator(task_id='greet',
    opr_sleep = BashOperator(task_id='sleep_me',
                             bash_command='sleep 5')

    opr_respond = PythonOperator(task_id='respond',

    # Setting the task flow dependencies
    opr_hello >> opr_greet >> opr_sleep >> opr_respond


mlFlow is a framework that supports the machine learning lifecycle. This means that it has components to monitor your model during training and running, ability to store models, load the model in production code and create a pipeline.

What exactly it can track:

  • Parameters - Key-value input parameters of your choice. Both keys and values are strings.

  • example - mlflow.log_parameter('alpha',alpha_val_of_regression)

  • Metrics - Key-value metrics where the value is numeric. Each metric can be updated throughout the course of the run (for example, to track how your model’s loss function is converging), and MLflow will record and let you visualize the metric’s full history.

  • example - mlflow.log_metric('rmse', rmse_value_of_model_train)

  • Artifacts - Output files in any format. For example, you can record images (for example, PNGs), models (for example, a pickled scikit-learn model) or even data files (for example, a parquet file) as artifacts.

  • example - mlflow.sklearn.log_model('Ridge', model_ridge)


How to install:

$ pip install mlflow

How to integrate with code (Put more concentration on the lines where and how MLFlow components are getting introduced):

import os
import warnings
import sys

import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet
from urllib.parse import urlparse
# Importing mlflow and its sklearn component
import mlflow
import mlflow.sklearn

import logging
logger = logging.getLogger(__name__)

def eval_metrics(actual, pred):
    rmse = np.sqrt(mean_squared_error(actual, pred))
    mae = mean_absolute_error(actual, pred)
    r2 = r2_score(actual, pred)
    return rmse, mae, r2

if __name__ == "__main__":

    # Read the wine-quality csv file from the URL
    csv_url =\
        data = pd.read_csv(csv_url, sep=';')
    except Exception as e:
            "Unable to download training & test CSV, check your internet connection. Error: %s", e)

    # Split the data into training and test sets. (0.75, 0.25) split.
    train, test = train_test_split(data)

    # The predicted column is "quality" which is a scalar from [3, 9]
    train_x = train.drop(["quality"], axis=1)
    test_x = test.drop(["quality"], axis=1)
    train_y = train[["quality"]]
    test_y = test[["quality"]]

    alpha = float(sys.argv[1]) if len(sys.argv) > 1 else 0.5
    l1_ratio = float(sys.argv[2]) if len(sys.argv) > 2 else 0.5
    # Initiating mlflow run
    with mlflow.start_run():
        lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42), train_y)

        predicted_qualities = lr.predict(test_x)

        (rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)

        print("Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio))
        print("  RMSE: %s" % rmse)
        print("  MAE: %s" % mae)
        print("  R2: %s" % r2)
        # parameter logging
        mlflow.log_param("alpha", alpha)
        mlflow.log_param("l1_ratio", l1_ratio)
        # metric logging
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("r2", r2)
        mlflow.log_metric("mae", mae)

        tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme
        # model logging
        # Model registry does not work with file store
        if tracking_url_type_store != "file":

            # Register the model
            # There are other ways to use the Model Registry, which depends on the use case,
            # please refer to the doc for more information:
            mlflow.sklearn.log_model(lr, "model", registered_model_name="ElasticnetWineModel")
            mlflow.sklearn.log_model(lr, "model")

See Tracked result:

From a new terminal,

$ mlflow ui

The above command will initiate the mlflow server at the default port 5000. In our browser, simply go to localhost:5000


Building an easy and simple template for using Airflow + MLFlow solution in your data science eco-system is not an easy task. To serve the purpose and to make everyone familiar with its basic functionalities, we have made three simple scripts (py),

The DAG script is actually built upon the tasks/functions defined in data and model preparation script.

Where you need to put a DAG script:

Inside your virtual environment, where your airflow folder is situated, inside that there should be another folder named ‘dags’. All your dags that will be scheduled and executed by airflow should be in this ‘dags’ folder.

So our template DAG and its helping scripts all should be in this ‘dags’ folder too.

The structure would look like

└── dags
    └── model_pipeline

Inside dags, we made another folder ‘model_pipeline’ and this contains the DAG and other supporting scripts.

Note: while initiating the airflow DB (airflow initdb ), the airflow will automatically parse the dag information and update the meta-database accordingly.

Docker Image

For everyone's ease of use, I have created a docker image with all the setups and configurations done. Please follow the below instructions:

  • Pull the docker image: ytiam20/airmlflow:latest

  • create a container from the above image keeping 8080 and 5000 ports exposed.

  • Once you are in the container created from this image, follow the below steps one by one:

$conda activate Ayata/dev_proj
$cd Ayata/dev_proj/dev/
$bash start_airflow_dev 8080 (it will initiate the airflow db , webserver and scheduler)
$mlflow UI --host &> /dev/null & (it will initiate the mlflow server and UI tracker at the 5000 port)

Now you are done with all the setups.

I have put an experiment dag in the airflow dag (Ayata/dev_proj/dev/airflow/dags) folder, the same discussed in the blog. You can do your own experiments and add new dags accordingly. In your web browser, in one tab open localhost:8080 for airflow, and in another tab open localhost:5000 for airflow.

1,064 views0 comments
bottom of page