CVAT webhook + Airflow DAG 触发器

CVAT webhooks + Airflow DAG trigger

提问人:JaimeCorton 提问时间:11/17/2023 更新时间:11/17/2023 访问量:22

问:

我正在尝试通过来自 Airflow DAG 的传感器配置 Python 脚本的激活,该传感器在满足某些条件(例如,当注释任务完成时)感知 CVAT webhook 发送的 JSON。我在具有相同操作系统和内核的 2 台独立机器上部署了 CVAT 和 Airflow 和 Docker(操作系统:Ubuntu 22.04.3 LTS,内核:Linux 5.15.0-88-generic,架构:x86-64)。

我已经有一个 CVAT Webhook 的工作 url,让我们 https://direction.com/login/ 调用该 url,到目前为止,在尝试从 CVAT UI 使用 Ping 选项时,响应是正确的 (200),我可以通过 comand 看到生成的 JSON。我还创建了一个与 https://direction.com 的 Airflow 连接。此外,在我的 Airflow DAG(没有任何导入错误或类似错误)上,我使用了具有相应http_conn_id和端点 /loginhttpSensor~$ docker logs {container id}

完成所有这些操作后,预期的行为应该是,一旦 DAG 的 http 传感器在指定的 URL 上读取 Webhook 的 JSON 消息,就应该激活脚本执行。

这是我为气流 DAG 提供的代码:

@dag(
default_args={
    'owner': 'OWNER',
    'start_date': datetime(2023, 1, 1),
},
schedule_interval=None, 
tags=['cvat_dag','webhook'],
catchup=False,)

def cvat_webhook_dag():

@task
def http_sensor_task():
        return HttpSensor(
            task_id='cvat_webhook_sensor',
            http_conn_id='cvat_webhook',
            endpoint='/login/',
            method='GET',
            response_check=lambda response: "CVAT_trial_task_1" in json.dumps(response.json()), 
            timeout=120,
            poke_interval=60
        )

@task
def execute_script():
    # Execute your script here
    print('execute script triggered')

@task
def restart_dag_fun():
    restart_dag = TriggerDagRunOperator(
        task_id = 'restart_dag',
        trigger_dag_id = 'cvat_webhook_sensor',
    )

# Set the task dependencies
http_sensor_task() >> execute_script() >> restart_dag_fun()

我还更改了 HttpSensor,以便强制脚本执行独立于 JSON 内容,但它也不起作用。我检查了 Airflow 的日志,它们没有提供任何有意义的信息它只是说执行失败。response_check=lambda response: response.status_code==200

任何建议都会很棒。我对 Airflow 不是很熟悉,所以可能出了点问题,但我不知道解决方案是什么。

提前感谢您的帮助。

HTTP Airflow Webhook CVAT

评论


答: 暂无答案