From 99d8a7e619138353031b72406e9313f78ad2da6c Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Sat, 31 Jan 2026 06:21:24 +0000
Subject: [PATCH 1/5] Initial plan
From d9860f91bd7b1cf3291abbc4c4d5016101886f69 Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Sat, 31 Jan 2026 06:31:56 +0000
Subject: [PATCH 2/5] Add SuperSocket.MQTT.Client project with E2E tests
Co-authored-by: kerryjiang <456060+kerryjiang@users.noreply.github.com>
---
Directory.Packages.props | 1 +
SuperSocket.MQTT.sln | 15 ++
src/SuperSocket.MQTT.Client/MQTTClient.cs | 209 ++++++++++++++++++
.../MQTTPacketEncoder.cs | 64 ++++++
.../SuperSocket.MQTT.Client.csproj | 6 +
.../MQTTPipelineFilterFactory.cs | 19 ++
.../SuperSocketHostBuilderExtensions.cs | 6 +-
src/SuperSocket.MQTT/MQTTPipelineFilter.cs | 8 +
.../MQTTClientE2ETests.cs | 207 +++++++++++++++++
.../SuperSocket.MQTT.Tests.csproj | 1 +
10 files changed, 533 insertions(+), 3 deletions(-)
create mode 100644 src/SuperSocket.MQTT.Client/MQTTClient.cs
create mode 100644 src/SuperSocket.MQTT.Client/MQTTPacketEncoder.cs
create mode 100644 src/SuperSocket.MQTT.Client/SuperSocket.MQTT.Client.csproj
create mode 100644 src/SuperSocket.MQTT.Server/MQTTPipelineFilterFactory.cs
create mode 100644 test/SuperSocket.MQTT.Tests/MQTTClientE2ETests.cs
diff --git a/Directory.Packages.props b/Directory.Packages.props
index c870cc0..6722a8c 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -7,6 +7,7 @@
+
diff --git a/SuperSocket.MQTT.sln b/SuperSocket.MQTT.sln
index 877dd99..a96527c 100644
--- a/SuperSocket.MQTT.sln
+++ b/SuperSocket.MQTT.sln
@@ -9,6 +9,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SuperSocket.MQTT", "src\Sup
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SuperSocket.MQTT.Server", "src\SuperSocket.MQTT.Server\SuperSocket.MQTT.Server.csproj", "{B28F3060-D93F-4ED4-A24C-B9C1D4C7ED09}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SuperSocket.MQTT.Client", "src\SuperSocket.MQTT.Client\SuperSocket.MQTT.Client.csproj", "{A1B2C3D4-E5F6-4789-ABCD-EF0123456789}"
+EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{0C88DD14-F956-CE84-757C-A364CCF449FC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SuperSocket.MQTT.Tests", "test\SuperSocket.MQTT.Tests\SuperSocket.MQTT.Tests.csproj", "{23CC3568-7338-490F-96DD-D5731B3AF059}"
@@ -47,6 +49,18 @@ Global
{B28F3060-D93F-4ED4-A24C-B9C1D4C7ED09}.Release|x64.Build.0 = Release|Any CPU
{B28F3060-D93F-4ED4-A24C-B9C1D4C7ED09}.Release|x86.ActiveCfg = Release|Any CPU
{B28F3060-D93F-4ED4-A24C-B9C1D4C7ED09}.Release|x86.Build.0 = Release|Any CPU
+ {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Debug|x64.Build.0 = Debug|Any CPU
+ {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Debug|x86.Build.0 = Debug|Any CPU
+ {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Release|Any CPU.Build.0 = Release|Any CPU
+ {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Release|x64.ActiveCfg = Release|Any CPU
+ {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Release|x64.Build.0 = Release|Any CPU
+ {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Release|x86.ActiveCfg = Release|Any CPU
+ {A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Release|x86.Build.0 = Release|Any CPU
{23CC3568-7338-490F-96DD-D5731B3AF059}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{23CC3568-7338-490F-96DD-D5731B3AF059}.Debug|Any CPU.Build.0 = Debug|Any CPU
{23CC3568-7338-490F-96DD-D5731B3AF059}.Debug|x64.ActiveCfg = Debug|Any CPU
@@ -66,6 +80,7 @@ Global
GlobalSection(NestedProjects) = preSolution
{549EA105-D941-4923-85B8-FA777444A1A9} = {44759F26-AD7B-4D06-BBCF-C70D5AB2C4BE}
{B28F3060-D93F-4ED4-A24C-B9C1D4C7ED09} = {44759F26-AD7B-4D06-BBCF-C70D5AB2C4BE}
+ {A1B2C3D4-E5F6-4789-ABCD-EF0123456789} = {44759F26-AD7B-4D06-BBCF-C70D5AB2C4BE}
{23CC3568-7338-490F-96DD-D5731B3AF059} = {0C88DD14-F956-CE84-757C-A364CCF449FC}
EndGlobalSection
EndGlobal
diff --git a/src/SuperSocket.MQTT.Client/MQTTClient.cs b/src/SuperSocket.MQTT.Client/MQTTClient.cs
new file mode 100644
index 0000000..a0c4727
--- /dev/null
+++ b/src/SuperSocket.MQTT.Client/MQTTClient.cs
@@ -0,0 +1,209 @@
+using System;
+using System.Collections.Generic;
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+using SuperSocket.Client;
+using SuperSocket.MQTT.Packets;
+
+namespace SuperSocket.MQTT.Client
+{
+ ///
+ /// MQTT client implementation based on SuperSocket.Client.
+ /// Uses the same pipeline filter as the MQTT server for protocol compatibility.
+ ///
+ public class MQTTClient : IAsyncDisposable
+ {
+ private readonly IEasyClient _client;
+ private ushort _packetIdentifier = 0;
+
+ ///
+ /// Creates a new MQTT client instance.
+ ///
+ public MQTTClient()
+ {
+ var pipelineFilter = new MQTTPipelineFilter();
+ _client = new EasyClient(pipelineFilter);
+ }
+
+ ///
+ /// Connects to an MQTT broker at the specified endpoint.
+ ///
+ /// The endpoint of the MQTT broker.
+ /// Cancellation token.
+ /// True if the connection was successful, false otherwise.
+ public async ValueTask ConnectAsync(EndPoint endPoint, CancellationToken cancellationToken = default)
+ {
+ return await _client.ConnectAsync(endPoint, cancellationToken);
+ }
+
+ ///
+ /// Sends an MQTT CONNECT packet and waits for CONNACK response.
+ ///
+ /// The client identifier.
+ /// Keep alive interval in seconds.
+ /// Cancellation token.
+ /// The CONNACK packet received from the broker.
+ public async ValueTask SendConnectAsync(string clientId, short keepAlive = 60, CancellationToken cancellationToken = default)
+ {
+ var connectPacket = new ConnectPacket
+ {
+ Type = ControlPacketType.CONNECT,
+ ProtocolName = "MQTT",
+ ProtocolLevel = 4,
+ ClientId = clientId,
+ KeepAlive = keepAlive
+ };
+
+ var data = MQTTPacketEncoder.Encode(connectPacket);
+ await _client.SendAsync(data);
+
+ var response = await _client.ReceiveAsync();
+ return response as ConnAckPacket;
+ }
+
+ ///
+ /// Sends an MQTT PINGREQ packet and waits for PINGRESP response.
+ ///
+ /// Cancellation token.
+ /// The PINGRESP packet received from the broker.
+ public async ValueTask SendPingAsync(CancellationToken cancellationToken = default)
+ {
+ var pingPacket = new PingReqPacket
+ {
+ Type = ControlPacketType.PINGREQ
+ };
+
+ var data = MQTTPacketEncoder.Encode(pingPacket);
+ await _client.SendAsync(data);
+
+ var response = await _client.ReceiveAsync();
+ return response as PingRespPacket;
+ }
+
+ ///
+ /// Sends an MQTT SUBSCRIBE packet and waits for SUBACK response.
+ ///
+ /// The topics to subscribe to.
+ /// Cancellation token.
+ /// The SUBACK packet received from the broker.
+ public async ValueTask SendSubscribeAsync(IEnumerable topicFilters, CancellationToken cancellationToken = default)
+ {
+ var packetId = GetNextPacketIdentifier();
+ var subscribePacket = new SubscribePacket
+ {
+ Type = ControlPacketType.SUBSCRIBE,
+ Flags = 0x02, // SUBSCRIBE must have flag bits set to 0010
+ PacketIdentifier = packetId,
+ TopicFilters = new List(topicFilters)
+ };
+
+ var data = MQTTPacketEncoder.Encode(subscribePacket);
+ await _client.SendAsync(data);
+
+ var response = await _client.ReceiveAsync();
+ return response as SubAckPacket;
+ }
+
+ ///
+ /// Sends an MQTT SUBSCRIBE packet for a single topic and waits for SUBACK response.
+ ///
+ /// The topic to subscribe to.
+ /// The QoS level for the subscription.
+ /// Cancellation token.
+ /// The SUBACK packet received from the broker.
+ public async ValueTask SendSubscribeAsync(string topic, byte qos = 0, CancellationToken cancellationToken = default)
+ {
+ return await SendSubscribeAsync(new[] { new TopicFilter { Topic = topic, QoS = qos } }, cancellationToken);
+ }
+
+ ///
+ /// Sends an MQTT PUBLISH packet.
+ ///
+ /// The topic to publish to.
+ /// The message payload.
+ /// The QoS level.
+ /// Whether to retain the message.
+ /// Cancellation token.
+ /// A task representing the async operation. For QoS > 0, returns the acknowledgement packet.
+ public async ValueTask SendPublishAsync(string topic, byte[] payload, byte qos = 0, bool retain = false, CancellationToken cancellationToken = default)
+ {
+ var packetId = qos > 0 ? GetNextPacketIdentifier() : (ushort)0;
+
+ byte flags = (byte)((qos & 0x03) << 1);
+ if (retain)
+ {
+ flags |= 0x01;
+ }
+
+ var publishPacket = new PublishPacket
+ {
+ Type = ControlPacketType.PUBLISH,
+ Flags = (byte)(((byte)ControlPacketType.PUBLISH << 4) | flags),
+ TopicName = topic,
+ PacketIdentifier = packetId,
+ Qos = qos,
+ Retain = retain,
+ Payload = new ReadOnlyMemory(payload)
+ };
+
+ var data = MQTTPacketEncoder.Encode(publishPacket);
+ await _client.SendAsync(data);
+
+ if (qos > 0)
+ {
+ var response = await _client.ReceiveAsync();
+ return response;
+ }
+
+ return null;
+ }
+
+ ///
+ /// Sends an MQTT DISCONNECT packet.
+ ///
+ /// Cancellation token.
+ public async ValueTask SendDisconnectAsync(CancellationToken cancellationToken = default)
+ {
+ var disconnectPacket = new DisconnectPacket
+ {
+ Type = ControlPacketType.DISCONNECT
+ };
+
+ var data = MQTTPacketEncoder.Encode(disconnectPacket);
+ await _client.SendAsync(data);
+ }
+
+ ///
+ /// Receives an MQTT packet from the broker.
+ ///
+ /// Cancellation token.
+ /// The received MQTT packet.
+ public async ValueTask ReceiveAsync(CancellationToken cancellationToken = default)
+ {
+ return await _client.ReceiveAsync();
+ }
+
+ ///
+ /// Closes the connection to the broker.
+ ///
+ /// A task representing the async close operation.
+ public async ValueTask CloseAsync()
+ {
+ await _client.CloseAsync();
+ }
+
+ ///
+ /// Disposes the MQTT client asynchronously.
+ ///
+ public async ValueTask DisposeAsync()
+ {
+ await CloseAsync();
+ }
+
+ private ushort GetNextPacketIdentifier()
+ {
+ return ++_packetIdentifier;
+ }
+ }
+}
diff --git a/src/SuperSocket.MQTT.Client/MQTTPacketEncoder.cs b/src/SuperSocket.MQTT.Client/MQTTPacketEncoder.cs
new file mode 100644
index 0000000..fffac6a
--- /dev/null
+++ b/src/SuperSocket.MQTT.Client/MQTTPacketEncoder.cs
@@ -0,0 +1,64 @@
+using System;
+using System.Buffers;
+
+namespace SuperSocket.MQTT.Client
+{
+ ///
+ /// Encodes MQTT packets to bytes for transmission.
+ ///
+ public static class MQTTPacketEncoder
+ {
+ ///
+ /// Encodes an MQTT packet to a byte array.
+ ///
+ /// The MQTT packet to encode.
+ /// The encoded byte array.
+ public static byte[] Encode(MQTTPacket packet)
+ {
+ var writer = new ArrayBufferWriter();
+
+ // First, encode the body to determine its length
+ var bodyWriter = new ArrayBufferWriter();
+ var bodyLength = packet.EncodeBody(bodyWriter);
+ var bodyData = bodyWriter.WrittenSpan;
+
+ // Calculate the fixed header
+ var packetTypeAndFlags = ((byte)packet.Type << 4) | (packet.Flags & 0x0F);
+
+ // Write fixed header
+ writer.GetSpan(1)[0] = (byte)packetTypeAndFlags;
+ writer.Advance(1);
+
+ // Write remaining length (variable length encoding)
+ WriteRemainingLength(writer, bodyLength);
+
+ // Write body
+ if (bodyLength > 0)
+ {
+ var destSpan = writer.GetSpan(bodyLength);
+ bodyData.CopyTo(destSpan);
+ writer.Advance(bodyLength);
+ }
+
+ return writer.WrittenSpan.ToArray();
+ }
+
+ private static void WriteRemainingLength(ArrayBufferWriter writer, int length)
+ {
+ do
+ {
+ var encodedByte = length % 128;
+ length /= 128;
+
+ if (length > 0)
+ {
+ encodedByte |= 0x80;
+ }
+
+ writer.GetSpan(1)[0] = (byte)encodedByte;
+ writer.Advance(1);
+ }
+ while (length > 0);
+ }
+ }
+}
diff --git a/src/SuperSocket.MQTT.Client/SuperSocket.MQTT.Client.csproj b/src/SuperSocket.MQTT.Client/SuperSocket.MQTT.Client.csproj
new file mode 100644
index 0000000..0f673ea
--- /dev/null
+++ b/src/SuperSocket.MQTT.Client/SuperSocket.MQTT.Client.csproj
@@ -0,0 +1,6 @@
+
+
+
+
+
+
diff --git a/src/SuperSocket.MQTT.Server/MQTTPipelineFilterFactory.cs b/src/SuperSocket.MQTT.Server/MQTTPipelineFilterFactory.cs
new file mode 100644
index 0000000..ce5e0c4
--- /dev/null
+++ b/src/SuperSocket.MQTT.Server/MQTTPipelineFilterFactory.cs
@@ -0,0 +1,19 @@
+using SuperSocket.ProtoBase;
+
+namespace SuperSocket.MQTT.Server
+{
+ ///
+ /// Factory for creating MQTTPipelineFilter instances with properly initialized decoders.
+ ///
+ public class MQTTPipelineFilterFactory : PipelineFilterFactoryBase
+ {
+ ///
+ /// Creates a new MQTTPipelineFilter instance with the decoder properly initialized.
+ ///
+ /// A new MQTTPipelineFilter instance.
+ protected override IPipelineFilter Create()
+ {
+ return new MQTTPipelineFilter();
+ }
+ }
+}
diff --git a/src/SuperSocket.MQTT.Server/SuperSocketHostBuilderExtensions.cs b/src/SuperSocket.MQTT.Server/SuperSocketHostBuilderExtensions.cs
index f832d83..750f3e4 100644
--- a/src/SuperSocket.MQTT.Server/SuperSocketHostBuilderExtensions.cs
+++ b/src/SuperSocket.MQTT.Server/SuperSocketHostBuilderExtensions.cs
@@ -29,12 +29,12 @@ public static class SuperSocketHostBuilderExtensions
public static ISuperSocketHostBuilder UseMQTT(this ISuperSocketHostBuilder builder)
{
return builder
- .UsePipelineFilter()
+ .UsePipelineFilterFactory()
.UseSession()
.UseCommand(options =>
{
- // Add all command classes from the current assembly
- options.AddCommandAssembly(typeof(MQTTPacket).Assembly);
+ // Add all command classes from the Server assembly
+ options.AddCommandAssembly(typeof(SuperSocketHostBuilderExtensions).Assembly);
})
.UseMiddleware(sp => sp.GetRequiredService())
.ConfigureServices((ctx, services) =>
diff --git a/src/SuperSocket.MQTT/MQTTPipelineFilter.cs b/src/SuperSocket.MQTT/MQTTPipelineFilter.cs
index 8d8f03b..0420fb1 100644
--- a/src/SuperSocket.MQTT/MQTTPipelineFilter.cs
+++ b/src/SuperSocket.MQTT/MQTTPipelineFilter.cs
@@ -18,6 +18,14 @@ public class MQTTPipelineFilter : IPipelineFilter
private int _headerParsed;
+ ///
+ /// Creates a new instance of MQTTPipelineFilter with a default MQTTPacketDecoder.
+ ///
+ public MQTTPipelineFilter()
+ {
+ Decoder = new MQTTPacketDecoder();
+ }
+
public MQTTPacket Filter(ref SequenceReader reader)
{
if (_currentLenUnit >= 0)
diff --git a/test/SuperSocket.MQTT.Tests/MQTTClientE2ETests.cs b/test/SuperSocket.MQTT.Tests/MQTTClientE2ETests.cs
new file mode 100644
index 0000000..a8cc144
--- /dev/null
+++ b/test/SuperSocket.MQTT.Tests/MQTTClientE2ETests.cs
@@ -0,0 +1,207 @@
+using System;
+using System.Collections.Generic;
+using System.Net;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.Hosting;
+using SuperSocket.MQTT.Packets;
+using SuperSocket.MQTT.Server;
+using SuperSocket.MQTT.Client;
+using SuperSocket.Server;
+using SuperSocket.Server.Host;
+
+namespace SuperSocket.MQTT.Tests
+{
+ ///
+ /// End-to-end integration tests using the MQTTClient to connect to the MQTT server.
+ /// These tests verify the full request/response flow between client and server.
+ ///
+ public class MQTTClientE2ETests : IAsyncLifetime
+ {
+ private IHost? _host;
+ private const int TestPort = 21883;
+ private readonly IPEndPoint _serverEndPoint = new IPEndPoint(IPAddress.Loopback, TestPort);
+
+ public async Task InitializeAsync()
+ {
+ _host = SuperSocketHostBuilder
+ .Create()
+ .UseMQTT()
+ .UseInProcSessionContainer()
+ .ConfigureAppConfiguration((hostCtx, configApp) =>
+ {
+ configApp.AddInMemoryCollection(new Dictionary
+ {
+ { "serverOptions:name", "MQTTTestServer" },
+ { "serverOptions:listeners:0:ip", "Any" },
+ { "serverOptions:listeners:0:port", TestPort.ToString() }
+ });
+ })
+ .Build();
+
+ await _host.StartAsync();
+ // Give the server time to start listening
+ await Task.Delay(100);
+ }
+
+ public async Task DisposeAsync()
+ {
+ if (_host != null)
+ {
+ await _host.StopAsync();
+ _host.Dispose();
+ }
+ }
+
+ [Fact]
+ public async Task E2E_ConnectToServer_ShouldReceiveConnAck()
+ {
+ // Arrange
+ await using var client = new MQTTClient();
+
+ // Act
+ var connected = await client.ConnectAsync(_serverEndPoint);
+ Assert.True(connected, "Should connect to server");
+
+ var connAck = await client.SendConnectAsync("TestClient_" + Guid.NewGuid().ToString("N")[..8]);
+
+ // Assert
+ Assert.NotNull(connAck);
+ Assert.Equal(ControlPacketType.CONNACK, connAck.Type);
+ Assert.Equal(0, connAck.ReturnCode); // Connection accepted
+ }
+
+ [Fact]
+ public async Task E2E_PingServer_ShouldReceivePingResp()
+ {
+ // Arrange
+ await using var client = new MQTTClient();
+ var connected = await client.ConnectAsync(_serverEndPoint);
+ Assert.True(connected, "Should connect to server");
+
+ await client.SendConnectAsync("TestClient_" + Guid.NewGuid().ToString("N")[..8]);
+
+ // Act
+ var pingResp = await client.SendPingAsync();
+
+ // Assert
+ Assert.NotNull(pingResp);
+ Assert.Equal(ControlPacketType.PINGRESP, pingResp.Type);
+ }
+
+ [Fact]
+ public async Task E2E_SubscribeToTopic_ShouldReceiveSubAck()
+ {
+ // Arrange
+ await using var client = new MQTTClient();
+ var connected = await client.ConnectAsync(_serverEndPoint);
+ Assert.True(connected, "Should connect to server");
+
+ await client.SendConnectAsync("TestClient_" + Guid.NewGuid().ToString("N")[..8]);
+
+ // Act
+ var subAck = await client.SendSubscribeAsync("test/topic", qos: 0);
+
+ // Assert
+ Assert.NotNull(subAck);
+ Assert.Equal(ControlPacketType.SUBACK, subAck.Type);
+ Assert.Single(subAck.ReturnCodes);
+ Assert.Equal(0, subAck.ReturnCodes[0]); // QoS 0 granted
+ }
+
+ [Fact]
+ public async Task E2E_SubscribeMultipleTopics_ShouldReceiveSubAckWithAllReturnCodes()
+ {
+ // Arrange
+ await using var client = new MQTTClient();
+ var connected = await client.ConnectAsync(_serverEndPoint);
+ Assert.True(connected, "Should connect to server");
+
+ await client.SendConnectAsync("TestClient_" + Guid.NewGuid().ToString("N")[..8]);
+
+ var topicFilters = new List
+ {
+ new TopicFilter { Topic = "home/temperature", QoS = 0 },
+ new TopicFilter { Topic = "home/humidity", QoS = 1 },
+ new TopicFilter { Topic = "home/pressure", QoS = 2 }
+ };
+
+ // Act
+ var subAck = await client.SendSubscribeAsync(topicFilters);
+
+ // Assert
+ Assert.NotNull(subAck);
+ Assert.Equal(ControlPacketType.SUBACK, subAck.Type);
+ Assert.Equal(3, subAck.ReturnCodes.Count);
+ Assert.Equal(0, subAck.ReturnCodes[0]); // QoS 0 granted
+ Assert.Equal(1, subAck.ReturnCodes[1]); // QoS 1 granted
+ Assert.Equal(2, subAck.ReturnCodes[2]); // QoS 2 granted
+ }
+
+ [Fact]
+ public async Task E2E_PublishQoS0_ShouldSucceed()
+ {
+ // Arrange
+ await using var client = new MQTTClient();
+ var connected = await client.ConnectAsync(_serverEndPoint);
+ Assert.True(connected, "Should connect to server");
+
+ await client.SendConnectAsync("TestClient_" + Guid.NewGuid().ToString("N")[..8]);
+
+ // Act - QoS 0 publish should not return an acknowledgement packet
+ var result = await client.SendPublishAsync("test/topic", Encoding.UTF8.GetBytes("Hello MQTT!"), qos: 0);
+
+ // Assert - for QoS 0, no acknowledgement is expected
+ Assert.Null(result);
+ }
+
+ [Fact]
+ public async Task E2E_DisconnectFromServer_ShouldComplete()
+ {
+ // Arrange
+ await using var client = new MQTTClient();
+ var connected = await client.ConnectAsync(_serverEndPoint);
+ Assert.True(connected, "Should connect to server");
+
+ await client.SendConnectAsync("TestClient_" + Guid.NewGuid().ToString("N")[..8]);
+
+ // Act - disconnect should complete without exception
+ await client.SendDisconnectAsync();
+
+ // Assert - if we get here without exception, the test passed
+ Assert.True(true);
+ }
+
+ [Fact]
+ public async Task E2E_FullFlow_ConnectSubscribePingDisconnect()
+ {
+ // Arrange
+ await using var client = new MQTTClient();
+
+ // Act & Assert - Connect
+ var connected = await client.ConnectAsync(_serverEndPoint);
+ Assert.True(connected, "Should connect to server");
+
+ // Act & Assert - MQTT Connect
+ var connAck = await client.SendConnectAsync("TestClient_FullFlow");
+ Assert.NotNull(connAck);
+ Assert.Equal(0, connAck.ReturnCode);
+
+ // Act & Assert - Subscribe
+ var subAck = await client.SendSubscribeAsync("test/fullflow", qos: 1);
+ Assert.NotNull(subAck);
+ Assert.Single(subAck.ReturnCodes);
+
+ // Act & Assert - Ping
+ var pingResp = await client.SendPingAsync();
+ Assert.NotNull(pingResp);
+ Assert.Equal(ControlPacketType.PINGRESP, pingResp.Type);
+
+ // Act & Assert - Disconnect
+ await client.SendDisconnectAsync();
+ }
+ }
+}
diff --git a/test/SuperSocket.MQTT.Tests/SuperSocket.MQTT.Tests.csproj b/test/SuperSocket.MQTT.Tests/SuperSocket.MQTT.Tests.csproj
index cb3fdde..8375f1e 100644
--- a/test/SuperSocket.MQTT.Tests/SuperSocket.MQTT.Tests.csproj
+++ b/test/SuperSocket.MQTT.Tests/SuperSocket.MQTT.Tests.csproj
@@ -20,6 +20,7 @@
+
From 1cfe5b5226ca3d7456c3e6d96708625e76ec9552 Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Sat, 31 Jan 2026 06:34:32 +0000
Subject: [PATCH 3/5] Fix code review issues in MQTTClient
Co-authored-by: kerryjiang <456060+kerryjiang@users.noreply.github.com>
---
src/SuperSocket.MQTT.Client/MQTTClient.cs | 4 ++--
test/SuperSocket.MQTT.Tests/MQTTClientE2ETests.cs | 3 ---
2 files changed, 2 insertions(+), 5 deletions(-)
diff --git a/src/SuperSocket.MQTT.Client/MQTTClient.cs b/src/SuperSocket.MQTT.Client/MQTTClient.cs
index a0c4727..c2896cc 100644
--- a/src/SuperSocket.MQTT.Client/MQTTClient.cs
+++ b/src/SuperSocket.MQTT.Client/MQTTClient.cs
@@ -15,7 +15,7 @@ namespace SuperSocket.MQTT.Client
public class MQTTClient : IAsyncDisposable
{
private readonly IEasyClient _client;
- private ushort _packetIdentifier = 0;
+ private ushort _packetIdentifier = 0; // Will be incremented to 1 before first use (0 is invalid per MQTT spec)
///
/// Creates a new MQTT client instance.
@@ -139,7 +139,7 @@ public async ValueTask SendPublishAsync(string topic, byte[] payload
var publishPacket = new PublishPacket
{
Type = ControlPacketType.PUBLISH,
- Flags = (byte)(((byte)ControlPacketType.PUBLISH << 4) | flags),
+ Flags = flags,
TopicName = topic,
PacketIdentifier = packetId,
Qos = qos,
diff --git a/test/SuperSocket.MQTT.Tests/MQTTClientE2ETests.cs b/test/SuperSocket.MQTT.Tests/MQTTClientE2ETests.cs
index a8cc144..eafe65b 100644
--- a/test/SuperSocket.MQTT.Tests/MQTTClientE2ETests.cs
+++ b/test/SuperSocket.MQTT.Tests/MQTTClientE2ETests.cs
@@ -170,9 +170,6 @@ public async Task E2E_DisconnectFromServer_ShouldComplete()
// Act - disconnect should complete without exception
await client.SendDisconnectAsync();
-
- // Assert - if we get here without exception, the test passed
- Assert.True(true);
}
[Fact]
From 89ffcf9dc1c4bb5f9deef2655de7fc9af8ded5ab Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Sat, 31 Jan 2026 06:51:59 +0000
Subject: [PATCH 4/5] Address PR review feedback: implement IPackageEncoder,
use singleton decoder, use executing assembly
Co-authored-by: kerryjiang <456060+kerryjiang@users.noreply.github.com>
---
src/SuperSocket.MQTT.Client/MQTTClient.cs | 24 ++++++++-------
.../MQTTPacketEncoder.cs | 30 ++++++++++++-------
.../MQTTPipelineFilterFactory.cs | 19 ------------
.../SuperSocketHostBuilderExtensions.cs | 7 +++--
src/SuperSocket.MQTT/MQTTPipelineFilter.cs | 22 ++++++++------
5 files changed, 50 insertions(+), 52 deletions(-)
delete mode 100644 src/SuperSocket.MQTT.Server/MQTTPipelineFilterFactory.cs
diff --git a/src/SuperSocket.MQTT.Client/MQTTClient.cs b/src/SuperSocket.MQTT.Client/MQTTClient.cs
index c2896cc..f8c1eaa 100644
--- a/src/SuperSocket.MQTT.Client/MQTTClient.cs
+++ b/src/SuperSocket.MQTT.Client/MQTTClient.cs
@@ -1,4 +1,5 @@
using System;
+using System.Buffers;
using System.Collections.Generic;
using System.Net;
using System.Threading;
@@ -15,6 +16,7 @@ namespace SuperSocket.MQTT.Client
public class MQTTClient : IAsyncDisposable
{
private readonly IEasyClient _client;
+ private readonly MQTTPacketEncoder _encoder = new MQTTPacketEncoder();
private ushort _packetIdentifier = 0; // Will be incremented to 1 before first use (0 is invalid per MQTT spec)
///
@@ -55,8 +57,7 @@ public async ValueTask SendConnectAsync(string clientId, short ke
KeepAlive = keepAlive
};
- var data = MQTTPacketEncoder.Encode(connectPacket);
- await _client.SendAsync(data);
+ await SendPacketAsync(connectPacket);
var response = await _client.ReceiveAsync();
return response as ConnAckPacket;
@@ -74,8 +75,7 @@ public async ValueTask SendPingAsync(CancellationToken cancellat
Type = ControlPacketType.PINGREQ
};
- var data = MQTTPacketEncoder.Encode(pingPacket);
- await _client.SendAsync(data);
+ await SendPacketAsync(pingPacket);
var response = await _client.ReceiveAsync();
return response as PingRespPacket;
@@ -98,8 +98,7 @@ public async ValueTask SendSubscribeAsync(IEnumerable
TopicFilters = new List(topicFilters)
};
- var data = MQTTPacketEncoder.Encode(subscribePacket);
- await _client.SendAsync(data);
+ await SendPacketAsync(subscribePacket);
var response = await _client.ReceiveAsync();
return response as SubAckPacket;
@@ -147,8 +146,7 @@ public async ValueTask SendPublishAsync(string topic, byte[] payload
Payload = new ReadOnlyMemory(payload)
};
- var data = MQTTPacketEncoder.Encode(publishPacket);
- await _client.SendAsync(data);
+ await SendPacketAsync(publishPacket);
if (qos > 0)
{
@@ -170,8 +168,7 @@ public async ValueTask SendDisconnectAsync(CancellationToken cancellationToken =
Type = ControlPacketType.DISCONNECT
};
- var data = MQTTPacketEncoder.Encode(disconnectPacket);
- await _client.SendAsync(data);
+ await SendPacketAsync(disconnectPacket);
}
///
@@ -201,6 +198,13 @@ public async ValueTask DisposeAsync()
await CloseAsync();
}
+ private async ValueTask SendPacketAsync(MQTTPacket packet)
+ {
+ var writer = new ArrayBufferWriter();
+ _encoder.Encode(writer, packet);
+ await _client.SendAsync(writer.WrittenMemory);
+ }
+
private ushort GetNextPacketIdentifier()
{
return ++_packetIdentifier;
diff --git a/src/SuperSocket.MQTT.Client/MQTTPacketEncoder.cs b/src/SuperSocket.MQTT.Client/MQTTPacketEncoder.cs
index fffac6a..811ced9 100644
--- a/src/SuperSocket.MQTT.Client/MQTTPacketEncoder.cs
+++ b/src/SuperSocket.MQTT.Client/MQTTPacketEncoder.cs
@@ -1,36 +1,39 @@
using System;
using System.Buffers;
+using SuperSocket.ProtoBase;
namespace SuperSocket.MQTT.Client
{
///
/// Encodes MQTT packets to bytes for transmission.
///
- public static class MQTTPacketEncoder
+ public class MQTTPacketEncoder : IPackageEncoder
{
///
- /// Encodes an MQTT packet to a byte array.
+ /// Encodes an MQTT packet into the specified buffer writer.
///
- /// The MQTT packet to encode.
- /// The encoded byte array.
- public static byte[] Encode(MQTTPacket packet)
+ /// The buffer writer to write the encoded packet to.
+ /// The MQTT packet to encode.
+ /// The number of bytes written to the buffer.
+ public int Encode(IBufferWriter writer, MQTTPacket pack)
{
- var writer = new ArrayBufferWriter();
+ var totalBytes = 0;
// First, encode the body to determine its length
var bodyWriter = new ArrayBufferWriter();
- var bodyLength = packet.EncodeBody(bodyWriter);
+ var bodyLength = pack.EncodeBody(bodyWriter);
var bodyData = bodyWriter.WrittenSpan;
// Calculate the fixed header
- var packetTypeAndFlags = ((byte)packet.Type << 4) | (packet.Flags & 0x0F);
+ var packetTypeAndFlags = ((byte)pack.Type << 4) | (pack.Flags & 0x0F);
// Write fixed header
writer.GetSpan(1)[0] = (byte)packetTypeAndFlags;
writer.Advance(1);
+ totalBytes++;
// Write remaining length (variable length encoding)
- WriteRemainingLength(writer, bodyLength);
+ totalBytes += WriteRemainingLength(writer, bodyLength);
// Write body
if (bodyLength > 0)
@@ -38,13 +41,15 @@ public static byte[] Encode(MQTTPacket packet)
var destSpan = writer.GetSpan(bodyLength);
bodyData.CopyTo(destSpan);
writer.Advance(bodyLength);
+ totalBytes += bodyLength;
}
- return writer.WrittenSpan.ToArray();
+ return totalBytes;
}
- private static void WriteRemainingLength(ArrayBufferWriter writer, int length)
+ private static int WriteRemainingLength(IBufferWriter writer, int length)
{
+ var bytesWritten = 0;
do
{
var encodedByte = length % 128;
@@ -57,8 +62,11 @@ private static void WriteRemainingLength(ArrayBufferWriter writer, int len
writer.GetSpan(1)[0] = (byte)encodedByte;
writer.Advance(1);
+ bytesWritten++;
}
while (length > 0);
+
+ return bytesWritten;
}
}
}
diff --git a/src/SuperSocket.MQTT.Server/MQTTPipelineFilterFactory.cs b/src/SuperSocket.MQTT.Server/MQTTPipelineFilterFactory.cs
deleted file mode 100644
index ce5e0c4..0000000
--- a/src/SuperSocket.MQTT.Server/MQTTPipelineFilterFactory.cs
+++ /dev/null
@@ -1,19 +0,0 @@
-using SuperSocket.ProtoBase;
-
-namespace SuperSocket.MQTT.Server
-{
- ///
- /// Factory for creating MQTTPipelineFilter instances with properly initialized decoders.
- ///
- public class MQTTPipelineFilterFactory : PipelineFilterFactoryBase
- {
- ///
- /// Creates a new MQTTPipelineFilter instance with the decoder properly initialized.
- ///
- /// A new MQTTPipelineFilter instance.
- protected override IPipelineFilter Create()
- {
- return new MQTTPipelineFilter();
- }
- }
-}
diff --git a/src/SuperSocket.MQTT.Server/SuperSocketHostBuilderExtensions.cs b/src/SuperSocket.MQTT.Server/SuperSocketHostBuilderExtensions.cs
index 750f3e4..b1acfe6 100644
--- a/src/SuperSocket.MQTT.Server/SuperSocketHostBuilderExtensions.cs
+++ b/src/SuperSocket.MQTT.Server/SuperSocketHostBuilderExtensions.cs
@@ -1,4 +1,5 @@
using System;
+using System.Reflection;
using Microsoft.Extensions.DependencyInjection;
using SuperSocket.Command;
using SuperSocket.Server;
@@ -29,12 +30,12 @@ public static class SuperSocketHostBuilderExtensions
public static ISuperSocketHostBuilder UseMQTT(this ISuperSocketHostBuilder builder)
{
return builder
- .UsePipelineFilterFactory()
+ .UsePipelineFilter()
.UseSession()
.UseCommand(options =>
{
- // Add all command classes from the Server assembly
- options.AddCommandAssembly(typeof(SuperSocketHostBuilderExtensions).Assembly);
+ // Add all command classes from the current executing assembly
+ options.AddCommandAssembly(Assembly.GetExecutingAssembly());
})
.UseMiddleware(sp => sp.GetRequiredService())
.ConfigureServices((ctx, services) =>
diff --git a/src/SuperSocket.MQTT/MQTTPipelineFilter.cs b/src/SuperSocket.MQTT/MQTTPipelineFilter.cs
index 0420fb1..cbbf1be 100644
--- a/src/SuperSocket.MQTT/MQTTPipelineFilter.cs
+++ b/src/SuperSocket.MQTT/MQTTPipelineFilter.cs
@@ -6,7 +6,19 @@ namespace SuperSocket.MQTT
{
public class MQTTPipelineFilter : IPipelineFilter
{
- public IPackageDecoder Decoder { get; set; }
+ ///
+ /// Singleton MQTTPacketDecoder instance shared across all MQTTPipelineFilter instances.
+ /// MQTTPacketDecoder is stateless, so a single instance can be safely reused.
+ ///
+ private static readonly MQTTPacketDecoder _sharedDecoder = new MQTTPacketDecoder();
+
+ private IPackageDecoder _decoder;
+
+ public IPackageDecoder Decoder
+ {
+ get => _decoder ?? _sharedDecoder;
+ set => _decoder = value;
+ }
public IPipelineFilter NextFilter { get; private set; }
@@ -18,14 +30,6 @@ public class MQTTPipelineFilter : IPipelineFilter
private int _headerParsed;
- ///
- /// Creates a new instance of MQTTPipelineFilter with a default MQTTPacketDecoder.
- ///
- public MQTTPipelineFilter()
- {
- Decoder = new MQTTPacketDecoder();
- }
-
public MQTTPacket Filter(ref SequenceReader reader)
{
if (_currentLenUnit >= 0)
From a8abb9d1753e87c2857bc0b28d2f1ec1d2b6f792 Mon Sep 17 00:00:00 2001
From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com>
Date: Sat, 31 Jan 2026 07:46:33 +0000
Subject: [PATCH 5/5] Use singleton encoder and send packages directly via
EasyClient
Co-authored-by: kerryjiang <456060+kerryjiang@users.noreply.github.com>
---
src/SuperSocket.MQTT.Client/MQTTClient.cs | 18 +++++-------------
.../MQTTPacketEncoder.cs | 7 +++++++
2 files changed, 12 insertions(+), 13 deletions(-)
diff --git a/src/SuperSocket.MQTT.Client/MQTTClient.cs b/src/SuperSocket.MQTT.Client/MQTTClient.cs
index f8c1eaa..a5d1945 100644
--- a/src/SuperSocket.MQTT.Client/MQTTClient.cs
+++ b/src/SuperSocket.MQTT.Client/MQTTClient.cs
@@ -16,7 +16,6 @@ namespace SuperSocket.MQTT.Client
public class MQTTClient : IAsyncDisposable
{
private readonly IEasyClient _client;
- private readonly MQTTPacketEncoder _encoder = new MQTTPacketEncoder();
private ushort _packetIdentifier = 0; // Will be incremented to 1 before first use (0 is invalid per MQTT spec)
///
@@ -57,7 +56,7 @@ public async ValueTask SendConnectAsync(string clientId, short ke
KeepAlive = keepAlive
};
- await SendPacketAsync(connectPacket);
+ await _client.SendAsync(MQTTPacketEncoder.Default, connectPacket);
var response = await _client.ReceiveAsync();
return response as ConnAckPacket;
@@ -75,7 +74,7 @@ public async ValueTask SendPingAsync(CancellationToken cancellat
Type = ControlPacketType.PINGREQ
};
- await SendPacketAsync(pingPacket);
+ await _client.SendAsync(MQTTPacketEncoder.Default, pingPacket);
var response = await _client.ReceiveAsync();
return response as PingRespPacket;
@@ -98,7 +97,7 @@ public async ValueTask SendSubscribeAsync(IEnumerable
TopicFilters = new List(topicFilters)
};
- await SendPacketAsync(subscribePacket);
+ await _client.SendAsync(MQTTPacketEncoder.Default, subscribePacket);
var response = await _client.ReceiveAsync();
return response as SubAckPacket;
@@ -146,7 +145,7 @@ public async ValueTask SendPublishAsync(string topic, byte[] payload
Payload = new ReadOnlyMemory(payload)
};
- await SendPacketAsync(publishPacket);
+ await _client.SendAsync(MQTTPacketEncoder.Default, publishPacket);
if (qos > 0)
{
@@ -168,7 +167,7 @@ public async ValueTask SendDisconnectAsync(CancellationToken cancellationToken =
Type = ControlPacketType.DISCONNECT
};
- await SendPacketAsync(disconnectPacket);
+ await _client.SendAsync(MQTTPacketEncoder.Default, disconnectPacket);
}
///
@@ -198,13 +197,6 @@ public async ValueTask DisposeAsync()
await CloseAsync();
}
- private async ValueTask SendPacketAsync(MQTTPacket packet)
- {
- var writer = new ArrayBufferWriter();
- _encoder.Encode(writer, packet);
- await _client.SendAsync(writer.WrittenMemory);
- }
-
private ushort GetNextPacketIdentifier()
{
return ++_packetIdentifier;
diff --git a/src/SuperSocket.MQTT.Client/MQTTPacketEncoder.cs b/src/SuperSocket.MQTT.Client/MQTTPacketEncoder.cs
index 811ced9..61312a6 100644
--- a/src/SuperSocket.MQTT.Client/MQTTPacketEncoder.cs
+++ b/src/SuperSocket.MQTT.Client/MQTTPacketEncoder.cs
@@ -6,9 +6,16 @@ namespace SuperSocket.MQTT.Client
{
///
/// Encodes MQTT packets to bytes for transmission.
+ /// This class is stateless and should be used as a singleton.
///
public class MQTTPacketEncoder : IPackageEncoder
{
+ ///
+ /// Singleton instance of the encoder.
+ /// MQTTPacketEncoder is stateless, so a single instance can be safely reused.
+ ///
+ public static readonly MQTTPacketEncoder Default = new MQTTPacketEncoder();
+
///
/// Encodes an MQTT packet into the specified buffer writer.
///