Implement XEP-0198: Stream management xep-0198

Wed, 09 May 2018 10:01:34 +0300

author
defanor <defanor@uberspace.net>
date
Wed, 09 May 2018 10:01:34 +0300
branch
xep-0198
changeset 39094
5d298b8113ef
parent 38826
8a80075b2dc6
child 39095
d9fb456d78ae
child 39096
4b6b3cd601ed

Implement XEP-0198: Stream management

Only the acks part (including resending on reconnect) for now, no
resumption, no XEP-0203 (Delayed Delivery), and no user notifications
about unacknowledged stanzas.

libpurple/protocols/jabber/Makefile.am file | annotate | diff | comparison | revisions
libpurple/protocols/jabber/jabber.c file | annotate | diff | comparison | revisions
libpurple/protocols/jabber/jabber.h file | annotate | diff | comparison | revisions
libpurple/protocols/jabber/namespaces.h file | annotate | diff | comparison | revisions
libpurple/protocols/jabber/stream_management.c file | annotate | diff | comparison | revisions
libpurple/protocols/jabber/stream_management.h file | annotate | diff | comparison | revisions
--- a/libpurple/protocols/jabber/Makefile.am	Thu Dec 28 21:08:56 2017 -0600
+++ b/libpurple/protocols/jabber/Makefile.am	Wed May 09 10:01:34 2018 +0300
@@ -80,6 +80,8 @@
 			  roster.h \
 			  si.c \
 			  si.h \
+			  stream_management.c \
+			  stream_management.h \
 			  useravatar.c \
 			  useravatar.h \
 			  usermood.c \
--- a/libpurple/protocols/jabber/jabber.c	Thu Dec 28 21:08:56 2017 -0600
+++ b/libpurple/protocols/jabber/jabber.c	Wed May 09 10:01:34 2018 +0300
@@ -68,6 +68,7 @@
 #include "xdata.h"
 #include "pep.h"
 #include "adhoccommands.h"
+#include "stream_management.h"
 
 #include "jingle/jingle.h"
 #include "jingle/rtp.h"
@@ -312,6 +313,13 @@
 		jabber_stream_set_state(js, JABBER_STREAM_AUTHENTICATING);
 		jabber_auth_start_old(js);
 	}
+
+	/* Stream management */
+	if (xmlnode_get_child_with_namespace(packet, "sm", NS_STREAM_MANAGEMENT)
+	    && (js->sm_state == SM_DISABLED) )
+	{
+		jabber_sm_enable(js);
+	}
 }
 
 static void jabber_stream_handle_error(JabberStream *js, xmlnode *packet)
@@ -340,6 +348,8 @@
 	name = (*packet)->name;
 	xmlns = xmlnode_get_namespace(*packet);
 
+	jabber_sm_inbound(js, *packet);
+
 	if(purple_strequal((*packet)->name, "iq")) {
 		jabber_iq_parse(js, *packet);
 	} else if(purple_strequal((*packet)->name, "presence")) {
@@ -370,6 +380,8 @@
 				tls_init(js);
 			/* TODO: Handle <failure/>, I guess? */
 		}
+	} else if (purple_strequal(xmlns, NS_STREAM_MANAGEMENT)) {
+		jabber_sm_process_packet(js, *packet);
 	} else {
 		purple_debug_warning("jabber", "Unknown packet: %s\n", (*packet)->name);
 	}
@@ -583,6 +595,28 @@
 	return (len < 0 ? (int)strlen(buf) : len);
 }
 
+/* Checks whether a packet may be a stanza (not strictly: returns TRUE
+   if there's no namespace, assuming that the default namespace is
+   jabber:client or jabber:server). */
+gboolean
+jabber_is_stanza(xmlnode *packet) {
+	const char *name;
+	const char *xmlns;
+
+	g_return_val_if_fail(packet != NULL, FALSE);
+	g_return_val_if_fail(packet->name != NULL, FALSE);
+
+	name = packet->name;
+	xmlns = xmlnode_get_namespace(packet);
+
+	return ((purple_strequal(name, "message")
+	         || purple_strequal(name, "iq")
+	         || purple_strequal(name, "presence"))
+	        && ((xmlns == NULL)
+	            || purple_strequal(xmlns, NS_XMPP_CLIENT)
+	            || purple_strequal(xmlns, NS_XMPP_SERVER)));
+}
+
 void jabber_send_signal_cb(PurpleConnection *pc, xmlnode **packet,
                            gpointer unused)
 {
@@ -601,13 +635,13 @@
 		return;
 
 	if (js->bosh)
-		if (purple_strequal((*packet)->name, "message") ||
-				purple_strequal((*packet)->name, "iq") ||
-				purple_strequal((*packet)->name, "presence"))
+		if (jabber_is_stanza(*packet))
 			xmlnode_set_namespace(*packet, NS_XMPP_CLIENT);
 	txt = xmlnode_to_str(*packet, &len);
 	jabber_send_raw(js, txt, len);
 	g_free(txt);
+
+	jabber_sm_outbound(js, *packet);
 }
 
 void jabber_send(JabberStream *js, xmlnode *packet)
@@ -1015,6 +1049,8 @@
 	if (purple_presence_is_idle(presence))
 		js->idle = purple_presence_get_idle_time(presence);
 
+	js->sm_state = SM_DISABLED;
+
 	return js;
 }
 
@@ -1597,8 +1633,10 @@
 
 	if (js->bosh)
 		jabber_bosh_connection_close(js->bosh);
-	else if ((js->gsc && js->gsc->fd > 0) || js->fd > 0)
+	else if ((js->gsc && js->gsc->fd > 0) || js->fd > 0) {
+		jabber_sm_ack_send(js);
 		jabber_send_raw(js, "</stream:stream>", -1);
+	}
 
 	if (js->srv_query_data)
 		purple_srv_cancel(js->srv_query_data);
@@ -3896,12 +3934,15 @@
 	jabber_si_init();
 
 	jabber_auth_init();
+
+	jabber_sm_init();
 }
 
 static void
 jabber_do_uninit(void)
 {
 	/* reverse order of jabber_do_init */
+	jabber_sm_uninit();
 	jabber_bosh_uninit();
 	jabber_data_uninit();
 	jabber_si_uninit();
--- a/libpurple/protocols/jabber/jabber.h	Thu Dec 28 21:08:56 2017 -0600
+++ b/libpurple/protocols/jabber/jabber.h	Wed May 09 10:01:34 2018 +0300
@@ -49,6 +49,8 @@
 	JABBER_CAP_ITEMS          = 1 << 14,
 	JABBER_CAP_ROSTER_VERSIONING = 1 << 15,
 
+	JABBER_CAP_STREAM_MANAGEMENT = 1 << 16,
+
 	JABBER_CAP_RETRIEVED      = 1 << 31
 } JabberCapabilities;
 
@@ -96,6 +98,12 @@
 	JABBER_STREAM_CONNECTED
 } JabberStreamState;
 
+typedef enum {
+	SM_DISABLED,
+	SM_REQUESTED,
+	SM_ENABLED
+} JabberStreamManagementState;
+
 struct _JabberStream
 {
 	int fd;
@@ -282,6 +290,12 @@
 	gchar *google_relay_host;
 	GList *google_relay_requests; /* the HTTP requests to get */
 												/* relay info */
+
+	/* XEP-0198 (stream management) state */
+	guint32 sm_outbound_count;
+	guint32 sm_inbound_count;
+	guint32 sm_outbound_confirmed;
+	JabberStreamManagementState sm_state;
 };
 
 typedef gboolean (JabberFeatureEnabled)(JabberStream *js, const gchar *namespace);
@@ -314,6 +328,8 @@
  */
 extern GList *jabber_identities;
 
+gboolean jabber_is_stanza(xmlnode *packet);
+
 void jabber_stream_features_parse(JabberStream *js, xmlnode *packet);
 void jabber_process_packet(JabberStream *js, xmlnode **packet);
 void jabber_send(JabberStream *js, xmlnode *data);
--- a/libpurple/protocols/jabber/namespaces.h	Thu Dec 28 21:08:56 2017 -0600
+++ b/libpurple/protocols/jabber/namespaces.h	Wed May 09 10:01:34 2018 +0300
@@ -26,6 +26,7 @@
 
 #define NS_XMPP_BIND "urn:ietf:params:xml:ns:xmpp-bind"
 #define NS_XMPP_CLIENT "jabber:client"
+#define NS_XMPP_SERVER "jabber:server"
 #define NS_XMPP_SASL "urn:ietf:params:xml:ns:xmpp-sasl"
 #define NS_XMPP_SESSION "urn:ietf:params:xml:ns:xmpp-session"
 #define NS_XMPP_STANZAS "urn:ietf:params:xml:ns:xmpp-stanzas"
@@ -70,6 +71,9 @@
 /* XEP-0191 Simple Communications Blocking */
 #define NS_SIMPLE_BLOCKING "urn:xmpp:blocking"
 
+/* XEP-0198 Stream Management */
+#define NS_STREAM_MANAGEMENT "urn:xmpp:sm:3"
+
 /* XEP-0199 Ping */
 #define NS_PING "urn:xmpp:ping"
 
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libpurple/protocols/jabber/stream_management.c	Wed May 09 10:01:34 2018 +0300
@@ -0,0 +1,241 @@
+/*
+ * purple - Jabber Protocol Plugin
+ *
+ * Purple is the legal property of its developers, whose names are too numerous
+ * to list here.  Please refer to the COPYRIGHT file distributed with this
+ * source distribution.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02111-1301  USA
+ *
+ */
+
+#include "internal.h"
+
+#include <glib.h>
+#include "namespaces.h"
+#include "xmlnode.h"
+#include "jabber.h"
+#include "debug.h"
+#include "notify.h"
+#include "stream_management.h"
+
+#define MAX_QUEUE_LENGTH 10000
+
+GHashTable *jabber_sm_accounts;
+
+static void
+jabber_sm_accounts_queue_free(gpointer q)
+{
+	g_queue_free_full(q, (GDestroyNotify)xmlnode_free);
+}
+
+/* Returns a queue for a JabberStream's account (based on JID),
+   creates it if there's none. */
+static GQueue *
+jabber_sm_accounts_queue_get(JabberStream *js)
+{
+	GQueue *queue;
+	gchar *jid = jabber_id_get_bare_jid(js->user);
+	if (g_hash_table_contains(jabber_sm_accounts, jid) == TRUE) {
+		queue = g_hash_table_lookup(jabber_sm_accounts, jid);
+		g_free(jid);
+	} else {
+		queue = g_queue_new();
+		g_hash_table_insert(jabber_sm_accounts, jid, queue);
+	}
+	return queue;
+}
+
+void
+jabber_sm_init(void)
+{
+	jabber_sm_accounts = g_hash_table_new_full(g_str_hash, g_str_equal, free,
+	                                           jabber_sm_accounts_queue_free);
+}
+
+void
+jabber_sm_uninit(void)
+{
+	g_hash_table_destroy(jabber_sm_accounts);
+}
+
+/* Processes incoming NS_STREAM_MANAGEMENT packets. */
+void
+jabber_sm_process_packet(JabberStream *js, xmlnode *packet) {
+	gchar *jid;
+	const char *name = packet->name;
+	if (purple_strequal(name, "enabled")) {
+		purple_debug_info("XEP-0198", "Stream management is enabled\n");
+		js->sm_inbound_count = 0;
+		js->sm_state = SM_ENABLED;
+	} else if (purple_strequal(name, "failed")) {
+		purple_debug_error("XEP-0198", "Failed to enable stream management\n");
+		js->sm_state = SM_DISABLED;
+		jid = jabber_id_get_bare_jid(js->user);
+		g_hash_table_remove(jabber_sm_accounts, jid);
+		g_free(jid);
+	} else if (purple_strequal(name, "r")) {
+		jabber_sm_ack_send(js);
+	} else if (purple_strequal(name, "a")) {
+		jabber_sm_ack_read(js, packet);
+	} else {
+		purple_debug_error("XEP-0198", "Unknown packet: %s\n", name);
+	}
+}
+
+/* Sends an acknowledgement. */
+void
+jabber_sm_ack_send(JabberStream *js)
+{
+	xmlnode *ack;
+	char *ack_h;
+	if (js->sm_state != SM_ENABLED) {
+		return;
+	}
+	ack = xmlnode_new("a");
+	ack_h = g_strdup_printf("%u", js->sm_inbound_count);
+	xmlnode_set_namespace(ack, NS_STREAM_MANAGEMENT);
+	xmlnode_set_attrib(ack, "h", ack_h);
+	jabber_send(js, ack);
+	xmlnode_free(ack);
+	g_free(ack_h);
+}
+
+/* Reads acknowledgements, removes queued stanzas. */
+void
+jabber_sm_ack_read(JabberStream *js, xmlnode *packet)
+{
+	guint32 i;
+	guint32 h;
+	GQueue *queue;
+	xmlnode *stanza;
+	const char *ack_h = xmlnode_get_attrib(packet, "h");
+	if (ack_h == NULL) {
+		purple_debug_error("XEP-0198",
+		                   "The 'h' attribute is not defined for an answer.");
+		return;
+	}
+	h = strtoul(ack_h, NULL, 10);
+
+	/* Remove stanzas from the queue */
+	queue = jabber_sm_accounts_queue_get(js);
+	for (i = js->sm_outbound_confirmed; i < h; i++) {
+		stanza = g_queue_pop_head(queue);
+		if (stanza == NULL) {
+			purple_debug_error("XEP-0198", "The queue is empty\n");
+			break;
+		}
+		xmlnode_free(stanza);
+	}
+
+	js->sm_outbound_confirmed = h;
+	purple_debug_info("XEP-0198",
+	                  "Acknowledged %u out of %u outbound stanzas\n",
+	                  js->sm_outbound_confirmed, js->sm_outbound_count);
+}
+
+/* Asks a server to enable stream management, resends queued
+   stanzas. */
+void
+jabber_sm_enable(JabberStream *js)
+{
+	xmlnode *enable;
+	xmlnode *stanza;
+	GQueue *queue;
+	guint queue_len;
+	guint i;
+	js->server_caps |= JABBER_CAP_STREAM_MANAGEMENT;
+	purple_debug_info("XEP-0198", "Enabling stream management\n");
+	enable = xmlnode_new("enable");
+	xmlnode_set_namespace(enable, NS_STREAM_MANAGEMENT);
+	jabber_send(js, enable);
+	xmlnode_free(enable);
+	js->sm_outbound_count = 0;
+	js->sm_outbound_confirmed = 0;
+	js->sm_state = SM_REQUESTED;
+
+	/* Resend unacknowledged stanzas from the queue. */
+	queue = jabber_sm_accounts_queue_get(js);
+	queue_len = g_queue_get_length(queue);
+	if (queue_len > 0) {
+		purple_debug_info("XEP-0198", "Resending %u stanzas\n", queue_len);
+	}
+	for (i = 0; i < queue_len; i++) {
+		stanza = g_queue_pop_head(queue);
+		jabber_send(js, stanza);
+		xmlnode_free(stanza);
+	}
+}
+
+/* Tracks outbound stanzas, stores those into a queue, requests
+   acknowledgements. */
+void
+jabber_sm_outbound(JabberStream *js, xmlnode *packet)
+{
+	if (jabber_is_stanza(packet) && js->sm_state != SM_DISABLED) {
+		/* Counting stanzas even if there's no confirmation that SM is
+		   enabled yet, so that we won't miss any. */
+
+		/* Add this stanza to the queue, unless the queue is full. */
+		xmlnode *stanza;
+		xmlnode *req;
+		GQueue *queue = jabber_sm_accounts_queue_get(js);
+		if (g_queue_get_length(queue) < MAX_QUEUE_LENGTH) {
+			stanza = xmlnode_copy(packet);
+			g_queue_push_tail(queue, stanza);
+			if (g_queue_get_length(queue) == MAX_QUEUE_LENGTH) {
+				gchar *jid;
+				gchar *queue_is_full_message;
+				jid = jabber_id_get_bare_jid(js->user);
+				queue_is_full_message =
+					g_strdup_printf(
+						_("The queue for %s has reached its maximum length of %u."),
+						jid, MAX_QUEUE_LENGTH);
+				purple_debug_warning("XEP-0198",
+				                     "Stanza queue for %s is full (%u stanzas).\n",
+				                     jid, MAX_QUEUE_LENGTH);
+				g_free(jid);
+				purple_notify_formatted(js->gc, _("XMPP stream management"),
+				                        _("Stanza queue is full"),
+				                        _("No further messages will be queued"),
+				                        queue_is_full_message,
+				                        NULL, NULL);
+				g_free(queue_is_full_message);
+			}
+		}
+
+		/* Count the stanza */
+		js->sm_outbound_count++;
+
+		/* Requesting acknowledgements with either SM_REQUESTED or
+		   SM_ENABLED state as well, so that it would be harder to lose
+		   stanzas. */
+		req = xmlnode_new("r");
+		xmlnode_set_namespace(req, NS_STREAM_MANAGEMENT);
+		jabber_send(js, req);
+		xmlnode_free(req);
+	}
+}
+
+/* Counts inbound stanzas. */
+void
+jabber_sm_inbound(JabberStream *js, xmlnode *packet)
+{
+	/* Count stanzas for XEP-0198, excluding stream management
+	   packets. */
+	if (jabber_is_stanza(packet)) {
+		js->sm_inbound_count++;
+	}
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libpurple/protocols/jabber/stream_management.h	Wed May 09 10:01:34 2018 +0300
@@ -0,0 +1,36 @@
+/**
+ * @file stream_management.h XEP-0198
+ *
+ * purple
+ *
+ * Purple is the legal property of its developers, whose names are too numerous
+ * to list here.  Please refer to the COPYRIGHT file distributed with this
+ * source distribution.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02111-1301  USA
+ */
+
+
+void jabber_sm_init(void);
+void jabber_sm_uninit(void);
+
+void jabber_sm_enable(JabberStream *js);
+void jabber_sm_process_packet(JabberStream *js, xmlnode *packet);
+
+void jabber_sm_ack_send(JabberStream *js);
+void jabber_sm_ack_read(JabberStream *js, xmlnode *packet);
+
+void jabber_sm_outbound(JabberStream *js, xmlnode *packet);
+void jabber_sm_inbound(JabberStream *js, xmlnode *packet);

mercurial