libpurple/protocols/facebook/mqtt.c

changeset 41879
6009bf441ef5
parent 40443
025772e2b454
child 41969
86518b429f50
--- a/libpurple/protocols/facebook/mqtt.c	Thu Nov 03 22:51:47 2022 -0500
+++ b/libpurple/protocols/facebook/mqtt.c	Thu Nov 03 22:52:46 2022 -0500
@@ -30,8 +30,15 @@
 #include "mqtt.h"
 #include "util.h"
 
-typedef struct
+/**
+ * FbMqtt:
+ *
+ * Represents an MQTT connection.
+ */
+struct _FbMqtt
 {
+	GObject parent;
+
 	PurpleConnection *gc;
 	GIOStream *conn;
 	GBufferedInputStream *input;
@@ -44,32 +51,9 @@
 	gsize remz;
 
 	gint tev;
-} FbMqttPrivate;
-
-/**
- * FbMqtt:
- *
- * Represents an MQTT connection.
- */
-struct _FbMqtt
-{
-	GObject parent;
-	FbMqttPrivate *priv;
 };
 
-G_DEFINE_TYPE_WITH_PRIVATE(FbMqtt, fb_mqtt, G_TYPE_OBJECT);
-
-typedef struct
-{
-	FbMqttMessageType type;
-	FbMqttMessageFlags flags;
-
-	GByteArray *bytes;
-	guint offset;
-	guint pos;
-
-	gboolean local;
-} FbMqttMessagePrivate;
+G_DEFINE_TYPE(FbMqtt, fb_mqtt, G_TYPE_OBJECT);
 
 /**
  * FbMqttMessage:
@@ -79,10 +63,18 @@
 struct _FbMqttMessage
 {
 	GObject parent;
-	FbMqttMessagePrivate *priv;
+
+	FbMqttMessageType type;
+	FbMqttMessageFlags flags;
+
+	GByteArray *bytes;
+	guint offset;
+	guint pos;
+
+	gboolean local;
 };
 
-G_DEFINE_TYPE_WITH_PRIVATE(FbMqttMessage, fb_mqtt_message, G_TYPE_OBJECT);
+G_DEFINE_TYPE(FbMqttMessage, fb_mqtt_message, G_TYPE_OBJECT);
 
 static void fb_mqtt_read_packet(FbMqtt *mqtt);
 
@@ -90,10 +82,9 @@
 fb_mqtt_dispose(GObject *obj)
 {
 	FbMqtt *mqtt = FB_MQTT(obj);
-	FbMqttPrivate *priv = mqtt->priv;
 
 	fb_mqtt_close(mqtt);
-	g_byte_array_free(priv->rbuf, TRUE);
+	g_byte_array_free(mqtt->rbuf, TRUE);
 }
 
 static void
@@ -169,20 +160,17 @@
 static void
 fb_mqtt_init(FbMqtt *mqtt)
 {
-	FbMqttPrivate *priv = fb_mqtt_get_instance_private(mqtt);
-
-	mqtt->priv = priv;
-
-	priv->rbuf = g_byte_array_new();
+	mqtt->rbuf = g_byte_array_new();
 }
 
 static void
 fb_mqtt_message_dispose(GObject *obj)
 {
-	FbMqttMessagePrivate *priv = FB_MQTT_MESSAGE(obj)->priv;
+	FbMqttMessage *msg = FB_MQTT_MESSAGE(obj);
 
-	if ((priv->bytes != NULL) && priv->local) {
-		g_byte_array_free(priv->bytes, TRUE);
+	if(msg->bytes != NULL && msg->local) {
+		g_byte_array_free(msg->bytes, TRUE);
+		msg->bytes = NULL;
 	}
 }
 
@@ -197,9 +185,6 @@
 static void
 fb_mqtt_message_init(FbMqttMessage *msg)
 {
-	FbMqttMessagePrivate *priv = fb_mqtt_message_get_instance_private(msg);
-
-	msg->priv = priv;
 }
 
 GQuark
@@ -218,13 +203,11 @@
 fb_mqtt_new(PurpleConnection *gc)
 {
 	FbMqtt *mqtt;
-	FbMqttPrivate *priv;
 
 	g_return_val_if_fail(PURPLE_IS_CONNECTION(gc), NULL);
 
 	mqtt = g_object_new(FB_TYPE_MQTT,  NULL);
-	priv = mqtt->priv;
-	priv->gc = gc;
+	mqtt->gc = gc;
 
 	return mqtt;
 };
@@ -232,32 +215,29 @@
 void
 fb_mqtt_close(FbMqtt *mqtt)
 {
-	FbMqttPrivate *priv;
+	g_return_if_fail(FB_IS_MQTT(mqtt));
 
-	g_return_if_fail(FB_IS_MQTT(mqtt));
-	priv = mqtt->priv;
+	if(mqtt->tev > 0) {
+		g_source_remove(mqtt->tev);
+		mqtt->tev = 0;
+	}
 
-	if (priv->tev > 0) {
-		g_source_remove(priv->tev);
-		priv->tev = 0;
+	if(mqtt->cancellable != NULL) {
+		g_cancellable_cancel(mqtt->cancellable);
+		g_clear_object(&mqtt->cancellable);
 	}
 
-	if (priv->cancellable != NULL) {
-		g_cancellable_cancel(priv->cancellable);
-		g_clear_object(&priv->cancellable);
+	if(mqtt->conn != NULL) {
+		purple_gio_graceful_close(mqtt->conn,
+				G_INPUT_STREAM(mqtt->input),
+				G_OUTPUT_STREAM(mqtt->output));
+		g_clear_object(&mqtt->input);
+		g_clear_object(&mqtt->output);
+		g_clear_object(&mqtt->conn);
 	}
 
-	if (priv->conn != NULL) {
-		purple_gio_graceful_close(priv->conn,
-				G_INPUT_STREAM(priv->input),
-				G_OUTPUT_STREAM(priv->output));
-		g_clear_object(&priv->input);
-		g_clear_object(&priv->output);
-		g_clear_object(&priv->conn);
-	}
-
-	priv->connected = FALSE;
-	g_byte_array_set_size(priv->rbuf, 0);
+	mqtt->connected = FALSE;
+	g_byte_array_set_size(mqtt->rbuf, 0);
 }
 
 static void
@@ -313,9 +293,8 @@
 fb_mqtt_cb_timeout(gpointer data)
 {
 	FbMqtt *mqtt = data;
-	FbMqttPrivate *priv = mqtt->priv;
 
-	priv->tev = 0;
+	mqtt->tev = 0;
 	fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL,
 	                      _("Connection timed out"));
 	return FALSE;
@@ -324,21 +303,17 @@
 static void
 fb_mqtt_timeout_clear(FbMqtt *mqtt)
 {
-	FbMqttPrivate *priv = mqtt->priv;
-
-	if (priv->tev > 0) {
-		g_source_remove(priv->tev);
-		priv->tev = 0;
+	if(mqtt->tev > 0) {
+		g_source_remove(mqtt->tev);
+		mqtt->tev = 0;
 	}
 }
 
 static void
 fb_mqtt_timeout(FbMqtt *mqtt)
 {
-	FbMqttPrivate *priv = mqtt->priv;
-
 	fb_mqtt_timeout_clear(mqtt);
-	priv->tev = g_timeout_add_seconds(FB_MQTT_TIMEOUT_CONN, fb_mqtt_cb_timeout,
+	mqtt->tev = g_timeout_add_seconds(FB_MQTT_TIMEOUT_CONN, fb_mqtt_cb_timeout,
 	                                  mqtt);
 }
 
@@ -347,13 +322,12 @@
 {
 	FbMqtt *mqtt = data;
 	FbMqttMessage *msg;
-	FbMqttPrivate *priv = mqtt->priv;
 
 	msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PINGREQ, 0);
 	fb_mqtt_write(mqtt, msg);
 	g_object_unref(msg);
 
-	priv->tev = 0;
+	mqtt->tev = 0;
 	fb_mqtt_timeout(mqtt);
 	return FALSE;
 }
@@ -361,11 +335,9 @@
 static void
 fb_mqtt_ping(FbMqtt *mqtt)
 {
-	FbMqttPrivate *priv = mqtt->priv;
-
 	fb_mqtt_timeout_clear(mqtt);
-	priv->tev =
-	        g_timeout_add_seconds(FB_MQTT_TIMEOUT_PING, fb_mqtt_cb_ping, mqtt);
+	mqtt->tev = g_timeout_add_seconds(FB_MQTT_TIMEOUT_PING, fb_mqtt_cb_ping,
+	                                  mqtt);
 }
 
 static void
@@ -396,7 +368,6 @@
 fb_mqtt_cb_read_packet(GObject *source, GAsyncResult *res, gpointer data)
 {
 	FbMqtt *mqtt = data;
-	FbMqttPrivate *priv;
 	gssize ret;
 	FbMqttMessage *msg;
 	GError *err = NULL;
@@ -414,19 +385,18 @@
 		return;
 	}
 
-	priv = mqtt->priv;
-	priv->remz -= ret;
+	mqtt->remz -= ret;
 
-	if (priv->remz > 0) {
+	if(mqtt->remz > 0) {
 		g_input_stream_read_async(G_INPUT_STREAM(source),
-				priv->rbuf->data +
-				priv->rbuf->len - priv->remz, priv->remz,
-				G_PRIORITY_DEFAULT, priv->cancellable,
-				fb_mqtt_cb_read_packet, mqtt);
+		                          mqtt->rbuf->data + mqtt->rbuf->len - mqtt->remz,
+		                          mqtt->remz, G_PRIORITY_DEFAULT,
+		                          mqtt->cancellable, fb_mqtt_cb_read_packet,
+		                          mqtt);
 		return;
 	}
 
-	msg = fb_mqtt_message_new_bytes(priv->rbuf);
+	msg = fb_mqtt_message_new_bytes(mqtt->rbuf);
 
 	if (G_UNLIKELY(msg == NULL)) {
 		fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL,
@@ -446,7 +416,6 @@
 static void
 fb_mqtt_read_packet(FbMqtt *mqtt)
 {
-	FbMqttPrivate *priv = mqtt->priv;
 	const guint8 *buf;
 	gsize count = 0;
 	gsize pos;
@@ -454,7 +423,7 @@
 	guint8 byte;
 	gsize size = 0;
 
-	buf = g_buffered_input_stream_peek_buffer(priv->input, &count);
+	buf = g_buffered_input_stream_peek_buffer(mqtt->input, &count);
 
 	/* Start at 1 to skip the first byte */
 	pos = 1;
@@ -462,9 +431,10 @@
 	do {
 		if (pos >= count) {
 			/* Not enough data yet, try again later */
-			g_buffered_input_stream_fill_async(priv->input, -1,
-					G_PRIORITY_DEFAULT, priv->cancellable,
-					fb_mqtt_cb_fill, mqtt);
+			g_buffered_input_stream_fill_async(mqtt->input, -1,
+			                                   G_PRIORITY_DEFAULT,
+			                                   mqtt->cancellable,
+			                                   fb_mqtt_cb_fill, mqtt);
 			return;
 		}
 
@@ -477,8 +447,8 @@
 	/* Add header to size */
 	size += pos;
 
-	g_byte_array_set_size(priv->rbuf, size);
-	priv->remz = size;
+	g_byte_array_set_size(mqtt->rbuf, size);
+	mqtt->remz = size;
 
 	/* TODO: Use g_input_stream_read_all_async() when available. */
 	/* TODO: Alternately, it would be nice to let the
@@ -486,33 +456,27 @@
 	 * buffer instead of copying it, provided it's consumed
 	 * before the next read.
 	 */
-	g_input_stream_read_async(G_INPUT_STREAM(priv->input),
-			priv->rbuf->data, priv->rbuf->len,
-			G_PRIORITY_DEFAULT, priv->cancellable,
-			fb_mqtt_cb_read_packet, mqtt);
+	g_input_stream_read_async(G_INPUT_STREAM(mqtt->input), mqtt->rbuf->data,
+	                          mqtt->rbuf->len, G_PRIORITY_DEFAULT,
+	                          mqtt->cancellable, fb_mqtt_cb_read_packet, mqtt);
 }
 
 void
 fb_mqtt_read(FbMqtt *mqtt, FbMqttMessage *msg)
 {
 	FbMqttMessage *nsg;
-	FbMqttPrivate *priv;
-	FbMqttMessagePrivate *mriv;
 	GByteArray *wytes;
 	gchar *str;
 	guint8 chr;
 	guint16 mid;
 
 	g_return_if_fail(FB_IS_MQTT(mqtt));
-	g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
-	priv = mqtt->priv;
-	mriv = msg->priv;
 
-	fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, mriv->bytes,
+	fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, msg->bytes,
 	                      "Reading %d (flags: 0x%0X)",
-			      mriv->type, mriv->flags);
+	                      msg->type, msg->flags);
 
-	switch (mriv->type) {
+	switch (msg->type) {
 	case FB_MQTT_MESSAGE_TYPE_CONNACK:
 		if (!fb_mqtt_message_read_byte(msg, NULL) ||
 		    !fb_mqtt_message_read_byte(msg, &chr))
@@ -526,7 +490,7 @@
 			return;
 		}
 
-		priv->connected = TRUE;
+		mqtt->connected = TRUE;
 		fb_mqtt_ping(mqtt);
 		g_signal_emit_by_name(mqtt, "connect");
 		return;
@@ -536,10 +500,10 @@
 			break;
 		}
 
-		if ((mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS1) ||
-		    (mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS2))
+		if((msg->flags & FB_MQTT_MESSAGE_FLAG_QOS1) ||
+		   (msg->flags & FB_MQTT_MESSAGE_FLAG_QOS2))
 		{
-			if (mriv->flags & FB_MQTT_MESSAGE_FLAG_QOS1) {
+			if(msg->flags & FB_MQTT_MESSAGE_FLAG_QOS1) {
 				chr = FB_MQTT_MESSAGE_TYPE_PUBACK;
 			} else {
 				chr = FB_MQTT_MESSAGE_TYPE_PUBREC;
@@ -586,7 +550,7 @@
 
 	default:
 		fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
-		              _("Unknown packet (%u)"), mriv->type);
+		              _("Unknown packet (%u)"), msg->type);
 		return;
 	}
 
@@ -615,14 +579,10 @@
 fb_mqtt_write(FbMqtt *mqtt, FbMqttMessage *msg)
 {
 	const GByteArray *bytes;
-	FbMqttMessagePrivate *mriv;
-	FbMqttPrivate *priv;
 	GBytes *gbytes;
 
 	g_return_if_fail(FB_IS_MQTT(mqtt));
 	g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
-	priv = mqtt->priv;
-	mriv = msg->priv;
 
 	bytes = fb_mqtt_message_bytes(msg);
 
@@ -632,15 +592,16 @@
 		return;
 	}
 
-	fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, mriv->bytes,
+	fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, msg->bytes,
 	                      "Writing %d (flags: 0x%0X)",
-		              mriv->type, mriv->flags);
+	                      msg->type, msg->flags);
 
  	/* TODO: Would be nice to refactor this to not require copying bytes */
 	gbytes = g_bytes_new(bytes->data, bytes->len);
-	purple_queued_output_stream_push_bytes_async(priv->output, gbytes,
-			G_PRIORITY_DEFAULT, priv->cancellable,
-			fb_mqtt_cb_push_bytes, mqtt);
+	purple_queued_output_stream_push_bytes_async(mqtt->output, gbytes,
+	                                             G_PRIORITY_DEFAULT,
+	                                             mqtt->cancellable,
+	                                             fb_mqtt_cb_push_bytes, mqtt);
 	g_bytes_unref(gbytes);
 }
 
@@ -648,7 +609,6 @@
 fb_mqtt_cb_open(GObject *source, GAsyncResult *res, gpointer data)
 {
 	FbMqtt *mqtt = data;
-	FbMqttPrivate *priv;
 	GSocketConnection *conn;
 	GError *err = NULL;
 
@@ -662,12 +622,11 @@
 
 	fb_mqtt_timeout_clear(mqtt);
 
-	priv = mqtt->priv;
-	priv->conn = G_IO_STREAM(conn);
-	priv->input = G_BUFFERED_INPUT_STREAM(g_buffered_input_stream_new(
-			g_io_stream_get_input_stream(priv->conn)));
-	priv->output = purple_queued_output_stream_new(
-			g_io_stream_get_output_stream(priv->conn));
+	mqtt->conn = G_IO_STREAM(conn);
+	mqtt->input = G_BUFFERED_INPUT_STREAM(g_buffered_input_stream_new(
+			g_io_stream_get_input_stream(mqtt->conn)));
+	mqtt->output = purple_queued_output_stream_new(
+			g_io_stream_get_output_stream(mqtt->conn));
 
 	fb_mqtt_read_packet(mqtt);
 
@@ -677,15 +636,13 @@
 void
 fb_mqtt_open(FbMqtt *mqtt, const gchar *host, gint port)
 {
-	FbMqttPrivate *priv;
 	PurpleAccount *acc;
 	GSocketClient *client;
 	GError *err = NULL;
 
 	g_return_if_fail(FB_IS_MQTT(mqtt));
-	priv = mqtt->priv;
 
-	acc = purple_connection_get_account(priv->gc);
+	acc = purple_connection_get_account(mqtt->gc);
 	fb_mqtt_close(mqtt);
 
 	client = purple_gio_socket_client_new(acc, &err);
@@ -695,11 +652,12 @@
 		return;
 	}
 
-	priv->cancellable = g_cancellable_new();
+	mqtt->cancellable = g_cancellable_new();
 
 	g_socket_client_set_tls(client, TRUE);
 	g_socket_client_connect_to_host_async(client, host, port,
-			priv->cancellable, fb_mqtt_cb_open, mqtt);
+	                                      mqtt->cancellable, fb_mqtt_cb_open,
+	                                      mqtt);
 	g_object_unref(client);
 
 	fb_mqtt_timeout(mqtt);
@@ -732,12 +690,10 @@
 gboolean
 fb_mqtt_connected(FbMqtt *mqtt, gboolean error)
 {
-	FbMqttPrivate *priv;
 	gboolean connected;
 
 	g_return_val_if_fail(FB_IS_MQTT(mqtt), FALSE);
-	priv = mqtt->priv;
-	connected = (priv->conn != NULL) && priv->connected;
+	connected = (mqtt->conn != NULL) && mqtt->connected;
 
 	if (!connected && error) {
 		fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL, _("Not connected"));
@@ -765,18 +721,16 @@
 fb_mqtt_publish(FbMqtt *mqtt, const gchar *topic, const GByteArray *pload)
 {
 	FbMqttMessage *msg;
-	FbMqttPrivate *priv;
 
 	g_return_if_fail(FB_IS_MQTT(mqtt));
 	g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
-	priv = mqtt->priv;
 
 	/* Message identifier not required, but for consistency use QoS1 */
 	msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PUBLISH,
 	                          FB_MQTT_MESSAGE_FLAG_QOS1);
 
 	fb_mqtt_message_write_str(msg, topic);      /* Message topic */
-	fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */
+	fb_mqtt_message_write_mid(msg, &mqtt->mid); /* Message identifier */
 
 	if (pload != NULL) {
 		fb_mqtt_message_write(msg, pload->data, pload->len);
@@ -791,19 +745,17 @@
 {
 	const gchar *topic;
 	FbMqttMessage *msg;
-	FbMqttPrivate *priv;
 	guint16 qos;
 	va_list ap;
 
 	g_return_if_fail(FB_IS_MQTT(mqtt));
 	g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
-	priv = mqtt->priv;
 
 	/* Facebook requires a message identifier, use QoS1 */
 	msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_SUBSCRIBE,
 	                          FB_MQTT_MESSAGE_FLAG_QOS1);
 
-	fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */
+	fb_mqtt_message_write_mid(msg, &mqtt->mid); /* Message identifier */
 
 	va_start(ap, mqtt);
 
@@ -824,18 +776,16 @@
 {
 	const gchar *topic;
 	FbMqttMessage *msg;
-	FbMqttPrivate *priv;
 	va_list ap;
 
 	g_return_if_fail(FB_IS_MQTT(mqtt));
 	g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
-	priv = mqtt->priv;
 
 	/* Facebook requires a message identifier, use QoS1 */
 	msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_UNSUBSCRIBE,
 	                          FB_MQTT_MESSAGE_FLAG_QOS1);
 
-	fb_mqtt_message_write_mid(msg, &priv->mid); /* Message identifier */
+	fb_mqtt_message_write_mid(msg, &mqtt->mid); /* Message identifier */
 	fb_mqtt_message_write_str(msg, topic1);     /* First topic */
 
 	va_start(ap, topic1);
@@ -854,15 +804,13 @@
 fb_mqtt_message_new(FbMqttMessageType type, FbMqttMessageFlags flags)
 {
 	FbMqttMessage *msg;
-	FbMqttMessagePrivate *priv;
 
 	msg = g_object_new(FB_TYPE_MQTT_MESSAGE, NULL);
-	priv = msg->priv;
 
-	priv->type = type;
-	priv->flags = flags;
-	priv->bytes = g_byte_array_new();
-	priv->local = TRUE;
+	msg->type = type;
+	msg->flags = flags;
+	msg->bytes = g_byte_array_new();
+	msg->local = TRUE;
 
 	return msg;
 }
@@ -871,24 +819,26 @@
 fb_mqtt_message_new_bytes(GByteArray *bytes)
 {
 	FbMqttMessage *msg;
-	FbMqttMessagePrivate *priv;
 	guint8 *byte;
 
 	g_return_val_if_fail(bytes != NULL, NULL);
 	g_return_val_if_fail(bytes->len >= 2, NULL);
 
 	msg = g_object_new(FB_TYPE_MQTT_MESSAGE, NULL);
-	priv = msg->priv;
 
-	priv->bytes = bytes;
-	priv->local = FALSE;
-	priv->type = (*bytes->data & 0xF0) >> 4;
-	priv->flags = *bytes->data & 0x0F;
+	msg->bytes = bytes;
+	msg->local = FALSE;
+	msg->type = (*bytes->data & 0xF0) >> 4;
+	msg->flags = *bytes->data & 0x0F;
 
 	/* Skip the fixed header */
-	for (byte = priv->bytes->data + 1; (*(byte++) & 128) != 0; );
-	priv->offset = byte - bytes->data;
-	priv->pos = priv->offset;
+	byte = msg->bytes->data + 1;
+	while((*byte & 128) != 0) {
+		byte++;
+	}
+	byte++;
+	msg->offset = byte - bytes->data;
+	msg->pos = msg->offset;
 
 	return msg;
 }
@@ -896,32 +846,27 @@
 void
 fb_mqtt_message_reset(FbMqttMessage *msg)
 {
-	FbMqttMessagePrivate *priv;
-
 	g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
-	priv = msg->priv;
 
-	if (priv->offset > 0) {
-		g_byte_array_remove_range(priv->bytes, 0, priv->offset);
-		priv->offset = 0;
-		priv->pos = 0;
+	if(msg->offset > 0) {
+		g_byte_array_remove_range(msg->bytes, 0, msg->offset);
+		msg->offset = 0;
+		msg->pos = 0;
 	}
 }
 
 const GByteArray *
 fb_mqtt_message_bytes(FbMqttMessage *msg)
 {
-	FbMqttMessagePrivate *priv;
 	guint i;
 	guint8 byte;
 	guint8 sbuf[4];
 	guint32 size;
 
 	g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), NULL);
-	priv = msg->priv;
 
 	i = 0;
-	size = priv->bytes->len - priv->offset;
+	size = msg->bytes->len - msg->offset;
 
 	do {
 		if (G_UNLIKELY(i >= G_N_ELEMENTS(sbuf))) {
@@ -939,48 +884,42 @@
 	} while (size > 0);
 
 	fb_mqtt_message_reset(msg);
-	g_byte_array_prepend(priv->bytes, sbuf, i);
+	g_byte_array_prepend(msg->bytes, sbuf, i);
 
-	byte = ((priv->type & 0x0F) << 4) | (priv->flags & 0x0F);
-	g_byte_array_prepend(priv->bytes, &byte, sizeof byte);
+	byte = ((msg->type & 0x0F) << 4) | (msg->flags & 0x0F);
+	g_byte_array_prepend(msg->bytes, &byte, sizeof byte);
 
-	priv->pos = (i + 1) * (sizeof byte);
-	return priv->bytes;
+	msg->pos = (i + 1) * (sizeof byte);
+	return msg->bytes;
 }
 
 gboolean
 fb_mqtt_message_read(FbMqttMessage *msg, gpointer data, guint size)
 {
-	FbMqttMessagePrivate *priv;
+	g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE);
 
-	g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE);
-	priv = msg->priv;
-
-	if ((priv->pos + size) > priv->bytes->len) {
+	if((msg->pos + size) > msg->bytes->len) {
 		return FALSE;
 	}
 
 	if ((data != NULL) && (size > 0)) {
-		memcpy(data, priv->bytes->data + priv->pos, size);
+		memcpy(data, msg->bytes->data + msg->pos, size);
 	}
 
-	priv->pos += size;
+	msg->pos += size;
 	return TRUE;
 }
 
 gboolean
 fb_mqtt_message_read_r(FbMqttMessage *msg, GByteArray *bytes)
 {
-	FbMqttMessagePrivate *priv;
 	guint size;
 
 	g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE);
-	priv = msg->priv;
-	size = priv->bytes->len - priv->pos;
+	size = msg->bytes->len - msg->pos;
 
 	if (G_LIKELY(size > 0)) {
-		g_byte_array_append(bytes, priv->bytes->data + priv->pos,
-		                    size);
+		g_byte_array_append(bytes, msg->bytes->data + msg->pos, size);
 	}
 
 	return TRUE;
@@ -1044,13 +983,10 @@
 void
 fb_mqtt_message_write(FbMqttMessage *msg, gconstpointer data, guint size)
 {
-	FbMqttMessagePrivate *priv;
+	g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
 
-	g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
-	priv = msg->priv;
-
-	g_byte_array_append(priv->bytes, data, size);
-	priv->pos += size;
+	g_byte_array_append(msg->bytes, data, size);
+	msg->pos += size;
 }
 
 void

mercurial