Working on improved console streaming

This commit is contained in:
Marcel Baumgartner 2024-04-16 07:40:48 +02:00
parent e3f040c978
commit be173e1d48
4 changed files with 187 additions and 17 deletions

View file

@ -0,0 +1,169 @@
using System.Net.WebSockets;
using System.Text;
using MoonCore.Helpers;
using Newtonsoft.Json;
namespace Moonlight.Core.Helpers;
public class AdvancedWebsocketStream
{
private readonly WebSocket Socket;
private readonly Dictionary<int, Type> Packets = new();
public AdvancedWebsocketStream(WebSocket socket)
{
Socket = socket;
}
public void RegisterPacket<T>(int id) => RegisterPacket(id, typeof(T));
public void RegisterPacket(int id, Type type)
{
Packets.Add(id, type);
}
public async Task<object?> ReceivePacket()
{
if (Socket.State != WebSocketState.Open)
throw new ArgumentException("The websocket connection needs to be open in order to receive packets");
// Length
var lengthBuffer = new byte[4];
await Socket.ReceiveAsync(lengthBuffer, CancellationToken.None);
var length = BitConverter.ToInt32(lengthBuffer);
Logger.Debug($"Received length: {length}");
if (length <= 0)
throw new ArgumentException("The packet length cannot be less or equal than zero");
var packetBuffer = new byte[length];
var received = await Socket.ReceiveAsync(packetBuffer, CancellationToken.None);
Logger.Debug($"Lenght expected: {length}. Lenght got: {received.Count}");
return DecodePacket(packetBuffer);
}
public async Task<T?> ReceivePacket<T>()
{
var packet = await ReceivePacket();
if (packet == null)
return default;
if (packet is not T)
throw new ArgumentException($"Received packet {packet.GetType().Name} matches not the type {typeof(T).Name}");
return (T)packet;
}
public async Task SendPacket(object packet)
{
if (Socket.State != WebSocketState.Open)
throw new ArgumentException("The websocket connection needs to be open in order to send packets");
var buffer = EncodePacket(packet);
// Send length
var length = buffer.Length;
var lengthBuffer = BitConverter.GetBytes(length);
await Socket.SendAsync(lengthBuffer, WebSocketMessageType.Binary, WebSocketMessageFlags.None,
CancellationToken.None);
// Send packet
await Socket.SendAsync(buffer, WebSocketMessageType.Binary, WebSocketMessageFlags.None, CancellationToken.None);
}
public async Task WaitForClose()
{
var source = new TaskCompletionSource();
Task.Run(async () =>
{
while (Socket.State == WebSocketState.Open)
await Task.Delay(10);
source.SetResult();
});
await source.Task;
}
public async Task Close()
{
if(Socket.State == WebSocketState.Open)
await Socket.CloseOutputAsync(WebSocketCloseStatus.Empty, null, CancellationToken.None);
}
private byte[] EncodePacket(object packet)
{
var type = packet.GetType();
var packetId = Packets.Values.Contains(type) ? Packets.First(x => x.Value == type).Key : -1;
if (packetId == -1)
throw new ArgumentException($"Sending packet type which has not been registered: {packet.GetType().Name}");
// Header
var headerBuffer = BitConverter.GetBytes(packetId);
// Body
var jsonText = JsonConvert.SerializeObject(packet);
var bodyBuffer = Encoding.UTF8.GetBytes(jsonText);
return headerBuffer.Concat(bodyBuffer).ToArray();
}
private object? DecodePacket(byte[] buffer)
{
if (buffer.Length < 5) // 4 (header) + minimum 1 as body
{
Logger.Warn($"Received buffer is too small ({buffer.Length} bytes)");
return default;
}
var headerBuffer = new byte[4];
Array.Copy(buffer, 0, headerBuffer, 0, 4);
var packetId = BitConverter.ToInt32(headerBuffer);
Logger.Info($"Packet Id: {packetId}");
var packetType = Packets.TryGetValue(packetId, out var packet) ? packet : default;
if (packetType == null)
{
Logger.Warn($"Received packet id which has not been registered: {packetId}");
Logger.Info("Packet dumped: " + Encoding.UTF8.GetString(buffer));
return default;
}
var bodyBuffer = new byte[buffer.Length - 4];
Array.Copy(buffer, 4, bodyBuffer, 0, buffer.Length - 4);
var jsonText = Encoding.UTF8.GetString(bodyBuffer);
if (string.IsNullOrEmpty(jsonText))
{
Logger.Warn("Received empty json text");
return default;
}
object? result = default;
try
{
result = JsonConvert.DeserializeObject(jsonText, packetType);
}
catch (JsonReaderException e)
{
Logger.Warn($"An error occured while deserializating the json text of the packet {packetType.Name}");
Logger.Warn(e);
}
return result;
}
}

View file

@ -2,8 +2,8 @@
using MoonCore.Helpers;
using Moonlight.Features.Servers.Api.Packets;
using Moonlight.Features.Servers.Entities;
using Moonlight.Features.Servers.Models.Abstractions;
using Moonlight.Features.Servers.Models.Enums;
using AdvancedWebsocketStream = Moonlight.Core.Helpers.AdvancedWebsocketStream;
namespace Moonlight.Features.Servers.Helpers;
@ -23,7 +23,7 @@ public class ServerConsole
private readonly Server Server;
private ClientWebSocket WebSocket;
private WsPacketConnection PacketConnection;
private AdvancedWebsocketStream WebsocketStream;
private CancellationTokenSource Cancellation = new();
@ -50,11 +50,11 @@ public class ServerConsole
wsUrl = $"ws://{Server.Node.Fqdn}:{Server.Node.HttpPort}/servers/{Server.Id}/ws";
await WebSocket.ConnectAsync(new Uri(wsUrl), CancellationToken.None);
PacketConnection = new WsPacketConnection(WebSocket);
WebsocketStream = new AdvancedWebsocketStream(WebSocket);
await PacketConnection.RegisterPacket<string>("output");
await PacketConnection.RegisterPacket<ServerState>("state");
await PacketConnection.RegisterPacket<ServerStats>("stats");
WebsocketStream.RegisterPacket<string>(1);
WebsocketStream.RegisterPacket<ServerState>(2);
WebsocketStream.RegisterPacket<ServerStats>(3);
Task.Run(Worker);
}
@ -65,7 +65,7 @@ public class ServerConsole
{
try
{
var packet = await PacketConnection.Receive();
var packet = await WebsocketStream.ReceivePacket();
if (packet == null)
continue;
@ -111,7 +111,7 @@ public class ServerConsole
}
await OnDisconnected.Invoke();
await PacketConnection.Close();
await WebsocketStream.Close();
}
public async Task Close()
@ -119,8 +119,8 @@ public class ServerConsole
if(!Cancellation.IsCancellationRequested)
Cancellation.Cancel();
if(PacketConnection != null)
await PacketConnection.Close();
if(WebsocketStream != null)
await WebsocketStream.Close();
}
private string[] GetMessageCache()

View file

@ -8,6 +8,7 @@ using Moonlight.Features.Servers.Events;
using Moonlight.Features.Servers.Extensions;
using Moonlight.Features.Servers.Http.Requests;
using Moonlight.Features.Servers.Models.Abstractions;
using AdvancedWebsocketStream = Moonlight.Core.Helpers.AdvancedWebsocketStream;
namespace Moonlight.Features.Servers.Http.Controllers;
@ -36,9 +37,9 @@ public class ServersControllers : Controller
var websocket = await HttpContext.WebSockets.AcceptWebSocketAsync();
// Build connection wrapper
var wsPacketConnection = new WsPacketConnection(websocket);
await wsPacketConnection.RegisterPacket<int>("amount");
await wsPacketConnection.RegisterPacket<ServerConfiguration>("serverConfiguration");
var websocketStream = new AdvancedWebsocketStream(websocket);
websocketStream.RegisterPacket<int>(1);
websocketStream.RegisterPacket<ServerConfiguration>(2);
// Read server data for the node
var node = (HttpContext.Items["Node"] as ServerNode)!;
@ -62,13 +63,13 @@ public class ServersControllers : Controller
.ToArray();
// Send the amount of configs the node will receive
await wsPacketConnection.Send(servers.Length);
await websocketStream.SendPacket(servers.Length);
// Send the server configurations
foreach (var serverConfiguration in serverConfigurations)
await wsPacketConnection.Send(serverConfiguration);
await websocketStream.SendPacket(serverConfiguration);
await wsPacketConnection.WaitForClose();
await websocketStream.WaitForClose();
return Ok();
}

View file

@ -90,7 +90,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="MoonCore" Version="1.1.9" />
<PackageReference Include="MoonCore" Version="1.2.4" />
<PackageReference Include="MoonCoreUI" Version="1.1.5" />
<PackageReference Include="Otp.NET" Version="1.3.0" />
<PackageReference Include="QRCoder" Version="1.4.3" />