libpurple/protocols/bonjour/jabber.c

changeset 39639
fe7fa102e766
parent 39638
635c8b6ca9a9
child 39656
378ed2b4e242
--- a/libpurple/protocols/bonjour/jabber.c	Sun Jun 30 03:04:55 2019 -0400
+++ b/libpurple/protocols/bonjour/jabber.c	Tue Jul 02 02:12:08 2019 -0400
@@ -46,15 +46,15 @@
 #include <ifaddrs.h>
 #endif
 
-
-#include "network.h"
-#include "eventloop.h"
+#include "buddylist.h"
 #include "connection.h"
-#include "buddylist.h"
+#include "debug.h"
+#include "eventloop.h"
+#include "network.h"
+#include "notify.h"
+#include "purple-gio.h"
+#include "util.h"
 #include "xmlnode.h"
-#include "debug.h"
-#include "notify.h"
-#include "util.h"
 
 #include "jabber.h"
 #include "parser.h"
@@ -86,7 +86,7 @@
 bonjour_jabber_conv_new(PurpleBuddy *pb, PurpleAccount *account, const char *ip) {
 
 	BonjourJabberConversation *bconv = g_new0(BonjourJabberConversation, 1);
-	bconv->socket = -1;
+	bconv->cancellable = g_cancellable_new();
 	bconv->tx_buf = purple_circular_buffer_new(512);
 	bconv->tx_handler = 0;
 	bconv->rx_handler = 0;
@@ -267,32 +267,40 @@
 }
 
 static void
-_send_data_write_cb(gpointer data, gint source, PurpleInputCondition cond)
+_send_data_write_cb(GObject *stream, gpointer data)
 {
 	PurpleBuddy *pb = data;
 	BonjourBuddy *bb = purple_buddy_get_protocol_data(pb);
 	BonjourJabberConversation *bconv = bb->conversation;
-	int ret, writelen;
+	gsize writelen;
+	gssize ret;
+	GError *error = NULL;
 
 	writelen = purple_circular_buffer_get_max_read(bconv->tx_buf);
 
 	if (writelen == 0) {
-		purple_input_remove(bconv->tx_handler);
+		g_source_remove(bconv->tx_handler);
 		bconv->tx_handler = 0;
 		return;
 	}
 
-	ret = send(bconv->socket, purple_circular_buffer_get_output(bconv->tx_buf), writelen, 0);
+	ret = g_pollable_output_stream_write_nonblocking(
+	        G_POLLABLE_OUTPUT_STREAM(stream),
+	        purple_circular_buffer_get_output(bconv->tx_buf), writelen,
+	        bconv->cancellable, &error);
 
-	if (ret < 0 && errno == EAGAIN)
+	if (ret < 0 && error->code == G_IO_ERROR_WOULD_BLOCK) {
+		g_clear_error(&error);
 		return;
-	else if (ret <= 0) {
+	} else if (ret <= 0) {
 		PurpleConversation *conv = NULL;
 		PurpleAccount *account = NULL;
-		const char *error = g_strerror(errno);
 
-		purple_debug_error("bonjour", "Error sending message to buddy %s error: %s\n",
-				   purple_buddy_get_name(pb), error ? error : "(null)");
+		purple_debug_error(
+		        "bonjour",
+		        "Error sending message to buddy %s error: %s",
+		        purple_buddy_get_name(pb),
+		        error ? error->message : "(null)");
 
 		account = purple_buddy_get_account(pb);
 
@@ -304,6 +312,7 @@
 
 		bonjour_jabber_close_conversation(bb->conversation);
 		bb->conversation = NULL;
+		g_clear_error(&error);
 		return;
 	}
 
@@ -313,10 +322,11 @@
 static gint
 _send_data(PurpleBuddy *pb, char *message)
 {
-	gint ret;
-	int len = strlen(message);
 	BonjourBuddy *bb = purple_buddy_get_protocol_data(pb);
 	BonjourJabberConversation *bconv = bb->conversation;
+	gsize len = strlen(message);
+	gssize ret;
+	GError *error = NULL;
 
 	/* If we're not ready to actually send, append it to the buffer */
 	if (bconv->tx_handler != 0
@@ -325,20 +335,26 @@
 			|| !bconv->recv_stream_start
 			|| purple_circular_buffer_get_max_read(bconv->tx_buf) > 0) {
 		ret = -1;
-		errno = EAGAIN;
+		g_set_error_literal(&error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+		                    "Not yet ready to send.");
 	} else {
-		ret = send(bconv->socket, message, len, 0);
+		ret = g_pollable_output_stream_write_nonblocking(
+		        G_POLLABLE_OUTPUT_STREAM(bconv->output), message, len,
+		        bconv->cancellable, &error);
 	}
 
-	if (ret == -1 && errno == EAGAIN)
+	if (ret == -1 && error->code == G_IO_ERROR_WOULD_BLOCK) {
 		ret = 0;
-	else if (ret <= 0) {
+		g_clear_error(&error);
+	} else if (ret <= 0) {
 		PurpleConversation *conv;
 		PurpleAccount *account;
-		const char *error = g_strerror(errno);
 
-		purple_debug_error("bonjour", "Error sending message to buddy %s error: %s\n",
-				   purple_buddy_get_name(pb), error ? error : "(null)");
+		purple_debug_error(
+		        "bonjour",
+		        "Error sending message to buddy %s error: %s",
+		        purple_buddy_get_name(pb),
+		        error ? error->message : "(null)");
 
 		account = purple_buddy_get_account(pb);
 
@@ -350,14 +366,23 @@
 
 		bonjour_jabber_close_conversation(bb->conversation);
 		bb->conversation = NULL;
+		g_clear_error(&error);
 		return -1;
 	}
 
 	if (ret < len) {
 		/* Don't interfere with the stream starting */
-		if (bconv->sent_stream_start == FULLY_SENT && bconv->recv_stream_start && bconv->tx_handler == 0)
-			bconv->tx_handler = purple_input_add(bconv->socket, PURPLE_INPUT_WRITE,
-				_send_data_write_cb, pb);
+		if (bconv->sent_stream_start == FULLY_SENT &&
+		    bconv->recv_stream_start && bconv->tx_handler == 0) {
+			GSource *source =
+			        g_pollable_output_stream_create_source(
+			                G_POLLABLE_OUTPUT_STREAM(bconv->output),
+			                bconv->cancellable);
+			g_source_set_callback(source,
+			                      (GSourceFunc)_send_data_write_cb,
+			                      pb, NULL);
+			bconv->tx_handler = g_source_attach(source, NULL);
+		}
 		purple_circular_buffer_append(bconv->tx_buf, message + ret, len - ret);
 	}
 
@@ -396,22 +421,26 @@
 		bb->conversation = NULL;
 }
 
-static void
-_client_socket_handler(gpointer data, gint socket, PurpleInputCondition condition)
+static gboolean
+_client_socket_handler(GObject *stream, gpointer data)
 {
 	BonjourJabberConversation *bconv = data;
+	GError *error = NULL;
 	gssize len;
 	static char message[4096];
 
 	/* Read the data from the socket */
-	if ((len = recv(socket, message, sizeof(message) - 1, 0)) < 0) {
-		/* There have been an error reading from the socket */
-		if (len != -1 || errno != EAGAIN) {
-			const char *err = g_strerror(errno);
-
-			purple_debug_warning("bonjour",
-					"receive of %" G_GSSIZE_FORMAT " error: %s\n",
-					len, err ? err : "(null)");
+	len = g_pollable_input_stream_read_nonblocking(
+	        G_POLLABLE_INPUT_STREAM(stream), message, sizeof(message) - 1,
+	        bconv->cancellable, &error);
+	if (len == -1) {
+		/* There has been an error reading from the socket */
+		if (error == NULL || (error->code != G_IO_ERROR_WOULD_BLOCK &&
+		                      error->code != G_IO_ERROR_CANCELLED)) {
+			purple_debug_warning(
+			        "bonjour",
+			        "receive of %" G_GSSIZE_FORMAT " error: %s",
+			        len, error ? error->message : "(null)");
 
 			bonjour_jabber_close_conversation(bconv);
 			if (bconv->pb != NULL) {
@@ -424,41 +453,47 @@
 			/* I guess we really don't need to notify the user.
 			 * If they try to send another message it'll reconnect */
 		}
-		return;
+		g_clear_error(&error);
+		return FALSE;
 	} else if (len == 0) { /* The other end has closed the socket */
 		const gchar *name = purple_buddy_get_name(bconv->pb);
 		purple_debug_warning("bonjour", "Connection closed (without stream end) by %s.\n", (name) ? name : "(unknown)");
 		bonjour_jabber_stream_ended(bconv);
-		return;
+		return FALSE;
 	}
 
 	message[len] = '\0';
 
 	purple_debug_info("bonjour", "Receive: -%s- %" G_GSSIZE_FORMAT " bytes\n", message, len);
 	bonjour_parser_process(bconv, message, len);
+
+	return TRUE;
 }
 
 struct _stream_start_data {
 	char *msg;
 };
 
-
 static void
-_start_stream(gpointer data, gint source, PurpleInputCondition condition)
+_start_stream(GObject *stream, gpointer data)
 {
 	BonjourJabberConversation *bconv = data;
 	struct _stream_start_data *ss = bconv->stream_data;
-	int len, ret;
+	GError *error = NULL;
+	gsize len;
+	gssize ret;
 
 	len = strlen(ss->msg);
 
 	/* Start Stream */
-	ret = send(source, ss->msg, len, 0);
+	ret = g_pollable_output_stream_write_nonblocking(
+	        G_POLLABLE_OUTPUT_STREAM(stream), ss->msg, len,
+	        bconv->cancellable, &error);
 
-	if (ret == -1 && errno == EAGAIN)
+	if (ret == -1 && error->code == G_IO_ERROR_WOULD_BLOCK) {
+		g_clear_error(&error);
 		return;
-	else if (ret <= 0) {
-		const char *err = g_strerror(errno);
+	} else if (ret <= 0) {
 		PurpleConversation *conv;
 		const char *bname = bconv->buddy_name;
 		BonjourBuddy *bb = NULL;
@@ -468,8 +503,11 @@
 			bname = purple_buddy_get_name(bconv->pb);
 		}
 
-		purple_debug_error("bonjour", "Error starting stream with buddy %s at %s error: %s\n",
-				   bname ? bname : "(unknown)", bconv->ip, err ? err : "(null)");
+		purple_debug_error(
+		        "bonjour",
+		        "Error starting stream with buddy %s at %s error: %s",
+		        bname ? bname : "(unknown)", bconv->ip,
+		        error ? error->message : "(null)");
 
 		conv = PURPLE_CONVERSATION(purple_conversations_find_im_with_account(bname, bconv->account));
 		if (conv != NULL)
@@ -481,6 +519,7 @@
 		if(bb != NULL)
 			bb->conversation = NULL;
 
+		g_clear_error(&error);
 		return;
 	}
 
@@ -497,17 +536,20 @@
 	bconv->stream_data = NULL;
 
 	/* Stream started; process the send buffer if there is one */
-	purple_input_remove(bconv->tx_handler);
+	g_source_remove(bconv->tx_handler);
 	bconv->tx_handler = 0;
 	bconv->sent_stream_start = FULLY_SENT;
 
 	bonjour_jabber_stream_started(bconv);
 }
 
-static gboolean bonjour_jabber_send_stream_init(BonjourJabberConversation *bconv, int client_socket)
+static gboolean
+bonjour_jabber_send_stream_init(BonjourJabberConversation *bconv,
+                                GError **error)
 {
-	int ret, len;
-	char *stream_start;
+	gchar *stream_start;
+	gsize len;
+	gssize ret;
 	const char *bname = bconv->buddy_name;
 
 	if (bconv->pb != NULL)
@@ -524,15 +566,18 @@
 	bconv->sent_stream_start = PARTIALLY_SENT;
 
 	/* Start the stream */
-	ret = send(client_socket, stream_start, len, 0);
-
-	if (ret == -1 && errno == EAGAIN)
+	ret = g_pollable_output_stream_write_nonblocking(
+	        G_POLLABLE_OUTPUT_STREAM(bconv->output), stream_start, len,
+	        bconv->cancellable, error);
+	if (ret == -1 && (*error)->code == G_IO_ERROR_WOULD_BLOCK) {
 		ret = 0;
-	else if (ret <= 0) {
-		const char *err = g_strerror(errno);
-
-		purple_debug_error("bonjour", "Error starting stream with buddy %s at %s error: %s\n",
-				   (*bname) ? bname : "(unknown)", bconv->ip, err ? err : "(null)");
+		g_clear_error(error);
+	} else if (ret <= 0) {
+		purple_debug_error(
+		        "bonjour",
+		        "Error starting stream with buddy %s at %s error: %s",
+		        (*bname) ? bname : "(unknown)", bconv->ip,
+		        *error ? (*error)->message : "(null)");
 
 		if (bconv->pb) {
 			PurpleConversation *conv;
@@ -543,7 +588,12 @@
 					PURPLE_MESSAGE_ERROR);
 		}
 
-		close(client_socket);
+		purple_gio_graceful_close(G_IO_STREAM(bconv->socket),
+		                          G_INPUT_STREAM(bconv->input),
+		                          G_OUTPUT_STREAM(bconv->output));
+		g_clear_object(&bconv->socket);
+		bconv->input = NULL;
+		bconv->output = NULL;
 		g_free(stream_start);
 
 		return FALSE;
@@ -551,14 +601,20 @@
 
 	/* This is unlikely to happen */
 	if (ret < len) {
+		GSource *source;
 		struct _stream_start_data *ss = g_new(struct _stream_start_data, 1);
 		ss->msg = g_strdup(stream_start + ret);
 		bconv->stream_data = ss;
 		/* Finish sending the stream start */
-		bconv->tx_handler = purple_input_add(client_socket,
-			PURPLE_INPUT_WRITE, _start_stream, bconv);
-	} else
+		source = g_pollable_output_stream_create_source(
+		        G_POLLABLE_OUTPUT_STREAM(bconv->output),
+		        bconv->cancellable);
+		g_source_set_callback(source, (GSourceFunc)_start_stream, bconv,
+		                      NULL);
+		bconv->tx_handler = g_source_attach(source, NULL);
+	} else {
 		bconv->sent_stream_start = FULLY_SENT;
+	}
 
 	g_free(stream_start);
 
@@ -567,17 +623,23 @@
 
 /* This gets called when we've successfully sent our <stream:stream />
  * AND when we've received a <stream:stream /> */
-void bonjour_jabber_stream_started(BonjourJabberConversation *bconv) {
+void
+bonjour_jabber_stream_started(BonjourJabberConversation *bconv)
+{
+	GError *error = NULL;
 
-	if (bconv->sent_stream_start == NOT_SENT && !bonjour_jabber_send_stream_init(bconv, bconv->socket)) {
-		const char *err = g_strerror(errno);
+	if (bconv->sent_stream_start == NOT_SENT &&
+	    !bonjour_jabber_send_stream_init(bconv, &error)) {
 		const char *bname = bconv->buddy_name;
 
 		if (bconv->pb)
 			bname = purple_buddy_get_name(bconv->pb);
 
-		purple_debug_error("bonjour", "Error starting stream with buddy %s at %s error: %s\n",
-				   bname ? bname : "(unknown)", bconv->ip, err ? err : "(null)");
+		purple_debug_error(
+		        "bonjour",
+		        "Error starting stream with buddy %s at %s error: %s",
+		        bname ? bname : "(unknown)", bconv->ip,
+		        error ? error->message : "(null)");
 
 		if (bconv->pb) {
 			PurpleConversation *conv;
@@ -589,13 +651,18 @@
 		}
 
 		/* We don't want to recieve anything else */
-		close(bconv->socket);
-		bconv->socket = -1;
+		purple_gio_graceful_close(G_IO_STREAM(bconv->socket),
+		                          G_INPUT_STREAM(bconv->input),
+		                          G_OUTPUT_STREAM(bconv->output));
+		g_clear_object(&bconv->socket);
+		bconv->input = NULL;
+		bconv->output = NULL;
 
 		/* This must be asynchronous because it destroys the parser and we
 		 * may be in the middle of parsing.
 		 */
 		async_bonjour_jabber_close_conversation(bconv);
+		g_clear_error(&error);
 		return;
 	}
 
@@ -604,12 +671,15 @@
 	if (bconv->sent_stream_start == FULLY_SENT && bconv->recv_stream_start
 			&& bconv->pb && purple_circular_buffer_get_max_read(bconv->tx_buf) > 0) {
 		/* Watch for when we can write the buffered messages */
-		bconv->tx_handler = purple_input_add(bconv->socket, PURPLE_INPUT_WRITE,
-			_send_data_write_cb, bconv->pb);
+		GSource *source = g_pollable_output_stream_create_source(
+		        G_POLLABLE_OUTPUT_STREAM(bconv->output),
+		        bconv->cancellable);
+		g_source_set_callback(source, (GSourceFunc)_send_data_write_cb,
+		                      bconv->pb, NULL);
+		bconv->tx_handler = g_source_attach(source, NULL);
 		/* We can probably write the data right now. */
-		_send_data_write_cb(bconv->pb, bconv->socket, PURPLE_INPUT_WRITE);
+		_send_data_write_cb(G_OBJECT(bconv->output), bconv->pb);
 	}
-
 }
 
 #ifndef INET6_ADDRSTRLEN
@@ -627,7 +697,7 @@
 	struct _match_buddies_by_address_t *mbba;
 	BonjourJabberConversation *bconv;
 	GSList *buddies;
-	GSocket *socket;
+	GSource *source;
 
 	their_addr = g_socket_connection_get_remote_address(connection, NULL);
 	if (their_addr == NULL) {
@@ -674,11 +744,15 @@
 	bconv = bonjour_jabber_conv_new(NULL, jdata->account, address_text);
 
 	/* We wait for the stream start before doing anything else */
-	socket = g_socket_connection_get_socket(connection); /* Temporary until fully async. */
-	bconv->socket = g_socket_get_fd(socket);
-	_purple_network_set_common_socket_flags(bconv->socket);
-	bconv->rx_handler = purple_input_add(bconv->socket, PURPLE_INPUT_READ,
-	                                     _client_socket_handler, bconv);
+	bconv->socket = g_object_ref(connection);
+	bconv->input = g_io_stream_get_input_stream(G_IO_STREAM(bconv->socket));
+	bconv->output =
+	        g_io_stream_get_output_stream(G_IO_STREAM(bconv->socket));
+	source = g_pollable_input_stream_create_source(
+	        G_POLLABLE_INPUT_STREAM(bconv->input), bconv->cancellable);
+	g_source_set_callback(source, (GSourceFunc)_client_socket_handler,
+	                      bconv, NULL);
+	bconv->rx_handler = g_source_attach(source, NULL);
 	g_free(address_text);
 }
 
@@ -721,20 +795,35 @@
 }
 
 static void
-_connected_to_buddy(gpointer data, gint source, const gchar *error)
+_connected_to_buddy(GObject *source, GAsyncResult *res, gpointer user_data)
 {
-	PurpleBuddy *pb = data;
+	PurpleBuddy *pb = user_data;
 	BonjourBuddy *bb = purple_buddy_get_protocol_data(pb);
+	GSocketConnection *conn;
+	GSource *rx_source;
+	GError *error = NULL;
 
-	bb->conversation->connect_data = NULL;
+	conn = g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(source),
+	                                              res, &error);
 
-	if (source < 0) {
+	if (conn == NULL) {
 		PurpleConversation *conv = NULL;
 		PurpleAccount *account = NULL;
 		GSList *tmp = bb->ips;
 
-		purple_debug_error("bonjour", "Error connecting to buddy %s at %s:%d (%s); Trying next IP address\n",
-				   purple_buddy_get_name(pb), bb->conversation->ip, bb->port_p2pj, error);
+		if (error && error->code == G_IO_ERROR_CANCELLED) {
+			/* This conversation was closed before it started. */
+			g_error_free(error);
+			return;
+		}
+
+		purple_debug_error("bonjour",
+		                   "Error connecting to buddy %s at %s:%d "
+		                   "(%s); Trying next IP address",
+		                   purple_buddy_get_name(pb),
+		                   bb->conversation->ip, bb->port_p2pj,
+		                   error ? error->message : "(unknown)");
+		g_clear_error(&error);
 
 		/* There may be multiple entries for the same IP - one per
 		 * presence recieved (e.g. multiple interfaces).
@@ -749,21 +838,23 @@
 
 		if (tmp != NULL) {
 			const gchar *ip;
-			PurpleProxyConnectData *connect_data;
+			GSocketClient *client;
 
 			bb->conversation->ip_link = ip = tmp->data;
 
 			purple_debug_info("bonjour", "Starting conversation with %s at %s:%d\n",
 					  purple_buddy_get_name(pb), ip, bb->port_p2pj);
 
-			connect_data = purple_proxy_connect(purple_account_get_connection(account),
-							    account, ip, bb->port_p2pj, _connected_to_buddy, pb);
-
-			if (connect_data != NULL) {
+			/* Make sure to connect without a proxy. */
+			client = g_socket_client_new();
+			if (client != NULL) {
 				g_free(bb->conversation->ip);
 				bb->conversation->ip = g_strdup(ip);
-				bb->conversation->connect_data = connect_data;
-
+				g_socket_client_connect_to_host_async(
+				        client, ip, bb->port_p2pj,
+				        bb->conversation->cancellable,
+				        _connected_to_buddy, pb);
+				g_object_unref(client);
 				return;
 			}
 		}
@@ -781,13 +872,22 @@
 		return;
 	}
 
-	if (!bonjour_jabber_send_stream_init(bb->conversation, source)) {
-		const char *err = g_strerror(errno);
+	bb->conversation->socket = conn;
+	bb->conversation->input =
+	        g_io_stream_get_input_stream(G_IO_STREAM(conn));
+	bb->conversation->output =
+	        g_io_stream_get_output_stream(G_IO_STREAM(conn));
+
+	if (!bonjour_jabber_send_stream_init(bb->conversation, &error)) {
 		PurpleConversation *conv = NULL;
 		PurpleAccount *account = NULL;
 
-		purple_debug_error("bonjour", "Error starting stream with buddy %s at %s:%d error: %s\n",
-				   purple_buddy_get_name(pb), bb->conversation->ip, bb->port_p2pj, err ? err : "(null)");
+		purple_debug_error("bonjour",
+		                   "Error starting stream with buddy %s at "
+		                   "%s:%d error: %s",
+		                   purple_buddy_get_name(pb),
+		                   bb->conversation->ip, bb->port_p2pj,
+		                   error ? error->message : "(null)");
 
 		account = purple_buddy_get_account(pb);
 
@@ -797,16 +897,19 @@
 				_("Unable to send the message, the conversation couldn't be started."),
 				PURPLE_MESSAGE_ERROR);
 
-		close(source);
 		bonjour_jabber_close_conversation(bb->conversation);
 		bb->conversation = NULL;
+		g_clear_error(&error);
 		return;
 	}
 
 	/* Start listening for the stream acknowledgement */
-	bb->conversation->socket = source;
-	bb->conversation->rx_handler = purple_input_add(source,
-		PURPLE_INPUT_READ, _client_socket_handler, bb->conversation);
+	rx_source = g_pollable_input_stream_create_source(
+	        G_POLLABLE_INPUT_STREAM(bb->conversation->input),
+	        bb->conversation->cancellable);
+	g_source_set_callback(rx_source, (GSourceFunc)_client_socket_handler,
+	                      bb->conversation, NULL);
+	bb->conversation->rx_handler = g_source_attach(rx_source, NULL);
 }
 
 void
@@ -928,39 +1031,31 @@
 		return NULL;
 
 	/* Check if there is a previously open conversation */
-	if (bb->conversation == NULL)
-	{
-		PurpleProxyConnectData *connect_data;
-		PurpleProxyInfo *proxy_info;
-		const char *ip = bb->ips->data; /* Start with the first IP address. */
-
-		purple_debug_info("bonjour", "Starting conversation with %s at %s:%d\n", to, ip, bb->port_p2pj);
+	if (bb->conversation == NULL) {
+		GSocketClient *client;
+		/* Start with the first IP address. */
+		const gchar *ip = bb->ips->data;
 
-		/* Make sure that the account always has a proxy of "none".
-		 * This is kind of dirty, but proxy_connect_none() isn't exposed. */
-		proxy_info = purple_account_get_proxy_info(jdata->account);
-		if (proxy_info == NULL) {
-			proxy_info = purple_proxy_info_new();
-			purple_account_set_proxy_info(jdata->account, proxy_info);
-		}
-		purple_proxy_info_set_proxy_type(proxy_info, PURPLE_PROXY_NONE);
+		purple_debug_info("bonjour",
+		                  "Starting conversation with %s at %s:%d", to,
+		                  ip, bb->port_p2pj);
 
-		connect_data = purple_proxy_connect(
-						    purple_account_get_connection(jdata->account),
-						    jdata->account,
-						    ip, bb->port_p2pj, _connected_to_buddy, pb);
-
-		if (connect_data == NULL) {
-			purple_debug_error("bonjour", "Unable to connect to buddy (%s).\n", to);
+		/* Make sure to connect without a proxy. */
+		client = g_socket_client_new();
+		if (client == NULL) {
+			purple_debug_error("bonjour",
+			                   "Unable to connect to buddy (%s).",
+			                   to);
 			return NULL;
 		}
 
 		bb->conversation = bonjour_jabber_conv_new(pb, jdata->account, ip);
-		bb->conversation->connect_data = connect_data;
 		bb->conversation->ip_link = ip;
-		/* We don't want _send_data() to register the tx_handler;
-		 * that neeeds to wait until we're actually connected. */
-		bb->conversation->tx_handler = 0;
+
+		g_socket_client_connect_to_host_async(
+		        client, ip, bb->port_p2pj,
+		        bb->conversation->cancellable, _connected_to_buddy, pb);
+		g_object_unref(client);
 	}
 	return pb;
 }
@@ -1081,27 +1176,44 @@
 	}
 
 	/* Close the socket and remove the watcher */
-	if (bconv->socket >= 0) {
+	if (bconv->socket != NULL) {
 		/* Send the end of the stream to the other end of the conversation */
 		if (bconv->sent_stream_start == FULLY_SENT) {
 			size_t len = strlen(STREAM_END);
-			if (send(bconv->socket, STREAM_END, len, 0) != (gssize)len) {
+			if (g_pollable_output_stream_write_nonblocking(
+			            G_POLLABLE_OUTPUT_STREAM(bconv->output),
+			            STREAM_END, len, bconv->cancellable,
+			            NULL) != (gssize)len) {
 				purple_debug_error("bonjour",
 					"bonjour_jabber_close_conversation: "
 					"couldn't send data\n");
 			}
 		}
 		/* TODO: We're really supposed to wait for "</stream:stream>" before closing the socket */
-		close(bconv->socket);
+		purple_gio_graceful_close(G_IO_STREAM(bconv->socket),
+		                          G_INPUT_STREAM(bconv->input),
+		                          G_OUTPUT_STREAM(bconv->output));
 	}
 	if (bconv->rx_handler != 0) {
-		purple_input_remove(bconv->rx_handler);
+		g_source_remove(bconv->rx_handler);
+		bconv->rx_handler = 0;
 	}
-	if (bconv->tx_handler > 0) {
-		purple_input_remove(bconv->tx_handler);
+	if (bconv->tx_handler != 0) {
+		g_source_remove(bconv->tx_handler);
+		bconv->tx_handler = 0;
+	}
+
+	/* Cancel any pending operations. */
+	if (bconv->cancellable != NULL) {
+		g_cancellable_cancel(bconv->cancellable);
+		g_clear_object(&bconv->cancellable);
 	}
 
 	/* Free all the data related to the conversation */
+	g_clear_object(&bconv->socket);
+	bconv->input = NULL;
+	bconv->output = NULL;
+
 	g_object_unref(G_OBJECT(bconv->tx_buf));
 	if (bconv->connect_data != NULL) {
 		purple_proxy_connect_cancel(bconv->connect_data);

mercurial