ASPNET自定义UDP和TCP通信
2026/2/13大约 6 分钟
最近的工作中实际遇到并解决的问题,需要使用 UDP 和 TCP 通信,最好能像 http 一样直接注册 Controller ,简单记录一下解决思路。
问题关键
关键点还是在依赖注入上,需要将所有 controller 注入到服务,然后根据路由去做解析,对性能要求不高,但是需要实现方便。
思路是使用反射,在服务初始化时,扫一遍程序集,然后生成一个 路由-控制器 的字典,然后将所有控制器注入,然后在需要的时候通过路由从字典中获得控制器的类型,然后再通过类型调用对应的控制器。
但是这样实现也有局限性,路由只能定义到控制器类,而不能像 http 的控制器一样,让每个方法写 GET 或者 POST。不过问题也不大,够用了。
实现
话不多说,直接上代码。
只展示关键代码,配置和辅助等不做展示。
因为是DEMO代码,部分异常处理不够完善。
定义通信结构
| 偏移 | 定义 | 长度 |
|---|---|---|
| 0 | 固定包头 | 4 |
| 4 | 消息混动ID | 2 |
| 6 | 消息状态码 | 2 |
| 8 | 路由长度 | 2 |
| 10 | 消息长度 | 2 |
| 12 | 路由 | n |
| 12 + n | 消息内容 | m |
根据上面的定义写包头
using System.Buffers;
using System.Diagnostics.CodeAnalysis;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text;
using System.Text.Json;
using CommunityToolkit.HighPerformance;
namespace NetServerDemo.Models;
[StructLayout(LayoutKind.Explicit, Pack = 1)]
internal struct NetPackHead()
{
public const int Head = 0x12345678;
public static readonly int Size = Unsafe.SizeOf<NetPackHead>();
[FieldOffset(0)] private int _head = Head;
/// <summary>
/// 消息Id
/// </summary>
[field: FieldOffset(4)]
public ushort MessageId { get; set; }
/// <summary>
/// 消息状态码
/// </summary>
[field: FieldOffset(6)]
public short MessageCode { get; set; }
/// <summary>
/// 路由长度
/// </summary>
[field: FieldOffset(8)]
public ushort RouteLength { get; set; }
/// <summary>
/// 消息长度
/// </summary>
[field: FieldOffset(10)]
public ushort MessageLength { get; set; }
/// <summary>
/// 尝试创建消息体
/// </summary>
/// <param name="data"></param>
/// <param name="head"></param>
/// <returns></returns>
public static bool TryParse(ReadOnlySpan<byte> data, out NetPackHead head)
{
head = default;
if (data.Length < Size) return false;
head = MemoryMarshal.Read<NetPackHead>(data);
return head._head == Head && data.Length >= Size + head.MessageLength;
}
/// <summary>
/// 获取路由
/// </summary>
/// <param name="data"></param>
/// <param name="route"></param>
/// <returns></returns>
public bool TryGetRoute(ReadOnlySpan<byte> data, [MaybeNullWhen(false)] out string route)
{
if (data.Length < Size + RouteLength)
{
route = null;
return false;
}
var rd = data.Slice(Size, RouteLength);
route = Encoding.UTF8.GetString(rd);
return true;
}
/// <summary>
/// 获取消息
/// </summary>
/// <param name="data"></param>
/// <param name="message"></param>
/// <returns></returns>
public bool TryGetMessage<T>(ReadOnlySpan<byte> data, out T? message)
{
if (data.Length < Size + RouteLength + MessageLength)
{
message = default(T);
return false;
}
var md = data.Slice(Size + RouteLength, MessageLength);
message = JsonSerializer.Deserialize<T>(md);
return true;
}
/// <summary>
/// 写入到消息包中
/// </summary>
/// <param name="writer"></param>
/// <param name="route"></param>
/// <param name="message"></param>
public void WriteToPack(ArrayBufferWriter<byte> writer, string route, INetMessagePack message)
{
var rd = Encoding.UTF8.GetBytes(route);
var messageData = JsonSerializer.SerializeToUtf8Bytes(message);
MessageCode = message.Code;
RouteLength = (ushort)rd.Length;
MessageLength = (ushort)messageData.Length;
writer.Write(this);
writer.Write(rd);
writer.Write(messageData);
}
}namespace NetServerDemo.Models;
internal interface INetMessagePack
{
/// <summary>
/// 消息是否成功
/// 小于0的都是失败
/// </summary>
public short Code { get; set; }
}
internal class NetMessagePack : INetMessagePack
{
/// <inheritdoc />
public short Code { get; set; }
}namespace NetServerDemo.Services.NetServices;
internal interface INetServerMessage
{
/// <summary>
/// 消息内容
/// </summary>
public ReadOnlyMemory<byte> Data { get; }
/// <summary>
/// 回复消息
/// </summary>
/// <param name="buffer"></param>
public void Reply(ReadOnlySpan<byte> buffer);
}定义控制器
using NetServerDemo.Models;
using NetServerDemo.Services.NetServices;
namespace NetServerDemo.Controllers.NetControllers;
internal abstract class NetBaseController
{
public INetServerMessage RawMessage { protected get; set; } = null!;
public NetPackHead Head { protected get; set; }
/// <summary>
/// 处理消息
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
public abstract ValueTask<INetMessagePack> HandleMessageAsync();
}实现服务器
using System.Buffers;
using System.Net;
using System.Net.Sockets;
using NetCoreServer;
namespace NetServerDemo.Services.NetServices.Udp;
internal class NcUdpServer : UdpServer
{
public NcUdpServer(IPAddress address, int port) : base(address, port)
{
}
public NcUdpServer(string address, int port) : base(address, port)
{
}
public NcUdpServer(DnsEndPoint endpoint) : base(endpoint)
{
}
public NcUdpServer(IPEndPoint endpoint) : base(endpoint)
{
}
/// <summary>
/// 收到消息
/// </summary>
public required Func<INetServerMessage, ValueTask> OnMessageReceivedAsync { get; init; }
/// <summary>
/// 日志
/// </summary>
public ILogger? Logger { get; init; }
protected override void OnStarted()
{
Logger?.LogInformation($"UDP服务器已启动,监听 {Endpoint}");
ReceiveAsync();
}
protected override void OnReceived(EndPoint endpoint, byte[] buffer, long offset, long size)
{
_ = Task.Run(() => HandleUdpMessage(endpoint, buffer.AsMemory((int)offset, (int)size)));
ReceiveAsync();
}
private async Task HandleUdpMessage(EndPoint endpoint, ReadOnlyMemory<byte> buffer)
{
try
{
using var im = MemoryPool<byte>.Shared.Rent(buffer.Length);
buffer.CopyTo(im.Memory);
await OnMessageReceivedAsync(new UdpServerMessage(this, endpoint, im.Memory)).ConfigureAwait(false);
}
catch (Exception e)
{
Logger?.LogError(e, "UDP服务器处理消息时发生异常");
}
}
protected override void OnError(SocketError error)
{
Logger?.LogError($"UDP服务器发生错误: {error}");
}
}using System.Buffers;
using NetCoreServer;
namespace NetServerDemo.Services.NetServices.Tcp;
internal class NcTcpSession(NcTcpServer server) : TcpSession(server)
{
/// <summary>
/// 收到消息
/// </summary>
public required Func<INetServerMessage, ValueTask> OnMessageReceivedAsync { get; init; }
protected override void OnConnected()
{
server.Logger?.LogDebug("{Guid} connected", Id);
}
protected override void OnDisconnected()
{
server.Logger?.LogDebug("{Guid} disconnected", Id);
}
protected override void OnReceived(byte[] buffer, long offset, long size)
{
_ = Task.Run(() => HandleUdpMessage(buffer.AsMemory((int)offset, (int)size)));
}
private async Task HandleUdpMessage(ReadOnlyMemory<byte> buffer)
{
try
{
using var im = MemoryPool<byte>.Shared.Rent(buffer.Length);
buffer.CopyTo(im.Memory);
await OnMessageReceivedAsync(new TcpServerMessage(server, this, im.Memory)).ConfigureAwait(false);
}
catch (Exception e)
{
server.Logger?.LogError(e, "UDP服务器处理消息时发生异常");
}
}
}using System.Net;
using NetCoreServer;
namespace NetServerDemo.Services.NetServices.Tcp;
internal class NcTcpServer : TcpServer
{
public NcTcpServer(IPAddress address, int port) : base(address, port)
{
}
public NcTcpServer(string address, int port) : base(address, port)
{
}
public NcTcpServer(DnsEndPoint endpoint) : base(endpoint)
{
}
public NcTcpServer(IPEndPoint endpoint) : base(endpoint)
{
}
/// <summary>
/// 收到消息
/// </summary>
public required Func<INetServerMessage, ValueTask> OnMessageReceivedAsync { get; init; }
/// <summary>
/// 日志
/// </summary>
public ILogger? Logger { get; init; }
protected override TcpSession CreateSession()
{
return new NcTcpSession(this)
{
OnMessageReceivedAsync = OnMessageReceivedAsync
};
}
}定义 Service
using System.Buffers;
using CommunityToolkit.HighPerformance;
using NetServerDemo.Controllers.NetControllers;
using NetServerDemo.Models;
namespace NetServerDemo.Services.NetServices;
internal abstract class NetServerService : IHostedService, IDisposable
{
protected Dictionary<string, Type> Controllers = null!;
protected IServiceProvider ServiceProvider = null!;
protected readonly ILogger Logger;
protected NetServerService(IServiceProvider serviceProvider, Dictionary<string, Type> controllers, ILogger logger)
{
ServiceProvider = serviceProvider;
Controllers = controllers;
Logger = logger;
}
public abstract void Dispose();
public abstract Task StartAsync(CancellationToken cancellationToken);
public abstract Task StopAsync(CancellationToken cancellationToken);
/// <summary>
/// 接收到消息时
/// </summary>
/// <param name="message"></param>
protected async ValueTask OnMessageReceivedAsync(INetServerMessage message)
{
if (message.Data.Length < NetPackHead.Size) return;
ArrayBufferWriter<byte> rmw = new();
INetMessagePack? replyMessage = null;
if (NetPackHead.TryParse(message.Data.Span, out var head))
if (head.TryGetRoute(message.Data.Span, out var route))
{
if (!string.IsNullOrEmpty(route) && Controllers.TryGetValue(route, out var controllerType))
{
await using var scope = ServiceProvider.CreateAsyncScope();
if (scope.ServiceProvider.GetService(controllerType) is NetBaseController controller)
{
controller.RawMessage = message;
controller.Head = head;
replyMessage = await controller.HandleMessageAsync().ConfigureAwait(false);
}
}
replyMessage ??= new NetMessagePack
{
Code = 404
};
head.WriteToPack(rmw, route, replyMessage);
message.Reply(rmw.WrittenSpan);
return;
}
head.MessageCode = -1;
head.RouteLength = 0;
head.MessageLength = 0;
rmw.Write(head);
message.Reply(rmw.WrittenSpan);
}
}using System.Net;
using NetServerDemo.Models;
namespace NetServerDemo.Services.NetServices.Udp;
internal class UdpServerService : NetServerService
{
private readonly NcUdpServer _server;
private UdpServerService(IServiceProvider serviceProvider, Dictionary<string, Type> controllers)
: base(serviceProvider, controllers, serviceProvider.GetRequiredService<ILogger<UdpServerService>>())
{
var appSettings = serviceProvider.GetRequiredService<AppSettings>();
_server = new NcUdpServer(IPAddress.Any, appSettings.ServerPort)
{
Logger = serviceProvider.GetRequiredService<ILogger<NcUdpServer>>(),
OnMessageReceivedAsync = OnMessageReceivedAsync
};
}
public static UdpServerService Create(IServiceProvider serviceProvider, Dictionary<string, Type> controllers)
{
return new UdpServerService(serviceProvider, controllers);
}
public override void Dispose()
{
_server.Dispose();
}
public override Task StartAsync(CancellationToken cancellationToken)
{
_server.Start();
return Task.CompletedTask;
}
public override Task StopAsync(CancellationToken cancellationToken)
{
_server.Stop();
return Task.CompletedTask;
}
}using System.Net;
using NetServerDemo.Models;
namespace NetServerDemo.Services.NetServices.Tcp;
internal class TcpServerService : NetServerService
{
private readonly NcTcpServer _server;
private TcpServerService(IServiceProvider serviceProvider, Dictionary<string, Type> controllers)
: base(serviceProvider, controllers, serviceProvider.GetRequiredService<ILogger<TcpServerService>>())
{
var appSettings = serviceProvider.GetRequiredService<AppSettings>();
_server = new NcTcpServer(IPAddress.Any, appSettings.ServerPort)
{
Logger = serviceProvider.GetRequiredService<ILogger<NcTcpServer>>(),
OnMessageReceivedAsync = OnMessageReceivedAsync
};
}
public static TcpServerService Create(IServiceProvider serviceProvider, Dictionary<string, Type> controllers)
{
return new TcpServerService(serviceProvider, controllers);
}
public override void Dispose()
{
_server.Dispose();
}
public override Task StartAsync(CancellationToken cancellationToken)
{
_server.Start();
return Task.CompletedTask;
}
public override Task StopAsync(CancellationToken cancellationToken)
{
_server.Stop();
return Task.CompletedTask;
}
}定义辅助函数
using System.Reflection;
using Microsoft.AspNetCore.Mvc;
using NetServerDemo.Controllers.NetControllers;
using NetServerDemo.Services.NetServices.Tcp;
using NetServerDemo.Services.NetServices.Udp;
namespace NetServerDemo.Services.NetServices;
internal static class Utils
{
public static IServiceCollection AddNetServerService(this IServiceCollection services,
Assembly[]? assemblies = null)
{
var controllers = GetControllers(typeof(NetBaseController), assemblies);
foreach (var (_, type) in controllers) services.AddScoped(type);
services.AddHostedService(serviceProvider => UdpServerService.Create(serviceProvider, controllers));
services.AddHostedService(serviceProvider => TcpServerService.Create(serviceProvider, controllers));
return services;
}
private static Dictionary<string, Type> GetControllers(Type baseType, Assembly[]? assemblies = null)
{
var controllers = new Dictionary<string, Type>(StringComparer.OrdinalIgnoreCase);
assemblies ??= [baseType.Assembly];
foreach (var assembly in assemblies)
{
var derivedTypes = assembly.GetTypes()
.Where(t => t is { IsClass: true, IsAbstract: false, IsGenericType: false }
&& baseType.IsAssignableFrom(t)
&& t != baseType);
foreach (var type in derivedTypes)
{
var udpRoutes = type.GetCustomAttributes<RouteAttribute>();
foreach (var routeAttr in udpRoutes)
{
if (string.IsNullOrEmpty(routeAttr.Template))
throw new InvalidOperationException($"UDP控制器 {type.FullName} 的路由不能为空,已忽略");
if (controllers.TryGetValue(routeAttr.Template, out var controller))
throw new InvalidOperationException(
$"路由冲突: {routeAttr.Template} 已被 {controller.FullName} 注册, {type.FullName} 的声明已忽略");
controllers[routeAttr.Template] = type;
}
}
}
return controllers;
}
}注册服务
using NetServerDemo.Models;
using NetServerDemo.Services.NetServices;
var builder = WebApplication.CreateSlimBuilder(args);
var appSettings = builder.Configuration.Get<AppSettings>() ?? new AppSettings();
builder.Services.AddSingleton(appSettings);
builder.Services.AddNetServerService();
var app = builder.Build();
app.Run();至此,一个完整的服务器就实现完成了。