src/protocols/oscar/txqueue.c

branch
gaim
changeset 20470
77693555855f
parent 13071
b98e72d4089a
parent 20469
b2836a24d81e
child 20471
1966704b3e42
equal deleted inserted replaced
13071:b98e72d4089a 20470:77693555855f
1 /*
2 * txqueue.c
3 *
4 * Herein lies all the management routines for the transmit (Tx) queue.
5 *
6 */
7
8 #define FAIM_INTERNAL
9 #include <aim.h>
10
11 #ifndef _WIN32
12 #include <sys/socket.h>
13 #else
14 #include "win32dep.h"
15 #endif
16
17 /*
18 * Allocate a new tx frame.
19 *
20 * This is more for looks than anything else.
21 *
22 * Right now, that is. If/when we implement a pool of transmit
23 * frames, this will become the request-an-unused-frame part.
24 *
25 * framing = AIM_FRAMETYPE_OFT/FLAP
26 * chan = channel for FLAP, hdrtype for OFT
27 *
28 */
29 faim_internal aim_frame_t *aim_tx_new(aim_session_t *sess, aim_conn_t *conn, fu8_t framing, fu16_t chan, int datalen)
30 {
31 aim_frame_t *fr;
32
33 if (!sess || !conn) {
34 gaim_debug_misc("oscar", "aim_tx_new: No session or no connection specified!\n");
35 return NULL;
36 }
37
38 /* For sanity... */
39 if ((conn->type == AIM_CONN_TYPE_RENDEZVOUS) || (conn->type == AIM_CONN_TYPE_LISTENER)) {
40 if (framing != AIM_FRAMETYPE_OFT) {
41 gaim_debug_misc("oscar", "aim_tx_new: attempted to allocate inappropriate frame type for rendezvous connection\n");
42 return NULL;
43 }
44 } else {
45 if (framing != AIM_FRAMETYPE_FLAP) {
46 gaim_debug_misc("oscar", "aim_tx_new: attempted to allocate inappropriate frame type for FLAP connection\n");
47 return NULL;
48 }
49 }
50
51 if (!(fr = (aim_frame_t *)calloc(1, sizeof(aim_frame_t))))
52 return NULL;
53
54 fr->conn = conn;
55 fr->hdrtype = framing;
56 if (fr->hdrtype == AIM_FRAMETYPE_FLAP)
57 fr->hdr.flap.channel = chan;
58 else if (fr->hdrtype == AIM_FRAMETYPE_OFT)
59 fr->hdr.rend.type = chan;
60 else
61 gaim_debug_misc("oscar", "tx_new: unknown framing\n");
62
63 if (datalen > 0) {
64 fu8_t *data;
65
66 if (!(data = (unsigned char *)malloc(datalen))) {
67 aim_frame_destroy(fr);
68 return NULL;
69 }
70
71 aim_bstream_init(&fr->data, data, datalen);
72 }
73
74 return fr;
75 }
76
77 /*
78 * This increments the tx command count, and returns the seqnum
79 * that should be stamped on the next FLAP packet sent. This is
80 * normally called during the final step of packet preparation
81 * before enqueuement (in aim_tx_enqueue()).
82 */
83 static flap_seqnum_t aim_get_next_txseqnum(aim_conn_t *conn)
84 {
85 flap_seqnum_t ret;
86
87 ret = ++conn->seqnum;
88
89 return ret;
90 }
91
92 /*
93 * The overall purpose here is to enqueue the passed in command struct
94 * into the outgoing (tx) queue. Basically...
95 * 1) Make a scope-irrelevant copy of the struct
96 * 3) Mark as not-sent-yet
97 * 4) Enqueue the struct into the list
98 * 6) Return
99 *
100 * Note that this is only used when doing queue-based transmitting;
101 * that is, when sess->tx_enqueue is set to &aim_tx_enqueue__queuebased.
102 *
103 */
104 static int aim_tx_enqueue__queuebased(aim_session_t *sess, aim_frame_t *fr)
105 {
106
107 if (!fr->conn) {
108 gaim_debug_warning("oscar", "aim_tx_enqueue: enqueueing packet with no connecetion\n");
109 fr->conn = aim_getconn_type(sess, AIM_CONN_TYPE_BOS);
110 }
111
112 if (fr->hdrtype == AIM_FRAMETYPE_FLAP) {
113 /* assign seqnum -- XXX should really not assign until hardxmit */
114 fr->hdr.flap.seqnum = aim_get_next_txseqnum(fr->conn);
115 }
116
117 fr->handled = 0; /* not sent yet */
118
119 /* see overhead note in aim_rxqueue counterpart */
120 if (!sess->queue_outgoing)
121 sess->queue_outgoing = fr;
122 else {
123 aim_frame_t *cur;
124 for (cur = sess->queue_outgoing; cur->next; cur = cur->next);
125 cur->next = fr;
126 }
127
128 return 0;
129 }
130
131 /*
132 * Parallel to aim_tx_enqueue__queuebased, however, this bypasses
133 * the whole queue mess when you want immediate writes to happen.
134 *
135 * Basically the same as its __queuebased couterpart, however
136 * instead of doing a list append, it just calls aim_tx_sendframe()
137 * right here.
138 *
139 */
140 static int aim_tx_enqueue__immediate(aim_session_t *sess, aim_frame_t *fr)
141 {
142 int ret;
143
144 if (!fr->conn) {
145 gaim_debug_error("oscar", "aim_tx_enqueue: packet has no connection\n");
146 aim_frame_destroy(fr);
147 return 0;
148 }
149
150 if (fr->hdrtype == AIM_FRAMETYPE_FLAP)
151 fr->hdr.flap.seqnum = aim_get_next_txseqnum(fr->conn);
152
153 fr->handled = 0; /* not sent yet */
154
155 ret = aim_tx_sendframe(sess, fr);
156
157 aim_frame_destroy(fr);
158
159 return ret;
160 }
161
162 faim_export int aim_tx_setenqueue(aim_session_t *sess, int what, int (*func)(aim_session_t *, aim_frame_t *))
163 {
164
165 if (what == AIM_TX_QUEUED)
166 sess->tx_enqueue = &aim_tx_enqueue__queuebased;
167 else if (what == AIM_TX_IMMEDIATE)
168 sess->tx_enqueue = &aim_tx_enqueue__immediate;
169 else if (what == AIM_TX_USER) {
170 if (!func)
171 return -EINVAL;
172 sess->tx_enqueue = func;
173 } else
174 return -EINVAL; /* unknown action */
175
176 return 0;
177 }
178
179 faim_internal int aim_tx_enqueue(aim_session_t *sess, aim_frame_t *fr)
180 {
181
182 /*
183 * If we want to send on a connection that is in progress, we have to force
184 * them to use the queue based version. Otherwise, use whatever they
185 * want.
186 */
187 if (fr && fr->conn &&
188 (fr->conn->status & AIM_CONN_STATUS_INPROGRESS)) {
189 return aim_tx_enqueue__queuebased(sess, fr);
190 }
191
192 return (*sess->tx_enqueue)(sess, fr);
193 }
194
195 static int aim_send(int fd, const void *buf, size_t count)
196 {
197 int left, cur;
198
199 for (cur = 0, left = count; left; ) {
200 int ret;
201
202 ret = send(fd, ((unsigned char *)buf)+cur, left, 0);
203
204 if (ret == -1)
205 return -1;
206 else if (ret == 0)
207 return cur;
208
209 cur += ret;
210 left -= ret;
211 }
212
213 return cur;
214 }
215
216 faim_internal int aim_bstream_send(aim_bstream_t *bs, aim_conn_t *conn, size_t count)
217 {
218 int wrote = 0;
219
220 if (!bs || !conn)
221 return -EINVAL;
222
223 /* Make sure we don't send past the end of the bs */
224 if (count > aim_bstream_empty(bs))
225 count = aim_bstream_empty(bs); /* truncate to remaining space */
226
227 if (count) {
228 /*
229 * I need to rewrite this. "Updating the UI" doesn't make sense. The program is
230 * blocked and the UI can't redraw. We're blocking all of Gaim. We need to set
231 * up an actual txqueue and a GAIM_INPUT_WRITE callback and only write when we
232 * can. Why is this file called txqueue anyway? Lets rename it to txblock.
233 */
234 if ((conn->type == AIM_CONN_TYPE_RENDEZVOUS) &&
235 (conn->subtype == AIM_CONN_SUBTYPE_OFT_DIRECTIM)) {
236 const char *sn = aim_odc_getsn(conn);
237 aim_rxcallback_t userfunc;
238
239 while (count - wrote > 1024) {
240 int ret;
241
242 ret = aim_send(conn->fd, bs->data + bs->offset + wrote, 1024);
243 if (ret > 0)
244 wrote += ret;
245 if (ret < 0)
246 return -1;
247 if ((userfunc=aim_callhandler(conn->sessv, conn, AIM_CB_FAM_SPECIAL, AIM_CB_SPECIAL_IMAGETRANSFER)))
248 userfunc(conn->sessv, NULL, sn, count-wrote>1024 ? ((double)wrote / count) : 1);
249 }
250 }
251
252 if (count - wrote) {
253 wrote = wrote + aim_send(conn->fd, bs->data + bs->offset + wrote, count - wrote);
254 }
255 }
256
257 bs->offset += wrote;
258
259 return wrote;
260 }
261
262 static int sendframe_flap(aim_session_t *sess, aim_frame_t *fr)
263 {
264 aim_bstream_t bs;
265 fu8_t *bs_raw;
266 int payloadlen, err = 0, bslen;
267
268 payloadlen = aim_bstream_curpos(&fr->data);
269
270 if (!(bs_raw = malloc(6 + payloadlen)))
271 return -ENOMEM;
272
273 aim_bstream_init(&bs, bs_raw, 6 + payloadlen);
274
275 /* FLAP header */
276 aimbs_put8(&bs, 0x2a);
277 aimbs_put8(&bs, fr->hdr.flap.channel);
278 aimbs_put16(&bs, fr->hdr.flap.seqnum);
279 aimbs_put16(&bs, payloadlen);
280
281 /* payload */
282 aim_bstream_rewind(&fr->data);
283 aimbs_putbs(&bs, &fr->data, payloadlen);
284
285 bslen = aim_bstream_curpos(&bs);
286 aim_bstream_rewind(&bs);
287 if (aim_bstream_send(&bs, fr->conn, bslen) != bslen)
288 err = -errno;
289
290 free(bs_raw); /* XXX aim_bstream_free */
291
292 fr->handled = 1;
293 fr->conn->lastactivity = time(NULL);
294
295 return err;
296 }
297
298 static int sendframe_rendezvous(aim_session_t *sess, aim_frame_t *fr)
299 {
300 aim_bstream_t bs;
301 fu8_t *bs_raw;
302 int payloadlen, err = 0, bslen;
303
304 payloadlen = aim_bstream_curpos(&fr->data);
305
306 if (!(bs_raw = malloc(8 + payloadlen)))
307 return -ENOMEM;
308
309 aim_bstream_init(&bs, bs_raw, 8 + payloadlen);
310
311 /* Rendezvous header */
312 aimbs_putraw(&bs, fr->hdr.rend.magic, 4);
313 aimbs_put16(&bs, fr->hdr.rend.hdrlen);
314 aimbs_put16(&bs, fr->hdr.rend.type);
315
316 /* payload */
317 aim_bstream_rewind(&fr->data);
318 aimbs_putbs(&bs, &fr->data, payloadlen);
319
320 bslen = aim_bstream_curpos(&bs);
321 aim_bstream_rewind(&bs);
322 if (aim_bstream_send(&bs, fr->conn, bslen) != bslen)
323 err = -errno;
324
325 free(bs_raw); /* XXX aim_bstream_free */
326
327 fr->handled = 1;
328 fr->conn->lastactivity = time(NULL);
329
330 return err;
331 }
332
333 faim_internal int aim_tx_sendframe(aim_session_t *sess, aim_frame_t *fr)
334 {
335 if (fr->hdrtype == AIM_FRAMETYPE_FLAP)
336 return sendframe_flap(sess, fr);
337 else if (fr->hdrtype == AIM_FRAMETYPE_OFT)
338 return sendframe_rendezvous(sess, fr);
339
340 return -1;
341 }
342
343 faim_export int aim_tx_flushqueue(aim_session_t *sess)
344 {
345 aim_frame_t *cur;
346
347 for (cur = sess->queue_outgoing; cur; cur = cur->next) {
348
349 if (cur->handled)
350 continue; /* already been sent */
351
352 if (cur->conn && (cur->conn->status & AIM_CONN_STATUS_INPROGRESS))
353 continue;
354
355 /*
356 * And now for the meager attempt to force transmit
357 * latency and avoid missed messages.
358 */
359 if ((cur->conn->lastactivity + cur->conn->forcedlatency) >= time(NULL)) {
360 /*
361 * XXX should be a break! we don't want to block the
362 * upper layers
363 *
364 * XXX or better, just do this right.
365 *
366 */
367 sleep((cur->conn->lastactivity + cur->conn->forcedlatency) - time(NULL));
368 }
369
370 /* XXX this should call the custom "queuing" function!! */
371 aim_tx_sendframe(sess, cur);
372 }
373
374 /* purge sent commands from queue */
375 aim_tx_purgequeue(sess);
376
377 return 0;
378 }
379
380 /*
381 * This is responsible for removing sent commands from the transmit
382 * queue. This is not a required operation, but it of course helps
383 * reduce memory footprint at run time!
384 */
385 faim_export void aim_tx_purgequeue(aim_session_t *sess)
386 {
387 aim_frame_t *cur, **prev;
388
389 for (prev = &sess->queue_outgoing; (cur = *prev); ) {
390 if (cur->handled) {
391 *prev = cur->next;
392 aim_frame_destroy(cur);
393 } else
394 prev = &cur->next;
395 }
396
397 return;
398 }
399
400 /**
401 * Get rid of packets waiting for tx on a dying conn. For now this
402 * simply marks all packets as sent and lets them disappear without
403 * warning.
404 *
405 * @param sess A session.
406 * @param conn Connection that's dying.
407 */
408 faim_internal void aim_tx_cleanqueue(aim_session_t *sess, aim_conn_t *conn)
409 {
410 aim_frame_t *cur;
411
412 for (cur = sess->queue_outgoing; cur; cur = cur->next) {
413 if (cur->conn == conn)
414 cur->handled = 1;
415 }
416
417 return;
418 }

mercurial