业务流程协调程序在 AzureDurablefunction C 中的每个 Timertrigger 中调用多次#

Orchestrator is calling multiple times in each Timertrigger in AzureDurablefunction C#

提问人:Shashimk 提问时间:11/9/2023 最后编辑:Shashimk 更新时间:11/9/2023 访问量:120

问:

在 Azure Durable Function 中,当 Timertrigger 每分钟触发一次时,Orchestrator 会多次调用,因为此 SubOrchestrator 也会多次调用并多次获取 SubOrchestrator 的日志。

定时触发器

 [Function("TimerTrigger_EveryMinute")]
        public async Task TimerTrigger_EveryMinute(
            [TimerTrigger("0 * * * * *")] TimerInfo timerInfo,
            [DurableClient] DurableTaskClient starter)
        {
            _logger.LogInformation($"Running timer trigger");

            string instanceId = await starter.ScheduleNewOrchestrationInstanceAsync(RunOrchestrator);
            await starter.WaitForInstanceCompletionAsync(instanceId);

            _logger.LogInformation($"Completed timer trigger orchestration with ID = '{instanceId}'");
        }

业务流程协调程序

     [Function("RunOrchestrator")]
        public async Task RunOrchestrator(
            [OrchestrationTrigger] TaskOrchestrationContext context)
        {
            var ongoingResponse = await context.CallActivityAsync<ResponseModel>(RESPONSEMODEL, $"{DateTime.Now.ToString()}");

            if (ongoingResponse == null || ongoingResponse.Count == 0)
                return;

            var sessionId = ongoingResponse.Where(x => x.Id != null).OrderBy(x => x.TId).GroupBy(tx => tx.Id).Select(g => g.ToList());

            var chunkedSessions = sessionId.Chunk(1);

            var setCTasks = new List<Task<List<OngoingStatus>>>();

            foreach (var chunk in chunkedSessions)
            {
                setCTasks.Add(context.CallSubOrchestratorAsync<List<OngoingStatus>>(PROCESS, chunk.First()));
            }

            await Task.WhenAll(setCTasks);
        }

子编排器

[Function("PROCESS")]
        public async Task<List<OngoingStatus>> PROCESS([OrchestrationTrigger] TaskOrchestrationContext context, List<OngoingCResult> ongoingSessions)
        {
            List<OngoingStatus> response = new List<OngoingStatus>();
            TimeSpan timeout = TimeSpan.FromSeconds(1);
            DateTime deadline = context.CurrentUtcDateTime.Add(timeout);

            using (var cts = new CancellationTokenSource())
            {
                var timeoutUsingContext = context.CreateTimer(deadline, cts.Token);

                var newSessions = ongoingSessions.Select(item => item.TransactionId).ToList();

                var newSessionsTask = new List<Task<OngoingStatus>>();
                foreach (var session in newSessions)
                {
                    setProfileInput setProfileInput = new setProfileInput();
                   
                    setProfileInput.Id = Id;

                    newSessionsTask.Add(context.CallActivityAsync<OngoingStatus>("SetProfile", setProfileInput));
                }

                var newSessionsWinnerTask = await Task.WhenAny(Task.WhenAll(newSessionsTask), timeoutUsingContext);
                if (newSessionsWinnerTask == timeoutUsingContext)
                    _logger.LogError($"timed out in suborchestrator ");
                else
                    _logger.LogError($" Completed in suborchestrator ");

            }
            return response;
        }
    

活动触发器

[Function("SetProfile")]
        public async Task<OngoingStatus> SetProfile([ActivityTrigger] SetProfileInput setProfileInput)
        {
            var sessionStatus = new OngoingStatus();

            await Task.Delay(TimeSpan.FromSeconds(3));

            _logger.LogError($"running  in SetProfile");
            return sessionStatus;
        }
    }

}

输出: https://i.stack.imgur.com/Frp0U.jpg .

我需要的是:
1)在每个 Timertrigger 中,它应该只在该触发器中调用一次 Orchestrator。

C# .NET Azure-Durable-Functions 业务流程

评论

1赞 Zdeněk Jelínek 11/9/2023
澄清一下,计时器触发器是多次调用还是多次调用业务流程协调程序?业务流程协调程序和子业务流程协调程序逻辑的多次运行是持久函数的核心功能。例如,参见 learn.microsoft.com/en-us/azure/azure-functions/durable/...
0赞 Shashimk 11/9/2023
计时器触发器每分钟仅调用一次,但 Orchestrator 在该分钟内多次调用,因此多次获取 Orchestrator 和 SubOrchestrator 日志
1赞 Zdeněk Jelínek 11/9/2023
请查看我链接的文档。它包括有关如何处理由于业务流程协调程序重播而导致的重复日志记录的参考。
0赞 Thomas 11/9/2023
这是预期的行为,请参阅 learn.microsoft.com/en-us/azure/azure-functions/durable/... 和 learn.microsoft.com/en-us/azure/azure-functions/durable/...

答:

0赞 SaiVamshiKrishna 11/9/2023 #1

如果要阻止后续 Orchestrator 调用,可以使用内存中标志变量来跟踪上次执行时间。

这将确保业务流程协调程序在每次计时器触发器调用中仅执行一次。该变量跟踪上次执行业务流程协调程序的时间。如果当前时间大于指定的时间间隔 () ,则将启动业务流程协调程序。否则,代码将记录一条消息,指示业务流程协调程序已在指定的时间间隔内执行。lastExecutionTimetimeIntervallastExecutionTime

    using Microsoft.Azure.Functions.Worker;
    using Microsoft.Azure.Functions.Worker.Http;
    using Microsoft.Azure.WebJobs.Extensions.DurableTask;
    using Microsoft.Azure.WebJobs;
    using Microsoft.DurableTask;
    using Microsoft.DurableTask.Client;
    using Microsoft.Extensions.Logging;
    using System.Threading.Tasks;
    using System;
    
    namespace DurableApp
    {
        public static class Function1
        {
            private static DateTime lastExecutionTime = DateTime.MinValue; // Initialize with an earlier date
    
            [FunctionName("MyTimerTrigger")]
            public static async Task Run(
                [TimerTrigger("0 */1 * * * *")] TimerInfo myTimer, // Replace with your desired schedule
                [DurableClient] IDurableOrchestrationClient starter,
                ILogger log)
            {
                TimeSpan timeInterval = TimeSpan.FromMinutes(60); // Specify the time interval
    
                if (DateTime.UtcNow - lastExecutionTime >= timeInterval)
                {
                    // Replace "MyOrchestratorFunction" with the name of your Orchestrator function.
                    string instanceId = await starter.StartNewAsync("MyOrchestratorFunction", null);
    
                    log.LogInformation($"Started Orchestrator with ID = {instanceId}");
    
                    // Update the last execution time to prevent subsequent Orchestrator calls within the time interval.
                    lastExecutionTime = DateTime.UtcNow;
                }
                else
                {
                    log.LogInformation("Orchestrator already executed within the specified time interval.");
                }
            }
    
            [FunctionName("MyOrchestratorFunction")]
            public static async Task<string> MyOrchestratorFunction(
                [OrchestrationTrigger] IDurableOrchestrationContext context)
            {
                // Add your Orchestrator logic here
                // This function can be called only once per TimerTrigger execution
    
                return "Orchestrator completed";
            }
        }
    }

结果 enter image description here

此机制通过确保仅当自上次执行以来经过足够多的时间时才调用业务流程协调程序来防止重复的业务流程协调程序执行。

带活动功能

    using Microsoft.Azure.Functions.Worker;
    using Microsoft.Azure.Functions.Worker.Http;
    using Microsoft.Azure.WebJobs.Extensions.DurableTask;
    using Microsoft.Azure.WebJobs;
    using Microsoft.DurableTask;
    using Microsoft.DurableTask.Client;
    using Microsoft.Extensions.Logging;
    using System.Threading.Tasks;
    using System;
    
    namespace DurableApp
    {
        public static class Function1
        {
            private static DateTime lastExecutionTime = DateTime.MinValue; // Initialize with an earlier date
    
            [FunctionName("MyTimerTrigger")]
            public static async Task Run(
                [TimerTrigger("0 */1 * * * *")] TimerInfo myTimer, // Replace with your desired schedule
                [DurableClient] IDurableOrchestrationClient starter,
                ILogger log)
            {
                TimeSpan timeInterval = TimeSpan.FromMinutes(60); // Specify the time interval
    
                if (DateTime.UtcNow - lastExecutionTime >= timeInterval)
                {
                    string instanceId = await starter.StartNewAsync("MyOrchestratorFunction", null);
    
                    log.LogInformation($"Started Orchestrator with ID = {instanceId}");
    
                    // Update the last execution time to prevent subsequent Orchestrator calls within the time interval.
                    lastExecutionTime = DateTime.UtcNow;
                }
                else
                {
                    log.LogInformation("Orchestrator already executed within the specified time interval.");
                }
            }
    
            [FunctionName("MyOrchestratorFunction")]
            public static async Task<string> MyOrchestratorFunction(
                [OrchestrationTrigger] IDurableOrchestrationContext context)
            {
    
                string output = await context.CallActivityAsync<string>("MyActivityFunction", null);
                return output;
    
            }
    
            
            [FunctionName("MyActivityFunction")]
            public static string MyActivityFunction([ActivityTrigger] string name, ILogger log)
            {
                log.LogInformation($"C# Timer trigger function executed at: {DateTime.Now}");
                return $"C# Timer trigger function executed at: {DateTime.Now}";
            }
        }
    }

结果 enter image description here

评论

0赞 Shashimk 11/9/2023
有没有其他方法可以通过阻止重播在一分钟内仅调用一次 Orchestrator.If I don't use SubOrchestrator 它工作正常。但是当我使用 SubOrchestrator 时,如果 SubOrchestrator 超时,那么它只会多次调用 Orchestrator 并多次打印 Orchestrator 日志。如果 SubOrchestrator 在“时间”内完成,则其工作正常。
0赞 SaiVamshiKrishna 11/9/2023
可以使用 Durable Function 检查点来防止在 SubOrchestrator 超时时重播 Orchestrator。如果 SubOrchestrator 超时,将从检查点重播 Orchestrator 函数。但是,检查点只会创建一次,因此不会多次重播 Orchestrator 函数。
0赞 Shashimk 11/10/2023
我想知道为什么 Replay 在我的代码中快乐。我遵循了 Orchestrator 函数代码约束。Durable Function 检查点仅用于开发/调试目的。
0赞 SaiVamshiKrishna 11/16/2023
由于在 PROCESS 子业务流程协调程序函数中调用 SetProfile 活动函数的方式,可能会发生代码中的重播行为。调用 context 时。CallActivityAsync<OngoingStatus>(“SetProfile”, setProfileInput) 实质上是为 newSessions 列表中的每个会话启动 SetProfile 活动函数的新实例。这意味着,如果重播 PROCESS 子业务流程协调程序函数,它将再次为每个会话启动 SetProfile 活动函数的新实例,即使以前的实例已经完成。