Skip to main content

airflow task inside task group failed

checking if a task has failed in airflow is simple enough, and may be done with:

from airflow.operators.python import get_current_context

@task
def check_task_failed(task_id: str) -> bool:
      context = get_current_context()
      return (
          context["dag_run"].get_task_instance(task_id=task_id).state
          == State.FAILED
      )

however, if the task is nested within a task_group, referencing the task with task_id doesn't work. in theory, tasks inside a task_group should be indexed with group_id.task_id1, but trying get_task_instance with this ID structure will return a NoneType.

the one way i have found to actually make it work, is with the following:

from airflow.operators.python import get_current_context

@task
def check_task_failed(group_id: str, task_id: str) -> bool:
      context = get_current_context()
      failed_tasks = context["dag_run"].get_task_instances(state=State.FAILED)
      return f"{group_id}.{task_id}" in {
          ft.task_id for ft in failed_tasks
      }

which i was able to implement based an answer on stackoverflow2.

before finding that answer, i struggled with missing docs, llm hallucinations. and what i am fairly sure were people posting llm bullshit as if it were real.

Comments