libpurple/queuedoutputstream.c

changeset 39540
8a5927c69f02
parent 39169
35d3f5c5ffad
child 39556
622bf98df0ac
equal deleted inserted replaced
39539:f221f88ff0b0 39540:8a5927c69f02
22 */ 22 */
23 23
24 #include "internal.h" 24 #include "internal.h"
25 #include "queuedoutputstream.h" 25 #include "queuedoutputstream.h"
26 26
27 struct _PurpleQueuedOutputStreamPrivate { 27 /**
28 * PurpleQueuedOutputStream:
29 *
30 * An implementation of #GFilterOutputStream which allows queuing data for
31 * output. This allows data to be queued while other data is being output.
32 * Therefore, data doesn't have to be manually stored while waiting for
33 * stream operations to finish.
34 *
35 * To create a queued output stream, use #purple_queued_output_stream_new().
36 *
37 * To queue data, use #purple_queued_output_stream_push_bytes_async().
38 *
39 * If there's a fatal stream error, it's suggested to clear the remaining
40 * bytes queued with #purple_queued_output_stream_clear_queue() to avoid
41 * excessive errors returned in
42 * #purple_queued_output_stream_push_bytes_async()'s async callback.
43 */
44 struct _PurpleQueuedOutputStream
45 {
46 GFilterOutputStream parent;
47 };
48
49 typedef struct _PurpleQueuedOutputStreamPrivate
50 {
28 GAsyncQueue *queue; 51 GAsyncQueue *queue;
29 gboolean pending_queued; 52 gboolean pending_queued;
30 }; 53 } PurpleQueuedOutputStreamPrivate;
31 54
32 static GObjectClass *parent_class = NULL; 55 G_DEFINE_TYPE_WITH_PRIVATE(PurpleQueuedOutputStream,
33 56 purple_queued_output_stream, G_TYPE_FILTER_OUTPUT_STREAM)
34 #define PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(obj) \ 57
35 (G_TYPE_INSTANCE_GET_PRIVATE((obj), \ 58 /******************************************************************************
36 PURPLE_TYPE_QUEUED_OUTPUT_STREAM, \ 59 * Helpers
37 PurpleQueuedOutputStreamPrivate)) 60 *****************************************************************************/
38 61
39 G_DEFINE_TYPE_WITH_CODE(PurpleQueuedOutputStream, purple_queued_output_stream,
40 G_TYPE_FILTER_OUTPUT_STREAM,
41 G_ADD_PRIVATE(PurpleQueuedOutputStream))
42
43 static void purple_queued_output_stream_dispose(GObject *object);
44 static void purple_queued_output_stream_start_push_bytes_async(GTask *task); 62 static void purple_queued_output_stream_start_push_bytes_async(GTask *task);
45
46 static void
47 purple_queued_output_stream_class_init(PurpleQueuedOutputStreamClass *klass)
48 {
49 GObjectClass *object_class;
50
51 parent_class = g_type_class_peek_parent(klass);
52
53 object_class = G_OBJECT_CLASS(klass);
54 object_class->dispose = purple_queued_output_stream_dispose;
55 }
56
57 PurpleQueuedOutputStream *
58 purple_queued_output_stream_new(GOutputStream *base_stream)
59 {
60 PurpleQueuedOutputStream *stream;
61
62 g_return_val_if_fail(G_IS_OUTPUT_STREAM(base_stream), NULL);
63
64 stream = g_object_new(PURPLE_TYPE_QUEUED_OUTPUT_STREAM,
65 "base-stream", base_stream,
66 NULL);
67
68 return stream;
69 }
70
71 static void
72 purple_queued_output_stream_init(PurpleQueuedOutputStream *stream)
73 {
74 stream->priv = PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(stream);
75 stream->priv->queue =
76 g_async_queue_new_full((GDestroyNotify)g_bytes_unref);
77 stream->priv->pending_queued = FALSE;
78 }
79
80 static void
81 purple_queued_output_stream_dispose(GObject *object)
82 {
83 PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(object);
84
85 g_clear_pointer(&stream->priv->queue, g_async_queue_unref);
86
87 G_OBJECT_CLASS(parent_class)->dispose(object);
88 }
89 63
90 static void 64 static void
91 purple_queued_output_stream_push_bytes_async_cb(GObject *source, 65 purple_queued_output_stream_push_bytes_async_cb(GObject *source,
92 GAsyncResult *res, gpointer user_data) 66 GAsyncResult *res, gpointer user_data)
93 { 67 {
94 GTask *task = G_TASK(user_data); 68 GTask *task = G_TASK(user_data);
95 PurpleQueuedOutputStream *stream = g_task_get_source_object(task); 69 PurpleQueuedOutputStream *stream = g_task_get_source_object(task);
70 PurpleQueuedOutputStreamPrivate *priv = purple_queued_output_stream_get_instance_private(stream);
96 gssize written; 71 gssize written;
97 GBytes *bytes; 72 GBytes *bytes;
98 gsize size; 73 gsize size;
99 GError *error = NULL; 74 GError *error = NULL;
100 75
124 * tasks to process here. 99 * tasks to process here.
125 */ 100 */
126 101
127 if (task == NULL) { 102 if (task == NULL) {
128 /* Any queued data left? */ 103 /* Any queued data left? */
129 task = g_async_queue_try_pop(stream->priv->queue); 104 task = g_async_queue_try_pop(priv->queue);
130 } 105 }
131 106
132 if (task != NULL) { 107 if (task != NULL) {
133 /* More to process */ 108 /* More to process */
134 purple_queued_output_stream_start_push_bytes_async(task); 109 purple_queued_output_stream_start_push_bytes_async(task);
135 } else { 110 } else {
136 /* All done */ 111 /* All done */
137 stream->priv->pending_queued = FALSE; 112 priv->pending_queued = FALSE;
138 g_output_stream_clear_pending(G_OUTPUT_STREAM(stream)); 113 g_output_stream_clear_pending(G_OUTPUT_STREAM(stream));
139 } 114 }
140 } 115 }
141 116
142 static void 117 static void
154 g_task_get_cancellable(task), 129 g_task_get_cancellable(task),
155 purple_queued_output_stream_push_bytes_async_cb, 130 purple_queued_output_stream_push_bytes_async_cb,
156 task); 131 task);
157 } 132 }
158 133
134 /******************************************************************************
135 * GObject Implementation
136 *****************************************************************************/
137
138 static void
139 purple_queued_output_stream_dispose(GObject *object)
140 {
141 PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(object);
142 PurpleQueuedOutputStreamPrivate *priv = purple_queued_output_stream_get_instance_private(stream);
143
144 g_clear_pointer(&priv->queue, g_async_queue_unref);
145
146 G_OBJECT_CLASS(purple_queued_output_stream_parent_class)->dispose(object);
147 }
148
149 static void
150 purple_queued_output_stream_class_init(PurpleQueuedOutputStreamClass *klass)
151 {
152 GObjectClass *obj_class = G_OBJECT_CLASS(klass);
153
154 obj_class->dispose = purple_queued_output_stream_dispose;
155 }
156
157 static void
158 purple_queued_output_stream_init(PurpleQueuedOutputStream *stream)
159 {
160 PurpleQueuedOutputStreamPrivate *priv = purple_queued_output_stream_get_instance_private(stream);
161 priv->queue = g_async_queue_new_full((GDestroyNotify)g_bytes_unref);
162 priv->pending_queued = FALSE;
163 }
164
165 /******************************************************************************
166 * Public API
167 *****************************************************************************/
168
169 PurpleQueuedOutputStream *
170 purple_queued_output_stream_new(GOutputStream *base_stream)
171 {
172 PurpleQueuedOutputStream *stream;
173
174 g_return_val_if_fail(G_IS_OUTPUT_STREAM(base_stream), NULL);
175
176 stream = g_object_new(PURPLE_TYPE_QUEUED_OUTPUT_STREAM,
177 "base-stream", base_stream,
178 NULL);
179
180 return stream;
181 }
182
159 void 183 void
160 purple_queued_output_stream_push_bytes_async(PurpleQueuedOutputStream *stream, 184 purple_queued_output_stream_push_bytes_async(PurpleQueuedOutputStream *stream,
161 GBytes *bytes, int io_priority, GCancellable *cancellable, 185 GBytes *bytes, int io_priority, GCancellable *cancellable,
162 GAsyncReadyCallback callback, gpointer user_data) 186 GAsyncReadyCallback callback, gpointer user_data)
163 { 187 {
164 GTask *task; 188 GTask *task;
165 gboolean set_pending; 189 gboolean set_pending;
166 GError *error = NULL; 190 GError *error = NULL;
167 191 PurpleQueuedOutputStreamPrivate *priv = NULL;
168 g_return_if_fail(PURPLE_QUEUED_OUTPUT_STREAM(stream)); 192
193 g_return_if_fail(PURPLE_IS_QUEUED_OUTPUT_STREAM(stream));
169 g_return_if_fail(bytes != NULL); 194 g_return_if_fail(bytes != NULL);
195
196 priv = purple_queued_output_stream_get_instance_private(stream);
170 197
171 task = g_task_new(stream, cancellable, callback, user_data); 198 task = g_task_new(stream, cancellable, callback, user_data);
172 g_task_set_task_data(task, g_bytes_ref(bytes), 199 g_task_set_task_data(task, g_bytes_ref(bytes),
173 (GDestroyNotify)g_bytes_unref); 200 (GDestroyNotify)g_bytes_unref);
174 g_task_set_source_tag(task, 201 g_task_set_source_tag(task,
181 /* Since we're allowing queuing requests without blocking, 208 /* Since we're allowing queuing requests without blocking,
182 * it's not an error to be pending while processing queued operations. 209 * it's not an error to be pending while processing queued operations.
183 */ 210 */
184 if (!set_pending && (!g_error_matches(error, 211 if (!set_pending && (!g_error_matches(error,
185 G_IO_ERROR, G_IO_ERROR_PENDING) || 212 G_IO_ERROR, G_IO_ERROR_PENDING) ||
186 !stream->priv->pending_queued)) { 213 !priv->pending_queued)) {
187 g_task_return_error(task, error); 214 g_task_return_error(task, error);
188 g_object_unref(task); 215 g_object_unref(task);
189 return; 216 return;
190 } 217 }
191 218
192 stream->priv->pending_queued = TRUE; 219 priv->pending_queued = TRUE;
193 220
194 if (set_pending) { 221 if (set_pending) {
195 /* Start processing if there were no pending operations */ 222 /* Start processing if there were no pending operations */
196 purple_queued_output_stream_start_push_bytes_async(task); 223 purple_queued_output_stream_start_push_bytes_async(task);
197 } else { 224 } else {
198 /* Otherwise queue the data */ 225 /* Otherwise queue the data */
199 g_async_queue_push(stream->priv->queue, task); 226 g_async_queue_push(priv->queue, task);
200 } 227 }
201 } 228 }
202 229
203 gboolean 230 gboolean
204 purple_queued_output_stream_push_bytes_finish(PurpleQueuedOutputStream *stream, 231 purple_queued_output_stream_push_bytes_finish(PurpleQueuedOutputStream *stream,
214 241
215 void 242 void
216 purple_queued_output_stream_clear_queue(PurpleQueuedOutputStream *stream) 243 purple_queued_output_stream_clear_queue(PurpleQueuedOutputStream *stream)
217 { 244 {
218 GTask *task; 245 GTask *task;
246 PurpleQueuedOutputStreamPrivate *priv = NULL;
219 247
220 g_return_if_fail(PURPLE_IS_QUEUED_OUTPUT_STREAM(stream)); 248 g_return_if_fail(PURPLE_IS_QUEUED_OUTPUT_STREAM(stream));
221 249
222 while ((task = g_async_queue_try_pop(stream->priv->queue)) != NULL) { 250 priv = purple_queued_output_stream_get_instance_private(stream);
251
252 while ((task = g_async_queue_try_pop(priv->queue)) != NULL) {
223 g_task_return_new_error(task, G_IO_ERROR, G_IO_ERROR_CANCELLED, 253 g_task_return_new_error(task, G_IO_ERROR, G_IO_ERROR_CANCELLED,
224 "PurpleQueuedOutputStream queue cleared"); 254 "PurpleQueuedOutputStream queue cleared");
225 g_object_unref(task); 255 g_object_unref(task);
226 } 256 }
227 } 257 }

mercurial