Skip to content

Airflow

What is this thing

Skills

  • Run an example job where you log something
  • Run same example job where you log something, this time on a schedule, say every one to ten minutes
  • Run a SQL Database, say postgres, connect to it and perform a SQL query on it
  • Connect to Redshift Database, using Postgres connector, and perform a SQL query on it
  • Get a S3 bucket with data, JSON or CSV, and use Postgres connector to get that S3 data into redshift
  • Perform transformations on data in redshift to calculate statistics
  • Use time as a delimiter fed into the task via args setting airflow to backlog stuff performing a task for each month

Install

Udacity


AIRFLOW_VERSION=1.10.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow[async,aws_hook,postgres,google]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
pip install awscli==1.16.17
pip install aws-xray-sdk==0.95

rm airflow.cfg airflow.db unittests.cfg
export AIRFLOW__CORE__AIRFLOW_HOME=$(pwd)
export AIRFLOW_HOME="${AIRFLOW__CORE__AIRFLOW_HOME}"
export AIRFLOW__CORE__DAGS_FOLDER="${AIRFLOW__CORE__AIRFLOW_HOME}/dags"
export AIRFLOW__CORE__LOAD_EXAMPLES="False"
airflow initdb
airflow webserver
airflow scheduler
airflow worker

rm -rf airflow.cfg airflow.db unittests.cfg logs
airflow initdb
airflow webserver

airflow scheduler

airflow list_dags
airflow list_dag_runs hello_world

Tutorials

/home/$USER/.local/lib/python3.8/site-packages/airflow/example_dags/

Examples

Operator stuff

Code Samples

# Setup
from airflow import DAG
from datetime import datetime
with DAG("youtube",
  start_date=datetime(2021, 1 ,1), 
  schedule_interval='@daily', 
  catchup=False) as dag:

# Operators
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

def hello_date(*args, **kwargs):
    print(f"Hello {kwargs[‘execution_date’]}")

def hello_date(*args, **kwargs):
    print(f“Hello {kwargs[‘execution_date’]}”)
    divvy_dag = DAG(...)
    task = PythonOperator(
        task_id=’hello_date’,
        python_callable=hello_date,
        provide_context=True,
        dag=divvy_dag)

S3 Stuff

AdministratorAccess AmazonRedshiftFullAccess AmazonS3FullAccess

Example Projects