Skip to content

Commit b2ff3f7

Browse files
committed
Add TCP network components that can configure properties
1 parent 3ded4d8 commit b2ff3f7

37 files changed

+1914
-2
lines changed

src/Surging.Core/Surging.Core.CPlatform/Configurations/ProtocolPortOptions.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,7 @@ public class ProtocolPortOptions
1111
public int GrpcPort { get; set; }
1212

1313
public int UdpPort { get; set; }
14+
15+
public int TcpPort { get; set; }
1416
}
1517
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Text;
5+
using System.Threading.Tasks;
6+
7+
namespace Surging.Core.CPlatform.Network
8+
{
9+
public interface INetwork
10+
{
11+
12+
string Id { get; set; }
13+
14+
Task StartAsync();
15+
/**
16+
* @return 网络类型
17+
* @see DefaultNetworkType
18+
*/
19+
NetworkType GetType();
20+
21+
/**
22+
* 关闭网络组件
23+
*/
24+
void Shutdown();
25+
26+
/**
27+
* @return 是否存活
28+
*/
29+
bool IsAlive();
30+
31+
/**
32+
* 当{@link Network#isAlive()}为false是,是否自动重新加载.
33+
*
34+
* @return 是否重新加载
35+
* @see NetworkProvider#reload(Network, Object)
36+
*/
37+
bool IsAutoReload();
38+
}
39+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Text;
5+
using System.Threading.Tasks;
6+
7+
namespace Surging.Core.CPlatform.Network
8+
{
9+
public interface INetworkProvider<T>
10+
{
11+
12+
/**
13+
* @return 类型
14+
* @see DefaultNetworkType
15+
*/
16+
NetworkType GetNetworkType();
17+
18+
INetwork CreateNetwork(T properties);
19+
20+
/**
21+
* 重新加载网络组件
22+
*
23+
* @param network 网络组件
24+
* @param properties 配置信息
25+
*/
26+
void Reload(INetwork network, T properties);
27+
28+
29+
IDictionary<string, object> GetConfigMetadata();
30+
}
31+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Text;
5+
using System.Threading.Tasks;
6+
7+
namespace Surging.Core.CPlatform.Network
8+
{
9+
public enum NetworkType
10+
{
11+
TcpClient,//TCP客户端
12+
TcpServer,//TCP服务
13+
MqttClient,//MQTT客户端
14+
MqttServer,//MQTT服务
15+
HttpClient,//HTTP客户端
16+
HttpServer,//HTTP服务
17+
WebSocketClient,//WebSocket客户端
18+
WebSocketServer,//WebSocket服务
19+
UDP,//UDP
20+
CoapClient,//CoAP客户端
21+
CoapServer//CoAP服务
22+
}
23+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
using DotNetty.Buffers;
2+
using DotNetty.Handlers.Timeout;
3+
using DotNetty.Transport.Channels;
4+
using Microsoft.Extensions.Logging;
5+
using Surging.Core.CPlatform.Network;
6+
using Surging.Core.Protocol.Tcp.Runtime;
7+
using System;
8+
using System.Collections.Generic;
9+
using System.Linq;
10+
using System.Text;
11+
using System.Threading.Tasks;
12+
13+
namespace Surging.Core.Protocol.Tcp.Adapter
14+
{
15+
public class ConnectionChannelHandlerAdapter : ChannelHandlerAdapter
16+
{
17+
private readonly ILogger _logger;
18+
private readonly IDeviceProvider _deviceProvider;
19+
private readonly ITcpServiceEntryProvider _tcpServiceEntryProvider;
20+
private readonly TcpServerProperties _tcpServerProperties;
21+
public ConnectionChannelHandlerAdapter(ILogger logger, IDeviceProvider deviceProvider, ITcpServiceEntryProvider tcpServiceEntryProvider, TcpServerProperties tcpServerProperties)
22+
{
23+
_logger = logger;
24+
_deviceProvider = deviceProvider;
25+
_tcpServiceEntryProvider = tcpServiceEntryProvider;
26+
_tcpServerProperties= tcpServerProperties;
27+
28+
}
29+
30+
31+
public override void ChannelActive(IChannelHandlerContext ctx)
32+
{
33+
_deviceProvider.Register(ctx);
34+
var tcpEntry=_tcpServiceEntryProvider.GetEntry();
35+
tcpEntry.Behavior.DeviceStatusProcess(DeviceStatus.Connected, ctx.Channel.Id.AsLongText(), _tcpServerProperties);
36+
if (_logger.IsEnabled(LogLevel.Information))
37+
_logger.LogInformation("channel active:" + ctx.Channel.RemoteAddress);
38+
39+
}
40+
41+
public override void ChannelInactive(IChannelHandlerContext ctx)
42+
{
43+
_deviceProvider.Unregister(ctx);
44+
var tcpEntry = _tcpServiceEntryProvider.GetEntry();
45+
tcpEntry.Behavior.DeviceStatusProcess(DeviceStatus.Closed, ctx.Channel.Id.AsLongText(), _tcpServerProperties);
46+
if (_logger.IsEnabled(LogLevel.Information))
47+
_logger.LogInformation("channel inactive:" + ctx.Channel.RemoteAddress);
48+
49+
}
50+
51+
52+
public override void ExceptionCaught(IChannelHandlerContext ctx, Exception exception)
53+
{
54+
_deviceProvider.Unregister(ctx);
55+
var tcpEntry = _tcpServiceEntryProvider.GetEntry();
56+
tcpEntry.Behavior.DeviceStatusProcess(DeviceStatus.Abnormal, ctx.Channel.Id.AsLongText(), _tcpServerProperties);
57+
if (_logger.IsEnabled(LogLevel.Error))
58+
_logger.LogError("channel exceptionCaught:" + ctx.Channel.RemoteAddress, exception);
59+
60+
}
61+
}
62+
}
63+
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
using DotNetty.Buffers;
2+
using DotNetty.Codecs;
3+
using DotNetty.Transport.Channels;
4+
using System;
5+
using System.Collections.Generic;
6+
using System.Linq;
7+
using System.Text;
8+
using System.Threading.Tasks;
9+
10+
namespace Surging.Core.Protocol.Tcp.Codecs
11+
{
12+
public class FixedLengthFrameDecoder : ByteToMessageDecoder
13+
{
14+
private readonly int frameLength;
15+
public FixedLengthFrameDecoder(int frameLength)
16+
{
17+
if (frameLength <= 0)
18+
throw new ArgumentOutOfRangeException(nameof(frameLength));
19+
this.frameLength = frameLength;
20+
}
21+
protected override void Decode(IChannelHandlerContext ctx, IByteBuffer input, List<object> output)
22+
{
23+
object decoded = this.Decode(ctx, input);
24+
if (decoded != null)
25+
output.Add(decoded);
26+
}
27+
28+
protected virtual object Decode(IChannelHandlerContext ctx, IByteBuffer buffer)
29+
{
30+
if (buffer.ReadableBytes < frameLength)
31+
{
32+
return null;
33+
}
34+
else
35+
{
36+
return buffer.ReadRetainedSlice(frameLength);
37+
}
38+
}
39+
}
40+
}
41+
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
using DotNetty.Buffers;
2+
using DotNetty.Transport.Channels;
3+
using Surging.Core.CPlatform.Messages;
4+
using Surging.Core.CPlatform.Transport;
5+
using Surging.Core.Protocol.Tcp.Runtime;
6+
using System;
7+
using System.Collections.Generic;
8+
using System.Linq;
9+
using System.Text;
10+
using System.Threading.Tasks;
11+
12+
namespace Surging.Core.Protocol.Tcp
13+
{
14+
public abstract class DotNettyTcpMessageSender
15+
{
16+
17+
protected DotNettyTcpMessageSender()
18+
{
19+
}
20+
21+
protected IByteBuffer GetByteBuffer(TransportMessage message)
22+
{
23+
var data = message.GetContent<byte[]>();
24+
return Unpooled.WrappedBuffer(data);
25+
}
26+
}
27+
28+
#region Implementation of IMessageSender
29+
30+
/// <summary>
31+
/// 基于DotNetty服务端的消息发送者。
32+
/// </summary>
33+
public class DotNettyTcpServerMessageSender : DotNettyTcpMessageSender, IMessageSender
34+
{
35+
private readonly IChannelHandlerContext _context;
36+
37+
public DotNettyTcpServerMessageSender(IChannelHandlerContext context) : base()
38+
{
39+
_context = context;
40+
}
41+
42+
/// <summary>
43+
/// 发送消息。
44+
/// </summary>
45+
/// <param name="message">消息内容。</param>
46+
/// <returns>一个任务。</returns>
47+
public async Task SendAsync(TransportMessage message)
48+
{
49+
var buffer = GetByteBuffer(message);
50+
await _context.WriteAsync(buffer);
51+
}
52+
53+
/// <summary>
54+
/// 发送消息并清空缓冲区。
55+
/// </summary>
56+
/// <param name="message">消息内容。</param>
57+
/// <returns>一个任务。</returns>
58+
public async Task SendAndFlushAsync(TransportMessage message)
59+
{
60+
var buffer = GetByteBuffer(message);
61+
if (_context.Channel.RemoteAddress != null)
62+
await _context.WriteAndFlushAsync(buffer);
63+
}
64+
65+
66+
}
67+
#endregion Implementation of IMessageSender
68+
69+
public class TcpServerMessageSender : ITcpMessageSender
70+
{
71+
private readonly IChannelHandlerContext _context;
72+
73+
public TcpServerMessageSender(IChannelHandlerContext context) : base()
74+
{
75+
_context = context;
76+
}
77+
78+
/// <summary>
79+
/// 发送消息。
80+
/// </summary>
81+
/// <param name="message">消息内容。</param>
82+
/// <returns>一个任务。</returns>
83+
public async Task SendAsync(object message, Encoding encoding)
84+
{
85+
if (message != null)
86+
{
87+
var buffer = Unpooled.WrappedBuffer(encoding.GetBytes(message.ToString()));
88+
await SendAsync(buffer);
89+
}
90+
}
91+
92+
/// <summary>
93+
/// 发送消息并清空缓冲区。
94+
/// </summary>
95+
/// <param name="message">消息内容。</param>
96+
/// <returns>一个任务。</returns>
97+
public async Task SendAndFlushAsync(object message, Encoding encoding)
98+
{
99+
if (message != null)
100+
{
101+
var buffer = Unpooled.WrappedBuffer(encoding.GetBytes(message.ToString()));
102+
await SendAndFlushAsync(buffer);
103+
}
104+
}
105+
106+
public async Task SendAsync(object message)
107+
{
108+
await SendAsync(message, Encoding.UTF8);
109+
}
110+
111+
public async Task SendAndFlushAsync(object message)
112+
{
113+
await SendAndFlushAsync(message, Encoding.UTF8);
114+
}
115+
116+
public async Task SendAndFlushAsync(IByteBuffer buffer)
117+
{
118+
if (_context.Channel.RemoteAddress != null)
119+
await _context.WriteAndFlushAsync(buffer);
120+
}
121+
122+
public async Task SendAsync(IByteBuffer buffer)
123+
{
124+
if (_context.Channel.RemoteAddress != null)
125+
await _context.WriteAsync(buffer);
126+
}
127+
}
128+
}

0 commit comments

Comments
 (0)