making announcements more robust and intelligent

This commit is contained in:
Travis Glenn Hansen 2011-11-05 15:09:09 -06:00
parent 9ead45dc1c
commit 3d4a8b5f9d
3 changed files with 101 additions and 62 deletions

View file

@ -82,7 +82,7 @@ namespace SparkleLib {
listeners.Add (new SparkleListenerTcp (announce_uri, folder_identifier));
break;
}
SparkleHelpers.DebugInfo ("ListenerFactory", "Issued new listener for " + announce_uri);
return (SparkleListenerBase) listeners [listeners.Count - 1];
}
@ -113,10 +113,14 @@ namespace SparkleLib {
public abstract bool IsConnected { get; }
protected List<string> channels = new List<string> ();
protected Hashtable last_announce = new Hashtable ();
protected List<SparkleAnnouncement> queue_up = new List<SparkleAnnouncement> ();
protected List<SparkleAnnouncement> queue_down = new List<SparkleAnnouncement> ();
protected List<string> channels = new List<string> ();
protected Dictionary<string,List<SparkleAnnouncement>> recent_announcements = new Dictionary<string, List<SparkleAnnouncement>> ();
protected int max_recent_announcements = 10;
protected Dictionary<string, SparkleAnnouncement> queue_up = new Dictionary<string, SparkleAnnouncement> ();
protected Dictionary<string,SparkleAnnouncement> queue_down = new Dictionary<string, SparkleAnnouncement> ();
protected bool is_connecting;
protected Uri server;
protected Timer reconnect_timer = new Timer { Interval = 60 * 1000, Enabled = true };
@ -136,30 +140,22 @@ namespace SparkleLib {
public void AnnounceBase (SparkleAnnouncement announcement)
{
if (IsConnected) {
SparkleHelpers.DebugInfo ("Listener",
"Announcing to " + announcement.FolderIdentifier + " on " + this.server);
if (!this.IsRecentAnnounement (announcement)) {
if (IsConnected) {
SparkleHelpers.DebugInfo ("Listener",
"Announcing message " + announcement.Message + " to " + announcement.FolderIdentifier + " on " + this.server);
Announce (announcement);
} else {
SparkleHelpers.DebugInfo ("Listener", "Not connected to " + this.server + ". Queuing message");
this.queue_up.Add (announcement);
}
}
public string NextQueueDownMessage (string folder_identifier)
{
foreach (SparkleAnnouncement announcement in this.queue_down.GetRange (0, this.queue_down.Count)) {
if (announcement.FolderIdentifier.Equals (folder_identifier)) {
string message = announcement.Message;
this.queue_down.Remove (announcement);
return message;
Announce (announcement);
this.AddRecentAnnouncement (announcement);
} else {
SparkleHelpers.DebugInfo ("Listener", "Not connected to " + this.server + ". Queuing message");
this.queue_up [announcement.FolderIdentifier] = announcement;
}
} else {
SparkleHelpers.DebugInfo ("Listener",
"Already received or sent message " + announcement.Message + " to " + announcement.FolderIdentifier + " on " + this.server);
}
return null;
}
@ -180,10 +176,11 @@ namespace SparkleLib {
if (this.queue_up.Count > 0) {
SparkleHelpers.DebugInfo ("Listener", "Delivering " + this.queue_up.Count + " queued messages...");
foreach (SparkleAnnouncement announcement in this.queue_up.GetRange(0, this.queue_up.Count)) {
foreach (KeyValuePair<string, SparkleAnnouncement> item in this.queue_up) {
SparkleAnnouncement announcement = item.Value;
AnnounceBase (announcement);
this.queue_up.Remove (announcement);
}
this.queue_down.Clear ();
}
}
@ -201,26 +198,62 @@ namespace SparkleLib {
{
SparkleHelpers.DebugInfo ("Listener", "Got message " + announcement.Message + " from " + announcement.FolderIdentifier + " on " + this.server);
if (this.last_announce.ContainsKey (announcement.FolderIdentifier) ){
SparkleHelpers.DebugInfo ("Listener", "Received previous message from " + announcement.FolderIdentifier + " on " + this.server);
if (this.last_announce[announcement.FolderIdentifier].Equals(announcement.Message)) {
SparkleHelpers.DebugInfo ("Listener", "Ignoring already processed announcment " + announcement.Message + " from " + announcement.FolderIdentifier + " on " + this.server);
return;
}
if (this.IsRecentAnnounement(announcement) ){
SparkleHelpers.DebugInfo ("Listener", "Ignoring previously received message " + announcement.Message + " from " + announcement.FolderIdentifier + " on " + this.server);
return;
}
SparkleHelpers.DebugInfo ("Listener", "Processing message " + announcement.Message + " from " + announcement.FolderIdentifier + " on " + this.server);
if (this.last_announce.ContainsKey (announcement.FolderIdentifier) )
this.last_announce.Remove (announcement.FolderIdentifier);
this.last_announce.Add (announcement.FolderIdentifier, announcement.Message);
this.queue_down.Add (announcement);
this.AddRecentAnnouncement (announcement);
this.queue_down [announcement.FolderIdentifier] = announcement;
if (Announcement != null)
Announcement (announcement);
}
private bool IsRecentAnnounement (SparkleAnnouncement announcement)
{
if (!this.HasRecentAnnouncements (announcement.FolderIdentifier)) {
return false;
} else {
foreach (SparkleAnnouncement recent_announcement in this.GetRecentAnnouncements (announcement.FolderIdentifier)) {
if (recent_announcement.Message.Equals (announcement.Message))
return true;
}
return false;
}
}
private List<SparkleAnnouncement> GetRecentAnnouncements (string folder_identifier)
{
if (!this.recent_announcements.ContainsKey (folder_identifier)) {
this.recent_announcements [folder_identifier] = new List<SparkleAnnouncement> ();
}
return (List<SparkleAnnouncement>) this.recent_announcements [folder_identifier];
}
private void AddRecentAnnouncement (SparkleAnnouncement announcement)
{
List<SparkleAnnouncement> recent_announcements = this.GetRecentAnnouncements (announcement.FolderIdentifier);
if (!this.IsRecentAnnounement (announcement))
recent_announcements.Add (announcement);
if (recent_announcements.Count > this.max_recent_announcements)
recent_announcements.RemoveRange (0, (recent_announcements.Count - this.max_recent_announcements));
}
private bool HasRecentAnnouncements (string folder_identifier)
{
return this.recent_announcements.ContainsKey (folder_identifier);
}
public virtual void Dispose ()
{
this.reconnect_timer.Dispose ();

View file

@ -29,7 +29,7 @@ namespace SparkleLib {
public class SparkleListenerTcp : SparkleListenerBase {
private Thread thread;
// these are shared
private readonly Object mutex = new Object();
private Socket socket;
@ -94,9 +94,9 @@ namespace SparkleLib {
if (bytes_read > 0) {
string received = Encoding.UTF8.GetString (bytes);
string folder_identifier = received.Substring (0, received.IndexOf ("!"));
string message = received.Substring (received.IndexOf ("!") + 1);
OnAnnouncement (new SparkleAnnouncement (folder_identifier, message));
string message = this.CleanMessage (received.Substring (received.IndexOf ("!") + 1));
if (!message.Equals("connected..."))
OnAnnouncement (new SparkleAnnouncement (folder_identifier, message));
} else {
SparkleHelpers.DebugInfo ("ListenerTcp", "Error on socket");
@ -109,9 +109,9 @@ namespace SparkleLib {
}
}
}
SparkleHelpers.DebugInfo ("ListenerTcp", "Disconnected from " + Server.Host);
} catch (SocketException e) {
SparkleHelpers.DebugInfo ("ListenerTcp", "Could not connect to " + Server + ": " + e.Message);
@ -173,5 +173,10 @@ namespace SparkleLib {
this.thread.Join ();
base.Dispose ();
}
private string CleanMessage(string message)
{
return message.Trim ().Replace ("\n", "").Replace ("\0", "");
}
}
}

View file

@ -110,12 +110,6 @@ namespace SparkleLib {
if (CheckForRemoteChanges ())
SyncDownBase ();
string message;
while ((message = this.listener.NextQueueDownMessage (identifier)) != null) {
if (!message.Equals (CurrentRevision))
SyncDownBase ();
}
}
// In the unlikely case that we haven't synced up our
@ -265,19 +259,18 @@ namespace SparkleLib {
if (announcement.FolderIdentifier.Equals (identifier) &&
!announcement.Message.Equals (CurrentRevision)) {
if ((Status != SyncStatus.SyncUp) &&
(Status != SyncStatus.SyncDown) &&
!this.is_buffering) {
string message;
while ((message = this.listener.NextQueueDownMessage (identifier)) != null) {
if (!message.Equals (CurrentRevision))
SyncDownBase ();
}
while (this.IsSyncing ()) {
//nothing just wait
}
SparkleHelpers.DebugInfo ("Listener", "Syncing due to Announcement");
if (!announcement.Message.Equals (CurrentRevision))
SyncDownBase ();
} else {
if (announcement.FolderIdentifier.Equals (identifier))
SparkleHelpers.DebugInfo ("Listener", "Not syncing message is for current revision");
}
};
// Start listening
if (!this.listener.IsConnected && !this.listener.IsConnecting) {
this.listener.Connect ();
@ -285,13 +278,21 @@ namespace SparkleLib {
}
private bool IsSyncing ()
{
if (Status == SyncStatus.SyncUp || Status == SyncStatus.SyncDown || this.is_buffering)
return true;
return false;
}
private void CheckForChanges ()
{
lock (this.change_lock) {
if (this.has_changed) {
if (this.sizebuffer.Count >= 4)
this.sizebuffer.RemoveAt (0);
DirectoryInfo dir_info = new DirectoryInfo (LocalPath);
this.sizebuffer.Add (CalculateFolderSize (dir_info));
@ -303,7 +304,7 @@ namespace SparkleLib {
SparkleHelpers.DebugInfo ("Local", "[" + Name + "] Changes have settled.");
this.is_buffering = false;
this.has_changed = false;
DisableWatching ();
while (AnyDifferences)
SyncUpBase ();