adjust tcp listener to work with nodejs server (subscribing doesn't work yet)

This commit is contained in:
Hylke Bons 2011-07-02 02:13:33 +01:00
parent b76ea3a82b
commit cf1a7a5e2b

View file

@ -17,7 +17,6 @@
using System;
using System.IO;
using System.Xml;
using System.Text;
using System.Threading;
using System.Net.Sockets;
@ -56,33 +55,6 @@ namespace SparkleLib {
}
}
private void SendCommand(TcpMessagePacket pkt)
{
XmlSerializer serializer = new XmlSerializer(typeof(TcpMessagePacket));
XmlSerializerNamespaces emptyNamespace = new XmlSerializerNamespaces();
emptyNamespace.Add(String.Empty, String.Empty);
StringBuilder output = new StringBuilder();
XmlWriter writer = XmlWriter.Create(output,
new XmlWriterSettings { OmitXmlDeclaration = true });
serializer.Serialize(writer, pkt, emptyNamespace);
lock (this.mutex) {
this.socket.Send(Encoding.UTF8.GetBytes(output.ToString()));
}
}
private TcpMessagePacket ReceiveCommand(byte[] input)
{
XmlSerializer serializer = new XmlSerializer(typeof(TcpMessagePacket));
MemoryStream ms = new MemoryStream(input);
TcpMessagePacket ret = (TcpMessagePacket) serializer.Deserialize(ms);
return ret;
}
// Starts a new thread and listens to the channel
public override void Connect ()
@ -101,12 +73,13 @@ namespace SparkleLib {
lock (this.mutex) {
base.is_connecting = false;
this.connected = true;
foreach (string channel in base.channels) {
SparkleHelpers.DebugInfo ("ListenerTcp", "Subscribing to channel " + channel);
this.socket.Send (Encoding.UTF8.GetBytes ("subscribe " + channel));
}
}
foreach (string channel in base.channels) {
SparkleHelpers.DebugInfo ("ListenerTcp", "Subscribing to channel " + channel);
this.SendCommand(new TcpMessagePacket(channel, "register"));
}
byte [] bytes = new byte [4096];
@ -114,9 +87,11 @@ namespace SparkleLib {
while (this.socket.Connected) {
int bytes_read = this.socket.Receive (bytes);
if (bytes_read > 0) {
TcpMessagePacket message = this.ReceiveCommand(bytes);
SparkleHelpers.DebugInfo ("ListenerTcp", "Update for folder " + message.repo);
OnAnnouncement (new SparkleAnnouncement (message.repo, message.readable));
string received = Encoding.UTF8.GetString (bytes);
string folder_identifier = received.Substring (0, received.IndexOf ("!"));
string message = received.Substring (received.IndexOf ("!") + 1);
OnAnnouncement (new SparkleAnnouncement (folder_identifier, message));
} else {
SparkleHelpers.DebugInfo ("ListenerTcp", "Error on socket");
lock (this.mutex) {
@ -148,7 +123,11 @@ namespace SparkleLib {
if (IsConnected) {
SparkleHelpers.DebugInfo ("ListenerTcp", "Subscribing to channel " + channel);
this.SendCommand(new TcpMessagePacket(channel, "register"));
string to_send = "subscribe " + folder_identifier;
lock (this.mutex) {
this.socket.Send (Encoding.UTF8.GetBytes (to_send));
}
}
}
}
@ -156,10 +135,11 @@ namespace SparkleLib {
public override void Announce (SparkleAnnouncement announcement)
{
this.SendCommand(new TcpMessagePacket(announcement.FolderIdentifier, "new_version"));
string message = "announce " + announcement.FolderIdentifier;
// Also announce to ourselves for debugging purposes
// base.OnAnnouncement (announcement);
lock (this.mutex) {
this.socket.Send (Encoding.UTF8.GetBytes (message));
}
}
@ -170,22 +150,4 @@ namespace SparkleLib {
base.Dispose ();
}
}
[Serializable,XmlRoot("packet")]
public class TcpMessagePacket
{
public string repo { get; set; }
public string command { get; set; }
public string readable { get; set; }
public TcpMessagePacket(string repo, string command) {
this.repo = repo;
this.command = command;
}
public TcpMessagePacket() {
this.repo = "none";
this.command = "invalid";
}
}
}