Merged in CMaiku/pidgin (pull request #129)

Fri, 30 Sep 2016 09:50:21 -0500

author
Gary Kramlich <grim@reaperworld.com>
date
Fri, 30 Sep 2016 09:50:21 -0500
changeset 38070
4663f9da17aa
parent 38068
fd6805c0df15 (current diff)
parent 38069
56d191003b34 (diff)
child 38114
324b3e1b30c5
child 38149
d092152c84c9
child 38277
061e91dd78d4
child 38304
6be873b79cba

Merged in CMaiku/pidgin (pull request #129)

facebook: Port from sslconn to Gio

--- a/libpurple/protocols/facebook/facebook.c	Thu Sep 29 10:42:41 2016 -0500
+++ b/libpurple/protocols/facebook/facebook.c	Fri Sep 30 09:50:21 2016 -0500
@@ -332,8 +332,8 @@
 
 	gc = fb_data_get_connection(fata);
 
-	if (error->domain == FB_MQTT_SSL_ERROR) {
-		purple_connection_ssl_error(gc, error->code);
+	if (error->domain == G_IO_ERROR) {
+		purple_connection_g_error(gc, error);
 		return;
 	}
 
--- a/libpurple/protocols/facebook/mqtt.c	Thu Sep 29 10:42:41 2016 -0500
+++ b/libpurple/protocols/facebook/mqtt.c	Fri Sep 30 09:50:21 2016 -0500
@@ -28,7 +28,8 @@
 #include "account.h"
 #include "eventloop.h"
 #include "glibcompat.h"
-#include "sslconn.h"
+#include "purple-gio.h"
+#include "queuedoutputstream.h"
 
 #include "marshal.h"
 #include "mqtt.h"
@@ -37,17 +38,17 @@
 struct _FbMqttPrivate
 {
 	PurpleConnection *gc;
-	PurpleSslConnection *gsc;
+	GIOStream *conn;
+	GBufferedInputStream *input;
+	PurpleQueuedOutputStream *output;
+	GCancellable *cancellable;
 	gboolean connected;
 	guint16 mid;
 
 	GByteArray *rbuf;
-	GByteArray *wbuf;
 	gsize remz;
 
 	gint tev;
-	gint rev;
-	gint wev;
 };
 
 struct _FbMqttMessagePrivate
@@ -65,6 +66,8 @@
 G_DEFINE_TYPE(FbMqtt, fb_mqtt, G_TYPE_OBJECT);
 G_DEFINE_TYPE(FbMqttMessage, fb_mqtt_message, G_TYPE_OBJECT);
 
+static void fb_mqtt_read_packet(FbMqtt *mqtt);
+
 static void
 fb_mqtt_dispose(GObject *obj)
 {
@@ -73,7 +76,6 @@
 
 	fb_mqtt_close(mqtt);
 	g_byte_array_free(priv->rbuf, TRUE);
-	g_byte_array_free(priv->wbuf, TRUE);
 }
 
 static void
@@ -161,7 +163,6 @@
 	mqtt->priv = priv;
 
 	priv->rbuf = g_byte_array_new();
-	priv->wbuf = g_byte_array_new();
 }
 
 static void
@@ -205,18 +206,6 @@
 	return q;
 }
 
-GQuark
-fb_mqtt_ssl_error_quark(void)
-{
-	static GQuark q = 0;
-
-	if (G_UNLIKELY(q == 0)) {
-		q = g_quark_from_static_string("fb-mqtt-ssl-error-quark");
-	}
-
-	return q;
-}
-
 FbMqtt *
 fb_mqtt_new(PurpleConnection *gc)
 {
@@ -240,33 +229,47 @@
 	g_return_if_fail(FB_IS_MQTT(mqtt));
 	priv = mqtt->priv;
 
-	if (priv->wev > 0) {
-		purple_input_remove(priv->wev);
-		priv->wev = 0;
-	}
-
-	if (priv->rev > 0) {
-		purple_input_remove(priv->rev);
-		priv->rev = 0;
-	}
-
 	if (priv->tev > 0) {
 		purple_timeout_remove(priv->tev);
 		priv->tev = 0;
 	}
 
-	if (priv->gsc != NULL) {
-		purple_ssl_close(priv->gsc);
-		priv->gsc = NULL;
+	if (priv->cancellable != NULL) {
+		g_cancellable_cancel(priv->cancellable);
+		g_clear_object(&priv->cancellable);
 	}
 
-	if (priv->wbuf->len > 0) {
-		fb_util_debug_warning("Closing with unwritten data");
+	if (priv->conn != NULL) {
+		purple_gio_graceful_close(priv->conn,
+				G_INPUT_STREAM(priv->input),
+				G_OUTPUT_STREAM(priv->output));
+		g_clear_object(&priv->input);
+		g_clear_object(&priv->output);
+		g_clear_object(&priv->conn);
 	}
 
 	priv->connected = FALSE;
 	g_byte_array_set_size(priv->rbuf, 0);
-	g_byte_array_set_size(priv->wbuf, 0);
+}
+
+static void
+fb_mqtt_take_error(FbMqtt *mqtt, GError *err, const gchar *prefix)
+{
+	if (g_error_matches(err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+		/* Return as cancelled means the connection is closing */
+		g_error_free(err);
+		return;
+	}
+
+	/* Now we can check for programming errors */
+	g_return_if_fail(FB_IS_MQTT(mqtt));
+
+	if (prefix != NULL) {
+		g_prefix_error(&err, "%s: ", prefix);
+	}
+
+	g_signal_emit_by_name(mqtt, "error", err);
+	g_error_free(err);
 }
 
 void
@@ -344,74 +347,130 @@
 }
 
 static void
-fb_mqtt_cb_read(gpointer data, gint fd, PurpleInputCondition cond)
+fb_mqtt_cb_fill(GObject *source, GAsyncResult *res, gpointer data)
+{
+	GBufferedInputStream *input = G_BUFFERED_INPUT_STREAM(source);
+	FbMqtt *mqtt = data;
+	gssize ret;
+	GError *err = NULL;
+
+	ret = g_buffered_input_stream_fill_finish(input, res, &err);
+
+	if (ret < 1) {
+		if (ret == 0) {
+			err = g_error_new_literal(G_IO_ERROR,
+					G_IO_ERROR_CONNECTION_CLOSED,
+					_("Connection closed"));
+		}
+
+		fb_mqtt_take_error(mqtt, err, _("Failed to read fixed header"));
+		return;
+	}
+
+	fb_mqtt_read_packet(mqtt);
+}
+
+static void
+fb_mqtt_cb_read_packet(GObject *source, GAsyncResult *res, gpointer data)
 {
 	FbMqtt *mqtt = data;
+	FbMqttPrivate *priv;
+	gssize ret;
 	FbMqttMessage *msg;
-	FbMqttPrivate *priv = mqtt->priv;
-	gint res;
-	guint mult;
-	guint8 buf[1024];
-	guint8 byte;
-	gsize size;
-	gssize rize;
+	GError *err = NULL;
 
-	if (priv->remz < 1) {
-		/* Reset the read buffer */
-		g_byte_array_set_size(priv->rbuf, 0);
+	ret = g_input_stream_read_finish(G_INPUT_STREAM(source), res, &err);
 
-		res = purple_ssl_read(priv->gsc, &byte, sizeof byte);
-		g_byte_array_append(priv->rbuf, &byte, sizeof byte);
-
-		if (res != sizeof byte) {
-			fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
-			              _("Failed to read fixed header"));
-			return;
+	if (ret < 1) {
+		if (ret == 0) {
+			err = g_error_new_literal(G_IO_ERROR,
+					G_IO_ERROR_CONNECTION_CLOSED,
+					_("Connection closed"));
 		}
 
-		mult = 1;
-
-		do {
-			res = purple_ssl_read(priv->gsc, &byte, sizeof byte);
-			g_byte_array_append(priv->rbuf, &byte, sizeof byte);
+		fb_mqtt_take_error(mqtt, err, _("Failed to read packet data"));
+		return;
+	}
 
-			if (res != sizeof byte) {
-				fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
-				              _("Failed to read packet size"));
-				return;
-			}
-
-			priv->remz += (byte & 127) * mult;
-			mult *= 128;
-		} while ((byte & 128) != 0);
-	}
+	priv = mqtt->priv;
+	priv->remz -= ret;
 
 	if (priv->remz > 0) {
-		size = MIN(priv->remz, sizeof buf);
-		rize = purple_ssl_read(priv->gsc, buf, size);
+		g_input_stream_read_async(G_INPUT_STREAM(source),
+				priv->rbuf->data +
+				priv->rbuf->len - priv->remz, priv->remz,
+				G_PRIORITY_DEFAULT, priv->cancellable,
+				fb_mqtt_cb_read_packet, mqtt);
+		return;
+	}
 
-		if (rize < 1) {
-			fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
-			              _("Failed to read packet data"));
-			return;
-		}
+	msg = fb_mqtt_message_new_bytes(priv->rbuf);
 
-		g_byte_array_append(priv->rbuf, buf, rize);
-		priv->remz -= rize;
+	if (G_UNLIKELY(msg == NULL)) {
+		fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
+		              _("Failed to parse message"));
+		return;
 	}
 
-	if (priv->remz < 1) {
-		msg = fb_mqtt_message_new_bytes(priv->rbuf);
-		priv->remz = 0;
+	fb_mqtt_read(mqtt, msg);
+	g_object_unref(msg);
+
+	/* Read another packet if connection wasn't reset in fb_mqtt_read() */
+	if (fb_mqtt_connected(mqtt, FALSE)) {
+		fb_mqtt_read_packet(mqtt);
+	}
+}
 
-		if (G_UNLIKELY(msg == NULL)) {
-			fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
-			              _("Failed to parse message"));
-			return;
+static void
+fb_mqtt_read_packet(FbMqtt *mqtt)
+{
+	FbMqttPrivate *priv = mqtt->priv;
+	const guint8 const *buf;
+	gsize count = 0;
+	gsize pos;
+	guint mult = 1;
+	guint8 byte;
+	gsize size = 0;
+
+	buf = g_buffered_input_stream_peek_buffer(priv->input, &count);
+
+	/* Start at 1 to skip the first byte */
+	pos = 1;
+
+	do {
+		if (pos >= count) {
+			/* Not enough data yet, try again later */
+			size = 0;
+			break;
 		}
 
-		fb_mqtt_read(mqtt, msg);
-		g_object_unref(msg);
+		byte = *(buf + pos++);
+
+		size += (byte & 127) * mult;
+		mult *= 128;
+	} while ((byte & 128) != 0);
+
+	if (size > 0) {
+		/* Add header to size */
+		size += pos;
+
+		g_byte_array_set_size(priv->rbuf, size);
+		priv->remz = size;
+
+		/* TODO: Use g_input_stream_read_all_async() when available. */
+		/* TODO: Alternately, it would be nice to let the
+		 * FbMqttMessage directly use the GBufferedInputStream
+		 * buffer instead of copying it, provided it's consumed
+		 * before the next read.
+		 */
+		g_input_stream_read_async(G_INPUT_STREAM(priv->input),
+				priv->rbuf->data, priv->rbuf->len,
+				G_PRIORITY_DEFAULT, priv->cancellable,
+				fb_mqtt_cb_read_packet, mqtt);
+	} else {
+		g_buffered_input_stream_fill_async(priv->input, -1,
+				G_PRIORITY_DEFAULT, priv->cancellable,
+				fb_mqtt_cb_fill, mqtt);
 	}
 }
 
@@ -519,27 +578,16 @@
 }
 
 static void
-fb_mqtt_cb_write(gpointer data, gint fd, PurpleInputCondition cond)
+fb_mqtt_cb_flush(GObject *source, GAsyncResult *res, gpointer data)
 {
 	FbMqtt *mqtt = data;
-	FbMqttPrivate *priv = mqtt->priv;
-	gssize wize;
+	GError *err = NULL;
 
-	wize = purple_ssl_write(priv->gsc, priv->wbuf->data, priv->wbuf->len);
-
-	if (wize < 0) {
-		fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
-		              _("Failed to write data"));
+	if (!g_output_stream_flush_finish(G_OUTPUT_STREAM(source),
+			res, &err)) {
+		fb_mqtt_take_error(mqtt, err, _("Failed to write data"));
 		return;
 	}
-
-	if (wize > 0) {
-		g_byte_array_remove_range(priv->wbuf, 0, wize);
-	}
-
-	if (priv->wbuf->len < 1) {
-		priv->wev = 0;
-	}
 }
 
 void
@@ -548,6 +596,7 @@
 	const GByteArray *bytes;
 	FbMqttMessagePrivate *mriv;
 	FbMqttPrivate *priv;
+	GBytes *gbytes;
 
 	g_return_if_fail(FB_IS_MQTT(mqtt));
 	g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
@@ -566,46 +615,46 @@
 	                      "Writing %d (flags: 0x%0X)",
 		              mriv->type, mriv->flags);
 
-	g_byte_array_append(priv->wbuf, bytes->data, bytes->len);
-	fb_mqtt_cb_write(mqtt, priv->gsc->fd, PURPLE_INPUT_WRITE);
+ 	/* TODO: Would be nice to refactor this to not require copying bytes */
+	gbytes = g_bytes_new(bytes->data, bytes->len);
+	purple_queued_output_stream_push_bytes(priv->output, gbytes);
+	g_bytes_unref(gbytes);
 
-	if (priv->wev > 0) {
-		priv->wev = purple_input_add(priv->gsc->fd,
-		                             PURPLE_INPUT_WRITE,
-		                             fb_mqtt_cb_write, mqtt);
+	if (!g_output_stream_has_pending(G_OUTPUT_STREAM(priv->output))) {
+		g_output_stream_flush_async(G_OUTPUT_STREAM(priv->output),
+				G_PRIORITY_DEFAULT, priv->cancellable,
+				fb_mqtt_cb_flush, mqtt);
 	}
 }
 
 static void
-fb_mqtt_cb_open(gpointer data, PurpleSslConnection *ssl,
-                PurpleInputCondition cond)
+fb_mqtt_cb_open(GObject *source, GAsyncResult *res, gpointer data)
 {
 	FbMqtt *mqtt = data;
-	FbMqttPrivate *priv = mqtt->priv;
+	FbMqttPrivate *priv;
+	GSocketConnection *conn;
+	GError *err = NULL;
+
+	conn = g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(source),
+			res, &err);
+
+	if (conn == NULL) {
+		fb_mqtt_take_error(mqtt, err, NULL);
+		return;
+	}
 
 	fb_mqtt_timeout_clear(mqtt);
-	priv->rev = purple_input_add(priv->gsc->fd, PURPLE_INPUT_READ,
-	                             fb_mqtt_cb_read, mqtt);
-	g_signal_emit_by_name(mqtt, "open");
-}
 
-static void
-fb_mqtt_cb_open_error(PurpleSslConnection *ssl, PurpleSslErrorType error,
-                      gpointer data)
-{
-	const gchar *str;
-	FbMqtt *mqtt = data;
-	FbMqttPrivate *priv = mqtt->priv;
-	GError *err;
+	priv = mqtt->priv;
+	priv->conn = G_IO_STREAM(conn);
+	priv->input = G_BUFFERED_INPUT_STREAM(g_buffered_input_stream_new(
+			g_io_stream_get_input_stream(priv->conn)));
+	priv->output = purple_queued_output_stream_new(
+			g_io_stream_get_output_stream(priv->conn));
 
-	str = purple_ssl_strerror(error);
-	err = g_error_new_literal(FB_MQTT_SSL_ERROR, error, str);
+	fb_mqtt_read_packet(mqtt);
 
-	/* Do not call purple_ssl_close() from the error_func */
-	priv->gsc = NULL;
-
-	g_signal_emit_by_name(mqtt, "error", err);
-	g_error_free(err);
+	g_signal_emit_by_name(mqtt, "open");
 }
 
 void
@@ -613,20 +662,29 @@
 {
 	FbMqttPrivate *priv;
 	PurpleAccount *acc;
+	GSocketClient *client;
+	GError *err = NULL;
 
 	g_return_if_fail(FB_IS_MQTT(mqtt));
 	priv = mqtt->priv;
 
 	acc = purple_connection_get_account(priv->gc);
 	fb_mqtt_close(mqtt);
-	priv->gsc = purple_ssl_connect(acc, host, port, fb_mqtt_cb_open,
-	                               fb_mqtt_cb_open_error, mqtt);
+
+	client = purple_gio_socket_client_new(acc, &err);
 
-	if (priv->gsc == NULL) {
-		fb_mqtt_cb_open_error(NULL, 0, mqtt);
+	if (client == NULL) {
+		fb_mqtt_take_error(mqtt, err, NULL);
 		return;
 	}
 
+	priv->cancellable = g_cancellable_new();
+
+	g_socket_client_set_tls(client, TRUE);
+	g_socket_client_connect_to_host_async(client, host, port,
+			priv->cancellable, fb_mqtt_cb_open, mqtt);
+	g_object_unref(client);
+
 	fb_mqtt_timeout(mqtt);
 }
 
@@ -662,7 +720,7 @@
 
 	g_return_val_if_fail(FB_IS_MQTT(mqtt), FALSE);
 	priv = mqtt->priv;
-	connected = (priv->gsc != NULL) && priv->connected;
+	connected = (priv->conn != NULL) && priv->connected;
 
 	if (!connected && error) {
 		fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
--- a/libpurple/protocols/facebook/mqtt.h	Thu Sep 29 10:42:41 2016 -0500
+++ b/libpurple/protocols/facebook/mqtt.h	Fri Sep 30 09:50:21 2016 -0500
@@ -107,13 +107,6 @@
  */
 #define FB_MQTT_ERROR  fb_mqtt_error_quark()
 
-/**
- * FB_MQTT_SSL_ERROR:
- *
- * The #GQuark of the domain of MQTT SSL errors.
- */
-#define FB_MQTT_SSL_ERROR  fb_mqtt_ssl_error_quark()
-
 typedef struct _FbMqtt FbMqtt;
 typedef struct _FbMqttClass FbMqttClass;
 typedef struct _FbMqttPrivate FbMqttPrivate;
@@ -298,16 +291,6 @@
 fb_mqtt_error_quark(void);
 
 /**
- * fb_mqtt_ssl_error_quark:
- *
- * Gets the #GQuark of the domain of MQTT SSL errors.
- *
- * Returns: The #GQuark of the domain.
- */
-GQuark
-fb_mqtt_ssl_error_quark(void);
-
-/**
  * fb_mqtt_new:
  * @gc: The #PurpleConnection.
  *

mercurial