气流:如何在不同时间安排匕首?

Airflow : How to schedule a dag at different times?

提问人:Anis Smail 提问时间:12/28/2019 更新时间:4/7/2021 访问量:1692

问:

我想每天安排一个 dag,但在一天中的不同时间,例如:

  • 星期一 16H
  • 星期二 9H
  • 星期三 10H
  • ...
  • 星期一 15H
  • 星期二 12H
  • 等。。。

我怎样才能做到这一点?

气流 airflow-scheduler

评论

0赞 Neha Jirafe 12/28/2019
很明显,您无法使用“schedule_interval”实现临时调度。你有可以在 cron 表达式中编码的模式吗?
0赞 Anis Smail 12/28/2019
实际上,我想每天在随机时间触发一次 dag,这不能用 cron 表达式表示。
0赞 Neha Jirafe 12/28/2019
那么很明显,您无法使用“schedule_interval”实现临时调度。我建议您更新问题以提供更多详细信息。
0赞 Anis Smail 12/28/2019
在这个问题中,我没有特别要求使用schedule_interval。

答:

4赞 zglin 12/28/2019 #1

如果希望每天随机调度一次 dag,请编写 python 帮助程序代码。 在定义 dag 之前,在 dag 代码中,放置一个种子随机(不会随日期变化)以创建伪兰特。

在此示例中,我已将完整日期转换为数字日期,但您可以使用您喜欢的任何方法。

像下面的代码应该可以工作。

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
dag=Dag('TestDag,schedule_interval=randomCronString,default_args=args, catchup=False)

编辑

Airflow 每 5 秒运行一次 dag,但通过使用种子随机,您强制随机仅在种子发生变化时更改(在这种情况下,当一天翻转时),但请记住,在大多数系统中,气流都在 UTC 上。

如果一遍又一遍地运行以下代码块

import random
import datetime

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)

您将获得以下结果

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))

print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

random.seed(int(datetime.date.today().strftime('%Y%m%d')))    
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *

评论

0赞 Anis Smail 1/3/2020
这是行不通的,因为这个脚本每 5 秒通过气流运行一次,我可以有一个值 12,然后一个值 13,所以作业将每天运行两次。
0赞 zglin 1/10/2020
差一点。。。。这就是您使用种子随机的原因。这是一个非常糟糕的解决方案,但看看我的编辑。
2赞 szeta 4/7/2021 #2

添加一个延迟步骤(具有随机的睡眠间隔),并在实际做某事之前安排它怎么样?

例如

def _delay_dag():
    import random
    import time
    delay_by = random.randint(0,60*60*6) #e.g. to delay up to 6h from schedule time
    print(f'Delaying start by {delay_by} seconds..')
    time.sleep(delay_by)
    return True

def _do_something(ti, *kwargs):
    pass
    
with DAG('my_DAG_with_random_start', 
         schedule_interval='@daily', 
         tags=['test','random_start'],
         default_args=default_args) as dag:
    
    delay_dag = PythonOperator(
        task_id = 'delay_dag',
        python_callable=_delay_dag
    )

    do_something = PythonOperator(
        task_id = 'do_something',
        provide_context=True,
        python_callable=_do_something
    )
    
    delay_dag >> do_something

评论

0赞 Anis Smail 4/8/2021
这应该有效,但我会在监控中发现该作业持续了几个小时,尽管它确实运行了几分钟,因此监控将无法使用。
1赞 szeta 4/8/2021
是的,对于内置监控,这是真的。如果您有自己的监控(通过查询 Airflow DB),您当然可以过滤掉delay_dag。