提问人:IMEN KAABACHI 提问时间:7/7/2023 更新时间:7/7/2023 访问量:86
如何在我的 mqttnet 代理中实现冗余?
How to achieve redundancy within my mqttnet broker?
问:
我想构建一个支持高可用性的 mqttnet 代理。我想将冗余功能添加到我的服务器。 我正在构建一个具有 2 个节点的 mqtt 代理,我想在 2 个节点之间复制消息。 节点是具有一些基本功能的服务器,它们只是接收和显示来自客户端的消息。 以下是其中一台服务器的代码:
using System.Text;
using System.Text.Json;
using MQTTnet;
// using MQTTnet.Protocol;
using MQTTnet.Server;
using static System.Console;
namespace Server2;
internal abstract class Server2
{
static string storePath = "Server2/RetainedMessages.json";
private static async Task Main(string[] args)
{
// Create the options for our MQTT Broker
var options = new MqttServerOptionsBuilder()
// set endpoint to localhost
.WithDefaultEndpoint()
.WithDefaultEndpointPort(1884).WithPersistentSessions();
// creates a new mqtt server
var server = new MqttFactory().CreateMqttServer(options.Build());
server.LoadingRetainedMessageAsync += ServerOnLoadingRetainedMessageAsync;
// start the server with options
await server.StartAsync();
// handler for new connections
server.ClientConnectedAsync += ClientConnectedEventArgs;
// handler for new messages
server.InterceptingPublishAsync += ServerInterceptingPublishAsync;
// keep application running until user press a key
WriteLine("Press any key to stop the server...");
ReadLine();
//handlers implementation
Task ServerInterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
// Convert Payload to string
var payloadSegment = arg.ApplicationMessage.PayloadSegment;
var payload = Encoding.UTF8.GetString(payloadSegment.Array!, payloadSegment.Offset, payloadSegment.Count);
WriteLine(
" TimeStamp: {0} -- Message: ClientId = {1}, Topic = {2}, Payload = {3}, QoS = {4}, Retain-Flag = {5}",
DateTime.Now,
arg.ClientId,
arg.ApplicationMessage?.Topic,
payload,
arg.ApplicationMessage?.QualityOfServiceLevel,
arg.ApplicationMessage?.Retain);
return Task.CompletedTask;
}
static Task ClientConnectedEventArgs(ClientConnectedEventArgs arg)
{
WriteLine("New connection: ClientId = {0}, Endpoint = {1}", arg.ClientId, arg.Endpoint);
return Task.CompletedTask;
}
}
private static async Task ServerOnLoadingRetainedMessageAsync(LoadingRetainedMessagesEventArgs arg)
{
var models = arg.LoadedRetainedMessages.Select(MqttRetainedMessageModel.Create);
var buffer = JsonSerializer.SerializeToUtf8Bytes(models);
await File.WriteAllBytesAsync(storePath, buffer);
WriteLine("Retained messages saved.");
}
}
答: 暂无答案
评论