| |
1 /* |
| |
2 * |
| |
3 * purple |
| |
4 * |
| |
5 * Purple is the legal property of its developers, whose names are too numerous |
| |
6 * to list here. Please refer to the COPYRIGHT file distributed with this |
| |
7 * source distribution. |
| |
8 * |
| |
9 * This program is free software; you can redistribute it and/or modify |
| |
10 * it under the terms of the GNU General Public License as published by |
| |
11 * the Free Software Foundation; either version 2 of the License, or |
| |
12 * (at your option) any later version. |
| |
13 * |
| |
14 * This program is distributed in the hope that it will be useful, |
| |
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| |
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| |
17 * GNU General Public License for more details. |
| |
18 * |
| |
19 * You should have received a copy of the GNU General Public License |
| |
20 * along with this program; if not, write to the Free Software |
| |
21 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA |
| |
22 */ |
| |
23 |
| |
24 #include "internal.h" |
| |
25 #include "queuedoutputstream.h" |
| |
26 |
| |
27 struct _PurpleQueuedOutputStreamPrivate { |
| |
28 GAsyncQueue *queue; |
| |
29 }; |
| |
30 |
| |
31 GObjectClass *parent_class = NULL; |
| |
32 |
| |
33 #define PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(obj) \ |
| |
34 (G_TYPE_INSTANCE_GET_PRIVATE((obj), \ |
| |
35 PURPLE_TYPE_QUEUED_OUTPUT_STREAM, \ |
| |
36 PurpleQueuedOutputStreamPrivate)) |
| |
37 |
| |
38 G_DEFINE_TYPE_WITH_CODE(PurpleQueuedOutputStream, purple_queued_output_stream, |
| |
39 G_TYPE_FILTER_OUTPUT_STREAM, |
| |
40 G_ADD_PRIVATE(PurpleQueuedOutputStream)) |
| |
41 |
| |
42 static void purple_queued_output_stream_dispose(GObject *object); |
| |
43 static gboolean purple_queued_output_stream_flush(GOutputStream *stream, |
| |
44 GCancellable *cancellable, GError **error); |
| |
45 |
| |
46 static void |
| |
47 purple_queued_output_stream_class_init(PurpleQueuedOutputStreamClass *klass) |
| |
48 { |
| |
49 GObjectClass *object_class; |
| |
50 GOutputStreamClass *ostream_class; |
| |
51 |
| |
52 parent_class = g_type_class_peek_parent(klass); |
| |
53 |
| |
54 object_class = G_OBJECT_CLASS(klass); |
| |
55 object_class->dispose = purple_queued_output_stream_dispose; |
| |
56 |
| |
57 ostream_class = G_OUTPUT_STREAM_CLASS(klass); |
| |
58 ostream_class->flush = purple_queued_output_stream_flush; |
| |
59 } |
| |
60 |
| |
61 PurpleQueuedOutputStream * |
| |
62 purple_queued_output_stream_new(GOutputStream *base_stream) |
| |
63 { |
| |
64 PurpleQueuedOutputStream *stream; |
| |
65 |
| |
66 g_return_val_if_fail(G_IS_OUTPUT_STREAM(base_stream), NULL); |
| |
67 |
| |
68 stream = g_object_new(PURPLE_TYPE_QUEUED_OUTPUT_STREAM, |
| |
69 "base-stream", base_stream, |
| |
70 NULL); |
| |
71 |
| |
72 return stream; |
| |
73 } |
| |
74 |
| |
75 void |
| |
76 purple_queued_output_stream_push_bytes(PurpleQueuedOutputStream *stream, |
| |
77 GBytes *bytes) |
| |
78 { |
| |
79 g_return_if_fail(PURPLE_QUEUED_OUTPUT_STREAM(stream)); |
| |
80 g_return_if_fail(bytes != NULL); |
| |
81 |
| |
82 g_async_queue_push(stream->priv->queue, g_bytes_ref(bytes)); |
| |
83 } |
| |
84 |
| |
85 static void |
| |
86 purple_queued_output_stream_init(PurpleQueuedOutputStream *stream) |
| |
87 { |
| |
88 stream->priv = PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(stream); |
| |
89 stream->priv->queue = |
| |
90 g_async_queue_new_full((GDestroyNotify)g_bytes_unref); |
| |
91 } |
| |
92 |
| |
93 static void |
| |
94 purple_queued_output_stream_dispose(GObject *object) |
| |
95 { |
| |
96 PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(object); |
| |
97 |
| |
98 /* Chain up first in case the stream is flushed */ |
| |
99 G_OBJECT_CLASS(parent_class)->dispose(object); |
| |
100 |
| |
101 g_clear_pointer(&stream->priv->queue, g_async_queue_unref); |
| |
102 } |
| |
103 |
| |
104 static gboolean |
| |
105 purple_queued_output_stream_flush(GOutputStream *stream, |
| |
106 GCancellable *cancellable, GError **error) |
| |
107 { |
| |
108 GOutputStream *base_stream; |
| |
109 GAsyncQueue *queue; |
| |
110 GBytes *bytes; |
| |
111 const void *buffer; |
| |
112 gsize count; |
| |
113 gsize bytes_written = 0; |
| |
114 gboolean ret = TRUE; |
| |
115 |
| |
116 base_stream = g_filter_output_stream_get_base_stream( |
| |
117 G_FILTER_OUTPUT_STREAM(stream)); |
| |
118 queue = PURPLE_QUEUED_OUTPUT_STREAM(stream)->priv->queue; |
| |
119 |
| |
120 do { |
| |
121 bytes = g_async_queue_try_pop(queue); |
| |
122 |
| |
123 if (bytes == NULL) { |
| |
124 break; |
| |
125 } |
| |
126 |
| |
127 buffer = g_bytes_get_data(bytes, &count); |
| |
128 |
| |
129 ret = g_output_stream_write_all(base_stream, buffer, count, |
| |
130 &bytes_written, cancellable, error); |
| |
131 |
| |
132 if (!ret) { |
| |
133 GBytes *queue_bytes; |
| |
134 |
| |
135 if (bytes_written > 0) { |
| |
136 queue_bytes = g_bytes_new_from_bytes(bytes, |
| |
137 bytes_written, |
| |
138 count - bytes_written); |
| |
139 } else { |
| |
140 queue_bytes = g_bytes_ref(bytes); |
| |
141 } |
| |
142 |
| |
143 g_async_queue_push_front(queue, queue_bytes); |
| |
144 } |
| |
145 |
| |
146 g_bytes_unref(bytes); |
| |
147 } while (ret); |
| |
148 |
| |
149 return ret; |
| |
150 } |
| |
151 |