libpurple/queuedoutputstream.c

changeset 37665
3eab84b9b88f
child 37975
18146230e402
equal deleted inserted replaced
37637:4d2299293fb3 37665:3eab84b9b88f
1 /*
2 *
3 * purple
4 *
5 * Purple is the legal property of its developers, whose names are too numerous
6 * to list here. Please refer to the COPYRIGHT file distributed with this
7 * source distribution.
8 *
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111-1301 USA
22 */
23
24 #include "internal.h"
25 #include "queuedoutputstream.h"
26
27 struct _PurpleQueuedOutputStreamPrivate {
28 GAsyncQueue *queue;
29 };
30
31 GObjectClass *parent_class = NULL;
32
33 #define PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(obj) \
34 (G_TYPE_INSTANCE_GET_PRIVATE((obj), \
35 PURPLE_TYPE_QUEUED_OUTPUT_STREAM, \
36 PurpleQueuedOutputStreamPrivate))
37
38 G_DEFINE_TYPE_WITH_CODE(PurpleQueuedOutputStream, purple_queued_output_stream,
39 G_TYPE_FILTER_OUTPUT_STREAM,
40 G_ADD_PRIVATE(PurpleQueuedOutputStream))
41
42 static void purple_queued_output_stream_dispose(GObject *object);
43 static gboolean purple_queued_output_stream_flush(GOutputStream *stream,
44 GCancellable *cancellable, GError **error);
45
46 static void
47 purple_queued_output_stream_class_init(PurpleQueuedOutputStreamClass *klass)
48 {
49 GObjectClass *object_class;
50 GOutputStreamClass *ostream_class;
51
52 parent_class = g_type_class_peek_parent(klass);
53
54 object_class = G_OBJECT_CLASS(klass);
55 object_class->dispose = purple_queued_output_stream_dispose;
56
57 ostream_class = G_OUTPUT_STREAM_CLASS(klass);
58 ostream_class->flush = purple_queued_output_stream_flush;
59 }
60
61 PurpleQueuedOutputStream *
62 purple_queued_output_stream_new(GOutputStream *base_stream)
63 {
64 PurpleQueuedOutputStream *stream;
65
66 g_return_val_if_fail(G_IS_OUTPUT_STREAM(base_stream), NULL);
67
68 stream = g_object_new(PURPLE_TYPE_QUEUED_OUTPUT_STREAM,
69 "base-stream", base_stream,
70 NULL);
71
72 return stream;
73 }
74
75 void
76 purple_queued_output_stream_push_bytes(PurpleQueuedOutputStream *stream,
77 GBytes *bytes)
78 {
79 g_return_if_fail(PURPLE_QUEUED_OUTPUT_STREAM(stream));
80 g_return_if_fail(bytes != NULL);
81
82 g_async_queue_push(stream->priv->queue, g_bytes_ref(bytes));
83 }
84
85 static void
86 purple_queued_output_stream_init(PurpleQueuedOutputStream *stream)
87 {
88 stream->priv = PURPLE_QUEUED_OUTPUT_STREAM_GET_PRIVATE(stream);
89 stream->priv->queue =
90 g_async_queue_new_full((GDestroyNotify)g_bytes_unref);
91 }
92
93 static void
94 purple_queued_output_stream_dispose(GObject *object)
95 {
96 PurpleQueuedOutputStream *stream = PURPLE_QUEUED_OUTPUT_STREAM(object);
97
98 /* Chain up first in case the stream is flushed */
99 G_OBJECT_CLASS(parent_class)->dispose(object);
100
101 g_clear_pointer(&stream->priv->queue, g_async_queue_unref);
102 }
103
104 static gboolean
105 purple_queued_output_stream_flush(GOutputStream *stream,
106 GCancellable *cancellable, GError **error)
107 {
108 GOutputStream *base_stream;
109 GAsyncQueue *queue;
110 GBytes *bytes;
111 const void *buffer;
112 gsize count;
113 gsize bytes_written = 0;
114 gboolean ret = TRUE;
115
116 base_stream = g_filter_output_stream_get_base_stream(
117 G_FILTER_OUTPUT_STREAM(stream));
118 queue = PURPLE_QUEUED_OUTPUT_STREAM(stream)->priv->queue;
119
120 do {
121 bytes = g_async_queue_try_pop(queue);
122
123 if (bytes == NULL) {
124 break;
125 }
126
127 buffer = g_bytes_get_data(bytes, &count);
128
129 ret = g_output_stream_write_all(base_stream, buffer, count,
130 &bytes_written, cancellable, error);
131
132 if (!ret) {
133 GBytes *queue_bytes;
134
135 if (bytes_written > 0) {
136 queue_bytes = g_bytes_new_from_bytes(bytes,
137 bytes_written,
138 count - bytes_written);
139 } else {
140 queue_bytes = g_bytes_ref(bytes);
141 }
142
143 g_async_queue_push_front(queue, queue_bytes);
144 }
145
146 g_bytes_unref(bytes);
147 } while (ret);
148
149 return ret;
150 }
151

mercurial