purplequeuedoutputstream: Implement unthreaded asynchronous flush

Tue, 30 Aug 2016 03:01:20 -0500

author
Mike Ruprecht <cmaiku@gmail.com>
date
Tue, 30 Aug 2016 03:01:20 -0500
changeset 37978
266009ac8af6
parent 37977
2cc052ab7505
child 37982
bf06ecd3547b
child 37989
018063ec9d2e

purplequeuedoutputstream: Implement unthreaded asynchronous flush

This patch implements an unthreaded asynchronous flush implementation
for PurpleQueuedOutputStream. This fixes a potential race condition
with the default, threaded flush implementation, the way it's
currently used. Prior to this patch, the PurpleQueuedOutputStream
could finish flushing in a thread, more data could be queued while
the flushing operation is pending, then the flushing operation could
finish. This would otherwise cause the queued data to sit there,
idle, without the consumer realizing there's further data to send.
The asynchronous flush implementation solves this by flushing in the
default context and, provided there is data to flush, returns without
the possibility for such a race condition.

libpurple/queuedoutputstream.c file | annotate | diff | comparison | revisions
--- a/libpurple/queuedoutputstream.c	Thu Sep 01 23:12:51 2016 -0500
+++ b/libpurple/queuedoutputstream.c	Tue Aug 30 03:01:20 2016 -0500
@@ -43,6 +43,13 @@
 static void purple_queued_output_stream_dispose(GObject *object);
 static gboolean purple_queued_output_stream_flush(GOutputStream *stream,
 		GCancellable *cancellable, GError **error);
+static void purple_queued_output_stream_flush_async(GOutputStream *stream,
+		int io_priority, GCancellable *cancellable,
+		GAsyncReadyCallback callback, gpointer user_data);
+static gboolean purple_queued_output_stream_flush_finish(GOutputStream *stream,
+		GAsyncResult *result, GError **error);
+
+static void purple_queued_output_stream_start_flush_async(GTask *task);
 
 static void
 purple_queued_output_stream_class_init(PurpleQueuedOutputStreamClass *klass)
@@ -57,6 +64,8 @@
 
 	ostream_class = G_OUTPUT_STREAM_CLASS(klass);
 	ostream_class->flush = purple_queued_output_stream_flush;
+	ostream_class->flush_async = purple_queued_output_stream_flush_async;
+	ostream_class->flush_finish = purple_queued_output_stream_flush_finish;
 }
 
 PurpleQueuedOutputStream *
@@ -156,3 +165,86 @@
 	return ret;
 }
 
+static void
+purple_queued_output_stream_flush_async_cb(GObject *source,
+		GAsyncResult *res, gpointer user_data)
+{
+	GTask *task = user_data;
+	PurpleQueuedOutputStream *stream;
+	gssize written;
+	gsize size;
+	GBytes *old_bytes;
+	GError *error = NULL;
+
+	written = g_output_stream_write_bytes_finish(G_OUTPUT_STREAM(source),
+			res, &error);
+
+	if (written < 0) {
+		g_task_return_error(task, error);
+		return;
+	}
+
+	stream = PURPLE_QUEUED_OUTPUT_STREAM(g_task_get_source_object(task));
+	size = g_bytes_get_size(stream->priv->next);
+
+	old_bytes = stream->priv->next;
+	stream->priv->next = NULL;
+
+	if (size > (gsize)written) {
+		stream->priv->next = g_bytes_new_from_bytes(old_bytes,
+				written, size - written);
+	}
+
+	g_bytes_unref(old_bytes);
+
+	purple_queued_output_stream_start_flush_async(task);
+}
+
+static void
+purple_queued_output_stream_start_flush_async(GTask *task)
+{
+	PurpleQueuedOutputStream *stream;
+	GOutputStream *base_stream;
+
+	stream = PURPLE_QUEUED_OUTPUT_STREAM(g_task_get_source_object(task));
+	base_stream = g_filter_output_stream_get_base_stream(
+			G_FILTER_OUTPUT_STREAM(stream));
+
+	if (stream->priv->next == NULL) {
+		stream->priv->next =
+				g_async_queue_try_pop(stream->priv->queue);
+
+		if (stream->priv->next == NULL) {
+			g_task_return_boolean(task, TRUE);
+			return;
+		}
+	}
+
+	g_output_stream_write_bytes_async(base_stream, stream->priv->next,
+			g_task_get_priority(task),
+			g_task_get_cancellable(task),
+			purple_queued_output_stream_flush_async_cb, task);
+}
+
+static void
+purple_queued_output_stream_flush_async(GOutputStream *stream,
+		int io_priority, GCancellable *cancellable,
+		GAsyncReadyCallback callback, gpointer user_data)
+{
+	GTask *task;
+
+	task = g_task_new(stream, cancellable, callback, user_data);
+	g_task_set_priority(task, io_priority);
+
+	purple_queued_output_stream_start_flush_async(task);
+}
+
+static gboolean
+purple_queued_output_stream_flush_finish(GOutputStream *stream,
+		GAsyncResult *result, GError **error)
+{
+	g_return_val_if_fail(g_task_is_valid(result, stream), FALSE);
+
+	return g_task_propagate_boolean(G_TASK(result), error);
+}
+

mercurial