what to do when an airflow task inside a task group fails
checking if a task has failed on airflow is simple enough, and may be done with:
from airflow.operators.python import get_current_context
@task
def check_task_failed(task_id: str) -> bool:
context = get_current_context()
return (
context["dag_run"]
.get_task_instance(task_id=task_id)
.state == State.FAILED
)
however, if the task is nested within a task_group, referencing said task with task_id doesn't work.
in theory, tasks inside a task_group should be indexed with group_id.task_id1, but trying get_task_instance with this ID structure will return a NoneType.
the one way i have found to actually make it work, is with the following:
from airflow.operators.python import get_current_context
@task
def check_task_failed(group_id: str, task_id: str) -> bool:
context = get_current_context()
# get all failed tasks
failed_tasks = context["dag_run"].get_task_instances(state=State.FAILED)
# check if the task from the group id shows up in failed tasks
return f"{group_id}.{task_id}" in {
ft.task_id for ft in failed_tasks
}
which i was able to implement based on an answer from stackoverflow2.
before finding that answer, i struggled with missing docs, llm hallucinations. and what i am fairly sure were people posting llm bullshit as if it were real.
Comments