listener: use one connection with multiple channels per server

This commit is contained in:
Hylke Bons 2011-05-22 01:02:16 +01:00
parent 3c1c0ed4f3
commit 359ec616f9
7 changed files with 236 additions and 89 deletions

View file

@ -85,6 +85,17 @@ namespace SparkleLib {
writer.WriteLine (config); writer.WriteLine (config);
writer.Close (); writer.Close ();
string style_file_path = SparkleHelpers.CombineMore (base.target_folder, ".hg", "log.style");
string style = "changeset = \"{file_mods}{file_adds}{file_dels}\"" + n +
"file_add = \"A {file_add}\\n\"" + n +
"file_mod = \"M {file_mod}\\n\"" + n +
"file_del = \"D {file_del}\\n\"" + n;
writer = new StreamWriter (style_file_path);
writer.WriteLine (style);
writer.Close ();
SparkleHelpers.DebugInfo ("Config", "Added configuration to '" + repo_config_file_path + "'"); SparkleHelpers.DebugInfo ("Config", "Added configuration to '" + repo_config_file_path + "'");
} }

View file

@ -27,6 +27,52 @@ namespace SparkleLib {
} }
public class SparkleAnnouncement {
public readonly string FolderIdentifier;
public readonly string Message;
public SparkleAnnouncement (string folder_identifier, string message)
{
FolderIdentifier = folder_identifier;
Message = message;
}
}
public static class SparkleListenerFactory {
private static List<SparkleListenerBase> listeners;
public static SparkleListenerIrc CreateIrcListener (string server, string folder_identifier,
NotificationServerType type)
{
if (listeners == null)
listeners = new List<SparkleListenerBase> ();
// This is SparkleShare's centralized notification service.
// Don't worry, we only use this server as a backup if you
// don't have your own. All data needed to connect is hashed and
// we don't store any personal information ever
if (type == NotificationServerType.Central)
server = "204.62.14.135";
foreach (SparkleListenerBase listener in listeners) {
if (listener.Server.Equals (server)) {
SparkleHelpers.DebugInfo ("ListenerFactory", "Refered to existing listener for " + server);
listener.AlsoListenTo (folder_identifier);
return (SparkleListenerIrc) listener;
}
}
SparkleHelpers.DebugInfo ("ListenerFactory", "Issued new listener for " + server);
listeners.Add (new SparkleListenerIrc (server, folder_identifier, type));
return (SparkleListenerIrc) listeners [listeners.Count - 1];
}
}
// A persistent connection to the server that // A persistent connection to the server that
// listens for change notifications // listens for change notifications
public abstract class SparkleListenerBase { public abstract class SparkleListenerBase {
@ -42,33 +88,34 @@ namespace SparkleLib {
// We've been notified about a remote // We've been notified about a remote
// change by the channel // change by the channel
public event RemoteChangeEventHandler RemoteChange; public event RemoteChangeEventHandler RemoteChange;
public delegate void RemoteChangeEventHandler (string change_id); public delegate void RemoteChangeEventHandler (SparkleAnnouncement announcement);
// Starts listening for remote changes
public abstract void Connect (); public abstract void Connect ();
public abstract void Announce (SparkleAnnouncement announcent);
private void AnnounceBase (string message) { public abstract void AlsoListenTo (string folder_identifier);
if (IsConnected) {
SparkleHelpers.DebugInfo ("Listener", "Announcing to " + this.channel + " on " + this.server);
Announce (message);
} else {
SparkleHelpers.DebugInfo ("Listener", "Not connected. Queuing message");
this.announce_queue.Add (message);
}
}
// Announces to the channel that
// we've pushed changes to the server
public abstract void Announce (string message);
// Release all resources
public abstract void Dispose (); public abstract void Dispose ();
public abstract bool IsConnected { get; } public abstract bool IsConnected { get; }
// Announcements that weren't sent off // Announcements that weren't sent off
// because we were disconnected // because we were disconnected
protected List<string> announce_queue = new List<string> (); protected List<SparkleAnnouncement> announce_queue = new List<SparkleAnnouncement> ();
protected string server;
protected List<string> channels = new List<string> ();
protected int changes_queue = 0;
protected bool is_connecting;
public SparkleListenerBase (string server, string folder_identifier, NotificationServerType type) { }
public string Server {
get {
return this.server;
}
}
// Announcements of remote changes that we've received // Announcements of remote changes that we've received
public int ChangesQueue { public int ChangesQueue {
@ -77,33 +124,48 @@ namespace SparkleLib {
} }
} }
protected string server;
protected string channel;
protected int changes_queue = 0;
public SparkleListenerBase (string server, string folder_identifier, NotificationServerType type) { } public bool IsConnecting {
get {
return this.is_connecting;
}
}
public void AnnounceBase (SparkleAnnouncement announcement) {
if (IsConnected) {
SparkleHelpers.DebugInfo ("Listener", "Announcing to " + announcement.FolderIdentifier + " on " + this.server);
Announce (announcement);
} else {
SparkleHelpers.DebugInfo ("Listener", "Not connected to " + this.server + ". Queuing message");
this.announce_queue.Add (announcement);
}
}
public void DecrementChangesQueue () public void DecrementChangesQueue ()
{ {
this.changes_queue--; this.changes_queue--;
} }
public void OnConnected () public void OnConnected ()
{ {
SparkleHelpers.DebugInfo ("Listener", "Connected"); SparkleHelpers.DebugInfo ("Listener", "Connected to " + Server);
if (Connected != null) if (Connected != null)
Connected (); Connected ();
if (this.announce_queue.Count > 0) { if (this.announce_queue.Count > 0) {
string message = this.announce_queue [this.announce_queue.Count - 1];
this.announce_queue = new List<string> ();
SparkleHelpers.DebugInfo ("Listener", "Delivering queued messages..."); SparkleHelpers.DebugInfo ("Listener", "Delivering queued messages...");
AnnounceBase (message); foreach (SparkleAnnouncement announcement in this.announce_queue)
AnnounceBase (announcement);
this.announce_queue = new List<SparkleAnnouncement> ();
} }
} }
public void OnDisconnected () public void OnDisconnected ()
{ {
SparkleHelpers.DebugInfo ("Listener", "Disonnected"); SparkleHelpers.DebugInfo ("Listener", "Disonnected");
@ -112,14 +174,15 @@ namespace SparkleLib {
Disconnected (); Disconnected ();
} }
public void OnRemoteChange (string change_id)
public void OnRemoteChange (SparkleAnnouncement announcement)
{ {
SparkleHelpers.DebugInfo ("Listener", "Got message from " + this.channel + " on " + this.server); SparkleHelpers.DebugInfo ("Listener", "Got message from " + announcement.FolderIdentifier + " on " + this.server);
this.changes_queue++; this.changes_queue++;
if (RemoteChange != null) if (RemoteChange != null)
RemoteChange (change_id); RemoteChange (announcement);
} }
} }
} }

View file

@ -31,19 +31,10 @@ namespace SparkleLib {
private string nick; private string nick;
public SparkleListenerIrc (string server, string folder_identifier, public SparkleListenerIrc (string server, string folder_identifier, NotificationServerType type) :
NotificationServerType type) : base (server, folder_identifier, type) base (server, folder_identifier, type)
{ {
if (type == NotificationServerType.Own) { base.server = server;
base.server = server;
} else {
// This is SparkleShare's centralized notification service.
// Don't worry, we only use this server as a backup if you
// don't have your own. All data needed to connect is hashed and
// we don't store any personal information ever
base.server = "204.62.14.135";
}
// Try to get a uniqueish nickname // Try to get a uniqueish nickname
this.nick = SHA1 (DateTime.Now.ToString ("ffffff") + "sparkles"); this.nick = SHA1 (DateTime.Now.ToString ("ffffff") + "sparkles");
@ -52,9 +43,7 @@ namespace SparkleLib {
// with a number, so prefix an alphabetic character // with a number, so prefix an alphabetic character
this.nick = "s" + this.nick.Substring (0, 7); this.nick = "s" + this.nick.Substring (0, 7);
// Hash and salt the folder identifier, so base.channels.Add ("#" + folder_identifier);
// nobody knows any possible folder details
base.channel = "#" + SHA1 (folder_identifier + "sparkles");
this.client = new IrcClient () { this.client = new IrcClient () {
PingTimeout = 180, PingTimeout = 180,
@ -62,6 +51,7 @@ namespace SparkleLib {
}; };
this.client.OnConnected += delegate { this.client.OnConnected += delegate {
base.is_connecting = false;
OnConnected (); OnConnected ();
}; };
@ -71,16 +61,26 @@ namespace SparkleLib {
this.client.OnChannelMessage += delegate (object o, IrcEventArgs args) { this.client.OnChannelMessage += delegate (object o, IrcEventArgs args) {
string message = args.Data.Message.Trim (); string message = args.Data.Message.Trim ();
OnRemoteChange (message); string folder_id = args.Data.Channel.Substring (1); // remove the #
OnRemoteChange (new SparkleAnnouncement (folder_id, message));
}; };
} }
public override bool IsConnected {
get {
return this.client.IsConnected;
}
}
// Starts a new thread and listens to the channel // Starts a new thread and listens to the channel
public override void Connect () public override void Connect ()
{ {
SparkleHelpers.DebugInfo ("ListenerIrc", "Connecting to " + base.channel + " on " + base.server); SparkleHelpers.DebugInfo ("ListenerIrc", "Connecting to " + Server);
base.is_connecting = true;
this.thread = new Thread ( this.thread = new Thread (
new ThreadStart (delegate { new ThreadStart (delegate {
try { try {
@ -88,7 +88,9 @@ namespace SparkleLib {
// Connect, login, and join the channel // Connect, login, and join the channel
this.client.Connect (new string [] {base.server}, 6667); this.client.Connect (new string [] {base.server}, 6667);
this.client.Login (this.nick, this.nick); this.client.Login (this.nick, this.nick);
this.client.RfcJoin (base.channel);
foreach (string channel in base.channels)
this.client.RfcJoin (channel);
// List to the channel, this blocks the thread // List to the channel, this blocks the thread
this.client.Listen (); this.client.Listen ();
@ -96,7 +98,7 @@ namespace SparkleLib {
// Disconnect when we time out // Disconnect when we time out
this.client.Disconnect (); this.client.Disconnect ();
} catch (Meebey.SmartIrc4net.ConnectionException e) { } catch (Meebey.SmartIrc4net.ConnectionException e) {
SparkleHelpers.DebugInfo ("ListenerIrc", "Could not connect to " + base.channel + " on " + base.server + ": " + e.Message); SparkleHelpers.DebugInfo ("ListenerIrc", "Could not connect to " + Server + ": " + e.Message);
} }
}) })
); );
@ -105,16 +107,21 @@ namespace SparkleLib {
} }
public override void Announce (string message) public override void AlsoListenTo (string folder_identifier)
{ {
this.client.SendMessage (SendType.Message, base.channel, message); string channel = "#" + folder_identifier;
base.channels.Add (channel);
this.client.RfcJoin (channel);
} }
public override bool IsConnected { public override void Announce (SparkleAnnouncement announcement)
get { {
return this.client.IsConnected; string channel = "#" + announcement.FolderIdentifier;
} this.client.SendMessage (SendType.Message, channel, announcement.Message);
// Also announce to ourselves for debugging purposes
OnRemoteChange (announcement);
} }

View file

@ -132,7 +132,7 @@ namespace SparkleLib {
SyncDownBase (); SyncDownBase ();
} }
if (this.is_polling && !this.listener.IsConnected) if (this.is_polling && !this.listener.IsConnected && !this.listener.IsConnecting)
this.listener.Connect (); this.listener.Connect ();
if (HasUnsyncedChanges) if (HasUnsyncedChanges)
@ -242,7 +242,7 @@ namespace SparkleLib {
server_type = NotificationServerType.Central; server_type = NotificationServerType.Central;
else else
server_type = NotificationServerType.Own; server_type = NotificationServerType.Own;
this.listener = new SparkleListenerIrc (Domain, Identifier, server_type);//////////// this.listener = SparkleListenerFactory.CreateIrcListener (Domain, Identifier, server_type);
// Stop polling when the connection to the irc channel is succesful // Stop polling when the connection to the irc channel is succesful
this.listener.Connected += delegate { this.listener.Connected += delegate {
@ -263,9 +263,11 @@ namespace SparkleLib {
}; };
// Fetch changes when there is a message in the irc channel // Fetch changes when there is a message in the irc channel
this.listener.RemoteChange += delegate (string change_id) { this.listener.RemoteChange += delegate (SparkleAnnouncement announcement) {
if (!change_id.Equals (CurrentRevision) && change_id.Length == 40) { if (announcement.FolderIdentifier == Identifier &&
if ((Status != SyncStatus.SyncUp) && (Status != SyncStatus.SyncDown) && !announcement.Message.Equals (CurrentRevision)) {
if ((Status != SyncStatus.SyncUp) &&
(Status != SyncStatus.SyncDown) &&
!this.is_buffering) { !this.is_buffering) {
while (this.listener.ChangesQueue > 0) { while (this.listener.ChangesQueue > 0) {
@ -277,7 +279,8 @@ namespace SparkleLib {
}; };
// Start listening // Start listening
this.listener.Connect (); if (!this.listener.IsConnected && !this.listener.IsConnecting)
this.listener.Connect ();
} }
@ -359,7 +362,7 @@ namespace SparkleLib {
if (SyncStatusChanged != null) if (SyncStatusChanged != null)
SyncStatusChanged (SyncStatus.Idle); SyncStatusChanged (SyncStatus.Idle);
this.listener.Announce (CurrentRevision); this.listener.AnnounceBase (new SparkleAnnouncement (Identifier, CurrentRevision));
} else { } else {
SparkleHelpers.DebugInfo ("SyncUp", "[" + Name + "] Error"); SparkleHelpers.DebugInfo ("SyncUp", "[" + Name + "] Error");

View file

@ -134,7 +134,7 @@ namespace SparkleLib {
git.Start (); git.Start ();
git.WaitForExit (); git.WaitForExit ();
if (git.ExitCode == 0) { if (git.ExitCode == 0) {Console.WriteLine ("REBASING");
Rebase (); Rebase ();
return true; return true;
} else { } else {
@ -187,11 +187,6 @@ namespace SparkleLib {
} }
// Stages the made changes // Stages the made changes
private void Add () private void Add ()
{ {
@ -232,9 +227,6 @@ namespace SparkleLib {
} }
// Merges the fetched changes // Merges the fetched changes
private void Rebase () private void Rebase ()
{ {
@ -271,8 +263,6 @@ namespace SparkleLib {
} }
private void ResolveConflict () private void ResolveConflict ()
{ {
// This is al list of conflict status codes that Git uses, their // This is al list of conflict status codes that Git uses, their

View file

@ -202,22 +202,100 @@ namespace SparkleLib {
// TODO: Method needs to be made a lot faster // TODO: Method needs to be made a lot faster
public override List<SparkleChangeSet> GetChangeSets (int count) public override List<SparkleChangeSet> GetChangeSets (int count)
{ {
if (count < 1)
count = 30;
List <SparkleChangeSet> change_sets = new List <SparkleChangeSet> ();
string style_file_path = SparkleHelpers.CombineMore (LocalPath, ".hg", "log.style");
SparkleHg hg_log = new SparkleHg (LocalPath, "log --limit " + count + " --style " + style_file_path);
Console.OutputEncoding = System.Text.Encoding.Unicode;
hg_log.Start ();
// Reading the standard output HAS to go before
// WaitForExit, or it will hang forever on output > 4096 bytes
string output = hg_log.StandardOutput.ReadToEnd ();
hg_log.WaitForExit ();
string [] lines = output.Split ("\n".ToCharArray ());
List <string> entries = new List <string> ();
int j = 0;
string entry = "", last_entry = "";
foreach (string line in lines) {
if (line.StartsWith ("changeset:") && j > 0) {
entries.Add (entry);
entry = "";
}
entry += line + "\n";
j++;
last_entry = entry;
}
entries.Add (last_entry);
Regex regex = new Regex (@"changeset: ([a-z0-9]{40})\n" +
"(.+) <(.+)>\n" +
"([0-9]{4})-([0-9]{2})-([0-9]{2}) ([0-9]{2}):([0-9]{2}) .([0-9]{4})\n" +
"", RegexOptions.Compiled);
// TODO: Need to optimise for speed
foreach (string log_entry in entries) {
bool is_merge_commit = false;
Match match = regex.Match (log_entry);
if (match.Success) {
SparkleChangeSet change_set = new SparkleChangeSet (); SparkleChangeSet change_set = new SparkleChangeSet ();
change_set.Revision = "test"; change_set.Revision = match.Groups [1].Value;
change_set.UserName = "test"; change_set.UserName = match.Groups [2].Value;
change_set.UserEmail = "test"; change_set.UserEmail = match.Groups [3].Value;
change_set.IsMerge = false; change_set.IsMerge = is_merge_commit;
change_set.Timestamp = new DateTime (int.Parse (match.Groups [4].Value),
int.Parse (match.Groups [5].Value), int.Parse (match.Groups [6].Value),
int.Parse (match.Groups [7].Value), int.Parse (match.Groups [8].Value), 0);
string [] entry_lines = log_entry.Split ("\n".ToCharArray ());
foreach (string entry_line in entry_lines) {
if (entry_line.StartsWith (":")) {
string change_type = entry_line [37].ToString ();
string file_path = entry_line.Substring (39);
string to_file_path;
if (change_type.Equals ("A")) {
change_set.Added.Add (file_path);
} else if (change_type.Equals ("M")) {
change_set.Edited.Add (file_path);
} else if (change_type.Equals ("D")) {
change_set.Deleted.Add (file_path);
} else if (change_type.Equals ("R")) {
int tab_pos = entry_line.LastIndexOf ("\t");
file_path = entry_line.Substring (42, tab_pos - 42);
to_file_path = entry_line.Substring (tab_pos + 1);
change_set.MovedFrom.Add (file_path);
change_set.MovedTo.Add (to_file_path);
}
}
}
change_sets.Add (change_set);
}
}
change_set.Timestamp = DateTime.Now;
List<SparkleChangeSet> change_sets = new List<SparkleChangeSet> ();
change_sets.Add (change_set);
return change_sets; return change_sets;
} }
// Creates a pretty commit message based on what has changed // Creates a pretty commit message based on what has changed
private string FormatCommitMessage () private string FormatCommitMessage () // TODO
{ {
return "SparkleShare Hg"; return "SparkleShare Hg";
} }

View file

@ -514,15 +514,10 @@ namespace SparkleShare {
if (folder_path.Equals (SparklePaths.SparkleTmpPath)) if (folder_path.Equals (SparklePaths.SparkleTmpPath))
return; return;
Console.WriteLine (folder_path);
SparkleRepoBase repo = null; SparkleRepoBase repo = null;
if (Directory.Exists (Path.Combine (folder_path, ".git"))) { if (Directory.Exists (Path.Combine (folder_path, ".git"))) {
Console.WriteLine (folder_path + " == Git");
repo = new SparkleRepoGit (folder_path, SparkleBackend.DefaultBackend); repo = new SparkleRepoGit (folder_path, SparkleBackend.DefaultBackend);
} else if (Directory.Exists (Path.Combine (folder_path, ".hg"))) { } else if (Directory.Exists (Path.Combine (folder_path, ".hg"))) {
Console.WriteLine (folder_path + " == Hg");
SparkleBackend hg_backend = new SparkleBackend ("Hg", new string [] {"/opt/local/bin/hg"}); SparkleBackend hg_backend = new SparkleBackend ("Hg", new string [] {"/opt/local/bin/hg"});
repo = new SparkleRepoMercurial (folder_path, hg_backend); repo = new SparkleRepoMercurial (folder_path, hg_backend);
} }