Theory

TBD

Execution

DAG and how it works

  • Directed acyclic graph - Wikipedia
  • My brief: a graph (math concept) that enables us to create a visual representation and order of operations (dependencies) as seen on the image below.

    It’s finite and non-cycle. Chains can only depend on previous operations.
Link to original

Deploy

Info

Screenshots were taken at different hours, so expect light/dark modes switches.

Prepare Airflow DAGs

We can use TaskFlow API to create DAG instead of “old school” PythonOperator, BashOperator, etc; use wrappers to create tasks.

We define @dag wrapper and there we can also parameterize the script, this especially comes useful when we intend to reuse the script. Set render_template_as_native_obj=True to preserve parameter types, by default airflow converts them to strings.

dags/s3_upload_test.py
# ...
default_args = {"owner": "airflow", "start_date": datetime.now(timezone.utc), "retries": 0}
 
@dag(
    dag_id="upload_to_s3_test",
    default_args=default_args,
    schedule=None,  # Manual trigger only
    catchup=False,
    start_date=datetime.now(timezone.utc),
    tags=["s3", "upload", "test"],
    params={
        "name": Param(
            "Hello",
            type="string",
            title="Name",
            description="Name of the file to upload to S3",
        ),
        "contents": Param(
            "Hi! I'm a test file",
            type="string",
            title="Contents",
            description="Contents of the file to upload to S3",
        ),
    },
    render_template_as_native_obj=True,
)

Functions are wrapped in tasks, which reside under main function, in this case upload_to_s3_test().

Tip

If we need custom logging we must define log = logging.getLogger("airflow.task") at the beginning of each task and then log what we need.

Such logs will be marked by source=airflow.task

get_params() - Here we implement parameter processing function, so that we can use them like argparse for continuous tasks. Notice the usage of multiple_outputs=True, this tells Airflow to process each return object separately within XCom.

save_objects_to_s3() - With S3Hook we can use previously created S3 Connection within Airflow. This script assumes we provided bucket_name during connection creation.

dags/s3_upload_test.py
# ...
@dag(
# ...
)
def upload_to_s3_test():
    @task(multiple_outputs=True)
    def get_params(**context):
        log = logging.getLogger("airflow.task")
        params = context["params"]
        name = params["name"]
        contents = params["contents"]
        log.info("Extracted parameters: name=%s, contents=%s", name, contents)
        return {"name": name, "contents": contents}
 
    @task
    def save_objects_to_s3(name: str, contents: str):
        log = logging.getLogger("airflow.task")
        hook =  S3Hook(aws_conn_id="S3")
        conn = Connection.get_connection_from_secrets("S3")
        bucket_name = conn.extra_dejson.get('bucket_name') #get('service_config', {}).get('s3', {}).
        hook.load_string(
                    string_data=f"{contents}",
                    key=f"{name}.txt",
                    bucket_name=bucket_name,
                )
        log.info(f"Uploaded to s3://{bucket_name}/{name}.txt")
 
    # Define task instances and dependencies
    params = get_params()
    save_to_s3 = save_objects_to_s3(name=params["name"], contents=params["contents"])
 
 
# Instantiate the DAG
dag_instance = upload_to_s3_test()
 
if __name__ == "__main__":
    dag_instance.test()

Lastly we define task instance order by simply executing them. The dag_instance.test() part is useful if we want to execute script in local environment e.g. from terminal.

dags/s3_upload_test.py
# ...
@dag(
# ...
)
def upload_to_s3_test():
    @task(multiple_outputs=True)
    def get_params(**context):
 
    @task
    def save_objects_to_s3(name: str, contents: str):
 
    # Define task instances and dependencies
    params = get_params()
    save_to_s3 = save_objects_to_s3(name=params["name"], contents=params["contents"])
 
 
# Execute the DAG
dag_instance = upload_to_s3_test()
 
# Enable local execuition and testing
if __name__ == "__main__":
    dag_instance.test()

Deploy via Helm

Airflow allows us to extend image to add more python packages or additional configurations. Let’s do both.

Suppose we need mlflow and couple of other libraries we have in our project:

pyproject.toml
# ...
airflow-server = [
    "mlflow>=3.1.1",
    "xgboost>=3.0.2",
    "pandas>=2.3.0",
    "hyperopt>=0.2.7",
    "scikit-learn>=1.7.0",
]
# ...

Let’s extend image with uv pip.

airflow.dockerfile
FROM apache/airflow:3.0.2
 
# Another way is to define image as argument variable
# ARG AIRFLOW_IMAGE_NAME 
# FROM ${AIRFLOW_IMAGE_NAME}
# and then pass the arg docker build ... --build-arg AIRFLOW_IMAGE_NAME=$AIRFLOW_IMAGE_NAME 
 
COPY pyproject.toml ./
RUN uv pip install --no-cache --group airflow-server

Now we can build the image from dockerfile.

docker build --pull --tag my-airflow:0.0.1 . -f airflow.dockerfile

Attention

If your K8S is setup via kind you can load extended image into K8S as such:

kind load docker-image name:version

To provide additional configuration, since we deploy though Helm in K8S we would extend the chart. We can let’s learn more from the docs:

airflow-values.yaml
dags:
  gitSync:
    enabled: true
    repo: repo_url.git
    branch: main
    subPath: folder/sub_folder
    period: 5s
    #sshKeySecret:

Configuration is ready for deployment, this will create airflow deployment and the boilerplate:

helm upgrade --install airflow apache-airflow/airflow --namespace airflow --create-namespace \
    --set images.airflow.repository=my-airflow \
    --set images.airflow.tag=0.0.1 \
    -f aifrflow-values.yaml

Access Airflow UI

To access Airflow API Server from host we need to port forward:

kubectl port-forward svc/airflow-api-server 8080:8080 --namespace airflow


First thing I saw once logged in the console were DAG Import errors, these came from tests_common module provided by Airflow. One script couldn’t load a module. Generally DAG import errors are easily fixable, for me usually it’s either me forgetting to remove and argument after refactoring or TaskFlow task order issue.
Airflow would print status of import and stack trace.

The task interface is friendly.