simple: Convert TCP reading to gio.

Mon, 18 May 2020 23:34:20 -0400

author
Elliott Sales de Andrade <qulogic@pidgin.im>
date
Mon, 18 May 2020 23:34:20 -0400
changeset 40423
00f2c8fb0453
parent 40422
2625e01cec7e
child 40424
7b1c60353345

simple: Convert TCP reading to gio.

libpurple/protocols/simple/simple.c file | annotate | diff | comparison | revisions
libpurple/protocols/simple/simple.h file | annotate | diff | comparison | revisions
--- a/libpurple/protocols/simple/simple.c	Mon May 18 19:58:19 2020 -0400
+++ b/libpurple/protocols/simple/simple.c	Mon May 18 23:34:20 2020 -0400
@@ -164,17 +164,6 @@
 	}
 }
 
-static struct sip_connection *connection_find(struct simple_account_data *sip, int fd) {
-	struct sip_connection *ret = NULL;
-	GSList *entry = sip->openconns;
-	while(entry) {
-		ret = entry->data;
-		if(ret->fd == fd) return ret;
-		entry = entry->next;
-	}
-	return NULL;
-}
-
 static struct simple_watcher *watcher_find(struct simple_account_data *sip,
 		const gchar *name) {
 	struct simple_watcher *watcher;
@@ -211,12 +200,10 @@
 }
 
 static struct sip_connection *
-connection_create(struct simple_account_data *sip, GSocketConnection *sockconn,
-                  int fd)
+connection_create(struct simple_account_data *sip, GSocketConnection *sockconn)
 {
 	struct sip_connection *ret = g_new0(struct sip_connection, 1);
 	ret->sockconn = sockconn;
-	ret->fd = fd;
 	sip->openconns = g_slist_append(sip->openconns, ret);
 	return ret;
 }
@@ -225,17 +212,32 @@
 connection_destroy(struct sip_connection *conn)
 {
 	if (conn->inputhandler) {
-		purple_input_remove(conn->inputhandler);
+		g_source_remove(conn->inputhandler);
 	}
 	g_clear_pointer(&conn->inbuf, g_free);
 	g_clear_object(&conn->sockconn);
 	g_free(conn);
 }
 
+static struct sip_connection *
+connection_find(struct simple_account_data *sip, GInputStream *input)
+{
+	struct sip_connection *ret = NULL;
+	GSList *entry = sip->openconns;
+	while (entry) {
+		ret = entry->data;
+		if (g_io_stream_get_input_stream(G_IO_STREAM(ret->sockconn)) == input) {
+			return ret;
+		}
+		entry = entry->next;
+	}
+	return NULL;
+}
+
 static void
-connection_remove(struct simple_account_data *sip, int fd)
+connection_remove(struct simple_account_data *sip, GInputStream *input)
 {
-	struct sip_connection *conn = connection_find(sip, fd);
+	struct sip_connection *conn = connection_find(sip, input);
 	sip->openconns = g_slist_remove(sip->openconns, conn);
 	connection_destroy(conn);
 }
@@ -496,7 +498,7 @@
 	}
 }
 
-static void simple_input_cb(gpointer data, gint source, PurpleInputCondition cond);
+static gboolean simple_input_cb(GObject *stream, gpointer data);
 
 static void
 send_later_cb(GObject *sender, GAsyncResult *res, gpointer data)
@@ -505,9 +507,8 @@
 	struct simple_account_data *sip;
 	struct sip_connection *conn;
 	GSocketConnection *sockconn;
-	GSocket *socket;
-	gint fd;
 	gsize writelen;
+	GSource *source;
 	GError *error = NULL;
 
 	sockconn = g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(sender),
@@ -521,12 +522,8 @@
 		return;
 	}
 
-	socket = g_socket_connection_get_socket(sockconn);
-	g_assert(socket != NULL);
-	fd = g_socket_get_fd(socket);
-
 	sip = purple_connection_get_protocol_data(gc);
-	sip->fd = fd;
+	sip->input = g_io_stream_get_input_stream(G_IO_STREAM(sockconn));
 	sip->output = purple_queued_output_stream_new(
 	        g_io_stream_get_output_stream(G_IO_STREAM(sockconn)));
 	sip->connecting = FALSE;
@@ -547,8 +544,13 @@
 		purple_circular_buffer_mark_read(sip->txbuf, writelen);
 	}
 
-	conn = connection_create(sip, sockconn, fd);
-	conn->inputhandler = purple_input_add(sip->fd, PURPLE_INPUT_READ, simple_input_cb, gc);
+	conn = connection_create(sip, sockconn);
+
+	source = g_pollable_input_stream_create_source(
+	        G_POLLABLE_INPUT_STREAM(sip->input), sip->cancellable);
+	g_source_set_callback(source, (GSourceFunc)simple_input_cb, gc, NULL);
+	conn->inputhandler = g_source_attach(source, NULL);
+	g_source_unref(source);
 }
 
 static void sendlater(PurpleConnection *gc, const char *buf) {
@@ -1749,15 +1751,19 @@
 	}
 }
 
-static void simple_input_cb(gpointer data, gint source, PurpleInputCondition cond)
+static gboolean
+simple_input_cb(GObject *stream, gpointer data)
 {
+	GInputStream *input = G_INPUT_STREAM(stream);
 	PurpleConnection *gc = data;
 	struct simple_account_data *sip = purple_connection_get_protocol_data(gc);
-	int len;
-	struct sip_connection *conn = connection_find(sip, source);
-	if(!conn) {
+	gssize len;
+	struct sip_connection *conn = connection_find(sip, input);
+	GError *error = NULL;
+
+	if (!conn) {
 		purple_debug_error("simple", "Connection not found!\n");
-		return;
+		return G_SOURCE_REMOVE;
 	}
 
 	if(conn->inbuflen < conn->inbufused + SIMPLE_BUF_INC) {
@@ -1765,24 +1771,41 @@
 		conn->inbuf = g_realloc(conn->inbuf, conn->inbuflen);
 	}
 
-	len = read(source, conn->inbuf + conn->inbufused, SIMPLE_BUF_INC - 1);
-
-	if(len < 0 && errno == EAGAIN)
-		return;
-	else if(len <= 0) {
-		purple_debug_info("simple", "simple_input_cb: read error\n");
-		if (sip->fd == source) {
-			sip->fd = -1;
+	len = g_pollable_input_stream_read_nonblocking(
+	        G_POLLABLE_INPUT_STREAM(stream), conn->inbuf + conn->inbufused,
+	        SIMPLE_BUF_INC - 1, sip->cancellable, &error);
+	if (len < 0) {
+		if (error->code == G_IO_ERROR_WOULD_BLOCK) {
+			g_error_free(error);
+			return G_SOURCE_CONTINUE;
+		} else if (error->code != G_IO_ERROR_CANCELLED) {
+			/* There has been an error reading from the socket */
+			purple_debug_info("simple", "simple_input_cb: read error");
+			if (sip->input == input) {
+				g_clear_object(&sip->input);
+				g_clear_object(&sip->output);
+			}
+			connection_remove(sip, input);
+		}
+		g_clear_error(&error);
+		return G_SOURCE_REMOVE;
+	} else if (len == 0) { /* The other end has closed the socket */
+		purple_debug_warning("simple", "simple_input_cb: connection closed");
+		if (sip->input == input) {
+			g_clear_object(&sip->input);
 			g_clear_object(&sip->output);
 		}
-		connection_remove(sip, source);
-		return;
+		connection_remove(sip, input);
+		return G_SOURCE_REMOVE;
 	}
+
 	purple_connection_update_last_received(gc);
 	conn->inbufused += len;
 	conn->inbuf[conn->inbufused] = '\0';
 
 	process_input(sip, conn);
+
+	return G_SOURCE_CONTINUE;
 }
 
 /* Callback for new connections on incoming TCP port */
@@ -1794,19 +1817,15 @@
 	PurpleConnection *gc = PURPLE_CONNECTION(source_object);
 	struct simple_account_data *sip = purple_connection_get_protocol_data(gc);
 	struct sip_connection *conn;
-	GSocket *socket;
-	gint fd;
+	GSource *source;
 
-	socket = g_socket_connection_get_socket(connection);
-	g_assert(socket != NULL);
-	fd = g_socket_get_fd(socket);
+	conn = connection_create(sip, g_object_ref(connection));
 
-	_purple_network_set_common_socket_flags(fd);
-
-	conn = connection_create(sip, g_object_ref(connection), fd);
-
-	conn->inputhandler =
-	        purple_input_add(fd, PURPLE_INPUT_READ, simple_input_cb, gc);
+	source = g_pollable_input_stream_create_source(
+	        G_POLLABLE_INPUT_STREAM(sip->input), sip->cancellable);
+	g_source_set_callback(source, (GSourceFunc)simple_input_cb, gc, NULL);
+	conn->inputhandler = g_source_attach(source, NULL);
+	g_source_unref(source);
 }
 
 static void
@@ -1816,8 +1835,7 @@
 	struct simple_account_data *sip;
 	struct sip_connection *conn;
 	GSocketConnection *sockconn;
-	GSocket *socket;
-	gint fd;
+	GSource *source;
 	GError *error = NULL;
 
 	sockconn = g_socket_client_connect_to_host_finish(G_SOCKET_CLIENT(sender),
@@ -1831,22 +1849,22 @@
 		return;
 	}
 
-	socket = g_socket_connection_get_socket(sockconn);
-	g_assert(socket != NULL);
-	fd = g_socket_get_fd(socket);
-
 	sip = purple_connection_get_protocol_data(gc);
-	sip->fd = fd;
+	sip->input = g_io_stream_get_input_stream(G_IO_STREAM(sockconn));
 	sip->output = purple_queued_output_stream_new(
 	        g_io_stream_get_output_stream(G_IO_STREAM(sockconn)));
 
-	conn = connection_create(sip, sockconn, fd);
+	conn = connection_create(sip, sockconn);
 
 	sip->registertimeout = g_timeout_add(g_random_int_range(10000, 100000), (GSourceFunc)subscribe_timeout, sip);
 
 	do_register(sip);
 
-	conn->inputhandler = purple_input_add(sip->fd, PURPLE_INPUT_READ, simple_input_cb, gc);
+	source = g_pollable_input_stream_create_source(
+	        G_POLLABLE_INPUT_STREAM(sip->input), sip->cancellable);
+	g_source_set_callback(source, (GSourceFunc)simple_input_cb, gc, NULL);
+	conn->inputhandler = g_source_attach(source, NULL);
+	g_source_unref(source);
 }
 
 static guint simple_ht_hash_nick(const char *nick) {
@@ -2128,6 +2146,7 @@
 	if (sip->listen_data != NULL)
 		purple_network_listen_cancel(sip->listen_data);
 
+	g_clear_object(&sip->input);
 	g_clear_object(&sip->output);
 	if (sip->fd >= 0)
 		close(sip->fd);
--- a/libpurple/protocols/simple/simple.h	Mon May 18 19:58:19 2020 -0400
+++ b/libpurple/protocols/simple/simple.h	Mon May 18 23:34:20 2020 -0400
@@ -94,6 +94,7 @@
 	gchar *username;
 	gchar *password;
 	GCancellable *cancellable;
+	GInputStream *input;
 	PurpleQueuedOutputStream *output;
 	GSocketService *service;
 	PurpleNetworkListenData *listen_data;

mercurial