ここに書いてある通りなのだが、なかなか見つからなかったので。
Airflow v2.2以前で、@taskデコレータで生成したタスクを ForLoop で複数生成したい場合:
from airflow.decorators import task
...
@task
def say_hi(name):
print("Hi", name)
names = ['Bob', 'Charlie', 'Jane']
for name in names:
start >> say_hi(name)
これでコードとしては動くが、loop で生成されるtask_id はsay_hi_1, say_hi_2, …と単に連番が振られるだけになってしまう。
そこで以下のようにすれば、好みのtask_id を設定できる:
...
for name in names:
start >> task(say_hi)(task_id=f"say_hi_to_{name}")(name)
何のことはない、@taskは decorator なので、関数を引数とした関数として使えるということ。
少々読みずらくはなってしまうが、Airflow v2.2 以前で@taskを使うには、いろいろと不便が伴うので仕方ない。
Airflow v2.3 以降について
Airflow v2.3以降では、以下のようにoverrideメソッドでtask_idを設定できるようだ:
@task
def say_hi(name):
print("Hi", name)
names = ['Bob', 'Charlie', 'Jane']
for name in names:
say_hi.override(task_id=f"say_hi_to_{name}")
start >> say_hi()
また、少し話はそれるが、v2.3 以降なら ForLoop の代わりにexpandメソッドが使える:
@task
def say_hi(name):
print("Hi", name)
names = ['Bob', 'Charlie', 'Jane']
say_hi.expand(name=names)
が、どうやらtask_idは最初の例のように、say_hi_1, say_hi_2, …と連番で振られてしまい、任意の値は設定できないようだ。
Astronomer のページ でもtask_idはマップ不可と言及されている:
Some parameters can’t be mapped. For example, task_id, pool, and many BaseOperator arguments.
既存の DAG でexpandを取り入れる場合、task_idが変わってしまえば、過去タスクのログが見えずらくなってしまう。
expandメソッドは便利だが、task_idが任意の値で設定できない点は注意が必要そうだ。