Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<PackageVersion Include="SuperSocket.ProtoBase" Version="2.0.1" />
<PackageVersion Include="SuperSocket.Server" Version="2.0.1" />
<PackageVersion Include="SuperSocket.Command" Version="2.0.1" />
<PackageVersion Include="SuperSocket.Client" Version="2.0.1" />
<!-- Build and development tools -->
<PackageVersion Include="Microsoft.SourceLink.GitHub" Version="1.1.1" />
<!-- Test packages -->
Expand Down
15 changes: 15 additions & 0 deletions SuperSocket.MQTT.sln
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SuperSocket.MQTT", "src\Sup
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SuperSocket.MQTT.Server", "src\SuperSocket.MQTT.Server\SuperSocket.MQTT.Server.csproj", "{B28F3060-D93F-4ED4-A24C-B9C1D4C7ED09}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SuperSocket.MQTT.Client", "src\SuperSocket.MQTT.Client\SuperSocket.MQTT.Client.csproj", "{A1B2C3D4-E5F6-4789-ABCD-EF0123456789}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{0C88DD14-F956-CE84-757C-A364CCF449FC}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SuperSocket.MQTT.Tests", "test\SuperSocket.MQTT.Tests\SuperSocket.MQTT.Tests.csproj", "{23CC3568-7338-490F-96DD-D5731B3AF059}"
Expand Down Expand Up @@ -47,6 +49,18 @@ Global
{B28F3060-D93F-4ED4-A24C-B9C1D4C7ED09}.Release|x64.Build.0 = Release|Any CPU
{B28F3060-D93F-4ED4-A24C-B9C1D4C7ED09}.Release|x86.ActiveCfg = Release|Any CPU
{B28F3060-D93F-4ED4-A24C-B9C1D4C7ED09}.Release|x86.Build.0 = Release|Any CPU
{A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Debug|x64.ActiveCfg = Debug|Any CPU
{A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Debug|x64.Build.0 = Debug|Any CPU
{A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Debug|x86.ActiveCfg = Debug|Any CPU
{A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Debug|x86.Build.0 = Debug|Any CPU
{A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Release|Any CPU.Build.0 = Release|Any CPU
{A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Release|x64.ActiveCfg = Release|Any CPU
{A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Release|x64.Build.0 = Release|Any CPU
{A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Release|x86.ActiveCfg = Release|Any CPU
{A1B2C3D4-E5F6-4789-ABCD-EF0123456789}.Release|x86.Build.0 = Release|Any CPU
{23CC3568-7338-490F-96DD-D5731B3AF059}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{23CC3568-7338-490F-96DD-D5731B3AF059}.Debug|Any CPU.Build.0 = Debug|Any CPU
{23CC3568-7338-490F-96DD-D5731B3AF059}.Debug|x64.ActiveCfg = Debug|Any CPU
Expand All @@ -66,6 +80,7 @@ Global
GlobalSection(NestedProjects) = preSolution
{549EA105-D941-4923-85B8-FA777444A1A9} = {44759F26-AD7B-4D06-BBCF-C70D5AB2C4BE}
{B28F3060-D93F-4ED4-A24C-B9C1D4C7ED09} = {44759F26-AD7B-4D06-BBCF-C70D5AB2C4BE}
{A1B2C3D4-E5F6-4789-ABCD-EF0123456789} = {44759F26-AD7B-4D06-BBCF-C70D5AB2C4BE}
{23CC3568-7338-490F-96DD-D5731B3AF059} = {0C88DD14-F956-CE84-757C-A364CCF449FC}
EndGlobalSection
EndGlobal
205 changes: 205 additions & 0 deletions src/SuperSocket.MQTT.Client/MQTTClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using SuperSocket.Client;
using SuperSocket.MQTT.Packets;

namespace SuperSocket.MQTT.Client
{
/// <summary>
/// MQTT client implementation based on SuperSocket.Client.
/// Uses the same pipeline filter as the MQTT server for protocol compatibility.
/// </summary>
public class MQTTClient : IAsyncDisposable
{
private readonly IEasyClient<MQTTPacket> _client;
private ushort _packetIdentifier = 0; // Will be incremented to 1 before first use (0 is invalid per MQTT spec)

/// <summary>
/// Creates a new MQTT client instance.
/// </summary>
public MQTTClient()
{
var pipelineFilter = new MQTTPipelineFilter();
_client = new EasyClient<MQTTPacket>(pipelineFilter);
}

/// <summary>
/// Connects to an MQTT broker at the specified endpoint.
/// </summary>
/// <param name="endPoint">The endpoint of the MQTT broker.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>True if the connection was successful, false otherwise.</returns>
public async ValueTask<bool> ConnectAsync(EndPoint endPoint, CancellationToken cancellationToken = default)
{
return await _client.ConnectAsync(endPoint, cancellationToken);
}

/// <summary>
/// Sends an MQTT CONNECT packet and waits for CONNACK response.
/// </summary>
/// <param name="clientId">The client identifier.</param>
/// <param name="keepAlive">Keep alive interval in seconds.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The CONNACK packet received from the broker.</returns>
public async ValueTask<ConnAckPacket> SendConnectAsync(string clientId, short keepAlive = 60, CancellationToken cancellationToken = default)
{
var connectPacket = new ConnectPacket
{
Type = ControlPacketType.CONNECT,
ProtocolName = "MQTT",
ProtocolLevel = 4,
ClientId = clientId,
KeepAlive = keepAlive
};

await _client.SendAsync(MQTTPacketEncoder.Default, connectPacket);

var response = await _client.ReceiveAsync();
return response as ConnAckPacket;
}

/// <summary>
/// Sends an MQTT PINGREQ packet and waits for PINGRESP response.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The PINGRESP packet received from the broker.</returns>
public async ValueTask<PingRespPacket> SendPingAsync(CancellationToken cancellationToken = default)
{
var pingPacket = new PingReqPacket
{
Type = ControlPacketType.PINGREQ
};

await _client.SendAsync(MQTTPacketEncoder.Default, pingPacket);

var response = await _client.ReceiveAsync();
return response as PingRespPacket;
}

/// <summary>
/// Sends an MQTT SUBSCRIBE packet and waits for SUBACK response.
/// </summary>
/// <param name="topicFilters">The topics to subscribe to.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The SUBACK packet received from the broker.</returns>
public async ValueTask<SubAckPacket> SendSubscribeAsync(IEnumerable<TopicFilter> topicFilters, CancellationToken cancellationToken = default)
{
var packetId = GetNextPacketIdentifier();
var subscribePacket = new SubscribePacket
{
Type = ControlPacketType.SUBSCRIBE,
Flags = 0x02, // SUBSCRIBE must have flag bits set to 0010
PacketIdentifier = packetId,
TopicFilters = new List<TopicFilter>(topicFilters)
};

await _client.SendAsync(MQTTPacketEncoder.Default, subscribePacket);

var response = await _client.ReceiveAsync();
return response as SubAckPacket;
}

/// <summary>
/// Sends an MQTT SUBSCRIBE packet for a single topic and waits for SUBACK response.
/// </summary>
/// <param name="topic">The topic to subscribe to.</param>
/// <param name="qos">The QoS level for the subscription.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The SUBACK packet received from the broker.</returns>
public async ValueTask<SubAckPacket> SendSubscribeAsync(string topic, byte qos = 0, CancellationToken cancellationToken = default)
{
return await SendSubscribeAsync(new[] { new TopicFilter { Topic = topic, QoS = qos } }, cancellationToken);
}

/// <summary>
/// Sends an MQTT PUBLISH packet.
/// </summary>
/// <param name="topic">The topic to publish to.</param>
/// <param name="payload">The message payload.</param>
/// <param name="qos">The QoS level.</param>
/// <param name="retain">Whether to retain the message.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>A task representing the async operation. For QoS > 0, returns the acknowledgement packet.</returns>
public async ValueTask<MQTTPacket> SendPublishAsync(string topic, byte[] payload, byte qos = 0, bool retain = false, CancellationToken cancellationToken = default)
{
var packetId = qos > 0 ? GetNextPacketIdentifier() : (ushort)0;

byte flags = (byte)((qos & 0x03) << 1);
if (retain)
{
flags |= 0x01;
}

var publishPacket = new PublishPacket
{
Type = ControlPacketType.PUBLISH,
Flags = flags,
TopicName = topic,
PacketIdentifier = packetId,
Qos = qos,
Retain = retain,
Payload = new ReadOnlyMemory<byte>(payload)
};

await _client.SendAsync(MQTTPacketEncoder.Default, publishPacket);

if (qos > 0)
{
var response = await _client.ReceiveAsync();
return response;
}

return null;
}

/// <summary>
/// Sends an MQTT DISCONNECT packet.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
public async ValueTask SendDisconnectAsync(CancellationToken cancellationToken = default)
{
var disconnectPacket = new DisconnectPacket
{
Type = ControlPacketType.DISCONNECT
};

await _client.SendAsync(MQTTPacketEncoder.Default, disconnectPacket);
}

/// <summary>
/// Receives an MQTT packet from the broker.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The received MQTT packet.</returns>
public async ValueTask<MQTTPacket> ReceiveAsync(CancellationToken cancellationToken = default)
{
return await _client.ReceiveAsync();
}

/// <summary>
/// Closes the connection to the broker.
/// </summary>
/// <returns>A task representing the async close operation.</returns>
public async ValueTask CloseAsync()
{
await _client.CloseAsync();
}

/// <summary>
/// Disposes the MQTT client asynchronously.
/// </summary>
public async ValueTask DisposeAsync()
{
await CloseAsync();
}

private ushort GetNextPacketIdentifier()
{
return ++_packetIdentifier;
}
}
}
79 changes: 79 additions & 0 deletions src/SuperSocket.MQTT.Client/MQTTPacketEncoder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
using System;
using System.Buffers;
using SuperSocket.ProtoBase;

namespace SuperSocket.MQTT.Client
{
/// <summary>
/// Encodes MQTT packets to bytes for transmission.
/// This class is stateless and should be used as a singleton.
/// </summary>
public class MQTTPacketEncoder : IPackageEncoder<MQTTPacket>
{
/// <summary>
/// Singleton instance of the encoder.
/// MQTTPacketEncoder is stateless, so a single instance can be safely reused.
/// </summary>
public static readonly MQTTPacketEncoder Default = new MQTTPacketEncoder();

/// <summary>
/// Encodes an MQTT packet into the specified buffer writer.
/// </summary>
/// <param name="writer">The buffer writer to write the encoded packet to.</param>
/// <param name="pack">The MQTT packet to encode.</param>
/// <returns>The number of bytes written to the buffer.</returns>
public int Encode(IBufferWriter<byte> writer, MQTTPacket pack)
{
var totalBytes = 0;

// First, encode the body to determine its length
var bodyWriter = new ArrayBufferWriter<byte>();
var bodyLength = pack.EncodeBody(bodyWriter);
var bodyData = bodyWriter.WrittenSpan;

// Calculate the fixed header
var packetTypeAndFlags = ((byte)pack.Type << 4) | (pack.Flags & 0x0F);

// Write fixed header
writer.GetSpan(1)[0] = (byte)packetTypeAndFlags;
writer.Advance(1);
totalBytes++;

// Write remaining length (variable length encoding)
totalBytes += WriteRemainingLength(writer, bodyLength);

// Write body
if (bodyLength > 0)
{
var destSpan = writer.GetSpan(bodyLength);
bodyData.CopyTo(destSpan);
writer.Advance(bodyLength);
totalBytes += bodyLength;
}

return totalBytes;
}

private static int WriteRemainingLength(IBufferWriter<byte> writer, int length)
{
var bytesWritten = 0;
do
{
var encodedByte = length % 128;
length /= 128;

if (length > 0)
{
encodedByte |= 0x80;
}

writer.GetSpan(1)[0] = (byte)encodedByte;
writer.Advance(1);
bytesWritten++;
}
while (length > 0);

return bytesWritten;
}
}
}
6 changes: 6 additions & 0 deletions src/SuperSocket.MQTT.Client/SuperSocket.MQTT.Client.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<ProjectReference Include="../SuperSocket.MQTT/SuperSocket.MQTT.csproj" />
<PackageReference Include="SuperSocket.Client" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Reflection;
using Microsoft.Extensions.DependencyInjection;
using SuperSocket.Command;
using SuperSocket.Server;
Expand Down Expand Up @@ -33,8 +34,8 @@ public static ISuperSocketHostBuilder<MQTTPacket> UseMQTT(this ISuperSocketHostB
.UseSession<MQTTSession>()
.UseCommand<ControlPacketType, MQTTPacket>(options =>
{
// Add all command classes from the current assembly
options.AddCommandAssembly(typeof(MQTTPacket).Assembly);
// Add all command classes from the current executing assembly
options.AddCommandAssembly(Assembly.GetExecutingAssembly());
})
.UseMiddleware<TopicMiddleware>(sp => sp.GetRequiredService<TopicMiddleware>())
.ConfigureServices((ctx, services) =>
Expand Down
14 changes: 13 additions & 1 deletion src/SuperSocket.MQTT/MQTTPipelineFilter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,19 @@ namespace SuperSocket.MQTT
{
public class MQTTPipelineFilter : IPipelineFilter<MQTTPacket>
{
public IPackageDecoder<MQTTPacket> Decoder { get; set; }
/// <summary>
/// Singleton MQTTPacketDecoder instance shared across all MQTTPipelineFilter instances.
/// MQTTPacketDecoder is stateless, so a single instance can be safely reused.
/// </summary>
private static readonly MQTTPacketDecoder _sharedDecoder = new MQTTPacketDecoder();

private IPackageDecoder<MQTTPacket> _decoder;

public IPackageDecoder<MQTTPacket> Decoder
{
get => _decoder ?? _sharedDecoder;
set => _decoder = value;
}

public IPipelineFilter<MQTTPacket> NextFilter { get; private set; }

Expand Down
Loading
Loading