提问人:Anis Smail 提问时间:12/28/2019 更新时间:4/7/2021 访问量:1692
气流:如何在不同时间安排匕首?
Airflow : How to schedule a dag at different times?
问:
我想每天安排一个 dag,但在一天中的不同时间,例如:
- 星期一 16H
- 星期二 9H
- 星期三 10H
- ...
- 星期一 15H
- 星期二 12H
- 等。。。
我怎样才能做到这一点?
答:
4赞
zglin
12/28/2019
#1
如果希望每天随机调度一次 dag,请编写 python 帮助程序代码。 在定义 dag 之前,在 dag 代码中,放置一个种子随机(不会随日期变化)以创建伪兰特。
在此示例中,我已将完整日期转换为数字日期,但您可以使用您喜欢的任何方法。
像下面的代码应该可以工作。
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
dag=Dag('TestDag,schedule_interval=randomCronString,default_args=args, catchup=False)
编辑
Airflow 每 5 秒运行一次 dag,但通过使用种子随机,您强制随机仅在种子发生变化时更改(在这种情况下,当一天翻转时),但请记住,在大多数系统中,气流都在 UTC 上。
如果一遍又一遍地运行以下代码块
import random
import datetime
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
您将获得以下结果
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
random.seed(int(datetime.date.today().strftime('%Y%m%d')))
randomCronString="* {} * * *".format(random.randint(0,24))
print(randomCronString)
* 1 * * *
评论
0赞
Anis Smail
1/3/2020
这是行不通的,因为这个脚本每 5 秒通过气流运行一次,我可以有一个值 12,然后一个值 13,所以作业将每天运行两次。
0赞
zglin
1/10/2020
差一点。。。。这就是您使用种子随机的原因。这是一个非常糟糕的解决方案,但看看我的编辑。
2赞
szeta
4/7/2021
#2
添加一个延迟步骤(具有随机的睡眠间隔),并在实际做某事之前安排它怎么样?
例如
def _delay_dag():
import random
import time
delay_by = random.randint(0,60*60*6) #e.g. to delay up to 6h from schedule time
print(f'Delaying start by {delay_by} seconds..')
time.sleep(delay_by)
return True
def _do_something(ti, *kwargs):
pass
with DAG('my_DAG_with_random_start',
schedule_interval='@daily',
tags=['test','random_start'],
default_args=default_args) as dag:
delay_dag = PythonOperator(
task_id = 'delay_dag',
python_callable=_delay_dag
)
do_something = PythonOperator(
task_id = 'do_something',
provide_context=True,
python_callable=_do_something
)
delay_dag >> do_something
评论
0赞
Anis Smail
4/8/2021
这应该有效,但我会在监控中发现该作业持续了几个小时,尽管它确实运行了几分钟,因此监控将无法使用。
1赞
szeta
4/8/2021
是的,对于内置监控,这是真的。如果您有自己的监控(通过查询 Airflow DB),您当然可以过滤掉delay_dag。
评论