airflow task inside task group failed
checking if a task has failed in airflow is simple enough, and may be done with:
from airflow.operators.python import get_current_context @task def check_task_failed(task_id: str) -> bool: context = get_current_context() return ( context["dag_run"].get_task_instance(task_id=task_id).state == State.FAILED )
however, if the task is nested within a task_group, referencing the task with task_id doesn't work.
in theory, tasks inside a task_group should be indexed with group_id.task_id1, but trying get_task_instance
with this ID structure will return a NoneType.
the one way i have found to actually make it work, is with the following:
from airflow.operators.python import get_current_context @task def check_task_failed(group_id: str, task_id: str) -> bool: context = get_current_context() failed_tasks = context["dag_run"].get_task_instances(state=State.FAILED) return f"{group_id}.{task_id}" in { ft.task_id for ft in failed_tasks }
which i was able to implement based an answer on stackoverflow2.
before finding that answer, i struggled with missing docs, llm hallucinations. and what i am fairly sure were people posting llm bullshit as if it were real.
Comments