Skip to main content

what to do when an airflow task inside a task group fails

checking if a task has failed on airflow is simple enough, and may be done with:

from airflow.operators.python import get_current_context

@task
def check_task_failed(task_id: str) -> bool:
      context = get_current_context()
      return (
          context["dag_run"]
            .get_task_instance(task_id=task_id)
            .state == State.FAILED
      )

however, if the task is nested within a task_group, referencing said task with task_id doesn't work. in theory, tasks inside a task_group should be indexed with group_id.task_id1, but trying get_task_instance with this ID structure will return a NoneType.

the one way i have found to actually make it work, is with the following:

from airflow.operators.python import get_current_context

@task
def check_task_failed(group_id: str, task_id: str) -> bool:
      context = get_current_context()
      # get all failed tasks
      failed_tasks = context["dag_run"].get_task_instances(state=State.FAILED)
      # check if the task from the group id shows up in failed tasks
      return f"{group_id}.{task_id}" in {
          ft.task_id for ft in failed_tasks
      }

which i was able to implement based on an answer from stackoverflow2.

before finding that answer, i struggled with missing docs, llm hallucinations. and what i am fairly sure were people posting llm bullshit as if it were real.

Comments