Screenshots were taken at different hours, so expect light/dark modes switches.
Prepare Airflow DAGs
We can use TaskFlow API to create DAG instead of “old school” PythonOperator, BashOperator, etc; use wrappers to create tasks.
We define @dag wrapper and there we can also parameterize the script, this especially comes useful when we intend to reuse the script. Set render_template_as_native_obj=True to preserve parameter types, by default airflow converts them to strings.
dags/s3_upload_test.py
# ...default_args = {"owner": "airflow", "start_date": datetime.now(timezone.utc), "retries": 0}@dag( dag_id="upload_to_s3_test", default_args=default_args, schedule=None, # Manual trigger only catchup=False, start_date=datetime.now(timezone.utc), tags=["s3", "upload", "test"], params={ "name": Param( "Hello", type="string", title="Name", description="Name of the file to upload to S3", ), "contents": Param( "Hi! I'm a test file", type="string", title="Contents", description="Contents of the file to upload to S3", ), }, render_template_as_native_obj=True,)
Functions are wrapped in tasks, which reside under main function, in this case upload_to_s3_test().
Tip
If we need custom logging we must define log = logging.getLogger("airflow.task") at the beginning of each task and then log what we need.
Such logs will be marked by source=airflow.task
get_params() - Here we implement parameter processing function, so that we can use them like argparse for continuous tasks. Notice the usage of multiple_outputs=True, this tells Airflow to process each return object separately within XCom.
save_objects_to_s3() - With S3Hook we can use previously created S3 Connection within Airflow. This script assumes we provided bucket_name during connection creation.
Lastly we define task instance order by simply executing them. The dag_instance.test() part is useful if we want to execute script in local environment e.g. from terminal.
dags/s3_upload_test.py
# ...@dag(# ...)def upload_to_s3_test(): @task(multiple_outputs=True) def get_params(**context): @task def save_objects_to_s3(name: str, contents: str): # Define task instances and dependencies params = get_params() save_to_s3 = save_objects_to_s3(name=params["name"], contents=params["contents"])# Execute the DAGdag_instance = upload_to_s3_test()# Enable local execuition and testingif __name__ == "__main__": dag_instance.test()
Deploy via Helm
Airflow allows us to extend image to add more python packages or additional configurations. Let’s do both.
Suppose we need mlflow and couple of other libraries we have in our project:
FROM apache/airflow:3.0.2# Another way is to define image as argument variable# ARG AIRFLOW_IMAGE_NAME # FROM ${AIRFLOW_IMAGE_NAME}# and then pass the arg docker build ... --build-arg AIRFLOW_IMAGE_NAME=$AIRFLOW_IMAGE_NAME COPY pyproject.toml ./RUN uv pip install --no-cache --group airflow-server
Production Guide — helm-chart Documentation
One simple way to extend it is to use Helm | Values Filesvalues.yaml to store configuration information, lets point Airflow to look for DAGs in remote GitHub (you would need to specify credentials for private).
First thing I saw once logged in the console were DAG Import errors, these came from tests_common module provided by Airflow. One script couldn’t load a module. Generally DAG import errors are easily fixable, for me usually it’s either me forgetting to remove and argument after refactoring or TaskFlow task order issue.
Airflow would print status of import and stack trace.