提问人:Shashimk 提问时间:11/9/2023 最后编辑:Shashimk 更新时间:11/9/2023 访问量:120
业务流程协调程序在 AzureDurablefunction C 中的每个 Timertrigger 中调用多次#
Orchestrator is calling multiple times in each Timertrigger in AzureDurablefunction C#
问:
在 Azure Durable Function 中,当 Timertrigger 每分钟触发一次时,Orchestrator 会多次调用,因为此 SubOrchestrator 也会多次调用并多次获取 SubOrchestrator 的日志。
定时触发器
[Function("TimerTrigger_EveryMinute")]
public async Task TimerTrigger_EveryMinute(
[TimerTrigger("0 * * * * *")] TimerInfo timerInfo,
[DurableClient] DurableTaskClient starter)
{
_logger.LogInformation($"Running timer trigger");
string instanceId = await starter.ScheduleNewOrchestrationInstanceAsync(RunOrchestrator);
await starter.WaitForInstanceCompletionAsync(instanceId);
_logger.LogInformation($"Completed timer trigger orchestration with ID = '{instanceId}'");
}
业务流程协调程序
[Function("RunOrchestrator")]
public async Task RunOrchestrator(
[OrchestrationTrigger] TaskOrchestrationContext context)
{
var ongoingResponse = await context.CallActivityAsync<ResponseModel>(RESPONSEMODEL, $"{DateTime.Now.ToString()}");
if (ongoingResponse == null || ongoingResponse.Count == 0)
return;
var sessionId = ongoingResponse.Where(x => x.Id != null).OrderBy(x => x.TId).GroupBy(tx => tx.Id).Select(g => g.ToList());
var chunkedSessions = sessionId.Chunk(1);
var setCTasks = new List<Task<List<OngoingStatus>>>();
foreach (var chunk in chunkedSessions)
{
setCTasks.Add(context.CallSubOrchestratorAsync<List<OngoingStatus>>(PROCESS, chunk.First()));
}
await Task.WhenAll(setCTasks);
}
子编排器
[Function("PROCESS")]
public async Task<List<OngoingStatus>> PROCESS([OrchestrationTrigger] TaskOrchestrationContext context, List<OngoingCResult> ongoingSessions)
{
List<OngoingStatus> response = new List<OngoingStatus>();
TimeSpan timeout = TimeSpan.FromSeconds(1);
DateTime deadline = context.CurrentUtcDateTime.Add(timeout);
using (var cts = new CancellationTokenSource())
{
var timeoutUsingContext = context.CreateTimer(deadline, cts.Token);
var newSessions = ongoingSessions.Select(item => item.TransactionId).ToList();
var newSessionsTask = new List<Task<OngoingStatus>>();
foreach (var session in newSessions)
{
setProfileInput setProfileInput = new setProfileInput();
setProfileInput.Id = Id;
newSessionsTask.Add(context.CallActivityAsync<OngoingStatus>("SetProfile", setProfileInput));
}
var newSessionsWinnerTask = await Task.WhenAny(Task.WhenAll(newSessionsTask), timeoutUsingContext);
if (newSessionsWinnerTask == timeoutUsingContext)
_logger.LogError($"timed out in suborchestrator ");
else
_logger.LogError($" Completed in suborchestrator ");
}
return response;
}
活动触发器
[Function("SetProfile")]
public async Task<OngoingStatus> SetProfile([ActivityTrigger] SetProfileInput setProfileInput)
{
var sessionStatus = new OngoingStatus();
await Task.Delay(TimeSpan.FromSeconds(3));
_logger.LogError($"running in SetProfile");
return sessionStatus;
}
}
}
输出: https://i.stack.imgur.com/Frp0U.jpg .
我需要的是:
1)在每个 Timertrigger 中,它应该只在该触发器中调用一次 Orchestrator。
答:
0赞
SaiVamshiKrishna
11/9/2023
#1
如果要阻止后续 Orchestrator 调用,可以使用内存中标志变量来跟踪上次执行时间。
这将确保业务流程协调程序在每次计时器触发器调用中仅执行一次。该变量跟踪上次执行业务流程协调程序的时间。如果当前时间大于指定的时间间隔 () ,则将启动业务流程协调程序。否则,代码将记录一条消息,指示业务流程协调程序已在指定的时间间隔内执行。lastExecutionTime
timeInterval
lastExecutionTime
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;
using System;
namespace DurableApp
{
public static class Function1
{
private static DateTime lastExecutionTime = DateTime.MinValue; // Initialize with an earlier date
[FunctionName("MyTimerTrigger")]
public static async Task Run(
[TimerTrigger("0 */1 * * * *")] TimerInfo myTimer, // Replace with your desired schedule
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
TimeSpan timeInterval = TimeSpan.FromMinutes(60); // Specify the time interval
if (DateTime.UtcNow - lastExecutionTime >= timeInterval)
{
// Replace "MyOrchestratorFunction" with the name of your Orchestrator function.
string instanceId = await starter.StartNewAsync("MyOrchestratorFunction", null);
log.LogInformation($"Started Orchestrator with ID = {instanceId}");
// Update the last execution time to prevent subsequent Orchestrator calls within the time interval.
lastExecutionTime = DateTime.UtcNow;
}
else
{
log.LogInformation("Orchestrator already executed within the specified time interval.");
}
}
[FunctionName("MyOrchestratorFunction")]
public static async Task<string> MyOrchestratorFunction(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
// Add your Orchestrator logic here
// This function can be called only once per TimerTrigger execution
return "Orchestrator completed";
}
}
}
结果
此机制通过确保仅当自上次执行以来经过足够多的时间时才调用业务流程协调程序来防止重复的业务流程协调程序执行。
带活动功能
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Http;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs;
using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;
using System;
namespace DurableApp
{
public static class Function1
{
private static DateTime lastExecutionTime = DateTime.MinValue; // Initialize with an earlier date
[FunctionName("MyTimerTrigger")]
public static async Task Run(
[TimerTrigger("0 */1 * * * *")] TimerInfo myTimer, // Replace with your desired schedule
[DurableClient] IDurableOrchestrationClient starter,
ILogger log)
{
TimeSpan timeInterval = TimeSpan.FromMinutes(60); // Specify the time interval
if (DateTime.UtcNow - lastExecutionTime >= timeInterval)
{
string instanceId = await starter.StartNewAsync("MyOrchestratorFunction", null);
log.LogInformation($"Started Orchestrator with ID = {instanceId}");
// Update the last execution time to prevent subsequent Orchestrator calls within the time interval.
lastExecutionTime = DateTime.UtcNow;
}
else
{
log.LogInformation("Orchestrator already executed within the specified time interval.");
}
}
[FunctionName("MyOrchestratorFunction")]
public static async Task<string> MyOrchestratorFunction(
[OrchestrationTrigger] IDurableOrchestrationContext context)
{
string output = await context.CallActivityAsync<string>("MyActivityFunction", null);
return output;
}
[FunctionName("MyActivityFunction")]
public static string MyActivityFunction([ActivityTrigger] string name, ILogger log)
{
log.LogInformation($"C# Timer trigger function executed at: {DateTime.Now}");
return $"C# Timer trigger function executed at: {DateTime.Now}";
}
}
}
结果
评论
0赞
Shashimk
11/9/2023
有没有其他方法可以通过阻止重播在一分钟内仅调用一次 Orchestrator.If I don't use SubOrchestrator 它工作正常。但是当我使用 SubOrchestrator 时,如果 SubOrchestrator 超时,那么它只会多次调用 Orchestrator 并多次打印 Orchestrator 日志。如果 SubOrchestrator 在“时间”内完成,则其工作正常。
0赞
SaiVamshiKrishna
11/9/2023
可以使用 Durable Function 检查点来防止在 SubOrchestrator 超时时重播 Orchestrator。如果 SubOrchestrator 超时,将从检查点重播 Orchestrator 函数。但是,检查点只会创建一次,因此不会多次重播 Orchestrator 函数。
0赞
Shashimk
11/10/2023
我想知道为什么 Replay 在我的代码中快乐。我遵循了 Orchestrator 函数代码约束。Durable Function 检查点仅用于开发/调试目的。
0赞
SaiVamshiKrishna
11/16/2023
由于在 PROCESS 子业务流程协调程序函数中调用 SetProfile 活动函数的方式,可能会发生代码中的重播行为。调用 context 时。CallActivityAsync<OngoingStatus>(“SetProfile”, setProfileInput) 实质上是为 newSessions 列表中的每个会话启动 SetProfile 活动函数的新实例。这意味着,如果重播 PROCESS 子业务流程协调程序函数,它将再次为每个会话启动 SetProfile 活动函数的新实例,即使以前的实例已经完成。
评论