--- a/libpurple/protocols/bonjour/jabber.c Sun Jun 30 03:04:55 2019 -0400 +++ b/libpurple/protocols/bonjour/jabber.c Tue Jul 02 02:12:08 2019 -0400 @@ -46,15 +46,15 @@ #include <ifaddrs.h> #endif - -#include "network.h" -#include "eventloop.h" +#include "buddylist.h" #include "connection.h" -#include "buddylist.h" +#include "debug.h" +#include "eventloop.h" +#include "network.h" +#include "notify.h" +#include "purple-gio.h" +#include "util.h" #include "xmlnode.h" -#include "debug.h" -#include "notify.h" -#include "util.h" #include "jabber.h" #include "parser.h" @@ -86,7 +86,7 @@ bonjour_jabber_conv_new(PurpleBuddy *pb, PurpleAccount *account, const char *ip) { BonjourJabberConversation *bconv = g_new0(BonjourJabberConversation, 1); - bconv->socket = -1; + bconv->cancellable = g_cancellable_new(); bconv->tx_buf = purple_circular_buffer_new(512); bconv->tx_handler = 0; bconv->rx_handler = 0; @@ -267,32 +267,40 @@ } static void -_send_data_write_cb(gpointer data, gint source, PurpleInputCondition cond) +_send_data_write_cb(GObject *stream, gpointer data) { PurpleBuddy *pb = data; BonjourBuddy *bb = purple_buddy_get_protocol_data(pb); BonjourJabberConversation *bconv = bb->conversation; - int ret, writelen; + gsize writelen; + gssize ret; + GError *error = NULL; writelen = purple_circular_buffer_get_max_read(bconv->tx_buf); if (writelen == 0) { - purple_input_remove(bconv->tx_handler); + g_source_remove(bconv->tx_handler); bconv->tx_handler = 0; return; } - ret = send(bconv->socket, purple_circular_buffer_get_output(bconv->tx_buf), writelen, 0); + ret = g_pollable_output_stream_write_nonblocking( + G_POLLABLE_OUTPUT_STREAM(stream), + purple_circular_buffer_get_output(bconv->tx_buf), writelen, + bconv->cancellable, &error); - if (ret < 0 && errno == EAGAIN) + if (ret < 0 && error->code == G_IO_ERROR_WOULD_BLOCK) { + g_clear_error(&error); return; - else if (ret <= 0) { + } else if (ret <= 0) { PurpleConversation *conv = NULL; PurpleAccount *account = NULL; - const char *error = g_strerror(errno); - purple_debug_error("bonjour", "Error sending message to buddy %s error: %s\n", - purple_buddy_get_name(pb), error ? error : "(null)"); + purple_debug_error( + "bonjour", + "Error sending message to buddy %s error: %s", + purple_buddy_get_name(pb), + error ? error->message : "(null)"); account = purple_buddy_get_account(pb); @@ -304,6 +312,7 @@ bonjour_jabber_close_conversation(bb->conversation); bb->conversation = NULL; + g_clear_error(&error); return; } @@ -313,10 +322,11 @@ static gint _send_data(PurpleBuddy *pb, char *message) { - gint ret; - int len = strlen(message); BonjourBuddy *bb = purple_buddy_get_protocol_data(pb); BonjourJabberConversation *bconv = bb->conversation; + gsize len = strlen(message); + gssize ret; + GError *error = NULL; /* If we're not ready to actually send, append it to the buffer */ if (bconv->tx_handler != 0 @@ -325,20 +335,26 @@ || !bconv->recv_stream_start || purple_circular_buffer_get_max_read(bconv->tx_buf) > 0) { ret = -1; - errno = EAGAIN; + g_set_error_literal(&error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, + "Not yet ready to send."); } else { - ret = send(bconv->socket, message, len, 0); + ret = g_pollable_output_stream_write_nonblocking( + G_POLLABLE_OUTPUT_STREAM(bconv->output), message, len, + bconv->cancellable, &error); } - if (ret == -1 && errno == EAGAIN) + if (ret == -1 && error->code == G_IO_ERROR_WOULD_BLOCK) { ret = 0; - else if (ret <= 0) { + g_clear_error(&error); + } else if (ret <= 0) { PurpleConversation *conv; PurpleAccount *account; - const char *error = g_strerror(errno); - purple_debug_error("bonjour", "Error sending message to buddy %s error: %s\n", - purple_buddy_get_name(pb), error ? error : "(null)"); + purple_debug_error( + "bonjour", + "Error sending message to buddy %s error: %s", + purple_buddy_get_name(pb), + error ? error->message : "(null)"); account = purple_buddy_get_account(pb); @@ -350,14 +366,23 @@ bonjour_jabber_close_conversation(bb->conversation); bb->conversation = NULL; + g_clear_error(&error); return -1; } if (ret < len) { /* Don't interfere with the stream starting */ - if (bconv->sent_stream_start == FULLY_SENT && bconv->recv_stream_start && bconv->tx_handler == 0) - bconv->tx_handler = purple_input_add(bconv->socket, PURPLE_INPUT_WRITE, - _send_data_write_cb, pb); + if (bconv->sent_stream_start == FULLY_SENT && + bconv->recv_stream_start && bconv->tx_handler == 0) { + GSource *source = + g_pollable_output_stream_create_source( + G_POLLABLE_OUTPUT_STREAM(bconv->output), + bconv->cancellable); + g_source_set_callback(source, + (GSourceFunc)_send_data_write_cb, + pb, NULL); + bconv->tx_handler = g_source_attach(source, NULL); + } purple_circular_buffer_append(bconv->tx_buf, message + ret, len - ret); } @@ -396,22 +421,26 @@ bb->conversation = NULL; } -static void -_client_socket_handler(gpointer data, gint socket, PurpleInputCondition condition) +static gboolean +_client_socket_handler(GObject *stream, gpointer data) { BonjourJabberConversation *bconv = data; + GError *error = NULL; gssize len; static char message[4096]; /* Read the data from the socket */ - if ((len = recv(socket, message, sizeof(message) - 1, 0)) < 0) { - /* There have been an error reading from the socket */ - if (len != -1 || errno != EAGAIN) { - const char *err = g_strerror(errno); - - purple_debug_warning("bonjour", - "receive of %" G_GSSIZE_FORMAT " error: %s\n", - len, err ? err : "(null)"); + len = g_pollable_input_stream_read_nonblocking( + G_POLLABLE_INPUT_STREAM(stream), message, sizeof(message) - 1, + bconv->cancellable, &error); + if (len == -1) { + /* There has been an error reading from the socket */ + if (error == NULL || (error->code != G_IO_ERROR_WOULD_BLOCK && + error->code != G_IO_ERROR_CANCELLED)) { + purple_debug_warning( + "bonjour", + "receive of %" G_GSSIZE_FORMAT " error: %s", + len, error ? error->message : "(null)"); bonjour_jabber_close_conversation(bconv); if (bconv->pb != NULL) { @@ -424,41 +453,47 @@ /* I guess we really don't need to notify the user. * If they try to send another message it'll reconnect */ } - return; + g_clear_error(&error); + return FALSE; } else if (len == 0) { /* The other end has closed the socket */ const gchar *name = purple_buddy_get_name(bconv->pb); purple_debug_warning("bonjour", "Connection closed (without stream end) by %s.\n", (name) ? name : "(unknown)"); bonjour_jabber_stream_ended(bconv); - return; + return FALSE; } message[len] = '\0'; purple_debug_info("bonjour", "Receive: -%s- %" G_GSSIZE_FORMAT " bytes\n", message, len); bonjour_parser_process(bconv, message, len); + + return TRUE; } struct _stream_start_data { char *msg; }; - static void -_start_stream(gpointer data, gint source, PurpleInputCondition condition) +_start_stream(GObject *stream, gpointer data) { BonjourJabberConversation *bconv = data; struct _stream_start_data *ss = bconv->stream_data; - int len, ret; + GError *error = NULL; + gsize len; + gssize ret; len = strlen(ss->msg); /* Start Stream */ - ret = send(source, ss->msg, len, 0); + ret = g_pollable_output_stream_write_nonblocking( + G_POLLABLE_OUTPUT_STREAM(stream), ss->msg, len, + bconv->cancellable, &error); - if (ret == -1 && errno == EAGAIN) + if (ret == -1 && error->code == G_IO_ERROR_WOULD_BLOCK) { + g_clear_error(&error); return; - else if (ret <= 0) { - const char *err = g_strerror(errno); + } else if (ret <= 0) { PurpleConversation *conv; const char *bname = bconv->buddy_name; BonjourBuddy *bb = NULL; @@ -468,8 +503,11 @@ bname = purple_buddy_get_name(bconv->pb); } - purple_debug_error("bonjour", "Error starting stream with buddy %s at %s error: %s\n", - bname ? bname : "(unknown)", bconv->ip, err ? err : "(null)"); + purple_debug_error( + "bonjour", + "Error starting stream with buddy %s at %s error: %s", + bname ? bname : "(unknown)", bconv->ip, + error ? error->message : "(null)"); conv = PURPLE_CONVERSATION(purple_conversations_find_im_with_account(bname, bconv->account)); if (conv != NULL) @@ -481,6 +519,7 @@ if(bb != NULL) bb->conversation = NULL; + g_clear_error(&error); return; } @@ -497,17 +536,20 @@ bconv->stream_data = NULL; /* Stream started; process the send buffer if there is one */ - purple_input_remove(bconv->tx_handler); + g_source_remove(bconv->tx_handler); bconv->tx_handler = 0; bconv->sent_stream_start = FULLY_SENT; bonjour_jabber_stream_started(bconv); } -static gboolean bonjour_jabber_send_stream_init(BonjourJabberConversation *bconv, int client_socket) +static gboolean +bonjour_jabber_send_stream_init(BonjourJabberConversation *bconv, + GError **error) { - int ret, len; - char *stream_start; + gchar *stream_start; + gsize len; + gssize ret; const char *bname = bconv->buddy_name; if (bconv->pb != NULL) @@ -524,15 +566,18 @@ bconv->sent_stream_start = PARTIALLY_SENT; /* Start the stream */ - ret = send(client_socket, stream_start, len, 0); - - if (ret == -1 && errno == EAGAIN) + ret = g_pollable_output_stream_write_nonblocking( + G_POLLABLE_OUTPUT_STREAM(bconv->output), stream_start, len, + bconv->cancellable, error); + if (ret == -1 && (*error)->code == G_IO_ERROR_WOULD_BLOCK) { ret = 0; - else if (ret <= 0) { - const char *err = g_strerror(errno); - - purple_debug_error("bonjour", "Error starting stream with buddy %s at %s error: %s\n", - (*bname) ? bname : "(unknown)", bconv->ip, err ? err : "(null)"); + g_clear_error(error); + } else if (ret <= 0) { + purple_debug_error( + "bonjour", + "Error starting stream with buddy %s at %s error: %s", + (*bname) ? bname : "(unknown)", bconv->ip, + *error ? (*error)->message : "(null)"); if (bconv->pb) { PurpleConversation *conv; @@ -543,7 +588,12 @@ PURPLE_MESSAGE_ERROR); } - close(client_socket); + purple_gio_graceful_close(G_IO_STREAM(bconv->socket), + G_INPUT_STREAM(bconv->input), + G_OUTPUT_STREAM(bconv->output)); + g_clear_object(&bconv->socket); + bconv->input = NULL; + bconv->output = NULL; g_free(stream_start); return FALSE; @@ -551,14 +601,20 @@ /* This is unlikely to happen */ if (ret < len) { + GSource *source; struct _stream_start_data *ss = g_new(struct _stream_start_data, 1); ss->msg = g_strdup(stream_start + ret); bconv->stream_data = ss; /* Finish sending the stream start */ - bconv->tx_handler = purple_input_add(client_socket, - PURPLE_INPUT_WRITE, _start_stream, bconv); - } else + source = g_pollable_output_stream_create_source( + G_POLLABLE_OUTPUT_STREAM(bconv->output), + bconv->cancellable); + g_source_set_callback(source, (GSourceFunc)_start_stream, bconv, + NULL); + bconv->tx_handler = g_source_attach(source, NULL); + } else { bconv->sent_stream_start = FULLY_SENT; + } g_free(stream_start); @@ -567,17 +623,23 @@ /* This gets called when we've successfully sent our <stream:stream /> * AND when we've received a <stream:stream /> */ -void bonjour_jabber_stream_started(BonjourJabberConversation *bconv) { +void +bonjour_jabber_stream_started(BonjourJabberConversation *bconv) +{ + GError *error = NULL; - if (bconv->sent_stream_start == NOT_SENT && !bonjour_jabber_send_stream_init(bconv, bconv->socket)) { - const char *err = g_strerror(errno); + if (bconv->sent_stream_start == NOT_SENT && + !bonjour_jabber_send_stream_init(bconv, &error)) { const char *bname = bconv->buddy_name; if (bconv->pb) bname = purple_buddy_get_name(bconv->pb); - purple_debug_error("bonjour", "Error starting stream with buddy %s at %s error: %s\n", - bname ? bname : "(unknown)", bconv->ip, err ? err : "(null)"); + purple_debug_error( + "bonjour", + "Error starting stream with buddy %s at %s error: %s", + bname ? bname : "(unknown)", bconv->ip, + error ? error->message : "(null)"); if (bconv->pb) { PurpleConversation *conv; @@ -589,13 +651,18 @@ } /* We don't want to recieve anything else */ - close(bconv->socket); - bconv->socket = -1; + purple_gio_graceful_close(G_IO_STREAM(bconv->socket), + G_INPUT_STREAM(bconv->input), + G_OUTPUT_STREAM(bconv->output)); + g_clear_object(&bconv->socket); + bconv->input = NULL; + bconv->output = NULL; /* This must be asynchronous because it destroys the parser and we * may be in the middle of parsing. */ async_bonjour_jabber_close_conversation(bconv); + g_clear_error(&error); return; } @@ -604,12 +671,15 @@ if (bconv->sent_stream_start == FULLY_SENT && bconv->recv_stream_start && bconv->pb && purple_circular_buffer_get_max_read(bconv->tx_buf) > 0) { /* Watch for when we can write the buffered messages */ - bconv->tx_handler = purple_input_add(bconv->socket, PURPLE_INPUT_WRITE, - _send_data_write_cb, bconv->pb); + GSource *source = g_pollable_output_stream_create_source( + G_POLLABLE_OUTPUT_STREAM(bconv->output), + bconv->cancellable); + g_source_set_callback(source, (GSourceFunc)_send_data_write_cb, + bconv->pb, NULL); + bconv->tx_handler = g_source_attach(source, NULL); /* We can probably write the data right now. */ - _send_data_write_cb(bconv->pb, bconv->socket, PURPLE_INPUT_WRITE); + _send_data_write_cb(G_OBJECT(bconv->output), bconv->pb); } - } #ifndef INET6_ADDRSTRLEN @@ -627,7 +697,7 @@ struct _match_buddies_by_address_t *mbba; BonjourJabberConversation *bconv; GSList *buddies; - GSocket *socket; + GSource *source; their_addr = g_socket_connection_get_remote_address(connection, NULL); if (their_addr == NULL) { @@ -674,11 +744,15 @@ bconv = bonjour_jabber_conv_new(NULL, jdata->account, address_text); /* We wait for the stream start before doing anything else */ - socket = g_socket_connection_get_socket(connection); /* Temporary until fully async. */ - bconv->socket = g_socket_get_fd(socket); - _purple_network_set_common_socket_flags(bconv->socket); - bconv->rx_handler = purple_input_add(bconv->socket, PURPLE_INPUT_READ, - _client_socket_handler, bconv); + bconv->socket = g_object_ref(connection); + bconv->input = g_io_stream_get_input_stream(G_IO_STREAM(bconv->socket)); + bconv->output = + g_io_stream_get_output_stream(G_IO_STREAM(bconv->socket)); + source = g_pollable_input_stream_create_source( + G_POLLABLE_INPUT_STREAM(bconv->input), bconv->cancellable); + g_source_set_callback(source, (GSourceFunc)_client_socket_handler, + bconv, NULL); + bconv->rx_handler = g_source_attach(source, NULL); g_free(address_text); } @@ -721,20 +795,35 @@ } static void -_connected_to_buddy(gpointer data, gint source, const gchar *error) +_connected_to_buddy(GObject *source, GAsyncResult *res, gpointer user_data) { - PurpleBuddy *pb = data; + PurpleBuddy *pb = user_data; BonjourBuddy *bb = purple_buddy_get_protocol_data(pb); + GSocketConnection *conn; + GSource *rx_source; + GError *error = NULL; - bb->conversation->connect_data = NULL; + conn = g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(source), + res, &error); - if (source < 0) { + if (conn == NULL) { PurpleConversation *conv = NULL; PurpleAccount *account = NULL; GSList *tmp = bb->ips; - purple_debug_error("bonjour", "Error connecting to buddy %s at %s:%d (%s); Trying next IP address\n", - purple_buddy_get_name(pb), bb->conversation->ip, bb->port_p2pj, error); + if (error && error->code == G_IO_ERROR_CANCELLED) { + /* This conversation was closed before it started. */ + g_error_free(error); + return; + } + + purple_debug_error("bonjour", + "Error connecting to buddy %s at %s:%d " + "(%s); Trying next IP address", + purple_buddy_get_name(pb), + bb->conversation->ip, bb->port_p2pj, + error ? error->message : "(unknown)"); + g_clear_error(&error); /* There may be multiple entries for the same IP - one per * presence recieved (e.g. multiple interfaces). @@ -749,21 +838,23 @@ if (tmp != NULL) { const gchar *ip; - PurpleProxyConnectData *connect_data; + GSocketClient *client; bb->conversation->ip_link = ip = tmp->data; purple_debug_info("bonjour", "Starting conversation with %s at %s:%d\n", purple_buddy_get_name(pb), ip, bb->port_p2pj); - connect_data = purple_proxy_connect(purple_account_get_connection(account), - account, ip, bb->port_p2pj, _connected_to_buddy, pb); - - if (connect_data != NULL) { + /* Make sure to connect without a proxy. */ + client = g_socket_client_new(); + if (client != NULL) { g_free(bb->conversation->ip); bb->conversation->ip = g_strdup(ip); - bb->conversation->connect_data = connect_data; - + g_socket_client_connect_to_host_async( + client, ip, bb->port_p2pj, + bb->conversation->cancellable, + _connected_to_buddy, pb); + g_object_unref(client); return; } } @@ -781,13 +872,22 @@ return; } - if (!bonjour_jabber_send_stream_init(bb->conversation, source)) { - const char *err = g_strerror(errno); + bb->conversation->socket = conn; + bb->conversation->input = + g_io_stream_get_input_stream(G_IO_STREAM(conn)); + bb->conversation->output = + g_io_stream_get_output_stream(G_IO_STREAM(conn)); + + if (!bonjour_jabber_send_stream_init(bb->conversation, &error)) { PurpleConversation *conv = NULL; PurpleAccount *account = NULL; - purple_debug_error("bonjour", "Error starting stream with buddy %s at %s:%d error: %s\n", - purple_buddy_get_name(pb), bb->conversation->ip, bb->port_p2pj, err ? err : "(null)"); + purple_debug_error("bonjour", + "Error starting stream with buddy %s at " + "%s:%d error: %s", + purple_buddy_get_name(pb), + bb->conversation->ip, bb->port_p2pj, + error ? error->message : "(null)"); account = purple_buddy_get_account(pb); @@ -797,16 +897,19 @@ _("Unable to send the message, the conversation couldn't be started."), PURPLE_MESSAGE_ERROR); - close(source); bonjour_jabber_close_conversation(bb->conversation); bb->conversation = NULL; + g_clear_error(&error); return; } /* Start listening for the stream acknowledgement */ - bb->conversation->socket = source; - bb->conversation->rx_handler = purple_input_add(source, - PURPLE_INPUT_READ, _client_socket_handler, bb->conversation); + rx_source = g_pollable_input_stream_create_source( + G_POLLABLE_INPUT_STREAM(bb->conversation->input), + bb->conversation->cancellable); + g_source_set_callback(rx_source, (GSourceFunc)_client_socket_handler, + bb->conversation, NULL); + bb->conversation->rx_handler = g_source_attach(rx_source, NULL); } void @@ -928,39 +1031,31 @@ return NULL; /* Check if there is a previously open conversation */ - if (bb->conversation == NULL) - { - PurpleProxyConnectData *connect_data; - PurpleProxyInfo *proxy_info; - const char *ip = bb->ips->data; /* Start with the first IP address. */ - - purple_debug_info("bonjour", "Starting conversation with %s at %s:%d\n", to, ip, bb->port_p2pj); + if (bb->conversation == NULL) { + GSocketClient *client; + /* Start with the first IP address. */ + const gchar *ip = bb->ips->data; - /* Make sure that the account always has a proxy of "none". - * This is kind of dirty, but proxy_connect_none() isn't exposed. */ - proxy_info = purple_account_get_proxy_info(jdata->account); - if (proxy_info == NULL) { - proxy_info = purple_proxy_info_new(); - purple_account_set_proxy_info(jdata->account, proxy_info); - } - purple_proxy_info_set_proxy_type(proxy_info, PURPLE_PROXY_NONE); + purple_debug_info("bonjour", + "Starting conversation with %s at %s:%d", to, + ip, bb->port_p2pj); - connect_data = purple_proxy_connect( - purple_account_get_connection(jdata->account), - jdata->account, - ip, bb->port_p2pj, _connected_to_buddy, pb); - - if (connect_data == NULL) { - purple_debug_error("bonjour", "Unable to connect to buddy (%s).\n", to); + /* Make sure to connect without a proxy. */ + client = g_socket_client_new(); + if (client == NULL) { + purple_debug_error("bonjour", + "Unable to connect to buddy (%s).", + to); return NULL; } bb->conversation = bonjour_jabber_conv_new(pb, jdata->account, ip); - bb->conversation->connect_data = connect_data; bb->conversation->ip_link = ip; - /* We don't want _send_data() to register the tx_handler; - * that neeeds to wait until we're actually connected. */ - bb->conversation->tx_handler = 0; + + g_socket_client_connect_to_host_async( + client, ip, bb->port_p2pj, + bb->conversation->cancellable, _connected_to_buddy, pb); + g_object_unref(client); } return pb; } @@ -1081,27 +1176,44 @@ } /* Close the socket and remove the watcher */ - if (bconv->socket >= 0) { + if (bconv->socket != NULL) { /* Send the end of the stream to the other end of the conversation */ if (bconv->sent_stream_start == FULLY_SENT) { size_t len = strlen(STREAM_END); - if (send(bconv->socket, STREAM_END, len, 0) != (gssize)len) { + if (g_pollable_output_stream_write_nonblocking( + G_POLLABLE_OUTPUT_STREAM(bconv->output), + STREAM_END, len, bconv->cancellable, + NULL) != (gssize)len) { purple_debug_error("bonjour", "bonjour_jabber_close_conversation: " "couldn't send data\n"); } } /* TODO: We're really supposed to wait for "</stream:stream>" before closing the socket */ - close(bconv->socket); + purple_gio_graceful_close(G_IO_STREAM(bconv->socket), + G_INPUT_STREAM(bconv->input), + G_OUTPUT_STREAM(bconv->output)); } if (bconv->rx_handler != 0) { - purple_input_remove(bconv->rx_handler); + g_source_remove(bconv->rx_handler); + bconv->rx_handler = 0; } - if (bconv->tx_handler > 0) { - purple_input_remove(bconv->tx_handler); + if (bconv->tx_handler != 0) { + g_source_remove(bconv->tx_handler); + bconv->tx_handler = 0; + } + + /* Cancel any pending operations. */ + if (bconv->cancellable != NULL) { + g_cancellable_cancel(bconv->cancellable); + g_clear_object(&bconv->cancellable); } /* Free all the data related to the conversation */ + g_clear_object(&bconv->socket); + bconv->input = NULL; + bconv->output = NULL; + g_object_unref(G_OBJECT(bconv->tx_buf)); if (bconv->connect_data != NULL) { purple_proxy_connect_cancel(bconv->connect_data);