| 22 */ |
22 */ |
| 23 |
23 |
| 24 #include "internal.h" |
24 #include "internal.h" |
| 25 #include "queuedoutputstream.h" |
25 #include "queuedoutputstream.h" |
| 26 |
26 |
| 27 struct _PurpleQueuedOutputStreamPrivate { |
27 /** |
| |
28 * PurpleQueuedOutputStream: |
| |
29 * |
| |
30 * An implementation of #GFilterOutputStream which allows queuing data for |
| |
31 * output. This allows data to be queued while other data is being output. |
| |
32 * Therefore, data doesn't have to be manually stored while waiting for |
| |
33 * stream operations to finish. |
| |
34 * |
| |
35 * To create a queued output stream, use #purple_queued_output_stream_new(). |
| |
36 * |
| |
37 * To queue data, use #purple_queued_output_stream_push_bytes_async(). |
| |
38 * |
| |
39 * If there's a fatal stream error, it's suggested to clear the remaining |
| |
40 * bytes queued with #purple_queued_output_stream_clear_queue() to avoid |
| |
41 * excessive errors returned in |
| |
42 * #purple_queued_output_stream_push_bytes_async()'s async callback. |
| |
43 */ |
| |
44 struct _PurpleQueuedOutputStream |
| |
45 { |
| |
46 GFilterOutputStream parent; |
| |
47 }; |
| |
48 |
| |
49 typedef struct _PurpleQueuedOutputStreamPrivate |
| |
50 { |
| 28 GAsyncQueue *queue; |
51 GAsyncQueue *queue; |
| 29 gboolean pending_queued; |
52 gboolean pending_queued; |
| 30 }; |
53 } PurpleQueuedOutputStreamPrivate; |
| 31 |
54 |
| 32 static GObjectClass *parent_class = NULL; |
55 G_DEFINE_TYPE_WITH_PRIVATE(PurpleQueuedOutputStream, |
| 33 |
56 purple_queued_output_stream, G_TYPE_FILTER_OUTPUT_STREAM) |
| 34 #define PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(obj) \ |
57 |
| 35 (G_TYPE_INSTANCE_GET_PRIVATE((obj), \ |
58 /****************************************************************************** |
| 36 PURPLE_TYPE_QUEUED_OUTPUT_STREAM, \ |
59 * Helpers |
| 37 PurpleQueuedOutputStreamPrivate)) |
60 *****************************************************************************/ |
| 38 |
61 |
| 39 G_DEFINE_TYPE_WITH_CODE(PurpleQueuedOutputStream, purple_queued_output_stream, |
|
| 40 G_TYPE_FILTER_OUTPUT_STREAM, |
|
| 41 G_ADD_PRIVATE(PurpleQueuedOutputStream)) |
|
| 42 |
|
| 43 static void purple_queued_output_stream_dispose(GObject *object); |
|
| 44 static void purple_queued_output_stream_start_push_bytes_async(GTask *task); |
62 static void purple_queued_output_stream_start_push_bytes_async(GTask *task); |
| 45 |
|
| 46 static void |
|
| 47 purple_queued_output_stream_class_init(PurpleQueuedOutputStreamClass *klass) |
|
| 48 { |
|
| 49 GObjectClass *object_class; |
|
| 50 |
|
| 51 parent_class = g_type_class_peek_parent(klass); |
|
| 52 |
|
| 53 object_class = G_OBJECT_CLASS(klass); |
|
| 54 object_class->dispose = purple_queued_output_stream_dispose; |
|
| 55 } |
|
| 56 |
|
| 57 PurpleQueuedOutputStream * |
|
| 58 purple_queued_output_stream_new(GOutputStream *base_stream) |
|
| 59 { |
|
| 60 PurpleQueuedOutputStream *stream; |
|
| 61 |
|
| 62 g_return_val_if_fail(G_IS_OUTPUT_STREAM(base_stream), NULL); |
|
| 63 |
|
| 64 stream = g_object_new(PURPLE_TYPE_QUEUED_OUTPUT_STREAM, |
|
| 65 "base-stream", base_stream, |
|
| 66 NULL); |
|
| 67 |
|
| 68 return stream; |
|
| 69 } |
|
| 70 |
|
| 71 static void |
|
| 72 purple_queued_output_stream_init(PurpleQueuedOutputStream *stream) |
|
| 73 { |
|
| 74 stream->priv = PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(stream); |
|
| 75 stream->priv->queue = |
|
| 76 g_async_queue_new_full((GDestroyNotify)g_bytes_unref); |
|
| 77 stream->priv->pending_queued = FALSE; |
|
| 78 } |
|
| 79 |
|
| 80 static void |
|
| 81 purple_queued_output_stream_dispose(GObject *object) |
|
| 82 { |
|
| 83 PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(object); |
|
| 84 |
|
| 85 g_clear_pointer(&stream->priv->queue, g_async_queue_unref); |
|
| 86 |
|
| 87 G_OBJECT_CLASS(parent_class)->dispose(object); |
|
| 88 } |
|
| 89 |
63 |
| 90 static void |
64 static void |
| 91 purple_queued_output_stream_push_bytes_async_cb(GObject *source, |
65 purple_queued_output_stream_push_bytes_async_cb(GObject *source, |
| 92 GAsyncResult *res, gpointer user_data) |
66 GAsyncResult *res, gpointer user_data) |
| 93 { |
67 { |
| 94 GTask *task = G_TASK(user_data); |
68 GTask *task = G_TASK(user_data); |
| 95 PurpleQueuedOutputStream *stream = g_task_get_source_object(task); |
69 PurpleQueuedOutputStream *stream = g_task_get_source_object(task); |
| |
70 PurpleQueuedOutputStreamPrivate *priv = purple_queued_output_stream_get_instance_private(stream); |
| 96 gssize written; |
71 gssize written; |
| 97 GBytes *bytes; |
72 GBytes *bytes; |
| 98 gsize size; |
73 gsize size; |
| 99 GError *error = NULL; |
74 GError *error = NULL; |
| 100 |
75 |
| 154 g_task_get_cancellable(task), |
129 g_task_get_cancellable(task), |
| 155 purple_queued_output_stream_push_bytes_async_cb, |
130 purple_queued_output_stream_push_bytes_async_cb, |
| 156 task); |
131 task); |
| 157 } |
132 } |
| 158 |
133 |
| |
134 /****************************************************************************** |
| |
135 * GObject Implementation |
| |
136 *****************************************************************************/ |
| |
137 |
| |
138 static void |
| |
139 purple_queued_output_stream_dispose(GObject *object) |
| |
140 { |
| |
141 PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(object); |
| |
142 PurpleQueuedOutputStreamPrivate *priv = purple_queued_output_stream_get_instance_private(stream); |
| |
143 |
| |
144 g_clear_pointer(&priv->queue, g_async_queue_unref); |
| |
145 |
| |
146 G_OBJECT_CLASS(purple_queued_output_stream_parent_class)->dispose(object); |
| |
147 } |
| |
148 |
| |
149 static void |
| |
150 purple_queued_output_stream_class_init(PurpleQueuedOutputStreamClass *klass) |
| |
151 { |
| |
152 GObjectClass *obj_class = G_OBJECT_CLASS(klass); |
| |
153 |
| |
154 obj_class->dispose = purple_queued_output_stream_dispose; |
| |
155 } |
| |
156 |
| |
157 static void |
| |
158 purple_queued_output_stream_init(PurpleQueuedOutputStream *stream) |
| |
159 { |
| |
160 PurpleQueuedOutputStreamPrivate *priv = purple_queued_output_stream_get_instance_private(stream); |
| |
161 priv->queue = g_async_queue_new_full((GDestroyNotify)g_bytes_unref); |
| |
162 priv->pending_queued = FALSE; |
| |
163 } |
| |
164 |
| |
165 /****************************************************************************** |
| |
166 * Public API |
| |
167 *****************************************************************************/ |
| |
168 |
| |
169 PurpleQueuedOutputStream * |
| |
170 purple_queued_output_stream_new(GOutputStream *base_stream) |
| |
171 { |
| |
172 PurpleQueuedOutputStream *stream; |
| |
173 |
| |
174 g_return_val_if_fail(G_IS_OUTPUT_STREAM(base_stream), NULL); |
| |
175 |
| |
176 stream = g_object_new(PURPLE_TYPE_QUEUED_OUTPUT_STREAM, |
| |
177 "base-stream", base_stream, |
| |
178 NULL); |
| |
179 |
| |
180 return stream; |
| |
181 } |
| |
182 |
| 159 void |
183 void |
| 160 purple_queued_output_stream_push_bytes_async(PurpleQueuedOutputStream *stream, |
184 purple_queued_output_stream_push_bytes_async(PurpleQueuedOutputStream *stream, |
| 161 GBytes *bytes, int io_priority, GCancellable *cancellable, |
185 GBytes *bytes, int io_priority, GCancellable *cancellable, |
| 162 GAsyncReadyCallback callback, gpointer user_data) |
186 GAsyncReadyCallback callback, gpointer user_data) |
| 163 { |
187 { |
| 164 GTask *task; |
188 GTask *task; |
| 165 gboolean set_pending; |
189 gboolean set_pending; |
| 166 GError *error = NULL; |
190 GError *error = NULL; |
| 167 |
191 PurpleQueuedOutputStreamPrivate *priv = NULL; |
| 168 g_return_if_fail(PURPLE_QUEUED_OUTPUT_STREAM(stream)); |
192 |
| |
193 g_return_if_fail(PURPLE_IS_QUEUED_OUTPUT_STREAM(stream)); |
| 169 g_return_if_fail(bytes != NULL); |
194 g_return_if_fail(bytes != NULL); |
| |
195 |
| |
196 priv = purple_queued_output_stream_get_instance_private(stream); |
| 170 |
197 |
| 171 task = g_task_new(stream, cancellable, callback, user_data); |
198 task = g_task_new(stream, cancellable, callback, user_data); |
| 172 g_task_set_task_data(task, g_bytes_ref(bytes), |
199 g_task_set_task_data(task, g_bytes_ref(bytes), |
| 173 (GDestroyNotify)g_bytes_unref); |
200 (GDestroyNotify)g_bytes_unref); |
| 174 g_task_set_source_tag(task, |
201 g_task_set_source_tag(task, |
| 181 /* Since we're allowing queuing requests without blocking, |
208 /* Since we're allowing queuing requests without blocking, |
| 182 * it's not an error to be pending while processing queued operations. |
209 * it's not an error to be pending while processing queued operations. |
| 183 */ |
210 */ |
| 184 if (!set_pending && (!g_error_matches(error, |
211 if (!set_pending && (!g_error_matches(error, |
| 185 G_IO_ERROR, G_IO_ERROR_PENDING) || |
212 G_IO_ERROR, G_IO_ERROR_PENDING) || |
| 186 !stream->priv->pending_queued)) { |
213 !priv->pending_queued)) { |
| 187 g_task_return_error(task, error); |
214 g_task_return_error(task, error); |
| 188 g_object_unref(task); |
215 g_object_unref(task); |
| 189 return; |
216 return; |
| 190 } |
217 } |
| 191 |
218 |
| 192 stream->priv->pending_queued = TRUE; |
219 priv->pending_queued = TRUE; |
| 193 |
220 |
| 194 if (set_pending) { |
221 if (set_pending) { |
| 195 /* Start processing if there were no pending operations */ |
222 /* Start processing if there were no pending operations */ |
| 196 purple_queued_output_stream_start_push_bytes_async(task); |
223 purple_queued_output_stream_start_push_bytes_async(task); |
| 197 } else { |
224 } else { |
| 198 /* Otherwise queue the data */ |
225 /* Otherwise queue the data */ |
| 199 g_async_queue_push(stream->priv->queue, task); |
226 g_async_queue_push(priv->queue, task); |
| 200 } |
227 } |
| 201 } |
228 } |
| 202 |
229 |
| 203 gboolean |
230 gboolean |
| 204 purple_queued_output_stream_push_bytes_finish(PurpleQueuedOutputStream *stream, |
231 purple_queued_output_stream_push_bytes_finish(PurpleQueuedOutputStream *stream, |
| 214 |
241 |
| 215 void |
242 void |
| 216 purple_queued_output_stream_clear_queue(PurpleQueuedOutputStream *stream) |
243 purple_queued_output_stream_clear_queue(PurpleQueuedOutputStream *stream) |
| 217 { |
244 { |
| 218 GTask *task; |
245 GTask *task; |
| |
246 PurpleQueuedOutputStreamPrivate *priv = NULL; |
| 219 |
247 |
| 220 g_return_if_fail(PURPLE_IS_QUEUED_OUTPUT_STREAM(stream)); |
248 g_return_if_fail(PURPLE_IS_QUEUED_OUTPUT_STREAM(stream)); |
| 221 |
249 |
| 222 while ((task = g_async_queue_try_pop(stream->priv->queue)) != NULL) { |
250 priv = purple_queued_output_stream_get_instance_private(stream); |
| |
251 |
| |
252 while ((task = g_async_queue_try_pop(priv->queue)) != NULL) { |
| 223 g_task_return_new_error(task, G_IO_ERROR, G_IO_ERROR_CANCELLED, |
253 g_task_return_new_error(task, G_IO_ERROR, G_IO_ERROR_CANCELLED, |
| 224 "PurpleQueuedOutputStream queue cleared"); |
254 "PurpleQueuedOutputStream queue cleared"); |
| 225 g_object_unref(task); |
255 g_object_unref(task); |
| 226 } |
256 } |
| 227 } |
257 } |