diff --git a/Directory.Packages.props b/Directory.Packages.props index c870cc0..6722a8c 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -7,6 +7,7 @@ + diff --git a/SuperSocket.MQTT.sln b/SuperSocket.MQTT.sln index 877dd99..a96527c 100644 --- a/SuperSocket.MQTT.sln +++ b/SuperSocket.MQTT.sln @@ -9,6 +9,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SuperSocket.MQTT", "src\Sup EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SuperSocket.MQTT.Server", "src\SuperSocket.MQTT.Server\SuperSocket.MQTT.Server.csproj", "{B28F3060-D93F-4ED4-A24C-B9C1D4C7ED09}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SuperSocket.MQTT.Client", "src\SuperSocket.MQTT.Client\SuperSocket.MQTT.Client.csproj", "{A1B2C3D4-E5F6-4789-ABCD-EF0123456789}" +EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{0C88DD14-F956-CE84-757C-A364CCF449FC}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SuperSocket.MQTT.Tests", "test\SuperSocket.MQTT.Tests\SuperSocket.MQTT.Tests.csproj", "{23CC3568-7338-490F-96DD-D5731B3AF059}" @@ -47,6 +49,18 @@ Global {B28F3060-D93F-4ED4-A24C-B9C1D4C7ED09}.Release|x64.Build.0 = Release|Any CPU {B28F3060-D93F-4ED4-A24C-B9C1D4C7ED09}.Release|x86.ActiveCfg = Release|Any CPU {B28F3060-D93F-4ED4-A24C-B9C1D4C7ED09}.Release|x86.Build.0 = Release|Any CPU + {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Debug|x64.ActiveCfg = Debug|Any CPU + {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Debug|x64.Build.0 = Debug|Any CPU + {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Debug|x86.ActiveCfg = Debug|Any CPU + {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Debug|x86.Build.0 = Debug|Any CPU + {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Release|Any CPU.Build.0 = Release|Any CPU + {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Release|x64.ActiveCfg = Release|Any CPU + {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Release|x64.Build.0 = Release|Any CPU + {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Release|x86.ActiveCfg = Release|Any CPU + {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Release|x86.Build.0 = Release|Any CPU {23CC3568-7338-490F-96DD-D5731B3AF059}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {23CC3568-7338-490F-96DD-D5731B3AF059}.Debug|Any CPU.Build.0 = Debug|Any CPU {23CC3568-7338-490F-96DD-D5731B3AF059}.Debug|x64.ActiveCfg = Debug|Any CPU @@ -66,6 +80,7 @@ Global GlobalSection(NestedProjects) = preSolution {549EA105-D941-4923-85B8-FA777444A1A9} = {44759F26-AD7B-4D06-BBCF-C70D5AB2C4BE} {B28F3060-D93F-4ED4-A24C-B9C1D4C7ED09} = {44759F26-AD7B-4D06-BBCF-C70D5AB2C4BE} + {A1B2C3D4-E5F6-4789-ABCD-EF0123456789} = {44759F26-AD7B-4D06-BBCF-C70D5AB2C4BE} {23CC3568-7338-490F-96DD-D5731B3AF059} = {0C88DD14-F956-CE84-757C-A364CCF449FC} EndGlobalSection EndGlobal diff --git a/src/SuperSocket.MQTT.Client/MQTTClient.cs b/src/SuperSocket.MQTT.Client/MQTTClient.cs new file mode 100644 index 0000000..a5d1945 --- /dev/null +++ b/src/SuperSocket.MQTT.Client/MQTTClient.cs @@ -0,0 +1,205 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.Net; +using System.Threading; +using System.Threading.Tasks; +using SuperSocket.Client; +using SuperSocket.MQTT.Packets; + +namespace SuperSocket.MQTT.Client +{ + /// + /// MQTT client implementation based on SuperSocket.Client. + /// Uses the same pipeline filter as the MQTT server for protocol compatibility. + /// + public class MQTTClient : IAsyncDisposable + { + private readonly IEasyClient _client; + private ushort _packetIdentifier = 0; // Will be incremented to 1 before first use (0 is invalid per MQTT spec) + + /// + /// Creates a new MQTT client instance. + /// + public MQTTClient() + { + var pipelineFilter = new MQTTPipelineFilter(); + _client = new EasyClient(pipelineFilter); + } + + /// + /// Connects to an MQTT broker at the specified endpoint. + /// + /// The endpoint of the MQTT broker. + /// Cancellation token. + /// True if the connection was successful, false otherwise. + public async ValueTask ConnectAsync(EndPoint endPoint, CancellationToken cancellationToken = default) + { + return await _client.ConnectAsync(endPoint, cancellationToken); + } + + /// + /// Sends an MQTT CONNECT packet and waits for CONNACK response. + /// + /// The client identifier. + /// Keep alive interval in seconds. + /// Cancellation token. + /// The CONNACK packet received from the broker. + public async ValueTask SendConnectAsync(string clientId, short keepAlive = 60, CancellationToken cancellationToken = default) + { + var connectPacket = new ConnectPacket + { + Type = ControlPacketType.CONNECT, + ProtocolName = "MQTT", + ProtocolLevel = 4, + ClientId = clientId, + KeepAlive = keepAlive + }; + + await _client.SendAsync(MQTTPacketEncoder.Default, connectPacket); + + var response = await _client.ReceiveAsync(); + return response as ConnAckPacket; + } + + /// + /// Sends an MQTT PINGREQ packet and waits for PINGRESP response. + /// + /// Cancellation token. + /// The PINGRESP packet received from the broker. + public async ValueTask SendPingAsync(CancellationToken cancellationToken = default) + { + var pingPacket = new PingReqPacket + { + Type = ControlPacketType.PINGREQ + }; + + await _client.SendAsync(MQTTPacketEncoder.Default, pingPacket); + + var response = await _client.ReceiveAsync(); + return response as PingRespPacket; + } + + /// + /// Sends an MQTT SUBSCRIBE packet and waits for SUBACK response. + /// + /// The topics to subscribe to. + /// Cancellation token. + /// The SUBACK packet received from the broker. + public async ValueTask SendSubscribeAsync(IEnumerable topicFilters, CancellationToken cancellationToken = default) + { + var packetId = GetNextPacketIdentifier(); + var subscribePacket = new SubscribePacket + { + Type = ControlPacketType.SUBSCRIBE, + Flags = 0x02, // SUBSCRIBE must have flag bits set to 0010 + PacketIdentifier = packetId, + TopicFilters = new List(topicFilters) + }; + + await _client.SendAsync(MQTTPacketEncoder.Default, subscribePacket); + + var response = await _client.ReceiveAsync(); + return response as SubAckPacket; + } + + /// + /// Sends an MQTT SUBSCRIBE packet for a single topic and waits for SUBACK response. + /// + /// The topic to subscribe to. + /// The QoS level for the subscription. + /// Cancellation token. + /// The SUBACK packet received from the broker. + public async ValueTask SendSubscribeAsync(string topic, byte qos = 0, CancellationToken cancellationToken = default) + { + return await SendSubscribeAsync(new[] { new TopicFilter { Topic = topic, QoS = qos } }, cancellationToken); + } + + /// + /// Sends an MQTT PUBLISH packet. + /// + /// The topic to publish to. + /// The message payload. + /// The QoS level. + /// Whether to retain the message. + /// Cancellation token. + /// A task representing the async operation. For QoS > 0, returns the acknowledgement packet. + public async ValueTask SendPublishAsync(string topic, byte[] payload, byte qos = 0, bool retain = false, CancellationToken cancellationToken = default) + { + var packetId = qos > 0 ? GetNextPacketIdentifier() : (ushort)0; + + byte flags = (byte)((qos & 0x03) << 1); + if (retain) + { + flags |= 0x01; + } + + var publishPacket = new PublishPacket + { + Type = ControlPacketType.PUBLISH, + Flags = flags, + TopicName = topic, + PacketIdentifier = packetId, + Qos = qos, + Retain = retain, + Payload = new ReadOnlyMemory(payload) + }; + + await _client.SendAsync(MQTTPacketEncoder.Default, publishPacket); + + if (qos > 0) + { + var response = await _client.ReceiveAsync(); + return response; + } + + return null; + } + + /// + /// Sends an MQTT DISCONNECT packet. + /// + /// Cancellation token. + public async ValueTask SendDisconnectAsync(CancellationToken cancellationToken = default) + { + var disconnectPacket = new DisconnectPacket + { + Type = ControlPacketType.DISCONNECT + }; + + await _client.SendAsync(MQTTPacketEncoder.Default, disconnectPacket); + } + + /// + /// Receives an MQTT packet from the broker. + /// + /// Cancellation token. + /// The received MQTT packet. + public async ValueTask ReceiveAsync(CancellationToken cancellationToken = default) + { + return await _client.ReceiveAsync(); + } + + /// + /// Closes the connection to the broker. + /// + /// A task representing the async close operation. + public async ValueTask CloseAsync() + { + await _client.CloseAsync(); + } + + /// + /// Disposes the MQTT client asynchronously. + /// + public async ValueTask DisposeAsync() + { + await CloseAsync(); + } + + private ushort GetNextPacketIdentifier() + { + return ++_packetIdentifier; + } + } +} diff --git a/src/SuperSocket.MQTT.Client/MQTTPacketEncoder.cs b/src/SuperSocket.MQTT.Client/MQTTPacketEncoder.cs new file mode 100644 index 0000000..61312a6 --- /dev/null +++ b/src/SuperSocket.MQTT.Client/MQTTPacketEncoder.cs @@ -0,0 +1,79 @@ +using System; +using System.Buffers; +using SuperSocket.ProtoBase; + +namespace SuperSocket.MQTT.Client +{ + /// + /// Encodes MQTT packets to bytes for transmission. + /// This class is stateless and should be used as a singleton. + /// + public class MQTTPacketEncoder : IPackageEncoder + { + /// + /// Singleton instance of the encoder. + /// MQTTPacketEncoder is stateless, so a single instance can be safely reused. + /// + public static readonly MQTTPacketEncoder Default = new MQTTPacketEncoder(); + + /// + /// Encodes an MQTT packet into the specified buffer writer. + /// + /// The buffer writer to write the encoded packet to. + /// The MQTT packet to encode. + /// The number of bytes written to the buffer. + public int Encode(IBufferWriter writer, MQTTPacket pack) + { + var totalBytes = 0; + + // First, encode the body to determine its length + var bodyWriter = new ArrayBufferWriter(); + var bodyLength = pack.EncodeBody(bodyWriter); + var bodyData = bodyWriter.WrittenSpan; + + // Calculate the fixed header + var packetTypeAndFlags = ((byte)pack.Type << 4) | (pack.Flags & 0x0F); + + // Write fixed header + writer.GetSpan(1)[0] = (byte)packetTypeAndFlags; + writer.Advance(1); + totalBytes++; + + // Write remaining length (variable length encoding) + totalBytes += WriteRemainingLength(writer, bodyLength); + + // Write body + if (bodyLength > 0) + { + var destSpan = writer.GetSpan(bodyLength); + bodyData.CopyTo(destSpan); + writer.Advance(bodyLength); + totalBytes += bodyLength; + } + + return totalBytes; + } + + private static int WriteRemainingLength(IBufferWriter writer, int length) + { + var bytesWritten = 0; + do + { + var encodedByte = length % 128; + length /= 128; + + if (length > 0) + { + encodedByte |= 0x80; + } + + writer.GetSpan(1)[0] = (byte)encodedByte; + writer.Advance(1); + bytesWritten++; + } + while (length > 0); + + return bytesWritten; + } + } +} diff --git a/src/SuperSocket.MQTT.Client/SuperSocket.MQTT.Client.csproj b/src/SuperSocket.MQTT.Client/SuperSocket.MQTT.Client.csproj new file mode 100644 index 0000000..0f673ea --- /dev/null +++ b/src/SuperSocket.MQTT.Client/SuperSocket.MQTT.Client.csproj @@ -0,0 +1,6 @@ + + + + + + diff --git a/src/SuperSocket.MQTT.Server/SuperSocketHostBuilderExtensions.cs b/src/SuperSocket.MQTT.Server/SuperSocketHostBuilderExtensions.cs index f832d83..b1acfe6 100644 --- a/src/SuperSocket.MQTT.Server/SuperSocketHostBuilderExtensions.cs +++ b/src/SuperSocket.MQTT.Server/SuperSocketHostBuilderExtensions.cs @@ -1,4 +1,5 @@ using System; +using System.Reflection; using Microsoft.Extensions.DependencyInjection; using SuperSocket.Command; using SuperSocket.Server; @@ -33,8 +34,8 @@ public static ISuperSocketHostBuilder UseMQTT(this ISuperSocketHostB .UseSession() .UseCommand(options => { - // Add all command classes from the current assembly - options.AddCommandAssembly(typeof(MQTTPacket).Assembly); + // Add all command classes from the current executing assembly + options.AddCommandAssembly(Assembly.GetExecutingAssembly()); }) .UseMiddleware(sp => sp.GetRequiredService()) .ConfigureServices((ctx, services) => diff --git a/src/SuperSocket.MQTT/MQTTPipelineFilter.cs b/src/SuperSocket.MQTT/MQTTPipelineFilter.cs index 8d8f03b..cbbf1be 100644 --- a/src/SuperSocket.MQTT/MQTTPipelineFilter.cs +++ b/src/SuperSocket.MQTT/MQTTPipelineFilter.cs @@ -6,7 +6,19 @@ namespace SuperSocket.MQTT { public class MQTTPipelineFilter : IPipelineFilter { - public IPackageDecoder Decoder { get; set; } + /// + /// Singleton MQTTPacketDecoder instance shared across all MQTTPipelineFilter instances. + /// MQTTPacketDecoder is stateless, so a single instance can be safely reused. + /// + private static readonly MQTTPacketDecoder _sharedDecoder = new MQTTPacketDecoder(); + + private IPackageDecoder _decoder; + + public IPackageDecoder Decoder + { + get => _decoder ?? _sharedDecoder; + set => _decoder = value; + } public IPipelineFilter NextFilter { get; private set; } diff --git a/test/SuperSocket.MQTT.Tests/MQTTClientE2ETests.cs b/test/SuperSocket.MQTT.Tests/MQTTClientE2ETests.cs new file mode 100644 index 0000000..eafe65b --- /dev/null +++ b/test/SuperSocket.MQTT.Tests/MQTTClientE2ETests.cs @@ -0,0 +1,204 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Xunit; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; +using SuperSocket.MQTT.Packets; +using SuperSocket.MQTT.Server; +using SuperSocket.MQTT.Client; +using SuperSocket.Server; +using SuperSocket.Server.Host; + +namespace SuperSocket.MQTT.Tests +{ + /// + /// End-to-end integration tests using the MQTTClient to connect to the MQTT server. + /// These tests verify the full request/response flow between client and server. + /// + public class MQTTClientE2ETests : IAsyncLifetime + { + private IHost? _host; + private const int TestPort = 21883; + private readonly IPEndPoint _serverEndPoint = new IPEndPoint(IPAddress.Loopback, TestPort); + + public async Task InitializeAsync() + { + _host = SuperSocketHostBuilder + .Create() + .UseMQTT() + .UseInProcSessionContainer() + .ConfigureAppConfiguration((hostCtx, configApp) => + { + configApp.AddInMemoryCollection(new Dictionary + { + { "serverOptions:name", "MQTTTestServer" }, + { "serverOptions:listeners:0:ip", "Any" }, + { "serverOptions:listeners:0:port", TestPort.ToString() } + }); + }) + .Build(); + + await _host.StartAsync(); + // Give the server time to start listening + await Task.Delay(100); + } + + public async Task DisposeAsync() + { + if (_host != null) + { + await _host.StopAsync(); + _host.Dispose(); + } + } + + [Fact] + public async Task E2E_ConnectToServer_ShouldReceiveConnAck() + { + // Arrange + await using var client = new MQTTClient(); + + // Act + var connected = await client.ConnectAsync(_serverEndPoint); + Assert.True(connected, "Should connect to server"); + + var connAck = await client.SendConnectAsync("TestClient_" + Guid.NewGuid().ToString("N")[..8]); + + // Assert + Assert.NotNull(connAck); + Assert.Equal(ControlPacketType.CONNACK, connAck.Type); + Assert.Equal(0, connAck.ReturnCode); // Connection accepted + } + + [Fact] + public async Task E2E_PingServer_ShouldReceivePingResp() + { + // Arrange + await using var client = new MQTTClient(); + var connected = await client.ConnectAsync(_serverEndPoint); + Assert.True(connected, "Should connect to server"); + + await client.SendConnectAsync("TestClient_" + Guid.NewGuid().ToString("N")[..8]); + + // Act + var pingResp = await client.SendPingAsync(); + + // Assert + Assert.NotNull(pingResp); + Assert.Equal(ControlPacketType.PINGRESP, pingResp.Type); + } + + [Fact] + public async Task E2E_SubscribeToTopic_ShouldReceiveSubAck() + { + // Arrange + await using var client = new MQTTClient(); + var connected = await client.ConnectAsync(_serverEndPoint); + Assert.True(connected, "Should connect to server"); + + await client.SendConnectAsync("TestClient_" + Guid.NewGuid().ToString("N")[..8]); + + // Act + var subAck = await client.SendSubscribeAsync("test/topic", qos: 0); + + // Assert + Assert.NotNull(subAck); + Assert.Equal(ControlPacketType.SUBACK, subAck.Type); + Assert.Single(subAck.ReturnCodes); + Assert.Equal(0, subAck.ReturnCodes[0]); // QoS 0 granted + } + + [Fact] + public async Task E2E_SubscribeMultipleTopics_ShouldReceiveSubAckWithAllReturnCodes() + { + // Arrange + await using var client = new MQTTClient(); + var connected = await client.ConnectAsync(_serverEndPoint); + Assert.True(connected, "Should connect to server"); + + await client.SendConnectAsync("TestClient_" + Guid.NewGuid().ToString("N")[..8]); + + var topicFilters = new List + { + new TopicFilter { Topic = "home/temperature", QoS = 0 }, + new TopicFilter { Topic = "home/humidity", QoS = 1 }, + new TopicFilter { Topic = "home/pressure", QoS = 2 } + }; + + // Act + var subAck = await client.SendSubscribeAsync(topicFilters); + + // Assert + Assert.NotNull(subAck); + Assert.Equal(ControlPacketType.SUBACK, subAck.Type); + Assert.Equal(3, subAck.ReturnCodes.Count); + Assert.Equal(0, subAck.ReturnCodes[0]); // QoS 0 granted + Assert.Equal(1, subAck.ReturnCodes[1]); // QoS 1 granted + Assert.Equal(2, subAck.ReturnCodes[2]); // QoS 2 granted + } + + [Fact] + public async Task E2E_PublishQoS0_ShouldSucceed() + { + // Arrange + await using var client = new MQTTClient(); + var connected = await client.ConnectAsync(_serverEndPoint); + Assert.True(connected, "Should connect to server"); + + await client.SendConnectAsync("TestClient_" + Guid.NewGuid().ToString("N")[..8]); + + // Act - QoS 0 publish should not return an acknowledgement packet + var result = await client.SendPublishAsync("test/topic", Encoding.UTF8.GetBytes("Hello MQTT!"), qos: 0); + + // Assert - for QoS 0, no acknowledgement is expected + Assert.Null(result); + } + + [Fact] + public async Task E2E_DisconnectFromServer_ShouldComplete() + { + // Arrange + await using var client = new MQTTClient(); + var connected = await client.ConnectAsync(_serverEndPoint); + Assert.True(connected, "Should connect to server"); + + await client.SendConnectAsync("TestClient_" + Guid.NewGuid().ToString("N")[..8]); + + // Act - disconnect should complete without exception + await client.SendDisconnectAsync(); + } + + [Fact] + public async Task E2E_FullFlow_ConnectSubscribePingDisconnect() + { + // Arrange + await using var client = new MQTTClient(); + + // Act & Assert - Connect + var connected = await client.ConnectAsync(_serverEndPoint); + Assert.True(connected, "Should connect to server"); + + // Act & Assert - MQTT Connect + var connAck = await client.SendConnectAsync("TestClient_FullFlow"); + Assert.NotNull(connAck); + Assert.Equal(0, connAck.ReturnCode); + + // Act & Assert - Subscribe + var subAck = await client.SendSubscribeAsync("test/fullflow", qos: 1); + Assert.NotNull(subAck); + Assert.Single(subAck.ReturnCodes); + + // Act & Assert - Ping + var pingResp = await client.SendPingAsync(); + Assert.NotNull(pingResp); + Assert.Equal(ControlPacketType.PINGRESP, pingResp.Type); + + // Act & Assert - Disconnect + await client.SendDisconnectAsync(); + } + } +} diff --git a/test/SuperSocket.MQTT.Tests/SuperSocket.MQTT.Tests.csproj b/test/SuperSocket.MQTT.Tests/SuperSocket.MQTT.Tests.csproj index cb3fdde..8375f1e 100644 --- a/test/SuperSocket.MQTT.Tests/SuperSocket.MQTT.Tests.csproj +++ b/test/SuperSocket.MQTT.Tests/SuperSocket.MQTT.Tests.csproj @@ -20,6 +20,7 @@ +