libpurple/protocols/facebook/mqtt.c

changeset 42187
fc241db9162d
parent 42186
637ba5491231
child 42188
04c0398f1046
equal deleted inserted replaced
42186:637ba5491231 42187:fc241db9162d
1 /* purple
2 *
3 * Purple is the legal property of its developers, whose names are too numerous
4 * to list here. Please refer to the COPYRIGHT file distributed with this
5 * source distribution.
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA
20 */
21
22 #include <glib/gi18n-lib.h>
23
24 #include <glib/gprintf.h>
25 #include <stdarg.h>
26 #include <string.h>
27
28 #include <purple.h>
29
30 #include "mqtt.h"
31 #include "util.h"
32
33 /**
34 * FbMqtt:
35 *
36 * Represents an MQTT connection.
37 */
38 struct _FbMqtt
39 {
40 GObject parent;
41
42 PurpleConnection *gc;
43 GIOStream *conn;
44 GBufferedInputStream *input;
45 PurpleQueuedOutputStream *output;
46 GCancellable *cancellable;
47 gboolean connected;
48 guint16 mid;
49
50 GByteArray *rbuf;
51 gsize remz;
52
53 gint tev;
54 };
55
56 G_DEFINE_TYPE(FbMqtt, fb_mqtt, G_TYPE_OBJECT);
57
58 /**
59 * FbMqttMessage:
60 *
61 * Represents a reader/writer for an MQTT message.
62 */
63 struct _FbMqttMessage
64 {
65 GObject parent;
66
67 FbMqttMessageType type;
68 FbMqttMessageFlags flags;
69
70 GByteArray *bytes;
71 guint offset;
72 guint pos;
73
74 gboolean local;
75 };
76
77 G_DEFINE_TYPE(FbMqttMessage, fb_mqtt_message, G_TYPE_OBJECT);
78
79 static void fb_mqtt_read_packet(FbMqtt *mqtt);
80
81 static void
82 fb_mqtt_dispose(GObject *obj)
83 {
84 FbMqtt *mqtt = FB_MQTT(obj);
85
86 fb_mqtt_close(mqtt);
87 g_byte_array_free(mqtt->rbuf, TRUE);
88 }
89
90 static void
91 fb_mqtt_class_init(FbMqttClass *klass)
92 {
93 GObjectClass *gklass = G_OBJECT_CLASS(klass);
94
95 gklass->dispose = fb_mqtt_dispose;
96 /**
97 * FbMqtt::connect:
98 * @mqtt: The #FbMqtt.
99 *
100 * Emitted upon the successful completion of the connection
101 * process. This is emitted as a result of #fb_mqtt_connect().
102 */
103 g_signal_new("connect",
104 G_TYPE_FROM_CLASS(klass),
105 G_SIGNAL_ACTION,
106 0,
107 NULL, NULL, NULL,
108 G_TYPE_NONE,
109 0);
110
111 /**
112 * FbMqtt::error:
113 * @mqtt: The #FbMqtt.
114 * @error: The #GError.
115 *
116 * Emitted whenever an error is hit within the #FbMqtt. This
117 * should close the #FbMqtt with #fb_mqtt_close().
118 */
119 g_signal_new("error",
120 G_TYPE_FROM_CLASS(klass),
121 G_SIGNAL_ACTION,
122 0,
123 NULL, NULL, NULL,
124 G_TYPE_NONE,
125 1, G_TYPE_ERROR);
126
127 /**
128 * FbMqtt::open:
129 * @mqtt: The #FbMqtt.
130 *
131 * Emitted upon the successful opening of the remote socket.
132 * This is emitted as a result of #fb_mqtt_open(). This should
133 * call #fb_mqtt_connect().
134 */
135 g_signal_new("open",
136 G_TYPE_FROM_CLASS(klass),
137 G_SIGNAL_ACTION,
138 0,
139 NULL, NULL, NULL,
140 G_TYPE_NONE,
141 0);
142
143 /**
144 * FbMqtt::publish:
145 * @mqtt: The #FbMqtt.
146 * @topic: The topic.
147 * @pload: The payload.
148 *
149 * Emitted upon an incoming message from the steam.
150 */
151 g_signal_new("publish",
152 G_TYPE_FROM_CLASS(klass),
153 G_SIGNAL_ACTION,
154 0,
155 NULL, NULL, NULL,
156 G_TYPE_NONE,
157 2, G_TYPE_STRING, G_TYPE_BYTE_ARRAY);
158 }
159
160 static void
161 fb_mqtt_init(FbMqtt *mqtt)
162 {
163 mqtt->rbuf = g_byte_array_new();
164 }
165
166 static void
167 fb_mqtt_message_dispose(GObject *obj)
168 {
169 FbMqttMessage *msg = FB_MQTT_MESSAGE(obj);
170
171 if(msg->bytes != NULL && msg->local) {
172 g_byte_array_free(msg->bytes, TRUE);
173 msg->bytes = NULL;
174 }
175 }
176
177 static void
178 fb_mqtt_message_class_init(FbMqttMessageClass *klass)
179 {
180 GObjectClass *gklass = G_OBJECT_CLASS(klass);
181
182 gklass->dispose = fb_mqtt_message_dispose;
183 }
184
185 static void
186 fb_mqtt_message_init(G_GNUC_UNUSED FbMqttMessage *msg)
187 {
188 }
189
190 GQuark
191 fb_mqtt_error_quark(void)
192 {
193 static GQuark q = 0;
194
195 if (G_UNLIKELY(q == 0)) {
196 q = g_quark_from_static_string("fb-mqtt-error-quark");
197 }
198
199 return q;
200 }
201
202 FbMqtt *
203 fb_mqtt_new(PurpleConnection *gc)
204 {
205 FbMqtt *mqtt;
206
207 g_return_val_if_fail(PURPLE_IS_CONNECTION(gc), NULL);
208
209 mqtt = g_object_new(FB_TYPE_MQTT, NULL);
210 mqtt->gc = gc;
211
212 return mqtt;
213 };
214
215 void
216 fb_mqtt_close(FbMqtt *mqtt)
217 {
218 g_return_if_fail(FB_IS_MQTT(mqtt));
219
220 g_clear_handle_id(&mqtt->tev, g_source_remove);
221
222 if(mqtt->cancellable != NULL) {
223 g_cancellable_cancel(mqtt->cancellable);
224 g_clear_object(&mqtt->cancellable);
225 }
226
227 if(mqtt->conn != NULL) {
228 purple_gio_graceful_close(mqtt->conn,
229 G_INPUT_STREAM(mqtt->input),
230 G_OUTPUT_STREAM(mqtt->output));
231 g_clear_object(&mqtt->input);
232 g_clear_object(&mqtt->output);
233 g_clear_object(&mqtt->conn);
234 }
235
236 mqtt->connected = FALSE;
237 g_byte_array_set_size(mqtt->rbuf, 0);
238 }
239
240 static void
241 fb_mqtt_take_error(FbMqtt *mqtt, GError *err, const gchar *prefix)
242 {
243 if (g_error_matches(err, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
244 /* Return as cancelled means the connection is closing */
245 g_error_free(err);
246 return;
247 }
248
249 /* Now we can check for programming errors */
250 g_return_if_fail(FB_IS_MQTT(mqtt));
251
252 if (prefix != NULL) {
253 g_prefix_error(&err, "%s: ", prefix);
254 }
255
256 g_signal_emit_by_name(mqtt, "error", err);
257 g_error_free(err);
258 }
259
260 static void
261 fb_mqtt_error_literal(FbMqtt *mqtt, FbMqttError error, const gchar *msg)
262 {
263 GError *err;
264
265 g_return_if_fail(FB_IS_MQTT(mqtt));
266
267 err = g_error_new_literal(FB_MQTT_ERROR, error, msg);
268
269 g_signal_emit_by_name(mqtt, "error", err);
270 g_error_free(err);
271 }
272
273 void
274 fb_mqtt_error(FbMqtt *mqtt, FbMqttError error, const gchar *format, ...)
275 {
276 GError *err;
277 va_list ap;
278
279 g_return_if_fail(FB_IS_MQTT(mqtt));
280
281 va_start(ap, format);
282 err = g_error_new_valist(FB_MQTT_ERROR, error, format, ap);
283 va_end(ap);
284
285 g_signal_emit_by_name(mqtt, "error", err);
286 g_error_free(err);
287 }
288
289 static gboolean
290 fb_mqtt_cb_timeout(gpointer data)
291 {
292 FbMqtt *mqtt = data;
293
294 mqtt->tev = 0;
295 fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL,
296 _("Connection timed out"));
297 return FALSE;
298 }
299
300 static void
301 fb_mqtt_timeout_clear(FbMqtt *mqtt)
302 {
303 g_clear_handle_id(&mqtt->tev, g_source_remove);
304 }
305
306 static void
307 fb_mqtt_timeout(FbMqtt *mqtt)
308 {
309 fb_mqtt_timeout_clear(mqtt);
310 mqtt->tev = g_timeout_add_seconds(FB_MQTT_TIMEOUT_CONN, fb_mqtt_cb_timeout,
311 mqtt);
312 }
313
314 static gboolean
315 fb_mqtt_cb_ping(gpointer data)
316 {
317 FbMqtt *mqtt = data;
318 FbMqttMessage *msg;
319
320 msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PINGREQ, 0);
321 fb_mqtt_write(mqtt, msg);
322 g_object_unref(msg);
323
324 mqtt->tev = 0;
325 fb_mqtt_timeout(mqtt);
326 return FALSE;
327 }
328
329 static void
330 fb_mqtt_ping(FbMqtt *mqtt)
331 {
332 fb_mqtt_timeout_clear(mqtt);
333 mqtt->tev = g_timeout_add_seconds(FB_MQTT_TIMEOUT_PING, fb_mqtt_cb_ping,
334 mqtt);
335 }
336
337 static void
338 fb_mqtt_cb_fill(GObject *source, GAsyncResult *res, gpointer data)
339 {
340 GBufferedInputStream *input = G_BUFFERED_INPUT_STREAM(source);
341 FbMqtt *mqtt = data;
342 gssize ret;
343 GError *err = NULL;
344
345 ret = g_buffered_input_stream_fill_finish(input, res, &err);
346
347 if (ret < 1) {
348 if (ret == 0) {
349 err = g_error_new_literal(G_IO_ERROR,
350 G_IO_ERROR_CONNECTION_CLOSED,
351 _("Connection closed"));
352 }
353
354 fb_mqtt_take_error(mqtt, err, _("Failed to read fixed header"));
355 return;
356 }
357
358 fb_mqtt_read_packet(mqtt);
359 }
360
361 static void
362 fb_mqtt_cb_read_packet(GObject *source, GAsyncResult *res, gpointer data)
363 {
364 FbMqtt *mqtt = data;
365 gssize ret;
366 FbMqttMessage *msg;
367 GError *err = NULL;
368
369 ret = g_input_stream_read_finish(G_INPUT_STREAM(source), res, &err);
370
371 if (ret < 1) {
372 if (ret == 0) {
373 err = g_error_new_literal(G_IO_ERROR,
374 G_IO_ERROR_CONNECTION_CLOSED,
375 _("Connection closed"));
376 }
377
378 fb_mqtt_take_error(mqtt, err, _("Failed to read packet data"));
379 return;
380 }
381
382 mqtt->remz -= ret;
383
384 if(mqtt->remz > 0) {
385 g_input_stream_read_async(G_INPUT_STREAM(source),
386 mqtt->rbuf->data + mqtt->rbuf->len - mqtt->remz,
387 mqtt->remz, G_PRIORITY_DEFAULT,
388 mqtt->cancellable, fb_mqtt_cb_read_packet,
389 mqtt);
390 return;
391 }
392
393 msg = fb_mqtt_message_new_bytes(mqtt->rbuf);
394
395 if (G_UNLIKELY(msg == NULL)) {
396 fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL,
397 _("Failed to parse message"));
398 return;
399 }
400
401 fb_mqtt_read(mqtt, msg);
402 g_object_unref(msg);
403
404 /* Read another packet if connection wasn't reset in fb_mqtt_read() */
405 if (fb_mqtt_connected(mqtt, FALSE)) {
406 fb_mqtt_read_packet(mqtt);
407 }
408 }
409
410 static void
411 fb_mqtt_read_packet(FbMqtt *mqtt)
412 {
413 const guint8 *buf;
414 gsize count = 0;
415 gsize pos;
416 guint mult = 1;
417 guint8 byte;
418 gsize size = 0;
419
420 buf = g_buffered_input_stream_peek_buffer(mqtt->input, &count);
421
422 /* Start at 1 to skip the first byte */
423 pos = 1;
424
425 do {
426 if (pos >= count) {
427 /* Not enough data yet, try again later */
428 g_buffered_input_stream_fill_async(mqtt->input, -1,
429 G_PRIORITY_DEFAULT,
430 mqtt->cancellable,
431 fb_mqtt_cb_fill, mqtt);
432 return;
433 }
434
435 byte = *(buf + pos++);
436
437 size += (byte & 127) * mult;
438 mult *= 128;
439 } while ((byte & 128) != 0);
440
441 /* Add header to size */
442 size += pos;
443
444 g_byte_array_set_size(mqtt->rbuf, size);
445 mqtt->remz = size;
446
447 /* TODO: Use g_input_stream_read_all_async() when available. */
448 /* TODO: Alternately, it would be nice to let the
449 * FbMqttMessage directly use the GBufferedInputStream
450 * buffer instead of copying it, provided it's consumed
451 * before the next read.
452 */
453 g_input_stream_read_async(G_INPUT_STREAM(mqtt->input), mqtt->rbuf->data,
454 mqtt->rbuf->len, G_PRIORITY_DEFAULT,
455 mqtt->cancellable, fb_mqtt_cb_read_packet, mqtt);
456 }
457
458 void
459 fb_mqtt_read(FbMqtt *mqtt, FbMqttMessage *msg)
460 {
461 FbMqttMessage *nsg;
462 GByteArray *wytes;
463 gchar *str;
464 guint8 chr;
465 guint16 mid;
466
467 g_return_if_fail(FB_IS_MQTT(mqtt));
468
469 fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, msg->bytes,
470 "Reading %d (flags: 0x%0X)",
471 msg->type, msg->flags);
472
473 switch (msg->type) {
474 case FB_MQTT_MESSAGE_TYPE_CONNACK:
475 if (!fb_mqtt_message_read_byte(msg, NULL) ||
476 !fb_mqtt_message_read_byte(msg, &chr))
477 {
478 break;
479 }
480
481 if (chr != FB_MQTT_ERROR_SUCCESS) {
482 fb_mqtt_error(mqtt, chr, _("Connection failed (%u)"),
483 chr);
484 return;
485 }
486
487 mqtt->connected = TRUE;
488 fb_mqtt_ping(mqtt);
489 g_signal_emit_by_name(mqtt, "connect");
490 return;
491
492 case FB_MQTT_MESSAGE_TYPE_PUBLISH:
493 if (!fb_mqtt_message_read_str(msg, &str)) {
494 break;
495 }
496
497 if((msg->flags & FB_MQTT_MESSAGE_FLAG_QOS1) ||
498 (msg->flags & FB_MQTT_MESSAGE_FLAG_QOS2))
499 {
500 if(msg->flags & FB_MQTT_MESSAGE_FLAG_QOS1) {
501 chr = FB_MQTT_MESSAGE_TYPE_PUBACK;
502 } else {
503 chr = FB_MQTT_MESSAGE_TYPE_PUBREC;
504 }
505
506 if (!fb_mqtt_message_read_mid(msg, &mid)) {
507 g_free(str);
508 break;
509 }
510
511 nsg = fb_mqtt_message_new(chr, 0);
512 fb_mqtt_message_write_u16(nsg, mid);
513 fb_mqtt_write(mqtt, nsg);
514 g_object_unref(nsg);
515 }
516
517 wytes = g_byte_array_new();
518 fb_mqtt_message_read_r(msg, wytes);
519 g_signal_emit_by_name(mqtt, "publish", str, wytes);
520 g_byte_array_free(wytes, TRUE);
521 g_free(str);
522 return;
523
524 case FB_MQTT_MESSAGE_TYPE_PUBREL:
525 if (!fb_mqtt_message_read_mid(msg, &mid)) {
526 break;
527 }
528
529 nsg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PUBCOMP, 0);
530 fb_mqtt_message_write_u16(nsg, mid); /* Message identifier */
531 fb_mqtt_write(mqtt, nsg);
532 g_object_unref(nsg);
533 return;
534
535 case FB_MQTT_MESSAGE_TYPE_PINGRESP:
536 fb_mqtt_ping(mqtt);
537 return;
538
539 case FB_MQTT_MESSAGE_TYPE_PUBACK:
540 case FB_MQTT_MESSAGE_TYPE_PUBCOMP:
541 case FB_MQTT_MESSAGE_TYPE_SUBACK:
542 case FB_MQTT_MESSAGE_TYPE_UNSUBACK:
543 return;
544
545 default:
546 fb_mqtt_error(mqtt, FB_MQTT_ERROR_GENERAL,
547 _("Unknown packet (%u)"), msg->type);
548 return;
549 }
550
551 /* Since no case returned, there was a parse error. */
552 fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL,
553 _("Failed to parse message"));
554 }
555
556 static void
557 fb_mqtt_cb_push_bytes(GObject *source, GAsyncResult *res, gpointer data)
558 {
559 PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(source);
560 FbMqtt *mqtt = data;
561 GError *err = NULL;
562
563 if (!purple_queued_output_stream_push_bytes_finish(stream,
564 res, &err)) {
565 purple_queued_output_stream_clear_queue(stream);
566
567 fb_mqtt_take_error(mqtt, err, _("Failed to write data"));
568 return;
569 }
570 }
571
572 void
573 fb_mqtt_write(FbMqtt *mqtt, FbMqttMessage *msg)
574 {
575 const GByteArray *bytes;
576 GBytes *gbytes;
577
578 g_return_if_fail(FB_IS_MQTT(mqtt));
579 g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
580
581 bytes = fb_mqtt_message_bytes(msg);
582
583 if (G_UNLIKELY(bytes == NULL)) {
584 fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL,
585 _("Failed to format data"));
586 return;
587 }
588
589 fb_util_debug_hexdump(FB_UTIL_DEBUG_INFO, msg->bytes,
590 "Writing %d (flags: 0x%0X)",
591 msg->type, msg->flags);
592
593 /* TODO: Would be nice to refactor this to not require copying bytes */
594 gbytes = g_bytes_new(bytes->data, bytes->len);
595 purple_queued_output_stream_push_bytes_async(mqtt->output, gbytes,
596 G_PRIORITY_DEFAULT,
597 mqtt->cancellable,
598 fb_mqtt_cb_push_bytes, mqtt);
599 g_bytes_unref(gbytes);
600 }
601
602 static void
603 fb_mqtt_cb_open(GObject *source, GAsyncResult *res, gpointer data)
604 {
605 FbMqtt *mqtt = data;
606 GSocketConnection *conn;
607 GError *err = NULL;
608
609 conn = g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(source),
610 res, &err);
611
612 if (conn == NULL) {
613 fb_mqtt_take_error(mqtt, err, NULL);
614 return;
615 }
616
617 fb_mqtt_timeout_clear(mqtt);
618
619 mqtt->conn = G_IO_STREAM(conn);
620 mqtt->input = G_BUFFERED_INPUT_STREAM(g_buffered_input_stream_new(
621 g_io_stream_get_input_stream(mqtt->conn)));
622 mqtt->output = purple_queued_output_stream_new(
623 g_io_stream_get_output_stream(mqtt->conn));
624
625 fb_mqtt_read_packet(mqtt);
626
627 g_signal_emit_by_name(mqtt, "open");
628 }
629
630 void
631 fb_mqtt_open(FbMqtt *mqtt, const gchar *host, gint port)
632 {
633 PurpleAccount *acc;
634 GSocketClient *client;
635 GError *err = NULL;
636
637 g_return_if_fail(FB_IS_MQTT(mqtt));
638
639 acc = purple_connection_get_account(mqtt->gc);
640 fb_mqtt_close(mqtt);
641
642 client = purple_gio_socket_client_new(acc, &err);
643
644 if (client == NULL) {
645 fb_mqtt_take_error(mqtt, err, NULL);
646 return;
647 }
648
649 mqtt->cancellable = g_cancellable_new();
650
651 g_socket_client_set_tls(client, TRUE);
652 g_socket_client_connect_to_host_async(client, host, port,
653 mqtt->cancellable, fb_mqtt_cb_open,
654 mqtt);
655 g_object_unref(client);
656
657 fb_mqtt_timeout(mqtt);
658 }
659
660 void
661 fb_mqtt_connect(FbMqtt *mqtt, guint8 flags, const GByteArray *pload)
662 {
663 FbMqttMessage *msg;
664
665 g_return_if_fail(!fb_mqtt_connected(mqtt, FALSE));
666 g_return_if_fail(pload != NULL);
667
668 /* Facebook always sends a CONNACK, use QoS1 */
669 flags |= FB_MQTT_CONNECT_FLAG_QOS1;
670
671 msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_CONNECT, 0);
672 fb_mqtt_message_write_str(msg, FB_MQTT_NAME); /* Protocol name */
673 fb_mqtt_message_write_byte(msg, FB_MQTT_LEVEL); /* Protocol level */
674 fb_mqtt_message_write_byte(msg, flags); /* Flags */
675 fb_mqtt_message_write_u16(msg, FB_MQTT_KA); /* Keep alive */
676
677 fb_mqtt_message_write(msg, pload->data, pload->len);
678 fb_mqtt_write(mqtt, msg);
679
680 fb_mqtt_timeout(mqtt);
681 g_object_unref(msg);
682 }
683
684 gboolean
685 fb_mqtt_connected(FbMqtt *mqtt, gboolean error)
686 {
687 gboolean connected;
688
689 g_return_val_if_fail(FB_IS_MQTT(mqtt), FALSE);
690 connected = (mqtt->conn != NULL) && mqtt->connected;
691
692 if (!connected && error) {
693 fb_mqtt_error_literal(mqtt, FB_MQTT_ERROR_GENERAL, _("Not connected"));
694 }
695
696 return connected;
697 }
698
699 void
700 fb_mqtt_disconnect(FbMqtt *mqtt)
701 {
702 FbMqttMessage *msg;
703
704 if (G_UNLIKELY(!fb_mqtt_connected(mqtt, FALSE))) {
705 return;
706 }
707
708 msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_DISCONNECT, 0);
709 fb_mqtt_write(mqtt, msg);
710 g_object_unref(msg);
711 fb_mqtt_close(mqtt);
712 }
713
714 void
715 fb_mqtt_publish(FbMqtt *mqtt, const gchar *topic, const GByteArray *pload)
716 {
717 FbMqttMessage *msg;
718
719 g_return_if_fail(FB_IS_MQTT(mqtt));
720 g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
721
722 /* Message identifier not required, but for consistency use QoS1 */
723 msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_PUBLISH,
724 FB_MQTT_MESSAGE_FLAG_QOS1);
725
726 fb_mqtt_message_write_str(msg, topic); /* Message topic */
727 fb_mqtt_message_write_mid(msg, &mqtt->mid); /* Message identifier */
728
729 if (pload != NULL) {
730 fb_mqtt_message_write(msg, pload->data, pload->len);
731 }
732
733 fb_mqtt_write(mqtt, msg);
734 g_object_unref(msg);
735 }
736
737 void
738 fb_mqtt_subscribe(FbMqtt *mqtt, ...)
739 {
740 const gchar *topic;
741 FbMqttMessage *msg;
742 guint16 qos;
743 va_list ap;
744
745 g_return_if_fail(FB_IS_MQTT(mqtt));
746 g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
747
748 /* Facebook requires a message identifier, use QoS1 */
749 msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_SUBSCRIBE,
750 FB_MQTT_MESSAGE_FLAG_QOS1);
751
752 fb_mqtt_message_write_mid(msg, &mqtt->mid); /* Message identifier */
753
754 va_start(ap, mqtt);
755
756 while ((topic = va_arg(ap, const gchar*)) != NULL) {
757 qos = va_arg(ap, guint);
758 fb_mqtt_message_write_str(msg, topic);
759 fb_mqtt_message_write_byte(msg, qos);
760 }
761
762 va_end(ap);
763
764 fb_mqtt_write(mqtt, msg);
765 g_object_unref(msg);
766 }
767
768 void
769 fb_mqtt_unsubscribe(FbMqtt *mqtt, const gchar *topic1, ...)
770 {
771 const gchar *topic;
772 FbMqttMessage *msg;
773 va_list ap;
774
775 g_return_if_fail(FB_IS_MQTT(mqtt));
776 g_return_if_fail(fb_mqtt_connected(mqtt, FALSE));
777
778 /* Facebook requires a message identifier, use QoS1 */
779 msg = fb_mqtt_message_new(FB_MQTT_MESSAGE_TYPE_UNSUBSCRIBE,
780 FB_MQTT_MESSAGE_FLAG_QOS1);
781
782 fb_mqtt_message_write_mid(msg, &mqtt->mid); /* Message identifier */
783 fb_mqtt_message_write_str(msg, topic1); /* First topic */
784
785 va_start(ap, topic1);
786
787 while ((topic = va_arg(ap, const gchar*)) != NULL) {
788 fb_mqtt_message_write_str(msg, topic); /* Remaining topics */
789 }
790
791 va_end(ap);
792
793 fb_mqtt_write(mqtt, msg);
794 g_object_unref(msg);
795 }
796
797 FbMqttMessage *
798 fb_mqtt_message_new(FbMqttMessageType type, FbMqttMessageFlags flags)
799 {
800 FbMqttMessage *msg;
801
802 msg = g_object_new(FB_TYPE_MQTT_MESSAGE, NULL);
803
804 msg->type = type;
805 msg->flags = flags;
806 msg->bytes = g_byte_array_new();
807 msg->local = TRUE;
808
809 return msg;
810 }
811
812 FbMqttMessage *
813 fb_mqtt_message_new_bytes(GByteArray *bytes)
814 {
815 FbMqttMessage *msg;
816 guint8 *byte;
817
818 g_return_val_if_fail(bytes != NULL, NULL);
819 g_return_val_if_fail(bytes->len >= 2, NULL);
820
821 msg = g_object_new(FB_TYPE_MQTT_MESSAGE, NULL);
822
823 msg->bytes = bytes;
824 msg->local = FALSE;
825 msg->type = (*bytes->data & 0xF0) >> 4;
826 msg->flags = *bytes->data & 0x0F;
827
828 /* Skip the fixed header */
829 byte = msg->bytes->data + 1;
830 while((*byte & 128) != 0) {
831 byte++;
832 }
833 byte++;
834 msg->offset = byte - bytes->data;
835 msg->pos = msg->offset;
836
837 return msg;
838 }
839
840 void
841 fb_mqtt_message_reset(FbMqttMessage *msg)
842 {
843 g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
844
845 if(msg->offset > 0) {
846 g_byte_array_remove_range(msg->bytes, 0, msg->offset);
847 msg->offset = 0;
848 msg->pos = 0;
849 }
850 }
851
852 const GByteArray *
853 fb_mqtt_message_bytes(FbMqttMessage *msg)
854 {
855 guint i;
856 guint8 byte;
857 guint8 sbuf[4];
858 guint32 size;
859
860 g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), NULL);
861
862 i = 0;
863 size = msg->bytes->len - msg->offset;
864
865 do {
866 if (G_UNLIKELY(i >= G_N_ELEMENTS(sbuf))) {
867 return NULL;
868 }
869
870 byte = size % 128;
871 size /= 128;
872
873 if (size > 0) {
874 byte |= 128;
875 }
876
877 sbuf[i++] = byte;
878 } while (size > 0);
879
880 fb_mqtt_message_reset(msg);
881 g_byte_array_prepend(msg->bytes, sbuf, i);
882
883 byte = ((msg->type & 0x0F) << 4) | (msg->flags & 0x0F);
884 g_byte_array_prepend(msg->bytes, &byte, sizeof byte);
885
886 msg->pos = (i + 1) * (sizeof byte);
887 return msg->bytes;
888 }
889
890 gboolean
891 fb_mqtt_message_read(FbMqttMessage *msg, gpointer data, guint size)
892 {
893 g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE);
894
895 if((msg->pos + size) > msg->bytes->len) {
896 return FALSE;
897 }
898
899 if ((data != NULL) && (size > 0)) {
900 memcpy(data, msg->bytes->data + msg->pos, size);
901 }
902
903 msg->pos += size;
904 return TRUE;
905 }
906
907 gboolean
908 fb_mqtt_message_read_r(FbMqttMessage *msg, GByteArray *bytes)
909 {
910 guint size;
911
912 g_return_val_if_fail(FB_IS_MQTT_MESSAGE(msg), FALSE);
913 size = msg->bytes->len - msg->pos;
914
915 if (G_LIKELY(size > 0)) {
916 g_byte_array_append(bytes, msg->bytes->data + msg->pos, size);
917 }
918
919 return TRUE;
920 }
921
922 gboolean
923 fb_mqtt_message_read_byte(FbMqttMessage *msg, guint8 *value)
924 {
925 return fb_mqtt_message_read(msg, value, sizeof *value);
926 }
927
928 gboolean
929 fb_mqtt_message_read_mid(FbMqttMessage *msg, guint16 *value)
930 {
931 return fb_mqtt_message_read_u16(msg, value);
932 }
933
934 gboolean
935 fb_mqtt_message_read_u16(FbMqttMessage *msg, guint16 *value)
936 {
937 if (!fb_mqtt_message_read(msg, value, sizeof *value)) {
938 return FALSE;
939 }
940
941 if (value != NULL) {
942 *value = g_ntohs(*value);
943 }
944
945 return TRUE;
946 }
947
948 gboolean
949 fb_mqtt_message_read_str(FbMqttMessage *msg, gchar **value)
950 {
951 guint8 *data;
952 guint16 size;
953
954 if (!fb_mqtt_message_read_u16(msg, &size)) {
955 return FALSE;
956 }
957
958 if (value != NULL) {
959 data = g_new(guint8, size + 1);
960 data[size] = 0;
961 } else {
962 data = NULL;
963 }
964
965 if (!fb_mqtt_message_read(msg, data, size)) {
966 g_free(data);
967 return FALSE;
968 }
969
970 if (value != NULL) {
971 *value = (gchar *) data;
972 }
973
974 return TRUE;
975 }
976
977 void
978 fb_mqtt_message_write(FbMqttMessage *msg, gconstpointer data, guint size)
979 {
980 g_return_if_fail(FB_IS_MQTT_MESSAGE(msg));
981
982 g_byte_array_append(msg->bytes, data, size);
983 msg->pos += size;
984 }
985
986 void
987 fb_mqtt_message_write_byte(FbMqttMessage *msg, guint8 value)
988 {
989 fb_mqtt_message_write(msg, &value, sizeof value);
990 }
991
992 void
993 fb_mqtt_message_write_mid(FbMqttMessage *msg, guint16 *value)
994 {
995 g_return_if_fail(value != NULL);
996 fb_mqtt_message_write_u16(msg, ++(*value));
997 }
998
999 void
1000 fb_mqtt_message_write_u16(FbMqttMessage *msg, guint16 value)
1001 {
1002 value = g_htons(value);
1003 fb_mqtt_message_write(msg, &value, sizeof value);
1004 }
1005
1006 void
1007 fb_mqtt_message_write_str(FbMqttMessage *msg, const gchar *value)
1008 {
1009 gint16 size;
1010
1011 g_return_if_fail(value != NULL);
1012
1013 size = strlen(value);
1014 fb_mqtt_message_write_u16(msg, size);
1015 fb_mqtt_message_write(msg, value, size);
1016 }

mercurial