Tue, 30 Aug 2016 03:01:20 -0500
purplequeuedoutputstream: Implement unthreaded asynchronous flush
This patch implements an unthreaded asynchronous flush implementation
for PurpleQueuedOutputStream. This fixes a potential race condition
with the default, threaded flush implementation, the way it's
currently used. Prior to this patch, the PurpleQueuedOutputStream
could finish flushing in a thread, more data could be queued while
the flushing operation is pending, then the flushing operation could
finish. This would otherwise cause the queued data to sit there,
idle, without the consumer realizing there's further data to send.
The asynchronous flush implementation solves this by flushing in the
default context and, provided there is data to flush, returns without
the possibility for such a race condition.
| libpurple/queuedoutputstream.c | file | annotate | diff | comparison | revisions |
--- a/libpurple/queuedoutputstream.c Thu Sep 01 23:12:51 2016 -0500 +++ b/libpurple/queuedoutputstream.c Tue Aug 30 03:01:20 2016 -0500 @@ -43,6 +43,13 @@ static void purple_queued_output_stream_dispose(GObject *object); static gboolean purple_queued_output_stream_flush(GOutputStream *stream, GCancellable *cancellable, GError **error); +static void purple_queued_output_stream_flush_async(GOutputStream *stream, + int io_priority, GCancellable *cancellable, + GAsyncReadyCallback callback, gpointer user_data); +static gboolean purple_queued_output_stream_flush_finish(GOutputStream *stream, + GAsyncResult *result, GError **error); + +static void purple_queued_output_stream_start_flush_async(GTask *task); static void purple_queued_output_stream_class_init(PurpleQueuedOutputStreamClass *klass) @@ -57,6 +64,8 @@ ostream_class = G_OUTPUT_STREAM_CLASS(klass); ostream_class->flush = purple_queued_output_stream_flush; + ostream_class->flush_async = purple_queued_output_stream_flush_async; + ostream_class->flush_finish = purple_queued_output_stream_flush_finish; } PurpleQueuedOutputStream * @@ -156,3 +165,86 @@ return ret; } +static void +purple_queued_output_stream_flush_async_cb(GObject *source, + GAsyncResult *res, gpointer user_data) +{ + GTask *task = user_data; + PurpleQueuedOutputStream *stream; + gssize written; + gsize size; + GBytes *old_bytes; + GError *error = NULL; + + written = g_output_stream_write_bytes_finish(G_OUTPUT_STREAM(source), + res, &error); + + if (written < 0) { + g_task_return_error(task, error); + return; + } + + stream = PURPLE_QUEUED_OUTPUT_STREAM(g_task_get_source_object(task)); + size = g_bytes_get_size(stream->priv->next); + + old_bytes = stream->priv->next; + stream->priv->next = NULL; + + if (size > (gsize)written) { + stream->priv->next = g_bytes_new_from_bytes(old_bytes, + written, size - written); + } + + g_bytes_unref(old_bytes); + + purple_queued_output_stream_start_flush_async(task); +} + +static void +purple_queued_output_stream_start_flush_async(GTask *task) +{ + PurpleQueuedOutputStream *stream; + GOutputStream *base_stream; + + stream = PURPLE_QUEUED_OUTPUT_STREAM(g_task_get_source_object(task)); + base_stream = g_filter_output_stream_get_base_stream( + G_FILTER_OUTPUT_STREAM(stream)); + + if (stream->priv->next == NULL) { + stream->priv->next = + g_async_queue_try_pop(stream->priv->queue); + + if (stream->priv->next == NULL) { + g_task_return_boolean(task, TRUE); + return; + } + } + + g_output_stream_write_bytes_async(base_stream, stream->priv->next, + g_task_get_priority(task), + g_task_get_cancellable(task), + purple_queued_output_stream_flush_async_cb, task); +} + +static void +purple_queued_output_stream_flush_async(GOutputStream *stream, + int io_priority, GCancellable *cancellable, + GAsyncReadyCallback callback, gpointer user_data) +{ + GTask *task; + + task = g_task_new(stream, cancellable, callback, user_data); + g_task_set_priority(task, io_priority); + + purple_queued_output_stream_start_flush_async(task); +} + +static gboolean +purple_queued_output_stream_flush_finish(GOutputStream *stream, + GAsyncResult *result, GError **error) +{ + g_return_val_if_fail(g_task_is_valid(result, stream), FALSE); + + return g_task_propagate_boolean(G_TASK(result), error); +} +