listener: Rework disconnect algorithm

This commit is contained in:
Hylke Bons 2012-02-01 16:23:49 +00:00
parent 5c32eb370f
commit 17fc8fa920
3 changed files with 195 additions and 140 deletions

View file

@ -64,7 +64,7 @@ namespace SparkleLib {
SparkleHelpers.DebugInfo ("ListenerFactory",
"Refered to existing listener for " + announce_uri);
listener.AlsoListenTo (folder_identifier);
listener.AlsoListenToBase (folder_identifier);
return (SparkleListenerBase) listener;
}
}
@ -90,40 +90,52 @@ namespace SparkleLib {
// listens for change notifications
public abstract class SparkleListenerBase {
// We've connected to the server
public event ConnectedEventHandler Connected;
public delegate void ConnectedEventHandler ();
// We've disconnected from the server
public event DisconnectedEventHandler Disconnected;
public delegate void DisconnectedEventHandler ();
// We've been notified about a remote
// change by the channel
public event AnnouncementEventHandler Announcement;
public delegate void AnnouncementEventHandler (SparkleAnnouncement announcement);
public event ReceivedEventHandler Received;
public delegate void ReceivedEventHandler (SparkleAnnouncement announcement);
public readonly Uri Server;
public abstract void Connect ();
public abstract void Announce (SparkleAnnouncement announcent);
public abstract void AlsoListenTo (string folder_identifier);
public abstract bool IsConnected { get; }
public abstract bool IsConnecting { get; }
protected abstract void Announce (SparkleAnnouncement announcent);
protected abstract void AlsoListenTo (string folder_identifier);
protected List<string> channels = new List<string> ();
protected Dictionary<string,List<SparkleAnnouncement>> recent_announcements = new Dictionary<string, List<SparkleAnnouncement>> ();
protected int max_recent_announcements = 10;
protected Dictionary<string, SparkleAnnouncement> queue_up = new Dictionary<string, SparkleAnnouncement> ();
protected Dictionary<string,SparkleAnnouncement> queue_down = new Dictionary<string, SparkleAnnouncement> ();
protected bool is_connecting;
protected Uri server;
protected Timer reconnect_timer = new Timer { Interval = 60 * 1000, Enabled = true };
private int max_recent_announcements = 10;
private Dictionary<string, List<SparkleAnnouncement>> recent_announcements =
new Dictionary<string, List<SparkleAnnouncement>> ();
private Dictionary<string, SparkleAnnouncement> queue_up =
new Dictionary<string, SparkleAnnouncement> ();
private Dictionary<string, SparkleAnnouncement> queue_down =
new Dictionary<string, SparkleAnnouncement> ();
private Timer reconnect_timer = new Timer {
Interval = 60 * 1000,
Enabled = true
};
public SparkleListenerBase (Uri server, string folder_identifier)
{
this.server = server;
Server = server;
this.channels.Add (folder_identifier);
this.reconnect_timer.Elapsed += delegate {
if (!IsConnected && !this.is_connecting)
if (!IsConnected && !IsConnecting)
Reconnect ();
};
@ -133,29 +145,46 @@ namespace SparkleLib {
public void AnnounceBase (SparkleAnnouncement announcement)
{
if (!this.IsRecentAnnounement (announcement)) {
if (!IsRecentAnnouncement (announcement)) {
if (IsConnected) {
SparkleHelpers.DebugInfo ("Listener",
"Announcing message " + announcement.Message + " to " + announcement.FolderIdentifier + " on " + this.server);
"Announcing message " + announcement.Message + " to " +
announcement.FolderIdentifier + " on " + Server);
Announce (announcement);
this.AddRecentAnnouncement (announcement);
AddRecentAnnouncement (announcement);
} else {
SparkleHelpers.DebugInfo ("Listener", "Can't send message to " + this.server + ". Queuing message");
SparkleHelpers.DebugInfo ("Listener",
"Can't send message to " +
Server + ". Queuing message");
this.queue_up [announcement.FolderIdentifier] = announcement;
}
} else {
SparkleHelpers.DebugInfo ("Listener",
"Already processed message " + announcement.Message + " to " + announcement.FolderIdentifier + " from " + this.server);
"Already processed message " + announcement.Message + " to " +
announcement.FolderIdentifier + " from " + Server);
}
}
public void AlsoListenToBase (string channel)
{
if (!this.channels.Contains (channel) && IsConnected) {
SparkleHelpers.DebugInfo ("Listener",
"Subscribing to channel " + channel);
this.channels.Add (channel);
AlsoListenTo (channel);
}
}
public void Reconnect ()
{
SparkleHelpers.DebugInfo ("Listener", "Trying to reconnect to " + this.server);
SparkleHelpers.DebugInfo ("Listener", "Trying to reconnect to " + Server);
Connect ();
}
@ -168,7 +197,8 @@ namespace SparkleLib {
Connected ();
if (this.queue_up.Count > 0) {
SparkleHelpers.DebugInfo ("Listener", "Delivering " + this.queue_up.Count + " queued messages...");
SparkleHelpers.DebugInfo ("Listener",
"Delivering " + this.queue_up.Count + " queued messages...");
foreach (KeyValuePair<string, SparkleAnnouncement> item in this.queue_up) {
SparkleAnnouncement announcement = item.Value;
@ -180,9 +210,9 @@ namespace SparkleLib {
}
public void OnDisconnected ()
public void OnDisconnected (string message)
{
SparkleHelpers.DebugInfo ("Listener", "Signal of " + Server + " lost");
SparkleHelpers.DebugInfo ("Listener", "Disconnected from " + Server + ": " + message);
if (Disconnected != null)
Disconnected ();
@ -192,34 +222,46 @@ namespace SparkleLib {
public void OnAnnouncement (SparkleAnnouncement announcement)
{
SparkleHelpers.DebugInfo ("Listener",
"Got message " + announcement.Message + " from " + announcement.FolderIdentifier + " on " + this.server);
"Got message " + announcement.Message + " from " +
announcement.FolderIdentifier + " on " + Server);
if (IsRecentAnnounement(announcement) ){
if (IsRecentAnnouncement (announcement)) {
SparkleHelpers.DebugInfo ("Listener",
"Ignoring previously processed message " + announcement.Message +
" from " + announcement.FolderIdentifier + " on " + this.server);
" from " + announcement.FolderIdentifier + " on " + Server);
return;
}
SparkleHelpers.DebugInfo ("Listener",
"Processing message " + announcement.Message + " from " + announcement.FolderIdentifier + " on " + this.server);
"Processing message " + announcement.Message + " from " +
announcement.FolderIdentifier + " on " + Server);
AddRecentAnnouncement (announcement);
this.queue_down [announcement.FolderIdentifier] = announcement;
if (Announcement != null)
Announcement (announcement);
if (Received != null)
Received (announcement);
}
private bool IsRecentAnnounement (SparkleAnnouncement announcement)
public virtual void Dispose ()
{
if (!HasRecentAnnouncements (announcement.FolderIdentifier)) {
this.reconnect_timer.Dispose ();
}
private bool IsRecentAnnouncement (SparkleAnnouncement announcement)
{
if (!this.recent_announcements
.ContainsKey (announcement.FolderIdentifier)) {
return false;
} else {
foreach (SparkleAnnouncement recent_announcement in GetRecentAnnouncements (announcement.FolderIdentifier)) {
foreach (SparkleAnnouncement recent_announcement in
GetRecentAnnouncements (announcement.FolderIdentifier)) {
if (recent_announcement.Message.Equals (announcement.Message))
return true;
}
@ -240,39 +282,15 @@ namespace SparkleLib {
private void AddRecentAnnouncement (SparkleAnnouncement announcement)
{
List<SparkleAnnouncement> recent_announcements = this.GetRecentAnnouncements (announcement.FolderIdentifier);
List<SparkleAnnouncement> recent_announcements =
GetRecentAnnouncements (announcement.FolderIdentifier);
if (!IsRecentAnnounement (announcement))
if (!IsRecentAnnouncement (announcement))
recent_announcements.Add (announcement);
if (recent_announcements.Count > this.max_recent_announcements)
recent_announcements.RemoveRange (0, (recent_announcements.Count - this.max_recent_announcements));
}
private bool HasRecentAnnouncements (string folder_identifier)
{
return this.recent_announcements.ContainsKey (folder_identifier);
}
public virtual void Dispose ()
{
this.reconnect_timer.Dispose ();
}
public Uri Server {
get {
return this.server;
}
}
public bool IsConnecting {
get {
return this.is_connecting;
}
recent_announcements.RemoveRange (0,
(recent_announcements.Count - this.max_recent_announcements));
}
}
}

View file

@ -19,6 +19,7 @@ using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Net.NetworkInformation;
using System.Net.Sockets;
using System.Security.Cryptography;
using System.Collections.Generic;
@ -28,30 +29,31 @@ namespace SparkleLib {
public class SparkleListenerTcp : SparkleListenerBase {
private Thread thread;
// these are shared
private readonly Object mutex = new Object();
private Socket socket;
private bool connected;
private Object socket_lock = new Object ();
private Thread thread;
private bool is_connected = false;
private bool is_connecting = false;
public SparkleListenerTcp (Uri server, string folder_identifier) :
base (server, folder_identifier)
{
base.channels.Add (folder_identifier);
this.connected = false;
}
public override bool IsConnected {
get {
bool result = false;
lock (this.socket_lock)
return this.is_connected;
}
}
lock (this.mutex) {
result = this.connected;
}
return result;
public override bool IsConnecting {
get {
lock (this.socket_lock)
return this.is_connecting;
}
}
@ -61,65 +63,88 @@ namespace SparkleLib {
{
SparkleHelpers.DebugInfo ("ListenerTcp", "Connecting to " + Server.Host);
base.is_connecting = true;
this.is_connecting = true;
this.thread = new Thread (
new ThreadStart (delegate {
try {
// Connect and subscribe to the channel
int port = Server.Port;
if (port < 0) port = 9999;
int port = Server.Port;
lock (this.mutex) {
this.socket = new Socket (AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
if (port < 0)
port = 1986;
try {
lock (this.socket_lock) {
this.socket = new Socket (AddressFamily.InterNetwork,
SocketType.Stream, ProtocolType.Tcp);
// TODO: our own time comparison to account for system sleep?
this.socket.ReceiveTimeout = 30 * 1000;
this.socket.Blocking = true;
this.socket.Connect (Server.Host, port);
base.is_connecting = false;
this.connected = true;
this.is_connecting = false;
this.is_connected = true;
OnConnected ();
foreach (string channel in base.channels) {
SparkleHelpers.DebugInfo ("ListenerTcp", "Subscribing to channel " + channel);
this.socket.Send (Encoding.UTF8.GetBytes ("subscribe " + channel + "\n"));
SparkleHelpers.DebugInfo ("ListenerTcp",
"Subscribing to channel " + channel);
byte [] subscribe_bytes =
Encoding.UTF8.GetBytes ("subscribe " + channel + "\n");
this.socket.Send (subscribe_bytes);
}
}
byte [] bytes = new byte [4096];
} catch (SocketException e) {
this.is_connected = false;
this.is_connecting = false;
// List to the channels, this blocks the thread
while (this.socket.Connected) {
int bytes_read = this.socket.Receive (bytes);
OnDisconnected (e.Message);
return;
}
if (bytes_read > 0) {
string received = Encoding.UTF8.GetString (bytes);
string line = received.Substring (0, received.IndexOf ("\n"));
if (!line.Contains ("!"))
continue;
string folder_identifier = line.Substring (0, line.IndexOf ("!"));
string message = this.CleanMessage (line.Substring (line.IndexOf ("!") + 1));
if (!folder_identifier.Equals("debug") &&
!String.IsNullOrEmpty(message))
OnAnnouncement (new SparkleAnnouncement (folder_identifier, message));
} else {
SparkleHelpers.DebugInfo ("ListenerTcp", "Error on socket");
lock (this.mutex) {
byte [] bytes = new byte [4096];
int bytes_read = 0;
// List to the channels, this blocks the thread
while (this.socket.Connected) {
try {
bytes_read = this.socket.Receive (bytes);
} catch (Exception e) {
if (!PingHost (Server.Host)) {
lock (this.socket_lock) {
this.socket.Close ();
this.connected = false;
this.is_connected = false;
OnDisconnected ();
OnDisconnected (e.Message);
}
}
}
SparkleHelpers.DebugInfo ("ListenerTcp", "Disconnected from " + Server.Host);
if (bytes_read > 0) {
string received = Encoding.UTF8.GetString (bytes);
string line = received.Substring (0, received.IndexOf ("\n"));
} catch (SocketException e) {
SparkleHelpers.DebugInfo ("ListenerTcp", "Could not connect to " + Server + ": " + e.Message);
if (!line.Contains ("!"))
continue;
OnDisconnected ();
string folder_identifier = line.Substring (0, line.IndexOf ("!"));
string message = CleanMessage (line.Substring (line.IndexOf ("!") + 1));
if (!folder_identifier.Equals ("debug") &&
!String.IsNullOrEmpty (message)) {
OnAnnouncement (new SparkleAnnouncement (folder_identifier, message));
}
}
}
OnDisconnected ("");
})
);
@ -127,46 +152,38 @@ namespace SparkleLib {
}
public override void AlsoListenTo (string folder_identifier)
protected override void AlsoListenTo (string folder_identifier)
{
string channel = folder_identifier;
string to_send = "subscribe " + folder_identifier + "\n";
if (!base.channels.Contains (channel)) {
base.channels.Add (channel);
if (IsConnected) {
SparkleHelpers.DebugInfo ("ListenerTcp", "Subscribing to channel " + channel);
string to_send = "subscribe " + folder_identifier + "\n";
try {
lock (this.mutex) {
this.socket.Send (Encoding.UTF8.GetBytes (to_send));
}
} catch (SocketException e) {
SparkleHelpers.DebugInfo ("ListenerTcp", "Could not connect to " + Server + ": " + e.Message);
OnDisconnected ();
}
try {
lock (this.socket_lock) {
this.socket.Send (Encoding.UTF8.GetBytes (to_send));
}
} catch (SocketException e) {
this.is_connected = false;
this.is_connecting = false;
OnDisconnected (e.Message);
}
}
public override void Announce (SparkleAnnouncement announcement)
protected override void Announce (SparkleAnnouncement announcement)
{
string to_send = "announce " + announcement.FolderIdentifier
+ " " + announcement.Message + "\n";
try {
lock (this.mutex) {
lock (this.socket_lock)
this.socket.Send (Encoding.UTF8.GetBytes (to_send));
}
} catch (SocketException e) {
SparkleHelpers.DebugInfo ("ListenerTcp", "Could not connect to " + Server + ": " + e.Message);
OnDisconnected ();
} catch (SocketException e) {
this.is_connected = false;
this.is_connecting = false;
OnDisconnected (e.Message);
}
}
@ -175,12 +192,32 @@ namespace SparkleLib {
{
this.thread.Abort ();
this.thread.Join ();
base.Dispose ();
}
private string CleanMessage(string message)
private string CleanMessage (string message)
{
return message.Trim ().Replace ("\n", "").Replace ("\0", "");
return message.Trim ()
.Replace ("\n", "")
.Replace ("\0", "");
}
private bool PingHost (string host)
{
Ping ping = new Ping ();
PingOptions options = new PingOptions () {
DontFragment = true
};
string data = "00000000000000000000000000000000";
byte [] buffer = Encoding.ASCII.GetBytes (data);
PingReply reply = ping.Send (host, 15, buffer, options);
return reply.Status == IPStatus.Success;
}
}
}

View file

@ -299,7 +299,7 @@ namespace SparkleLib {
};
// Fetch changes when there is a message in the irc channel
this.listener.Announcement += delegate (SparkleAnnouncement announcement) {
this.listener.Received += delegate (SparkleAnnouncement announcement) {
string identifier = Identifier;
if (announcement.FolderIdentifier.Equals (identifier) &&