Skip to content

Commit 9a1206f

Browse files
committed
Add ChannelAuchenticateTimeout Event.
1 parent d7617e0 commit 9a1206f

File tree

6 files changed

+112
-92
lines changed

6 files changed

+112
-92
lines changed
Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
using Microsoft.AspNetCore.Http;
2-
using Microsoft.Extensions.Logging;
32
using Quick.Protocol.Utils;
43
using System;
54
using System.Collections;
@@ -12,7 +11,7 @@ namespace Quick.Protocol.WebSocket.Server.AspNetCore
1211
public class QpWebSocketServer : QpServer
1312
{
1413
private Queue<WebSocketContext> webSocketContextQueue = new Queue<WebSocketContext>();
15-
private AutoResetEvent waitForConnectionAutoResetEvent;
14+
private bool isStarted = false;
1615

1716
private class WebSocketContext
1817
{
@@ -32,14 +31,18 @@ public QpWebSocketServer(QpWebSocketServerOptions options) : base(options) { }
3231

3332
public override void Start()
3433
{
35-
waitForConnectionAutoResetEvent = new AutoResetEvent(false);
34+
isStarted=true;
3635
lock (webSocketContextQueue)
3736
webSocketContextQueue.Clear();
3837
base.Start();
3938
}
4039

4140
public Task OnNewConnection(System.Net.WebSockets.WebSocket webSocket, ConnectionInfo connectionInfo)
4241
{
42+
//如果还没有开始接收,则直接关闭
43+
if (!isStarted)
44+
return webSocket.CloseAsync(System.Net.WebSockets.WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);
45+
4346
var connectionInfoStr = $"WebSocket:{connectionInfo.RemoteIpAddress}:{connectionInfo.RemotePort}";
4447
var cts = new CancellationTokenSource();
4548
lock (webSocketContextQueue)
@@ -48,7 +51,6 @@ public Task OnNewConnection(System.Net.WebSockets.WebSocket webSocket, Connectio
4851
connectionInfoStr,
4952
webSocket,
5053
cts));
51-
waitForConnectionAutoResetEvent.Set();
5254
return Task.Delay(-1, cts.Token).ContinueWith(t =>
5355
{
5456
if (LogUtils.LogConnection)
@@ -58,40 +60,42 @@ public Task OnNewConnection(System.Net.WebSockets.WebSocket webSocket, Connectio
5860

5961
public override void Stop()
6062
{
63+
isStarted=false;
6164
lock (webSocketContextQueue)
6265
webSocketContextQueue.Clear();
63-
waitForConnectionAutoResetEvent?.Dispose();
6466
base.Stop();
6567
}
6668

67-
protected override Task InnerAcceptAsync(CancellationToken token)
69+
protected override async Task InnerAcceptAsync(CancellationToken token)
6870
{
69-
return Task.Run(() =>
71+
WebSocketContext[] webSocketContexts = null;
72+
lock (webSocketContextQueue)
73+
{
74+
webSocketContexts = webSocketContextQueue.ToArray();
75+
webSocketContextQueue.Clear();
76+
}
77+
//如果当前没有WebSocket连接,则等待0.1秒后再返回
78+
if (webSocketContexts == null || webSocketContexts.Length==0)
79+
{
80+
await Task.Delay(100);
81+
return;
82+
}
83+
foreach (var context in webSocketContexts)
7084
{
71-
waitForConnectionAutoResetEvent.WaitOne();
72-
WebSocketContext[] webSocketContexts = null;
73-
lock (webSocketContextQueue)
85+
try
7486
{
75-
webSocketContexts = webSocketContextQueue.ToArray();
76-
webSocketContextQueue.Clear();
87+
if (LogUtils.LogConnection)
88+
Console.WriteLine("[Connection]{0} connected.", context.ConnectionInfo);
89+
OnNewChannelConnected(new WebSocketServerStream(context.WebSocket, context.Cts), context.ConnectionInfo, token);
7790
}
78-
foreach (var context in webSocketContexts)
91+
catch (Exception ex)
7992
{
80-
try
81-
{
82-
if (LogUtils.LogConnection)
83-
Console.WriteLine("[Connection]{0} connected.", context.ConnectionInfo);
84-
OnNewChannelConnected(new WebSocketServerStream(context.WebSocket,context.Cts), context.ConnectionInfo, token);
85-
}
86-
catch (Exception ex)
87-
{
88-
if (LogUtils.LogConnection)
89-
Console.WriteLine("[Connection]Init&Start Channel error,reason:{0}", ex.ToString());
90-
try { context.WebSocket.CloseAsync(System.Net.WebSockets.WebSocketCloseStatus.InternalServerError, ex.Message, CancellationToken.None); }
91-
catch { }
92-
}
93+
if (LogUtils.LogConnection)
94+
Console.WriteLine("[Connection]Init&Start Channel error,reason:{0}", ex.ToString());
95+
try { await context.WebSocket.CloseAsync(System.Net.WebSockets.WebSocketCloseStatus.InternalServerError, ex.Message, CancellationToken.None); }
96+
catch { }
9397
}
94-
});
98+
}
9599
}
96100
}
97101
}

Quick.Protocol/QpChannel.cs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,30 @@ public abstract class QpChannel
5757
private ConcurrentDictionary<string, CommandContext> commandDict = new ConcurrentDictionary<string, CommandContext>();
5858

5959
/// <summary>
60-
/// 当时是否连接
60+
/// 当前是否连接,要连接且认证通过后,才设置此属性为true
6161
/// </summary>
6262
public bool IsConnected { get; protected set; }
6363

64+
/// <summary>
65+
/// 连接断开时
66+
/// </summary>
67+
public event EventHandler Disconnected;
68+
69+
/// <summary>
70+
/// 断开连接时
71+
/// </summary>
72+
protected virtual void Disconnect()
73+
{
74+
lock (this)
75+
{
76+
if (IsConnected)
77+
{
78+
IsConnected = false;
79+
Disconnected?.Invoke(this, QpEventArgs.Empty);
80+
}
81+
}
82+
}
83+
6484
/// <summary>
6585
/// 最后的异常
6686
/// </summary>
@@ -182,6 +202,7 @@ protected virtual void OnReadError(Exception exception)
182202
LastException = exception;
183203
LogUtils.Log("[ReadError]{0}: {1}", DateTime.Now, ExceptionUtils.GetExceptionString(exception));
184204
InitQpPackageHandler_Stream(null);
205+
Disconnect();
185206
}
186207

187208
//获取空闲的缓存

Quick.Protocol/QpClient.cs

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,7 @@ namespace Quick.Protocol
1212
public abstract class QpClient : QpChannel
1313
{
1414
private CancellationTokenSource cts = null;
15-
public QpClientOptions Options { get; private set; }
16-
17-
/// <summary>
18-
/// 连接断开时
19-
/// </summary>
20-
public event EventHandler Disconnected;
15+
public QpClientOptions Options { get; private set; }
2116

2217
public QpClient(QpClientOptions options)
2318
: base(options)
@@ -39,8 +34,6 @@ public async Task ConnectAsync()
3934
var token = cts.Token;
4035

4136
var stream = await InnerConnectAsync();
42-
IsConnected = true;
43-
4437
//初始化网络
4538
InitQpPackageHandler_Stream(stream);
4639

@@ -69,6 +62,7 @@ public async Task ConnectAsync()
6962
}, 5000, () =>
7063
{
7164
Options.OnAuthPassed();
65+
IsConnected=true;
7266
});
7367

7468
//开始心跳
@@ -96,23 +90,15 @@ private void cancellAll()
9690
}
9791
}
9892

99-
protected virtual void Disconnect()
100-
{
101-
if (IsConnected)
102-
{
103-
IsConnected = false;
104-
Disconnected?.Invoke(this, QpEventArgs.Empty);
105-
}
106-
}
107-
10893
/// <summary>
10994
/// 关闭连接
11095
/// </summary>
11196
public void Close()
11297
{
11398
cancellAll();
99+
IsConnected = false;
114100
InitQpPackageHandler_Stream(null);
115-
Disconnect();
101+
Disconnect();
116102
}
117103
}
118104
}

Quick.Protocol/QpServer.cs

Lines changed: 28 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,18 @@ public abstract class QpServer
1212
{
1313
private CancellationTokenSource cts;
1414
private QpServerOptions options;
15+
1516
private List<QpServerChannel> channelList = new List<QpServerChannel>();
16-
private List<QpServerChannel> auchenticatedChannelList = new List<QpServerChannel>();
1717

1818
/// <summary>
1919
/// 增加Tag属性,用于引用与QpServer相关的对象
2020
/// </summary>
2121
public Object Tag { get; set; }
2222

23-
/// <summary>
24-
/// 获取全部的通道
25-
/// </summary>
26-
public QpServerChannel[] Channels { get; private set; } = new QpServerChannel[0];
27-
2823
/// <summary>
2924
/// 已通过认证的通道
3025
/// </summary>
31-
public QpServerChannel[] AuchenticatedChannels { get; private set; } = new QpServerChannel[0];
26+
public QpServerChannel[] Channels { get; private set; } = new QpServerChannel[0];
3227

3328
/// <summary>
3429
/// 通道连接上时
@@ -39,6 +34,11 @@ public abstract class QpServer
3934
/// 通道连接断开时
4035
/// </summary>
4136
public event EventHandler<QpServerChannel> ChannelDisconnected;
37+
38+
/// <summary>
39+
/// 通道认证超时
40+
/// </summary>
41+
public event EventHandler<QpServerChannel> ChannelAuchenticateTimeout;
4242

4343
public QpServer(QpServerOptions options)
4444
{
@@ -60,46 +60,39 @@ internal void RemoveChannel(QpServerChannel channel)
6060
channelList.Remove(channel);
6161
Channels = channelList.ToArray();
6262
}
63-
lock (auchenticatedChannelList)
64-
if (auchenticatedChannelList.Contains(channel))
65-
{
66-
auchenticatedChannelList.Remove(channel);
67-
AuchenticatedChannels = auchenticatedChannelList.ToArray();
68-
}
6963
}
7064

7165
protected void OnNewChannelConnected(Stream stream, string channelName, CancellationToken token)
7266
{
7367
var channel = new QpServerChannel(this, stream, channelName, token, options.Clone());
74-
//将通道加入到全部通道列表里面
75-
lock (channelList)
68+
69+
//认证超时
70+
channel.AuchenticateTimeout += (sender, e) =>
7671
{
77-
channelList.Add(channel);
78-
Channels = channelList.ToArray();
79-
}
72+
if (LogUtils.LogConnection)
73+
LogUtils.Log("[Connection]{0} auchenticate timeout.", channelName);
74+
ChannelAuchenticateTimeout?.Invoke(this, channel);
75+
};
8076

81-
//认证通过后,才将通道添加到已认证通道列表里面
77+
//认证通过后,才将通道添加到已连接通道列表里面
8278
channel.Auchenticated += (sender, e) =>
8379
{
84-
lock (auchenticatedChannelList)
80+
lock (channelList)
8581
{
86-
auchenticatedChannelList.Add(channel);
87-
AuchenticatedChannels = auchenticatedChannelList.ToArray();
82+
channelList.Add(channel);
83+
Channels = channelList.ToArray();
8884
}
89-
};
90-
channel.Disconnected += (sender, e) =>
91-
{
92-
if (LogUtils.LogConnection)
93-
LogUtils.Log("[Connection]{0} Disconnected.", channelName);
94-
RemoveChannel(channel);
95-
try { stream.Dispose(); }
96-
catch { }
97-
ChannelDisconnected?.Invoke(this, channel);
98-
};
99-
Task.Run(() =>
100-
{
10185
ChannelConnected?.Invoke(this, channel);
102-
});
86+
channel.Disconnected += (sender2, e2) =>
87+
{
88+
if (LogUtils.LogConnection)
89+
LogUtils.Log("[Connection]{0} Disconnected.", channelName);
90+
RemoveChannel(channel);
91+
try { stream.Dispose(); }
92+
catch { }
93+
ChannelDisconnected?.Invoke(this, channel);
94+
};
95+
};
10396
}
10497

10598
protected abstract Task InnerAcceptAsync(CancellationToken token);

Quick.Protocol/QpServerChannel.cs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,11 @@ public class QpServerChannel : QpChannel
2525
/// <summary>
2626
/// 通过认证时
2727
/// </summary>
28-
public event EventHandler Auchenticated;
29-
28+
internal event EventHandler Auchenticated;
3029
/// <summary>
31-
/// 连接断开时
30+
/// 认证超时
3231
/// </summary>
33-
public event EventHandler Disconnected;
32+
internal event EventHandler AuchenticateTimeout;
3433

3534
public QpServerChannel(QpServer server, Stream stream, string channelName, CancellationToken cancellationToken, QpServerOptions options) : base(options)
3635
{
@@ -43,7 +42,6 @@ public QpServerChannel(QpServer server, Stream stream, string channelName, Cance
4342
cancellationToken.Register(() => Stop());
4443
//修改缓存大小
4544
ChangeBufferSize(options.BufferSize);
46-
IsConnected = true;
4745

4846
//初始化连接相关指令处理器
4947
var connectAndAuthCommandExecuterManager = new CommandExecuterManager();
@@ -56,6 +54,25 @@ public QpServerChannel(QpServer server, Stream stream, string channelName, Cance
5654
InitQpPackageHandler_Stream(stream);
5755
//开始读取其他数据包
5856
BeginReadPackage(cts.Token);
57+
58+
//如果认证超时时间后没有通过认证,则断开连接
59+
if (options.AuthenticateTimeout>0)
60+
Task.Delay(options.AuthenticateTimeout, cts.Token).ContinueWith(t =>
61+
{
62+
if (t.IsCanceled)
63+
return;
64+
if (stream!=null)
65+
{
66+
try
67+
{
68+
stream.Close();
69+
stream.Dispose();
70+
stream=null;
71+
}
72+
catch { }
73+
}
74+
AuchenticateTimeout?.Invoke(this, EventArgs.Empty);
75+
});
5976
}
6077

6178
private Commands.Connect.Response connect(QpChannel handler, Commands.Connect.Request request)
@@ -87,6 +104,7 @@ private Commands.Authenticate.Response authenticate(QpChannel handler, Commands.
87104
});
88105
throw new CommandException(1, "认证失败!");
89106
}
107+
IsConnected=true;
90108
Auchenticated?.Invoke(this, EventArgs.Empty);
91109
return new Commands.Authenticate.Response();
92110
}
@@ -147,11 +165,6 @@ protected override void OnReadError(Exception exception)
147165
}
148166
Stop();
149167
base.OnReadError(exception);
150-
if (IsConnected)
151-
{
152-
IsConnected = false;
153-
Disconnected?.Invoke(this, QpEventArgs.Empty);
154-
}
155168
}
156169
}
157170
}

Quick.Protocol/QpServerOptions.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ public class QpServerOptions : QpChannelOptions
1212
/// 缓存大小(默认128KB)
1313
/// </summary>
1414
public int BufferSize = 128 * 1024;
15-
15+
/// <summary>
16+
/// 认证超时时间,在指定的超时时间没有完成认证,则断开连接
17+
/// </summary>
18+
public int AuthenticateTimeout { get; set; } = 5000;
1619
/// <summary>
1720
/// 服务端程序
1821
/// </summary>

0 commit comments

Comments
 (0)