--- a/libpurple/protocols/facebook/mqtt.c Thu Nov 03 22:51:47 2022 -0500 +++ b/libpurple/protocols/facebook/mqtt.c Thu Nov 03 22:52:46 2022 -0500 @@ -30,8 +30,15 @@ #include "mqtt.h" #include "util.h" -typedef struct +/** + * FbMqtt: + * + * Represents an MQTT connection. + */ +struct _FbMqtt { + GObject parent; + PurpleConnection *gc; GIOStream *conn; GBufferedInputStream *input; @@ -44,32 +51,9 @@ gsize remz; gint tev; -} FbMqttPrivate; - -/** - * FbMqtt: - * - * Represents an MQTT connection. - */ -struct _FbMqtt -{ - GObject parent; - FbMqttPrivate *priv; }; -G_DEFINE_TYPE_WITH_PRIVATE(FbMqtt, fb_mqtt, G_TYPE_OBJECT); - -typedef struct -{ - FbMqttMessageType type; - FbMqttMessageFlags flags; - - GByteArray *bytes; - guint offset; - guint pos; - - gboolean local; -} FbMqttMessagePrivate; +G_DEFINE_TYPE(FbMqtt, fb_mqtt, G_TYPE_OBJECT); /** * FbMqttMessage: @@ -79,10 +63,18 @@ struct _FbMqttMessage { GObject parent; - FbMqttMessagePrivate *priv; + + FbMqttMessageType type; + FbMqttMessageFlags flags; + + GByteArray *bytes; + guint offset; + guint pos; + + gboolean local; }; -G_DEFINE_TYPE_WITH_PRIVATE(FbMqttMessage, fb_mqtt_message, G_TYPE_OBJECT); +G_DEFINE_TYPE(FbMqttMessage, fb_mqtt_message, G_TYPE_OBJECT); static void fb_mqtt_read_packet(FbMqtt *mqtt); @@ -90,10 +82,9 @@ fb_mqtt_dispose(GObject *obj) { FbMqtt *mqtt = FB_MQTT(obj); - FbMqttPrivate *priv = mqtt->priv; fb_mqtt_close(mqtt); - g_byte_array_free(priv->rbuf, TRUE); + g_byte_array_free(mqtt->rbuf, TRUE); } static void @@ -169,20 +160,17 @@ static void fb_mqtt_init(FbMqtt *mqtt) { - FbMqttPrivate *priv = fb_mqtt_get_instance_private(mqtt); - - mqtt->priv = priv; - - priv->rbuf = g_byte_array_new(); + mqtt->rbuf = g_byte_array_new(); } static void fb_mqtt_message_dispose(GObject *obj) { - FbMqttMessagePrivate *priv = FB_MQTT_MESSAGE(obj)->priv; + FbMqttMessage *msg = FB_MQTT_MESSAGE(obj); - if ((priv->bytes != NULL) && priv->local) { - g_byte_array_free(priv->bytes, TRUE); + if(msg->bytes != NULL && msg->local) { + g_byte_array_free(msg->bytes, TRUE); + msg->bytes = NULL; } } @@ -197,9 +185,6 @@ static void fb_mqtt_message_init(FbMqttMessage *msg) { - FbMqttMessagePrivate *priv = fb_mqtt_message_get_instance_private(msg); - - msg->priv = priv; } GQuark @@ -218,13 +203,11 @@ fb_mqtt_new(PurpleConnection *gc) { FbMqtt *mqtt; - FbMqttPrivate *priv; g_return_val_if_fail(PURPLE_IS_CONNECTION(gc), NULL); mqtt = g_object_new(FB_TYPE_MQTT, NULL); - priv = mqtt->priv; - priv->gc = gc; + mqtt->gc = gc; return mqtt; }; @@ -232,32 +215,29 @@ void fb_mqtt_close(FbMqtt *mqtt) { - FbMqttPrivate *priv; + g_return_if_fail(FB_IS_MQTT(mqtt)); - g_return_if_fail(FB_IS_MQTT(mqtt)); - priv = mqtt->priv; + if(mqtt->tev > 0) { + g_source_remove(mqtt->tev); + mqtt->tev = 0; + } - if (priv->tev > 0) { - g_source_remove(priv->tev); - priv->tev = 0; + if(mqtt->cancellable != NULL) { + g_cancellable_cancel(mqtt->cancellable); + g_clear_object(&mqtt->cancellable); } - if (priv->cancellable != NULL) { - g_cancellable_cancel(priv->cancellable); - g_clear_object(&priv->cancellable); + if(mqtt->conn != NULL) { + purple_gio_graceful_close(mqtt->conn, + G_INPUT_STREAM(mqtt->input), + G_OUTPUT_STREAM(mqtt->output)); + g_clear_object(&mqtt->input); + g_clear_object(&mqtt->output); + g_clear_object(&mqtt->conn); } - if (priv->conn != NULL) { - purple_gio_graceful_close(priv->conn, - G_INPUT_STREAM(priv->input), - G_OUTPUT_STREAM(priv->output)); - g_clear_object(&priv->input); - g_clear_object(&priv->output); - g_clear_object(&priv->conn); - } - - priv->connected = FALSE; - g_byte_array_set_size(priv->rbuf, 0); + mqtt->connected = FALSE; + g_byte_array_set_size(mqtt->rbuf, 0); } static void @@ -313,9 +293,8 @@ fb_mqtt_cb_timeout(gpointer data) { FbMqtt *mqtt = data; - FbMqttPrivate *priv = mqtt->priv; - priv->tev = 0; + mqtt->tev = 0; fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL, _("Connection timed out")); return FALSE; @@ -324,21 +303,17 @@ static void fb_mqtt_timeout_clear(FbMqtt *mqtt) { - FbMqttPrivate *priv = mqtt->priv; - - if (priv->tev > 0) { - g_source_remove(priv->tev); - priv->tev = 0; + if(mqtt->tev > 0) { + g_source_remove(mqtt->tev); + mqtt->tev = 0; } } static void fb_mqtt_timeout(FbMqtt *mqtt) { - FbMqttPrivate *priv = mqtt->priv; - fb_mqtt_timeout_clear(mqtt); - priv->tev = g_timeout_add_seconds(FB_MQTT_TIMEOUT_CONN, fb_mqtt_cb_timeout, + mqtt->tev = g_timeout_add_seconds(FB_MQTT_TIMEOUT_CONN, fb_mqtt_cb_timeout, mqtt); } @@ -347,13 +322,12 @@ { FbMqtt *mqtt = data; FbMqttMessage *msg; - FbMqttPrivate *priv = mqtt->priv; msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PINGREQ, 0); fb_mqtt_write(mqtt, msg); g_object_unref(msg); - priv->tev = 0; + mqtt->tev = 0; fb_mqtt_timeout(mqtt); return FALSE; } @@ -361,11 +335,9 @@ static void fb_mqtt_ping(FbMqtt *mqtt) { - FbMqttPrivate *priv = mqtt->priv; - fb_mqtt_timeout_clear(mqtt); - priv->tev = - g_timeout_add_seconds(FB_MQTT_TIMEOUT_PING, fb_mqtt_cb_ping, mqtt); + mqtt->tev = g_timeout_add_seconds(FB_MQTT_TIMEOUT_PING, fb_mqtt_cb_ping, + mqtt); } static void @@ -396,7 +368,6 @@ fb_mqtt_cb_read_packet(GObject *source, GAsyncResult *res, gpointer data) { FbMqtt *mqtt = data; - FbMqttPrivate *priv; gssize ret; FbMqttMessage *msg; GError *err = NULL; @@ -414,19 +385,18 @@ return; } - priv = mqtt->priv; - priv->remz -= ret; + mqtt->remz -= ret; - if (priv->remz > 0) { + if(mqtt->remz > 0) { g_input_stream_read_async(G_INPUT_STREAM(source), - priv->rbuf->data + - priv->rbuf->len - priv->remz, priv->remz, - G_PRIORITY_DEFAULT, priv->cancellable, - fb_mqtt_cb_read_packet, mqtt); + mqtt->rbuf->data + mqtt->rbuf->len - mqtt->remz, + mqtt->remz, G_PRIORITY_DEFAULT, + mqtt->cancellable, fb_mqtt_cb_read_packet, + mqtt); return; } - msg = fb_mqtt_message_new_bytes(priv->rbuf); + msg = fb_mqtt_message_new_bytes(mqtt->rbuf); if (G_UNLIKELY(msg == NULL)) { fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL, @@ -446,7 +416,6 @@ static void fb_mqtt_read_packet(FbMqtt *mqtt) { - FbMqttPrivate *priv = mqtt->priv; const guint8 *buf; gsize count = 0; gsize pos; @@ -454,7 +423,7 @@ guint8 byte; gsize size = 0; - buf = g_buffered_input_stream_peek_buffer(priv->input, &count); + buf = g_buffered_input_stream_peek_buffer(mqtt->input, &count); /* Start at 1 to skip the first byte */ pos = 1; @@ -462,9 +431,10 @@ do { if (pos >= count) { /* Not enough data yet, try again later */ - g_buffered_input_stream_fill_async(priv->input, -1, - G_PRIORITY_DEFAULT, priv->cancellable, - fb_mqtt_cb_fill, mqtt); + g_buffered_input_stream_fill_async(mqtt->input, -1, + G_PRIORITY_DEFAULT, + mqtt->cancellable, + fb_mqtt_cb_fill, mqtt); return; } @@ -477,8 +447,8 @@ /* Add header to size */ size += pos; - g_byte_array_set_size(priv->rbuf, size); - priv->remz = size; + g_byte_array_set_size(mqtt->rbuf, size); + mqtt->remz = size; /* TODO: Use g_input_stream_read_all_async() when available. */ /* TODO: Alternately, it would be nice to let the @@ -486,33 +456,27 @@ * buffer instead of copying it, provided it's consumed * before the next read. */ - g_input_stream_read_async(G_INPUT_STREAM(priv->input), - priv->rbuf->data, priv->rbuf->len, - G_PRIORITY_DEFAULT, priv->cancellable, - fb_mqtt_cb_read_packet, mqtt); + g_input_stream_read_async(G_INPUT_STREAM(mqtt->input), mqtt->rbuf->data, + mqtt->rbuf->len, G_PRIORITY_DEFAULT, + mqtt->cancellable, fb_mqtt_cb_read_packet, mqtt); } void fb_mqtt_read(FbMqtt *mqtt, FbMqttMessage *msg) { FbMqttMessage *nsg; - FbMqttPrivate *priv; - FbMqttMessagePrivate *mriv; GByteArray *wytes; gchar *str; guint8 chr; guint16 mid; g_return_if_fail(FB_IS_MQTT(mqtt)); - g_return_if_fail(FB_IS_MQTT_MESSAGE(msg)); - priv = mqtt->priv; - mriv = msg->priv; - fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, mriv->bytes, + fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, msg->bytes, "Reading %d (flags: 0x%0X)", - mriv->type, mriv->flags); + msg->type, msg->flags); - switch (mriv->type) { + switch (msg->type) { case FB_MQTT_MESSAGE_TYPE_CONNACK: if (!fb_mqtt_message_read_byte(msg, NULL) || !fb_mqtt_message_read_byte(msg, &chr)) @@ -526,7 +490,7 @@ return; } - priv->connected = TRUE; + mqtt->connected = TRUE; fb_mqtt_ping(mqtt); g_signal_emit_by_name(mqtt, "connect"); return; @@ -536,10 +500,10 @@ break; } - if ((mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS1) || - (mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS2)) + if((msg->flags & FB_MQTT_MESSAGE_FLAG_QOS1) || + (msg->flags & FB_MQTT_MESSAGE_FLAG_QOS2)) { - if (mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS1) { + if(msg->flags & FB_MQTT_MESSAGE_FLAG_QOS1) { chr = FB_MQTT_MESSAGE_TYPE_PUBACK; } else { chr = FB_MQTT_MESSAGE_TYPE_PUBREC; @@ -586,7 +550,7 @@ default: fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL, - _("Unknown packet (%u)"), mriv->type); + _("Unknown packet (%u)"), msg->type); return; } @@ -615,14 +579,10 @@ fb_mqtt_write(FbMqtt *mqtt, FbMqttMessage *msg) { const GByteArray *bytes; - FbMqttMessagePrivate *mriv; - FbMqttPrivate *priv; GBytes *gbytes; g_return_if_fail(FB_IS_MQTT(mqtt)); g_return_if_fail(FB_IS_MQTT_MESSAGE(msg)); - priv = mqtt->priv; - mriv = msg->priv; bytes = fb_mqtt_message_bytes(msg); @@ -632,15 +592,16 @@ return; } - fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, mriv->bytes, + fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, msg->bytes, "Writing %d (flags: 0x%0X)", - mriv->type, mriv->flags); + msg->type, msg->flags); /* TODO: Would be nice to refactor this to not require copying bytes */ gbytes = g_bytes_new(bytes->data, bytes->len); - purple_queued_output_stream_push_bytes_async(priv->output, gbytes, - G_PRIORITY_DEFAULT, priv->cancellable, - fb_mqtt_cb_push_bytes, mqtt); + purple_queued_output_stream_push_bytes_async(mqtt->output, gbytes, + G_PRIORITY_DEFAULT, + mqtt->cancellable, + fb_mqtt_cb_push_bytes, mqtt); g_bytes_unref(gbytes); } @@ -648,7 +609,6 @@ fb_mqtt_cb_open(GObject *source, GAsyncResult *res, gpointer data) { FbMqtt *mqtt = data; - FbMqttPrivate *priv; GSocketConnection *conn; GError *err = NULL; @@ -662,12 +622,11 @@ fb_mqtt_timeout_clear(mqtt); - priv = mqtt->priv; - priv->conn = G_IO_STREAM(conn); - priv->input = G_BUFFERED_INPUT_STREAM(g_buffered_input_stream_new( - g_io_stream_get_input_stream(priv->conn))); - priv->output = purple_queued_output_stream_new( - g_io_stream_get_output_stream(priv->conn)); + mqtt->conn = G_IO_STREAM(conn); + mqtt->input = G_BUFFERED_INPUT_STREAM(g_buffered_input_stream_new( + g_io_stream_get_input_stream(mqtt->conn))); + mqtt->output = purple_queued_output_stream_new( + g_io_stream_get_output_stream(mqtt->conn)); fb_mqtt_read_packet(mqtt); @@ -677,15 +636,13 @@ void fb_mqtt_open(FbMqtt *mqtt, const gchar *host, gint port) { - FbMqttPrivate *priv; PurpleAccount *acc; GSocketClient *client; GError *err = NULL; g_return_if_fail(FB_IS_MQTT(mqtt)); - priv = mqtt->priv; - acc = purple_connection_get_account(priv->gc); + acc = purple_connection_get_account(mqtt->gc); fb_mqtt_close(mqtt); client = purple_gio_socket_client_new(acc, &err); @@ -695,11 +652,12 @@ return; } - priv->cancellable = g_cancellable_new(); + mqtt->cancellable = g_cancellable_new(); g_socket_client_set_tls(client, TRUE); g_socket_client_connect_to_host_async(client, host, port, - priv->cancellable, fb_mqtt_cb_open, mqtt); + mqtt->cancellable, fb_mqtt_cb_open, + mqtt); g_object_unref(client); fb_mqtt_timeout(mqtt); @@ -732,12 +690,10 @@ gboolean fb_mqtt_connected(FbMqtt *mqtt, gboolean error) { - FbMqttPrivate *priv; gboolean connected; g_return_val_if_fail(FB_IS_MQTT(mqtt), FALSE); - priv = mqtt->priv; - connected = (priv->conn != NULL) && priv->connected; + connected = (mqtt->conn != NULL) && mqtt->connected; if (!connected && error) { fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL, _("Not connected")); @@ -765,18 +721,16 @@ fb_mqtt_publish(FbMqtt *mqtt, const gchar *topic, const GByteArray *pload) { FbMqttMessage *msg; - FbMqttPrivate *priv; g_return_if_fail(FB_IS_MQTT(mqtt)); g_return_if_fail(fb_mqtt_connected(mqtt, FALSE)); - priv = mqtt->priv; /* Message identifier not required, but for consistency use QoS1 */ msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PUBLISH, FB_MQTT_MESSAGE_FLAG_QOS1); fb_mqtt_message_write_str(msg, topic); /* Message topic */ - fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */ + fb_mqtt_message_write_mid(msg, &mqtt->mid); /* Message identifier */ if (pload != NULL) { fb_mqtt_message_write(msg, pload->data, pload->len); @@ -791,19 +745,17 @@ { const gchar *topic; FbMqttMessage *msg; - FbMqttPrivate *priv; guint16 qos; va_list ap; g_return_if_fail(FB_IS_MQTT(mqtt)); g_return_if_fail(fb_mqtt_connected(mqtt, FALSE)); - priv = mqtt->priv; /* Facebook requires a message identifier, use QoS1 */ msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_SUBSCRIBE, FB_MQTT_MESSAGE_FLAG_QOS1); - fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */ + fb_mqtt_message_write_mid(msg, &mqtt->mid); /* Message identifier */ va_start(ap, mqtt); @@ -824,18 +776,16 @@ { const gchar *topic; FbMqttMessage *msg; - FbMqttPrivate *priv; va_list ap; g_return_if_fail(FB_IS_MQTT(mqtt)); g_return_if_fail(fb_mqtt_connected(mqtt, FALSE)); - priv = mqtt->priv; /* Facebook requires a message identifier, use QoS1 */ msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_UNSUBSCRIBE, FB_MQTT_MESSAGE_FLAG_QOS1); - fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */ + fb_mqtt_message_write_mid(msg, &mqtt->mid); /* Message identifier */ fb_mqtt_message_write_str(msg, topic1); /* First topic */ va_start(ap, topic1); @@ -854,15 +804,13 @@ fb_mqtt_message_new(FbMqttMessageType type, FbMqttMessageFlags flags) { FbMqttMessage *msg; - FbMqttMessagePrivate *priv; msg = g_object_new(FB_TYPE_MQTT_MESSAGE, NULL); - priv = msg->priv; - priv->type = type; - priv->flags = flags; - priv->bytes = g_byte_array_new(); - priv->local = TRUE; + msg->type = type; + msg->flags = flags; + msg->bytes = g_byte_array_new(); + msg->local = TRUE; return msg; } @@ -871,24 +819,26 @@ fb_mqtt_message_new_bytes(GByteArray *bytes) { FbMqttMessage *msg; - FbMqttMessagePrivate *priv; guint8 *byte; g_return_val_if_fail(bytes != NULL, NULL); g_return_val_if_fail(bytes->len >= 2, NULL); msg = g_object_new(FB_TYPE_MQTT_MESSAGE, NULL); - priv = msg->priv; - priv->bytes = bytes; - priv->local = FALSE; - priv->type = (*bytes->data & 0xF0) >> 4; - priv->flags = *bytes->data & 0x0F; + msg->bytes = bytes; + msg->local = FALSE; + msg->type = (*bytes->data & 0xF0) >> 4; + msg->flags = *bytes->data & 0x0F; /* Skip the fixed header */ - for (byte = priv->bytes->data + 1; (*(byte++) & 128) != 0; ); - priv->offset = byte - bytes->data; - priv->pos = priv->offset; + byte = msg->bytes->data + 1; + while((*byte & 128) != 0) { + byte++; + } + byte++; + msg->offset = byte - bytes->data; + msg->pos = msg->offset; return msg; } @@ -896,32 +846,27 @@ void fb_mqtt_message_reset(FbMqttMessage *msg) { - FbMqttMessagePrivate *priv; - g_return_if_fail(FB_IS_MQTT_MESSAGE(msg)); - priv = msg->priv; - if (priv->offset > 0) { - g_byte_array_remove_range(priv->bytes, 0, priv->offset); - priv->offset = 0; - priv->pos = 0; + if(msg->offset > 0) { + g_byte_array_remove_range(msg->bytes, 0, msg->offset); + msg->offset = 0; + msg->pos = 0; } } const GByteArray * fb_mqtt_message_bytes(FbMqttMessage *msg) { - FbMqttMessagePrivate *priv; guint i; guint8 byte; guint8 sbuf[4]; guint32 size; g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), NULL); - priv = msg->priv; i = 0; - size = priv->bytes->len - priv->offset; + size = msg->bytes->len - msg->offset; do { if (G_UNLIKELY(i >= G_N_ELEMENTS(sbuf))) { @@ -939,48 +884,42 @@ } while (size > 0); fb_mqtt_message_reset(msg); - g_byte_array_prepend(priv->bytes, sbuf, i); + g_byte_array_prepend(msg->bytes, sbuf, i); - byte = ((priv->type & 0x0F) << 4) | (priv->flags & 0x0F); - g_byte_array_prepend(priv->bytes, &byte, sizeof byte); + byte = ((msg->type & 0x0F) << 4) | (msg->flags & 0x0F); + g_byte_array_prepend(msg->bytes, &byte, sizeof byte); - priv->pos = (i + 1) * (sizeof byte); - return priv->bytes; + msg->pos = (i + 1) * (sizeof byte); + return msg->bytes; } gboolean fb_mqtt_message_read(FbMqttMessage *msg, gpointer data, guint size) { - FbMqttMessagePrivate *priv; + g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE); - g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE); - priv = msg->priv; - - if ((priv->pos + size) > priv->bytes->len) { + if((msg->pos + size) > msg->bytes->len) { return FALSE; } if ((data != NULL) && (size > 0)) { - memcpy(data, priv->bytes->data + priv->pos, size); + memcpy(data, msg->bytes->data + msg->pos, size); } - priv->pos += size; + msg->pos += size; return TRUE; } gboolean fb_mqtt_message_read_r(FbMqttMessage *msg, GByteArray *bytes) { - FbMqttMessagePrivate *priv; guint size; g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE); - priv = msg->priv; - size = priv->bytes->len - priv->pos; + size = msg->bytes->len - msg->pos; if (G_LIKELY(size > 0)) { - g_byte_array_append(bytes, priv->bytes->data + priv->pos, - size); + g_byte_array_append(bytes, msg->bytes->data + msg->pos, size); } return TRUE; @@ -1044,13 +983,10 @@ void fb_mqtt_message_write(FbMqttMessage *msg, gconstpointer data, guint size) { - FbMqttMessagePrivate *priv; + g_return_if_fail(FB_IS_MQTT_MESSAGE(msg)); - g_return_if_fail(FB_IS_MQTT_MESSAGE(msg)); - priv = msg->priv; - - g_byte_array_append(priv->bytes, data, size); - priv->pos += size; + g_byte_array_append(msg->bytes, data, size); + msg->pos += size; } void