如何在我的 mqttnet 代理中实现冗余?

How to achieve redundancy within my mqttnet broker?

提问人:IMEN KAABACHI 提问时间:7/7/2023 更新时间:7/7/2023 访问量:86

问:

我想构建一个支持高可用性的 mqttnet 代理。我想将冗余功能添加到我的服务器。 我正在构建一个具有 2 个节点的 mqtt 代理,我想在 2 个节点之间复制消息。 节点是具有一些基本功能的服务器,它们只是接收和显示来自客户端的消息。 以下是其中一台服务器的代码:

using System.Text;
using System.Text.Json;
using MQTTnet;
// using MQTTnet.Protocol;
using MQTTnet.Server;
using static System.Console;

namespace Server2;

internal abstract class Server2
{
    static string storePath = "Server2/RetainedMessages.json";

    private static async Task Main(string[] args)
    {
// Create the options for our MQTT Broker
        var options = new MqttServerOptionsBuilder()
            // set endpoint to localhost
            .WithDefaultEndpoint()
            .WithDefaultEndpointPort(1884).WithPersistentSessions();

// creates a new mqtt server     
        var server = new MqttFactory().CreateMqttServer(options.Build());
        
        
        server.LoadingRetainedMessageAsync += ServerOnLoadingRetainedMessageAsync;

// start the server with options  
        await server.StartAsync(); 
        
// handler for new connections
        server.ClientConnectedAsync += ClientConnectedEventArgs;
        
// handler for new messages
        server.InterceptingPublishAsync += ServerInterceptingPublishAsync;

// keep application running until user press a key
        WriteLine("Press any key to stop the server...");
        ReadLine();

//handlers implementation

        Task ServerInterceptingPublishAsync(InterceptingPublishEventArgs arg)
        {
            // Convert Payload to string
            var payloadSegment = arg.ApplicationMessage.PayloadSegment;
            var payload = Encoding.UTF8.GetString(payloadSegment.Array!, payloadSegment.Offset, payloadSegment.Count);

            WriteLine(
                " TimeStamp: {0} -- Message: ClientId = {1}, Topic = {2}, Payload = {3}, QoS = {4}, Retain-Flag = {5}",
                DateTime.Now,
                arg.ClientId,
                arg.ApplicationMessage?.Topic,
                payload,
                arg.ApplicationMessage?.QualityOfServiceLevel,
                arg.ApplicationMessage?.Retain);

            return Task.CompletedTask;
        }
        
        
        static Task ClientConnectedEventArgs(ClientConnectedEventArgs arg)
        {
            WriteLine("New connection: ClientId = {0}, Endpoint = {1}", arg.ClientId, arg.Endpoint);
            return Task.CompletedTask;
        }

    }

    private static async Task ServerOnLoadingRetainedMessageAsync(LoadingRetainedMessagesEventArgs arg)
    {
        var models = arg.LoadedRetainedMessages.Select(MqttRetainedMessageModel.Create);
        var buffer = JsonSerializer.SerializeToUtf8Bytes(models);
        await File.WriteAllBytesAsync(storePath, buffer);
        WriteLine("Retained messages saved.");
    }
}
高可用 冗余 MQTTNet

评论


答: 暂无答案