libpurple/protocols/facebook/mqtt.c

changeset 42187
fc241db9162d
parent 42186
637ba5491231
child 42188
04c0398f1046
--- a/libpurple/protocols/facebook/mqtt.c	Wed Mar 29 23:21:45 2023 -0500
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,1016 +0,0 @@
-/* purple
- *
- * Purple is the legal property of its developers, whose names are too numerous
- * to list here.  Please refer to the COPYRIGHT file distributed with this
- * source distribution.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02111-1301  USA
- */
-
-#include <glib/gi18n-lib.h>
-
-#include <glib/gprintf.h>
-#include <stdarg.h>
-#include <string.h>
-
-#include <purple.h>
-
-#include "mqtt.h"
-#include "util.h"
-
-/**
- * FbMqtt:
- *
- * Represents an MQTT connection.
- */
-struct _FbMqtt
-{
-	GObject parent;
-
-	PurpleConnection *gc;
-	GIOStream *conn;
-	GBufferedInputStream *input;
-	PurpleQueuedOutputStream *output;
-	GCancellable *cancellable;
-	gboolean connected;
-	guint16 mid;
-
-	GByteArray *rbuf;
-	gsize remz;
-
-	gint tev;
-};
-
-G_DEFINE_TYPE(FbMqtt, fb_mqtt, G_TYPE_OBJECT);
-
-/**
- * FbMqttMessage:
- *
- * Represents a reader/writer for an MQTT message.
- */
-struct _FbMqttMessage
-{
-	GObject parent;
-
-	FbMqttMessageType type;
-	FbMqttMessageFlags flags;
-
-	GByteArray *bytes;
-	guint offset;
-	guint pos;
-
-	gboolean local;
-};
-
-G_DEFINE_TYPE(FbMqttMessage, fb_mqtt_message, G_TYPE_OBJECT);
-
-static void fb_mqtt_read_packet(FbMqtt *mqtt);
-
-static void
-fb_mqtt_dispose(GObject *obj)
-{
-	FbMqtt *mqtt = FB_MQTT(obj);
-
-	fb_mqtt_close(mqtt);
-	g_byte_array_free(mqtt->rbuf, TRUE);
-}
-
-static void
-fb_mqtt_class_init(FbMqttClass *klass)
-{
-	GObjectClass *gklass = G_OBJECT_CLASS(klass);
-
-	gklass->dispose = fb_mqtt_dispose;
-	/**
-	 * FbMqtt::connect:
-	 * @mqtt: The #FbMqtt.
-	 *
-	 * Emitted upon the successful completion of the connection
-	 * process. This is emitted as a result of #fb_mqtt_connect().
-	 */
-	g_signal_new("connect",
-	             G_TYPE_FROM_CLASS(klass),
-	             G_SIGNAL_ACTION,
-	             0,
-	             NULL, NULL, NULL,
-	             G_TYPE_NONE,
-	             0);
-
-	/**
-	 * FbMqtt::error:
-	 * @mqtt: The #FbMqtt.
-	 * @error: The #GError.
-	 *
-	 * Emitted whenever an error is hit within the #FbMqtt. This
-	 * should close the #FbMqtt with #fb_mqtt_close().
-	 */
-	g_signal_new("error",
-	             G_TYPE_FROM_CLASS(klass),
-	             G_SIGNAL_ACTION,
-	             0,
-	             NULL, NULL, NULL,
-	             G_TYPE_NONE,
-	             1, G_TYPE_ERROR);
-
-	/**
-	 * FbMqtt::open:
-	 * @mqtt: The #FbMqtt.
-	 *
-	 * Emitted upon the successful opening of the remote socket.
-	 * This is emitted as a result of #fb_mqtt_open(). This should
-	 * call #fb_mqtt_connect().
-	 */
-	g_signal_new("open",
-	             G_TYPE_FROM_CLASS(klass),
-	             G_SIGNAL_ACTION,
-	             0,
-	             NULL, NULL, NULL,
-	             G_TYPE_NONE,
-	             0);
-
-	/**
-	 * FbMqtt::publish:
-	 * @mqtt: The #FbMqtt.
-	 * @topic: The topic.
-	 * @pload: The payload.
-	 *
-	 * Emitted upon an incoming message from the steam.
-	 */
-	g_signal_new("publish",
-	             G_TYPE_FROM_CLASS(klass),
-	             G_SIGNAL_ACTION,
-	             0,
-	             NULL, NULL, NULL,
-	             G_TYPE_NONE,
-	             2, G_TYPE_STRING, G_TYPE_BYTE_ARRAY);
-}
-
-static void
-fb_mqtt_init(FbMqtt *mqtt)
-{
-	mqtt->rbuf = g_byte_array_new();
-}
-
-static void
-fb_mqtt_message_dispose(GObject *obj)
-{
-	FbMqttMessage *msg = FB_MQTT_MESSAGE(obj);
-
-	if(msg->bytes != NULL && msg->local) {
-		g_byte_array_free(msg->bytes, TRUE);
-		msg->bytes = NULL;
-	}
-}
-
-static void
-fb_mqtt_message_class_init(FbMqttMessageClass *klass)
-{
-	GObjectClass *gklass = G_OBJECT_CLASS(klass);
-
-	gklass->dispose = fb_mqtt_message_dispose;
-}
-
-static void
-fb_mqtt_message_init(G_GNUC_UNUSED FbMqttMessage *msg)
-{
-}
-
-GQuark
-fb_mqtt_error_quark(void)
-{
-	static GQuark q = 0;
-
-	if (G_UNLIKELY(q == 0)) {
-		q = g_quark_from_static_string("fb-mqtt-error-quark");
-	}
-
-	return q;
-}
-
-FbMqtt *
-fb_mqtt_new(PurpleConnection *gc)
-{
-	FbMqtt *mqtt;
-
-	g_return_val_if_fail(PURPLE_IS_CONNECTION(gc), NULL);
-
-	mqtt = g_object_new(FB_TYPE_MQTT,  NULL);
-	mqtt->gc = gc;
-
-	return mqtt;
-};
-
-void
-fb_mqtt_close(FbMqtt *mqtt)
-{
-	g_return_if_fail(FB_IS_MQTT(mqtt));
-
-	g_clear_handle_id(&mqtt->tev, g_source_remove);
-
-	if(mqtt->cancellable != NULL) {
-		g_cancellable_cancel(mqtt->cancellable);
-		g_clear_object(&mqtt->cancellable);
-	}
-
-	if(mqtt->conn != NULL) {
-		purple_gio_graceful_close(mqtt->conn,
-				G_INPUT_STREAM(mqtt->input),
-				G_OUTPUT_STREAM(mqtt->output));
-		g_clear_object(&mqtt->input);
-		g_clear_object(&mqtt->output);
-		g_clear_object(&mqtt->conn);
-	}
-
-	mqtt->connected = FALSE;
-	g_byte_array_set_size(mqtt->rbuf, 0);
-}
-
-static void
-fb_mqtt_take_error(FbMqtt *mqtt, GError *err, const gchar *prefix)
-{
-	if (g_error_matches(err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
-		/* Return as cancelled means the connection is closing */
-		g_error_free(err);
-		return;
-	}
-
-	/* Now we can check for programming errors */
-	g_return_if_fail(FB_IS_MQTT(mqtt));
-
-	if (prefix != NULL) {
-		g_prefix_error(&err, "%s: ", prefix);
-	}
-
-	g_signal_emit_by_name(mqtt, "error", err);
-	g_error_free(err);
-}
-
-static void
-fb_mqtt_error_literal(FbMqtt *mqtt, FbMqttError error, const gchar *msg)
-{
-	GError *err;
-
-	g_return_if_fail(FB_IS_MQTT(mqtt));
-
-	err = g_error_new_literal(FB_MQTT_ERROR, error, msg);
-
-	g_signal_emit_by_name(mqtt, "error", err);
-	g_error_free(err);
-}
-
-void
-fb_mqtt_error(FbMqtt *mqtt, FbMqttError error, const gchar *format, ...)
-{
-	GError *err;
-	va_list ap;
-
-	g_return_if_fail(FB_IS_MQTT(mqtt));
-
-	va_start(ap, format);
-	err = g_error_new_valist(FB_MQTT_ERROR, error, format, ap);
-	va_end(ap);
-
-	g_signal_emit_by_name(mqtt, "error", err);
-	g_error_free(err);
-}
-
-static gboolean
-fb_mqtt_cb_timeout(gpointer data)
-{
-	FbMqtt *mqtt = data;
-
-	mqtt->tev = 0;
-	fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL,
-	                      _("Connection timed out"));
-	return FALSE;
-}
-
-static void
-fb_mqtt_timeout_clear(FbMqtt *mqtt)
-{
-	g_clear_handle_id(&mqtt->tev, g_source_remove);
-}
-
-static void
-fb_mqtt_timeout(FbMqtt *mqtt)
-{
-	fb_mqtt_timeout_clear(mqtt);
-	mqtt->tev = g_timeout_add_seconds(FB_MQTT_TIMEOUT_CONN, fb_mqtt_cb_timeout,
-	                                  mqtt);
-}
-
-static gboolean
-fb_mqtt_cb_ping(gpointer data)
-{
-	FbMqtt *mqtt = data;
-	FbMqttMessage *msg;
-
-	msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PINGREQ, 0);
-	fb_mqtt_write(mqtt, msg);
-	g_object_unref(msg);
-
-	mqtt->tev = 0;
-	fb_mqtt_timeout(mqtt);
-	return FALSE;
-}
-
-static void
-fb_mqtt_ping(FbMqtt *mqtt)
-{
-	fb_mqtt_timeout_clear(mqtt);
-	mqtt->tev = g_timeout_add_seconds(FB_MQTT_TIMEOUT_PING, fb_mqtt_cb_ping,
-	                                  mqtt);
-}
-
-static void
-fb_mqtt_cb_fill(GObject *source, GAsyncResult *res, gpointer data)
-{
-	GBufferedInputStream *input = G_BUFFERED_INPUT_STREAM(source);
-	FbMqtt *mqtt = data;
-	gssize ret;
-	GError *err = NULL;
-
-	ret = g_buffered_input_stream_fill_finish(input, res, &err);
-
-	if (ret < 1) {
-		if (ret == 0) {
-			err = g_error_new_literal(G_IO_ERROR,
-					G_IO_ERROR_CONNECTION_CLOSED,
-					_("Connection closed"));
-		}
-
-		fb_mqtt_take_error(mqtt, err, _("Failed to read fixed header"));
-		return;
-	}
-
-	fb_mqtt_read_packet(mqtt);
-}
-
-static void
-fb_mqtt_cb_read_packet(GObject *source, GAsyncResult *res, gpointer data)
-{
-	FbMqtt *mqtt = data;
-	gssize ret;
-	FbMqttMessage *msg;
-	GError *err = NULL;
-
-	ret = g_input_stream_read_finish(G_INPUT_STREAM(source), res, &err);
-
-	if (ret < 1) {
-		if (ret == 0) {
-			err = g_error_new_literal(G_IO_ERROR,
-					G_IO_ERROR_CONNECTION_CLOSED,
-					_("Connection closed"));
-		}
-
-		fb_mqtt_take_error(mqtt, err, _("Failed to read packet data"));
-		return;
-	}
-
-	mqtt->remz -= ret;
-
-	if(mqtt->remz > 0) {
-		g_input_stream_read_async(G_INPUT_STREAM(source),
-		                          mqtt->rbuf->data + mqtt->rbuf->len - mqtt->remz,
-		                          mqtt->remz, G_PRIORITY_DEFAULT,
-		                          mqtt->cancellable, fb_mqtt_cb_read_packet,
-		                          mqtt);
-		return;
-	}
-
-	msg = fb_mqtt_message_new_bytes(mqtt->rbuf);
-
-	if (G_UNLIKELY(msg == NULL)) {
-		fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL,
-		                      _("Failed to parse message"));
-		return;
-	}
-
-	fb_mqtt_read(mqtt, msg);
-	g_object_unref(msg);
-
-	/* Read another packet if connection wasn't reset in fb_mqtt_read() */
-	if (fb_mqtt_connected(mqtt, FALSE)) {
-		fb_mqtt_read_packet(mqtt);
-	}
-}
-
-static void
-fb_mqtt_read_packet(FbMqtt *mqtt)
-{
-	const guint8 *buf;
-	gsize count = 0;
-	gsize pos;
-	guint mult = 1;
-	guint8 byte;
-	gsize size = 0;
-
-	buf = g_buffered_input_stream_peek_buffer(mqtt->input, &count);
-
-	/* Start at 1 to skip the first byte */
-	pos = 1;
-
-	do {
-		if (pos >= count) {
-			/* Not enough data yet, try again later */
-			g_buffered_input_stream_fill_async(mqtt->input, -1,
-			                                   G_PRIORITY_DEFAULT,
-			                                   mqtt->cancellable,
-			                                   fb_mqtt_cb_fill, mqtt);
-			return;
-		}
-
-		byte = *(buf + pos++);
-
-		size += (byte & 127) * mult;
-		mult *= 128;
-	} while ((byte & 128) != 0);
-
-	/* Add header to size */
-	size += pos;
-
-	g_byte_array_set_size(mqtt->rbuf, size);
-	mqtt->remz = size;
-
-	/* TODO: Use g_input_stream_read_all_async() when available. */
-	/* TODO: Alternately, it would be nice to let the
-	 * FbMqttMessage directly use the GBufferedInputStream
-	 * buffer instead of copying it, provided it's consumed
-	 * before the next read.
-	 */
-	g_input_stream_read_async(G_INPUT_STREAM(mqtt->input), mqtt->rbuf->data,
-	                          mqtt->rbuf->len, G_PRIORITY_DEFAULT,
-	                          mqtt->cancellable, fb_mqtt_cb_read_packet, mqtt);
-}
-
-void
-fb_mqtt_read(FbMqtt *mqtt, FbMqttMessage *msg)
-{
-	FbMqttMessage *nsg;
-	GByteArray *wytes;
-	gchar *str;
-	guint8 chr;
-	guint16 mid;
-
-	g_return_if_fail(FB_IS_MQTT(mqtt));
-
-	fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, msg->bytes,
-	                      "Reading %d (flags: 0x%0X)",
-	                      msg->type, msg->flags);
-
-	switch (msg->type) {
-	case FB_MQTT_MESSAGE_TYPE_CONNACK:
-		if (!fb_mqtt_message_read_byte(msg, NULL) ||
-		    !fb_mqtt_message_read_byte(msg, &chr))
-		{
-			break;
-		}
-
-		if (chr != FB_MQTT_ERROR_SUCCESS) {
-			fb_mqtt_error(mqtt, chr, _("Connection failed (%u)"),
-			              chr);
-			return;
-		}
-
-		mqtt->connected = TRUE;
-		fb_mqtt_ping(mqtt);
-		g_signal_emit_by_name(mqtt, "connect");
-		return;
-
-	case FB_MQTT_MESSAGE_TYPE_PUBLISH:
-		if (!fb_mqtt_message_read_str(msg, &str)) {
-			break;
-		}
-
-		if((msg->flags & FB_MQTT_MESSAGE_FLAG_QOS1) ||
-		   (msg->flags & FB_MQTT_MESSAGE_FLAG_QOS2))
-		{
-			if(msg->flags & FB_MQTT_MESSAGE_FLAG_QOS1) {
-				chr = FB_MQTT_MESSAGE_TYPE_PUBACK;
-			} else {
-				chr = FB_MQTT_MESSAGE_TYPE_PUBREC;
-			}
-
-			if (!fb_mqtt_message_read_mid(msg, &mid)) {
-				g_free(str);
-				break;
-			}
-
-			nsg = fb_mqtt_message_new(chr, 0);
-			fb_mqtt_message_write_u16(nsg, mid);
-			fb_mqtt_write(mqtt, nsg);
-			g_object_unref(nsg);
-		}
-
-		wytes = g_byte_array_new();
-		fb_mqtt_message_read_r(msg, wytes);
-		g_signal_emit_by_name(mqtt, "publish", str, wytes);
-		g_byte_array_free(wytes, TRUE);
-		g_free(str);
-		return;
-
-	case FB_MQTT_MESSAGE_TYPE_PUBREL:
-		if (!fb_mqtt_message_read_mid(msg, &mid)) {
-			break;
-		}
-
-		nsg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PUBCOMP, 0);
-		fb_mqtt_message_write_u16(nsg, mid); /* Message identifier */
-		fb_mqtt_write(mqtt, nsg);
-		g_object_unref(nsg);
-		return;
-
-	case FB_MQTT_MESSAGE_TYPE_PINGRESP:
-		fb_mqtt_ping(mqtt);
-		return;
-
-	case FB_MQTT_MESSAGE_TYPE_PUBACK:
-	case FB_MQTT_MESSAGE_TYPE_PUBCOMP:
-	case FB_MQTT_MESSAGE_TYPE_SUBACK:
-	case FB_MQTT_MESSAGE_TYPE_UNSUBACK:
-		return;
-
-	default:
-		fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
-		              _("Unknown packet (%u)"), msg->type);
-		return;
-	}
-
-	/* Since no case returned, there was a parse error. */
-	fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL,
-	                      _("Failed to parse message"));
-}
-
-static void
-fb_mqtt_cb_push_bytes(GObject *source, GAsyncResult *res, gpointer data)
-{
-	PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(source);
-	FbMqtt *mqtt = data;
-	GError *err = NULL;
-
-	if (!purple_queued_output_stream_push_bytes_finish(stream,
-			res, &err)) {
-		purple_queued_output_stream_clear_queue(stream);
-
-		fb_mqtt_take_error(mqtt, err, _("Failed to write data"));
-		return;
-	}
-}
-
-void
-fb_mqtt_write(FbMqtt *mqtt, FbMqttMessage *msg)
-{
-	const GByteArray *bytes;
-	GBytes *gbytes;
-
-	g_return_if_fail(FB_IS_MQTT(mqtt));
-	g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
-
-	bytes = fb_mqtt_message_bytes(msg);
-
-	if (G_UNLIKELY(bytes == NULL)) {
-		fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL,
-		                      _("Failed to format data"));
-		return;
-	}
-
-	fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, msg->bytes,
-	                      "Writing %d (flags: 0x%0X)",
-	                      msg->type, msg->flags);
-
- 	/* TODO: Would be nice to refactor this to not require copying bytes */
-	gbytes = g_bytes_new(bytes->data, bytes->len);
-	purple_queued_output_stream_push_bytes_async(mqtt->output, gbytes,
-	                                             G_PRIORITY_DEFAULT,
-	                                             mqtt->cancellable,
-	                                             fb_mqtt_cb_push_bytes, mqtt);
-	g_bytes_unref(gbytes);
-}
-
-static void
-fb_mqtt_cb_open(GObject *source, GAsyncResult *res, gpointer data)
-{
-	FbMqtt *mqtt = data;
-	GSocketConnection *conn;
-	GError *err = NULL;
-
-	conn = g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(source),
-			res, &err);
-
-	if (conn == NULL) {
-		fb_mqtt_take_error(mqtt, err, NULL);
-		return;
-	}
-
-	fb_mqtt_timeout_clear(mqtt);
-
-	mqtt->conn = G_IO_STREAM(conn);
-	mqtt->input = G_BUFFERED_INPUT_STREAM(g_buffered_input_stream_new(
-			g_io_stream_get_input_stream(mqtt->conn)));
-	mqtt->output = purple_queued_output_stream_new(
-			g_io_stream_get_output_stream(mqtt->conn));
-
-	fb_mqtt_read_packet(mqtt);
-
-	g_signal_emit_by_name(mqtt, "open");
-}
-
-void
-fb_mqtt_open(FbMqtt *mqtt, const gchar *host, gint port)
-{
-	PurpleAccount *acc;
-	GSocketClient *client;
-	GError *err = NULL;
-
-	g_return_if_fail(FB_IS_MQTT(mqtt));
-
-	acc = purple_connection_get_account(mqtt->gc);
-	fb_mqtt_close(mqtt);
-
-	client = purple_gio_socket_client_new(acc, &err);
-
-	if (client == NULL) {
-		fb_mqtt_take_error(mqtt, err, NULL);
-		return;
-	}
-
-	mqtt->cancellable = g_cancellable_new();
-
-	g_socket_client_set_tls(client, TRUE);
-	g_socket_client_connect_to_host_async(client, host, port,
-	                                      mqtt->cancellable, fb_mqtt_cb_open,
-	                                      mqtt);
-	g_object_unref(client);
-
-	fb_mqtt_timeout(mqtt);
-}
-
-void
-fb_mqtt_connect(FbMqtt *mqtt, guint8 flags, const GByteArray *pload)
-{
-	FbMqttMessage *msg;
-
-	g_return_if_fail(!fb_mqtt_connected(mqtt, FALSE));
-	g_return_if_fail(pload != NULL);
-
-	/* Facebook always sends a CONNACK, use QoS1 */
-	flags |= FB_MQTT_CONNECT_FLAG_QOS1;
-
-	msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_CONNECT, 0);
-	fb_mqtt_message_write_str(msg, FB_MQTT_NAME);   /* Protocol name */
-	fb_mqtt_message_write_byte(msg, FB_MQTT_LEVEL); /* Protocol level */
-	fb_mqtt_message_write_byte(msg, flags);         /* Flags */
-	fb_mqtt_message_write_u16(msg, FB_MQTT_KA);     /* Keep alive */
-
-	fb_mqtt_message_write(msg, pload->data, pload->len);
-	fb_mqtt_write(mqtt, msg);
-
-	fb_mqtt_timeout(mqtt);
-	g_object_unref(msg);
-}
-
-gboolean
-fb_mqtt_connected(FbMqtt *mqtt, gboolean error)
-{
-	gboolean connected;
-
-	g_return_val_if_fail(FB_IS_MQTT(mqtt), FALSE);
-	connected = (mqtt->conn != NULL) && mqtt->connected;
-
-	if (!connected && error) {
-		fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL, _("Not connected"));
-	}
-
-	return connected;
-}
-
-void
-fb_mqtt_disconnect(FbMqtt *mqtt)
-{
-	FbMqttMessage *msg;
-
-	if (G_UNLIKELY(!fb_mqtt_connected(mqtt, FALSE))) {
-		return;
-	}
-
-	msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_DISCONNECT, 0);
-	fb_mqtt_write(mqtt, msg);
-	g_object_unref(msg);
-	fb_mqtt_close(mqtt);
-}
-
-void
-fb_mqtt_publish(FbMqtt *mqtt, const gchar *topic, const GByteArray *pload)
-{
-	FbMqttMessage *msg;
-
-	g_return_if_fail(FB_IS_MQTT(mqtt));
-	g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
-
-	/* Message identifier not required, but for consistency use QoS1 */
-	msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PUBLISH,
-	                          FB_MQTT_MESSAGE_FLAG_QOS1);
-
-	fb_mqtt_message_write_str(msg, topic);      /* Message topic */
-	fb_mqtt_message_write_mid(msg, &mqtt->mid); /* Message identifier */
-
-	if (pload != NULL) {
-		fb_mqtt_message_write(msg, pload->data, pload->len);
-	}
-
-	fb_mqtt_write(mqtt, msg);
-	g_object_unref(msg);
-}
-
-void
-fb_mqtt_subscribe(FbMqtt *mqtt, ...)
-{
-	const gchar *topic;
-	FbMqttMessage *msg;
-	guint16 qos;
-	va_list ap;
-
-	g_return_if_fail(FB_IS_MQTT(mqtt));
-	g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
-
-	/* Facebook requires a message identifier, use QoS1 */
-	msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_SUBSCRIBE,
-	                          FB_MQTT_MESSAGE_FLAG_QOS1);
-
-	fb_mqtt_message_write_mid(msg, &mqtt->mid); /* Message identifier */
-
-	va_start(ap, mqtt);
-
-	while ((topic = va_arg(ap, const gchar*)) != NULL) {
-		qos = va_arg(ap, guint);
-		fb_mqtt_message_write_str(msg, topic);
-		fb_mqtt_message_write_byte(msg, qos);
-	}
-
-	va_end(ap);
-
-	fb_mqtt_write(mqtt, msg);
-	g_object_unref(msg);
-}
-
-void
-fb_mqtt_unsubscribe(FbMqtt *mqtt, const gchar *topic1, ...)
-{
-	const gchar *topic;
-	FbMqttMessage *msg;
-	va_list ap;
-
-	g_return_if_fail(FB_IS_MQTT(mqtt));
-	g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
-
-	/* Facebook requires a message identifier, use QoS1 */
-	msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_UNSUBSCRIBE,
-	                          FB_MQTT_MESSAGE_FLAG_QOS1);
-
-	fb_mqtt_message_write_mid(msg, &mqtt->mid); /* Message identifier */
-	fb_mqtt_message_write_str(msg, topic1);     /* First topic */
-
-	va_start(ap, topic1);
-
-	while ((topic = va_arg(ap, const gchar*)) != NULL) {
-		fb_mqtt_message_write_str(msg, topic); /* Remaining topics */
-	}
-
-	va_end(ap);
-
-	fb_mqtt_write(mqtt, msg);
-	g_object_unref(msg);
-}
-
-FbMqttMessage *
-fb_mqtt_message_new(FbMqttMessageType type, FbMqttMessageFlags flags)
-{
-	FbMqttMessage *msg;
-
-	msg = g_object_new(FB_TYPE_MQTT_MESSAGE, NULL);
-
-	msg->type = type;
-	msg->flags = flags;
-	msg->bytes = g_byte_array_new();
-	msg->local = TRUE;
-
-	return msg;
-}
-
-FbMqttMessage *
-fb_mqtt_message_new_bytes(GByteArray *bytes)
-{
-	FbMqttMessage *msg;
-	guint8 *byte;
-
-	g_return_val_if_fail(bytes != NULL, NULL);
-	g_return_val_if_fail(bytes->len >= 2, NULL);
-
-	msg = g_object_new(FB_TYPE_MQTT_MESSAGE, NULL);
-
-	msg->bytes = bytes;
-	msg->local = FALSE;
-	msg->type = (*bytes->data & 0xF0) >> 4;
-	msg->flags = *bytes->data & 0x0F;
-
-	/* Skip the fixed header */
-	byte = msg->bytes->data + 1;
-	while((*byte & 128) != 0) {
-		byte++;
-	}
-	byte++;
-	msg->offset = byte - bytes->data;
-	msg->pos = msg->offset;
-
-	return msg;
-}
-
-void
-fb_mqtt_message_reset(FbMqttMessage *msg)
-{
-	g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
-
-	if(msg->offset > 0) {
-		g_byte_array_remove_range(msg->bytes, 0, msg->offset);
-		msg->offset = 0;
-		msg->pos = 0;
-	}
-}
-
-const GByteArray *
-fb_mqtt_message_bytes(FbMqttMessage *msg)
-{
-	guint i;
-	guint8 byte;
-	guint8 sbuf[4];
-	guint32 size;
-
-	g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), NULL);
-
-	i = 0;
-	size = msg->bytes->len - msg->offset;
-
-	do {
-		if (G_UNLIKELY(i >= G_N_ELEMENTS(sbuf))) {
-			return NULL;
-		}
-
-		byte = size % 128;
-		size /= 128;
-
-		if (size > 0) {
-			byte |= 128;
-		}
-
-		sbuf[i++] = byte;
-	} while (size > 0);
-
-	fb_mqtt_message_reset(msg);
-	g_byte_array_prepend(msg->bytes, sbuf, i);
-
-	byte = ((msg->type & 0x0F) << 4) | (msg->flags & 0x0F);
-	g_byte_array_prepend(msg->bytes, &byte, sizeof byte);
-
-	msg->pos = (i + 1) * (sizeof byte);
-	return msg->bytes;
-}
-
-gboolean
-fb_mqtt_message_read(FbMqttMessage *msg, gpointer data, guint size)
-{
-	g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE);
-
-	if((msg->pos + size) > msg->bytes->len) {
-		return FALSE;
-	}
-
-	if ((data != NULL) && (size > 0)) {
-		memcpy(data, msg->bytes->data + msg->pos, size);
-	}
-
-	msg->pos += size;
-	return TRUE;
-}
-
-gboolean
-fb_mqtt_message_read_r(FbMqttMessage *msg, GByteArray *bytes)
-{
-	guint size;
-
-	g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE);
-	size = msg->bytes->len - msg->pos;
-
-	if (G_LIKELY(size > 0)) {
-		g_byte_array_append(bytes, msg->bytes->data + msg->pos, size);
-	}
-
-	return TRUE;
-}
-
-gboolean
-fb_mqtt_message_read_byte(FbMqttMessage *msg, guint8 *value)
-{
-	return fb_mqtt_message_read(msg, value, sizeof *value);
-}
-
-gboolean
-fb_mqtt_message_read_mid(FbMqttMessage *msg, guint16 *value)
-{
-	return fb_mqtt_message_read_u16(msg, value);
-}
-
-gboolean
-fb_mqtt_message_read_u16(FbMqttMessage *msg, guint16 *value)
-{
-	if (!fb_mqtt_message_read(msg, value, sizeof *value)) {
-		return FALSE;
-	}
-
-	if (value != NULL) {
-		*value = g_ntohs(*value);
-	}
-
-	return TRUE;
-}
-
-gboolean
-fb_mqtt_message_read_str(FbMqttMessage *msg, gchar **value)
-{
-	guint8 *data;
-	guint16 size;
-
-	if (!fb_mqtt_message_read_u16(msg, &size)) {
-		return FALSE;
-	}
-
-	if (value != NULL) {
-		data = g_new(guint8, size + 1);
-		data[size] = 0;
-	} else {
-		data = NULL;
-	}
-
-	if (!fb_mqtt_message_read(msg, data, size)) {
-		g_free(data);
-		return FALSE;
-	}
-
-	if (value != NULL) {
-		*value = (gchar *) data;
-	}
-
-	return TRUE;
-}
-
-void
-fb_mqtt_message_write(FbMqttMessage *msg, gconstpointer data, guint size)
-{
-	g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
-
-	g_byte_array_append(msg->bytes, data, size);
-	msg->pos += size;
-}
-
-void
-fb_mqtt_message_write_byte(FbMqttMessage *msg, guint8 value)
-{
-	fb_mqtt_message_write(msg, &value, sizeof value);
-}
-
-void
-fb_mqtt_message_write_mid(FbMqttMessage *msg, guint16 *value)
-{
-	g_return_if_fail(value != NULL);
-	fb_mqtt_message_write_u16(msg, ++(*value));
-}
-
-void
-fb_mqtt_message_write_u16(FbMqttMessage *msg, guint16 value)
-{
-	value = g_htons(value);
-	fb_mqtt_message_write(msg, &value, sizeof value);
-}
-
-void
-fb_mqtt_message_write_str(FbMqttMessage *msg, const gchar *value)
-{
-	gint16 size;
-
-	g_return_if_fail(value != NULL);
-
-	size = strlen(value);
-	fb_mqtt_message_write_u16(msg, size);
-	fb_mqtt_message_write(msg, value, size);
-}

mercurial