Fri, 30 Sep 2016 09:50:21 -0500
Merged in CMaiku/pidgin (pull request #129)
facebook: Port from sslconn to Gio
--- a/libpurple/protocols/facebook/facebook.c Thu Sep 29 10:42:41 2016 -0500 +++ b/libpurple/protocols/facebook/facebook.c Fri Sep 30 09:50:21 2016 -0500 @@ -332,8 +332,8 @@ gc = fb_data_get_connection(fata); - if (error->domain == FB_MQTT_SSL_ERROR) { - purple_connection_ssl_error(gc, error->code); + if (error->domain == G_IO_ERROR) { + purple_connection_g_error(gc, error); return; }
--- a/libpurple/protocols/facebook/mqtt.c Thu Sep 29 10:42:41 2016 -0500 +++ b/libpurple/protocols/facebook/mqtt.c Fri Sep 30 09:50:21 2016 -0500 @@ -28,7 +28,8 @@ #include "account.h" #include "eventloop.h" #include "glibcompat.h" -#include "sslconn.h" +#include "purple-gio.h" +#include "queuedoutputstream.h" #include "marshal.h" #include "mqtt.h" @@ -37,17 +38,17 @@ struct _FbMqttPrivate { PurpleConnection *gc; - PurpleSslConnection *gsc; + GIOStream *conn; + GBufferedInputStream *input; + PurpleQueuedOutputStream *output; + GCancellable *cancellable; gboolean connected; guint16 mid; GByteArray *rbuf; - GByteArray *wbuf; gsize remz; gint tev; - gint rev; - gint wev; }; struct _FbMqttMessagePrivate @@ -65,6 +66,8 @@ G_DEFINE_TYPE(FbMqtt, fb_mqtt, G_TYPE_OBJECT); G_DEFINE_TYPE(FbMqttMessage, fb_mqtt_message, G_TYPE_OBJECT); +static void fb_mqtt_read_packet(FbMqtt *mqtt); + static void fb_mqtt_dispose(GObject *obj) { @@ -73,7 +76,6 @@ fb_mqtt_close(mqtt); g_byte_array_free(priv->rbuf, TRUE); - g_byte_array_free(priv->wbuf, TRUE); } static void @@ -161,7 +163,6 @@ mqtt->priv = priv; priv->rbuf = g_byte_array_new(); - priv->wbuf = g_byte_array_new(); } static void @@ -205,18 +206,6 @@ return q; } -GQuark -fb_mqtt_ssl_error_quark(void) -{ - static GQuark q = 0; - - if (G_UNLIKELY(q == 0)) { - q = g_quark_from_static_string("fb-mqtt-ssl-error-quark"); - } - - return q; -} - FbMqtt * fb_mqtt_new(PurpleConnection *gc) { @@ -240,33 +229,47 @@ g_return_if_fail(FB_IS_MQTT(mqtt)); priv = mqtt->priv; - if (priv->wev > 0) { - purple_input_remove(priv->wev); - priv->wev = 0; - } - - if (priv->rev > 0) { - purple_input_remove(priv->rev); - priv->rev = 0; - } - if (priv->tev > 0) { purple_timeout_remove(priv->tev); priv->tev = 0; } - if (priv->gsc != NULL) { - purple_ssl_close(priv->gsc); - priv->gsc = NULL; + if (priv->cancellable != NULL) { + g_cancellable_cancel(priv->cancellable); + g_clear_object(&priv->cancellable); } - if (priv->wbuf->len > 0) { - fb_util_debug_warning("Closing with unwritten data"); + if (priv->conn != NULL) { + purple_gio_graceful_close(priv->conn, + G_INPUT_STREAM(priv->input), + G_OUTPUT_STREAM(priv->output)); + g_clear_object(&priv->input); + g_clear_object(&priv->output); + g_clear_object(&priv->conn); } priv->connected = FALSE; g_byte_array_set_size(priv->rbuf, 0); - g_byte_array_set_size(priv->wbuf, 0); +} + +static void +fb_mqtt_take_error(FbMqtt *mqtt, GError *err, const gchar *prefix) +{ + if (g_error_matches(err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + /* Return as cancelled means the connection is closing */ + g_error_free(err); + return; + } + + /* Now we can check for programming errors */ + g_return_if_fail(FB_IS_MQTT(mqtt)); + + if (prefix != NULL) { + g_prefix_error(&err, "%s: ", prefix); + } + + g_signal_emit_by_name(mqtt, "error", err); + g_error_free(err); } void @@ -344,74 +347,130 @@ } static void -fb_mqtt_cb_read(gpointer data, gint fd, PurpleInputCondition cond) +fb_mqtt_cb_fill(GObject *source, GAsyncResult *res, gpointer data) +{ + GBufferedInputStream *input = G_BUFFERED_INPUT_STREAM(source); + FbMqtt *mqtt = data; + gssize ret; + GError *err = NULL; + + ret = g_buffered_input_stream_fill_finish(input, res, &err); + + if (ret < 1) { + if (ret == 0) { + err = g_error_new_literal(G_IO_ERROR, + G_IO_ERROR_CONNECTION_CLOSED, + _("Connection closed")); + } + + fb_mqtt_take_error(mqtt, err, _("Failed to read fixed header")); + return; + } + + fb_mqtt_read_packet(mqtt); +} + +static void +fb_mqtt_cb_read_packet(GObject *source, GAsyncResult *res, gpointer data) { FbMqtt *mqtt = data; + FbMqttPrivate *priv; + gssize ret; FbMqttMessage *msg; - FbMqttPrivate *priv = mqtt->priv; - gint res; - guint mult; - guint8 buf[1024]; - guint8 byte; - gsize size; - gssize rize; + GError *err = NULL; - if (priv->remz < 1) { - /* Reset the read buffer */ - g_byte_array_set_size(priv->rbuf, 0); + ret = g_input_stream_read_finish(G_INPUT_STREAM(source), res, &err); - res = purple_ssl_read(priv->gsc, &byte, sizeof byte); - g_byte_array_append(priv->rbuf, &byte, sizeof byte); - - if (res != sizeof byte) { - fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL, - _("Failed to read fixed header")); - return; + if (ret < 1) { + if (ret == 0) { + err = g_error_new_literal(G_IO_ERROR, + G_IO_ERROR_CONNECTION_CLOSED, + _("Connection closed")); } - mult = 1; - - do { - res = purple_ssl_read(priv->gsc, &byte, sizeof byte); - g_byte_array_append(priv->rbuf, &byte, sizeof byte); + fb_mqtt_take_error(mqtt, err, _("Failed to read packet data")); + return; + } - if (res != sizeof byte) { - fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL, - _("Failed to read packet size")); - return; - } - - priv->remz += (byte & 127) * mult; - mult *= 128; - } while ((byte & 128) != 0); - } + priv = mqtt->priv; + priv->remz -= ret; if (priv->remz > 0) { - size = MIN(priv->remz, sizeof buf); - rize = purple_ssl_read(priv->gsc, buf, size); + g_input_stream_read_async(G_INPUT_STREAM(source), + priv->rbuf->data + + priv->rbuf->len - priv->remz, priv->remz, + G_PRIORITY_DEFAULT, priv->cancellable, + fb_mqtt_cb_read_packet, mqtt); + return; + } - if (rize < 1) { - fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL, - _("Failed to read packet data")); - return; - } + msg = fb_mqtt_message_new_bytes(priv->rbuf); - g_byte_array_append(priv->rbuf, buf, rize); - priv->remz -= rize; + if (G_UNLIKELY(msg == NULL)) { + fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL, + _("Failed to parse message")); + return; } - if (priv->remz < 1) { - msg = fb_mqtt_message_new_bytes(priv->rbuf); - priv->remz = 0; + fb_mqtt_read(mqtt, msg); + g_object_unref(msg); + + /* Read another packet if connection wasn't reset in fb_mqtt_read() */ + if (fb_mqtt_connected(mqtt, FALSE)) { + fb_mqtt_read_packet(mqtt); + } +} - if (G_UNLIKELY(msg == NULL)) { - fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL, - _("Failed to parse message")); - return; +static void +fb_mqtt_read_packet(FbMqtt *mqtt) +{ + FbMqttPrivate *priv = mqtt->priv; + const guint8 const *buf; + gsize count = 0; + gsize pos; + guint mult = 1; + guint8 byte; + gsize size = 0; + + buf = g_buffered_input_stream_peek_buffer(priv->input, &count); + + /* Start at 1 to skip the first byte */ + pos = 1; + + do { + if (pos >= count) { + /* Not enough data yet, try again later */ + size = 0; + break; } - fb_mqtt_read(mqtt, msg); - g_object_unref(msg); + byte = *(buf + pos++); + + size += (byte & 127) * mult; + mult *= 128; + } while ((byte & 128) != 0); + + if (size > 0) { + /* Add header to size */ + size += pos; + + g_byte_array_set_size(priv->rbuf, size); + priv->remz = size; + + /* TODO: Use g_input_stream_read_all_async() when available. */ + /* TODO: Alternately, it would be nice to let the + * FbMqttMessage directly use the GBufferedInputStream + * buffer instead of copying it, provided it's consumed + * before the next read. + */ + g_input_stream_read_async(G_INPUT_STREAM(priv->input), + priv->rbuf->data, priv->rbuf->len, + G_PRIORITY_DEFAULT, priv->cancellable, + fb_mqtt_cb_read_packet, mqtt); + } else { + g_buffered_input_stream_fill_async(priv->input, -1, + G_PRIORITY_DEFAULT, priv->cancellable, + fb_mqtt_cb_fill, mqtt); } } @@ -519,27 +578,16 @@ } static void -fb_mqtt_cb_write(gpointer data, gint fd, PurpleInputCondition cond) +fb_mqtt_cb_flush(GObject *source, GAsyncResult *res, gpointer data) { FbMqtt *mqtt = data; - FbMqttPrivate *priv = mqtt->priv; - gssize wize; + GError *err = NULL; - wize = purple_ssl_write(priv->gsc, priv->wbuf->data, priv->wbuf->len); - - if (wize < 0) { - fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL, - _("Failed to write data")); + if (!g_output_stream_flush_finish(G_OUTPUT_STREAM(source), + res, &err)) { + fb_mqtt_take_error(mqtt, err, _("Failed to write data")); return; } - - if (wize > 0) { - g_byte_array_remove_range(priv->wbuf, 0, wize); - } - - if (priv->wbuf->len < 1) { - priv->wev = 0; - } } void @@ -548,6 +596,7 @@ const GByteArray *bytes; FbMqttMessagePrivate *mriv; FbMqttPrivate *priv; + GBytes *gbytes; g_return_if_fail(FB_IS_MQTT(mqtt)); g_return_if_fail(FB_IS_MQTT_MESSAGE(msg)); @@ -566,46 +615,46 @@ "Writing %d (flags: 0x%0X)", mriv->type, mriv->flags); - g_byte_array_append(priv->wbuf, bytes->data, bytes->len); - fb_mqtt_cb_write(mqtt, priv->gsc->fd, PURPLE_INPUT_WRITE); + /* TODO: Would be nice to refactor this to not require copying bytes */ + gbytes = g_bytes_new(bytes->data, bytes->len); + purple_queued_output_stream_push_bytes(priv->output, gbytes); + g_bytes_unref(gbytes); - if (priv->wev > 0) { - priv->wev = purple_input_add(priv->gsc->fd, - PURPLE_INPUT_WRITE, - fb_mqtt_cb_write, mqtt); + if (!g_output_stream_has_pending(G_OUTPUT_STREAM(priv->output))) { + g_output_stream_flush_async(G_OUTPUT_STREAM(priv->output), + G_PRIORITY_DEFAULT, priv->cancellable, + fb_mqtt_cb_flush, mqtt); } } static void -fb_mqtt_cb_open(gpointer data, PurpleSslConnection *ssl, - PurpleInputCondition cond) +fb_mqtt_cb_open(GObject *source, GAsyncResult *res, gpointer data) { FbMqtt *mqtt = data; - FbMqttPrivate *priv = mqtt->priv; + FbMqttPrivate *priv; + GSocketConnection *conn; + GError *err = NULL; + + conn = g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(source), + res, &err); + + if (conn == NULL) { + fb_mqtt_take_error(mqtt, err, NULL); + return; + } fb_mqtt_timeout_clear(mqtt); - priv->rev = purple_input_add(priv->gsc->fd, PURPLE_INPUT_READ, - fb_mqtt_cb_read, mqtt); - g_signal_emit_by_name(mqtt, "open"); -} -static void -fb_mqtt_cb_open_error(PurpleSslConnection *ssl, PurpleSslErrorType error, - gpointer data) -{ - const gchar *str; - FbMqtt *mqtt = data; - FbMqttPrivate *priv = mqtt->priv; - GError *err; + priv = mqtt->priv; + priv->conn = G_IO_STREAM(conn); + priv->input = G_BUFFERED_INPUT_STREAM(g_buffered_input_stream_new( + g_io_stream_get_input_stream(priv->conn))); + priv->output = purple_queued_output_stream_new( + g_io_stream_get_output_stream(priv->conn)); - str = purple_ssl_strerror(error); - err = g_error_new_literal(FB_MQTT_SSL_ERROR, error, str); + fb_mqtt_read_packet(mqtt); - /* Do not call purple_ssl_close() from the error_func */ - priv->gsc = NULL; - - g_signal_emit_by_name(mqtt, "error", err); - g_error_free(err); + g_signal_emit_by_name(mqtt, "open"); } void @@ -613,20 +662,29 @@ { FbMqttPrivate *priv; PurpleAccount *acc; + GSocketClient *client; + GError *err = NULL; g_return_if_fail(FB_IS_MQTT(mqtt)); priv = mqtt->priv; acc = purple_connection_get_account(priv->gc); fb_mqtt_close(mqtt); - priv->gsc = purple_ssl_connect(acc, host, port, fb_mqtt_cb_open, - fb_mqtt_cb_open_error, mqtt); + + client = purple_gio_socket_client_new(acc, &err); - if (priv->gsc == NULL) { - fb_mqtt_cb_open_error(NULL, 0, mqtt); + if (client == NULL) { + fb_mqtt_take_error(mqtt, err, NULL); return; } + priv->cancellable = g_cancellable_new(); + + g_socket_client_set_tls(client, TRUE); + g_socket_client_connect_to_host_async(client, host, port, + priv->cancellable, fb_mqtt_cb_open, mqtt); + g_object_unref(client); + fb_mqtt_timeout(mqtt); } @@ -662,7 +720,7 @@ g_return_val_if_fail(FB_IS_MQTT(mqtt), FALSE); priv = mqtt->priv; - connected = (priv->gsc != NULL) && priv->connected; + connected = (priv->conn != NULL) && priv->connected; if (!connected && error) { fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
--- a/libpurple/protocols/facebook/mqtt.h Thu Sep 29 10:42:41 2016 -0500 +++ b/libpurple/protocols/facebook/mqtt.h Fri Sep 30 09:50:21 2016 -0500 @@ -107,13 +107,6 @@ */ #define FB_MQTT_ERROR fb_mqtt_error_quark() -/** - * FB_MQTT_SSL_ERROR: - * - * The #GQuark of the domain of MQTT SSL errors. - */ -#define FB_MQTT_SSL_ERROR fb_mqtt_ssl_error_quark() - typedef struct _FbMqtt FbMqtt; typedef struct _FbMqttClass FbMqttClass; typedef struct _FbMqttPrivate FbMqttPrivate; @@ -298,16 +291,6 @@ fb_mqtt_error_quark(void); /** - * fb_mqtt_ssl_error_quark: - * - * Gets the #GQuark of the domain of MQTT SSL errors. - * - * Returns: The #GQuark of the domain. - */ -GQuark -fb_mqtt_ssl_error_quark(void); - -/** * fb_mqtt_new: * @gc: The #PurpleConnection. *