vrpn 07.35
Virtual Reality Peripheral Network
Loading...
Searching...
No Matches
vrpn_RedundantTransmission.C
Go to the documentation of this file.
1#include <stdio.h> // for fprintf, stderr, fclose, etc
2#include <string.h> // for NULL, memcpy
3
5
6struct timeval;
7
9 : d_connection(c)
10 , d_messageList(NULL)
11 , d_numMessagesQueued(0)
12 , d_numTransmissions(0)
13 , d_isEnabled(VRPN_FALSE)
14{
15
16 d_transmissionInterval.tv_sec = 0L;
17 d_transmissionInterval.tv_usec = 0L;
18
19 if (d_connection) {
21 }
22}
23
25{
26
27 if (d_connection) {
29 }
30}
31
33{
34 return d_numTransmissions;
35}
36
38{
40}
41
43{
44 return d_isEnabled;
45}
46
47// virtual
49{
50
51 queuedMessage *qm;
52 queuedMessage **snitch;
53 timeval now;
54
55 if (!d_connection) {
56 return;
57 }
58
59 // fprintf(stderr, "mainloop: %d messages queued.\n", d_numMessagesQueued);
60
61 vrpn_gettimeofday(&now, NULL);
62 for (qm = d_messageList; qm; qm = qm->next) {
63 if ((qm->remainingTransmissions > 0) &&
66 qm->p.type, qm->p.sender, qm->p.buffer,
70 // fprintf(stderr, "Sending message; "
71 //"%d transmissions remaining at %d.%d int.\n",
72 // qm->remainingTransmissions, qm->transmissionInterval.tv_sec,
73 // qm->transmissionInterval.tv_usec);
74 }
75 }
76
77 snitch = &d_messageList;
78 qm = *snitch;
79
80 while (qm) {
81 if (!qm->remainingTransmissions) {
82 *snitch = qm->next;
83 delete[]qm -> p.buffer;
84 delete qm;
85 qm = *snitch;
87 }
88 else {
89 snitch = &qm->next;
90 qm = *snitch;
91 }
92 }
93
96 fprintf(stderr, "vrpn_RedundantTransmission::mainloop(): "
97 "serious internal error.\n");
99 d_messageList = NULL;
100 }
101}
102
104{
105 d_isEnabled = on;
106
107 // fprintf(stderr, "vrpn_RedundantTransmission::enable(%s)\n",
108 // on ? "on" : "off");
109}
110
111void vrpn_RedundantTransmission::setDefaults(vrpn_uint32 numTransmissions,
112 timeval transmissionInterval)
113{
114
115 // fprintf(stderr, "vrpn_RedundantTransmission::setDefaults: "
116 //"to %d, %d.%d\n", numTransmissions, transmissionInterval.tv_sec,
117 // transmissionInterval.tv_usec);
118
119 d_numTransmissions = numTransmissions;
120 d_transmissionInterval = transmissionInterval;
121}
122
123// virtual
125 vrpn_uint32 len, timeval time, vrpn_uint32 type, vrpn_uint32 sender,
126 const char *buffer, vrpn_uint32 class_of_service,
127 vrpn_int32 numTransmissions, timeval *transmissionInterval)
128{
129 queuedMessage *qm;
130 int ret;
131 int i;
132
133 if (!d_connection) {
134 fprintf(stderr, "vrpn_RedundantTransmission::pack_message: "
135 "Connection not defined!\n");
136 return -1;
137 }
138
139 if (!d_isEnabled) {
140 return d_connection->pack_message(len, time, type, sender, buffer,
141 class_of_service);
142 }
143
144 ret = d_connection->pack_message(len, time, type, sender, buffer,
146
147 // TODO: check ret
148
149 // use defaults?
150 if (numTransmissions < 0) {
151 numTransmissions = d_numTransmissions;
152 }
153 if (!transmissionInterval) {
154 transmissionInterval = &d_transmissionInterval;
155 }
156
157 // fprintf(stderr, "In pack message with %d xmit at %d.%d\n",
158 // numTransmissions, transmissionInterval->tv_sec,
159 // transmissionInterval->tv_usec);
160
161 if (!numTransmissions) {
162 return ret;
163 }
164
165 // Special case - if transmissionInterval is 0, we send them all right
166 // away, but force VRPN to use separate network packets.
167 if (!transmissionInterval->tv_sec && !transmissionInterval->tv_usec) {
168 for (i = 0; i < numTransmissions; i++) {
170 ret = d_connection->pack_message(len, time, type, sender, buffer,
172 // TODO: check ret
173 }
175 return 0;
176 }
177
178 try { qm = new queuedMessage; }
179 catch (...) {
180 fprintf(stderr,
181 "vrpn_RedundantTransmission::pack_message: "
182 "Out of memory; can't queue message for retransmission.\n");
183 return -1;
184 }
185
186 qm->p.payload_len = len;
187 qm->p.msg_time = time;
188 qm->p.type = type;
189 qm->p.sender = sender;
190 try { qm->p.buffer = new char[len]; }
191 catch (...) {
192 fprintf(stderr,
193 "vrpn_RedundantTransmission::pack_message: "
194 "Out of memory; can't queue message for retransmission.\n");
195 return -1;
196 }
197 memcpy(const_cast<char *>(qm->p.buffer), buffer, len);
198
199 qm->remainingTransmissions = numTransmissions;
200 qm->transmissionInterval = *transmissionInterval;
201 qm->nextValidTime = vrpn_TimevalSum(time, *transmissionInterval);
202 qm->next = d_messageList;
203
205
206 // timeval now;
207 // vrpn_gettimeofday(&now, NULL);
208 // fprintf(stderr, " Queued message to go at %d.%d (now is %d.%d)\n",
209 // qm->nextValidTime.tv_sec, qm->nextValidTime.tv_usec,
210 // now.tv_sec, now.tv_usec);
211
212 d_messageList = qm;
213
214 return ret;
215}
216
217char *vrpn_RedundantController_Protocol::encode_set(int *len, vrpn_uint32 num,
218 timeval interval)
219{
220 char *buffer;
221 char *bp;
222 int buflen;
223
224 buflen = sizeof(vrpn_uint32) + sizeof(timeval);
225 *len = buflen;
226 try { buffer = new char[buflen]; }
227 catch (...) {
228 fprintf(stderr, "vrpn_RedundantController_Protocol::encode_set: "
229 "Out of memory.\n");
230 return NULL;
231 }
232
233 bp = buffer;
234 vrpn_buffer(&bp, &buflen, num);
235 vrpn_buffer(&bp, &buflen, interval);
236
237 return buffer;
238}
239
241 vrpn_uint32 *num,
242 timeval *interval)
243{
244 vrpn_unbuffer(buf, num);
245 vrpn_unbuffer(buf, interval);
246}
247
249{
250 char *buffer;
251 char *bp;
252 int buflen;
253
254 buflen = sizeof(vrpn_bool);
255 *len = buflen;
256 try { buffer = new char[buflen]; }
257 catch (...) {
258 fprintf(stderr, "vrpn_RedundantController_Protocol::encode_enable: "
259 "Out of memory.\n");
260 return NULL;
261 }
262
263 bp = buffer;
264 vrpn_buffer(&bp, &buflen, on);
265
266 return buffer;
267}
268
270 vrpn_bool *on)
271{
272 vrpn_unbuffer(buf, on);
273}
274
276{
277 d_set_type = c->register_message_type("vrpn_Red_Xmit_Ctrl set");
278 d_enable_type = c->register_message_type("vrpn_Red_Xmit_Ctrl enable");
279}
280
283 : vrpn_BaseClass("vrpn Redundant Transmission Controller", c)
284 , d_object(r)
285{
286
288
289 // fprintf(stderr, "Registering set handler with type %d.\n",
290 // d_protocol.d_set_type);
291
294}
295
297
299{
301
302 // do nothing
303}
304
306{
308
309 return 0;
310}
311
312// static
314{
316 const char **bp = &p.buffer;
317 vrpn_uint32 num;
318 timeval interval;
319
320 me->d_protocol.decode_set(bp, &num, &interval);
321 // fprintf(stderr, "vrpn_RedundantController::handle_set: %d, %d.%d\n",
322 // num, interval.tv_sec, interval.tv_usec);
323 me->d_object->setDefaults(num, interval);
324
325 return 0;
326}
327
329{
331 const char **bp = &p.buffer;
332 vrpn_bool on;
333
334 me->d_protocol.decode_enable(bp, &on);
335 me->d_object->enable(on);
336
337 return 0;
338}
339
341 : vrpn_BaseClass("vrpn Redundant Transmission Controller", c)
342{
346}
347
349
351{
353
354 // do nothing
355}
356
357void vrpn_RedundantRemote::set(int num, timeval interval)
358{
359 char *buf;
360 int len = 0;
361 timeval now;
362
363 buf = d_protocol.encode_set(&len, num, interval);
364 if (!buf) {
365 return;
366 }
367 // fprintf(stderr, "vrpn_RedundantRemote::set: %d, %d.%d\n",
368 // num, interval.tv_sec, interval.tv_usec);
369
370 vrpn_gettimeofday(&now, NULL);
373}
374
376{
377 char *buf;
378 int len = 0;
379 timeval now;
380
381 buf = d_protocol.encode_enable(&len, on);
382 if (!buf) {
383 return;
384 }
385
386 vrpn_gettimeofday(&now, NULL);
389}
390
392{
394
395 return 0;
396}
397
399 : nextTimestampToReplace(0)
400 , cb(NULL)
401 , handlerIsRegistered(vrpn_FALSE)
402{
403 timeval zero;
404 int i;
405
406 zero.tv_sec = 0;
407 zero.tv_usec = 0;
408
409 for (i = 0; i < VRPN_RR_LENGTH; i++) {
410 timestampSeen[i] = zero;
411 numSeen[i] = 0;
412 }
413}
414
416 : d_connection(c)
417 , d_memory(NULL)
418 , d_lastMemory(NULL)
419 , d_record(VRPN_FALSE)
420{
421
422 if (d_connection) {
424 }
425}
426
428{
429 vrpnMsgCallbackEntry *pVMCB, *pVMCB_Del;
430 int i;
431
432 for (i = 0; i < vrpn_CONNECTION_MAX_TYPES; i++) {
433 pVMCB = d_records[i].cb;
434 while (pVMCB) {
435 pVMCB_Del = pVMCB;
436 pVMCB = pVMCB_Del->next;
437 try {
438 delete pVMCB_Del;
439 } catch (...) {
440 fprintf(stderr, "vrpn_RedundantReceiver::~vrpn_RedundantReceiver(): delete failed\n");
441 return;
442 }
443 }
444 }
445
446 pVMCB = d_generic.cb;
447
448 while (pVMCB) {
449 pVMCB_Del = pVMCB;
450 pVMCB = pVMCB_Del->next;
451 try {
452 delete pVMCB_Del;
453 } catch (...) {
454 fprintf(stderr, "vrpn_RedundantReceiver::~vrpn_RedundantReceiver(): delete failed\n");
455 return;
456 }
457 }
458
459 if (d_connection) {
461 }
462}
463
464// virtual
466 vrpn_MESSAGEHANDLER handler,
467 void *userdata, vrpn_int32 sender)
468{
469 vrpnMsgCallbackEntry *ce = NULL;
470 try { ce = new vrpnMsgCallbackEntry; }
471 catch (...) {
472 fprintf(stderr, "vrpn_RedundantReceiver::register_handler: "
473 "Out of memory.\n");
474 return -1;
475 }
476 ce->handler = handler;
477 ce->userdata = userdata;
478 ce->sender = sender;
479
480 if (type == vrpn_ANY_TYPE) {
481 ce->next = d_generic.cb;
482 d_generic.cb = ce;
483 return 0;
484 } else if (type < 0) {
485 fprintf(stderr, "vrpn_RedundantReceiver::register_handler: "
486 "Negative type passed in.\n");
487 try {
488 delete ce;
489 } catch (...) {
490 fprintf(stderr, "vrpn_RedundantReceiver::register_handler(): delete failed\n");
491 return -1;
492 }
493 return -1;
494 } else {
495 ce->next = d_records[type].cb;
496 d_records[type].cb = ce;
497 }
498
499 if (!d_records[type].handlerIsRegistered) {
501 this, sender);
502 d_records[type].handlerIsRegistered = VRPN_TRUE;
503 }
504
505 return 0;
506}
507
508// virtual
510 vrpn_MESSAGEHANDLER handler,
511 void *userdata,
512 vrpn_int32 sender)
513{
514 // The pointer at *snitch points to victim
515 vrpnMsgCallbackEntry *victim, **snitch;
516
517 // Find a handler with this registry in the list (any one will do,
518 // since all duplicates are the same).
519 if (type == vrpn_ANY_TYPE) {
520 snitch = &d_generic.cb;
521 }
522 else {
523 snitch = &(d_records[type].cb);
524 }
525 victim = *snitch;
526 while ((victim != NULL) &&
527 ((victim->handler != handler) || (victim->userdata != userdata) ||
528 (victim->sender != sender))) {
529 snitch = &((*snitch)->next);
530 victim = victim->next;
531 }
532
533 // Make sure we found one
534 if (victim == NULL) {
535 fprintf(stderr,
536 "vrpn_TypeDispatcher::removeHandler: No such handler\n");
537 return -1;
538 }
539
540 // Remove the entry from the list
541 *snitch = victim->next;
542 try {
543 delete victim;
544 } catch (...) {
545 fprintf(stderr, "vrpn_RedundantReceiver::unregister_handler(): delete failed\n");
546 return -1;
547 }
548
549 return 0;
550}
551
552void vrpn_RedundantReceiver::record(vrpn_bool on) { d_record = on; }
553
554void vrpn_RedundantReceiver::writeMemory(const char *filename)
555{
556 FILE *fp;
557 RRMemory *mp;
558
559 if (!d_memory) {
560 fprintf(stderr, "vrpn_RedundantReceiver::writeMemory: "
561 "Memory is empty.\n");
562 return;
563 }
564
565 // BUG: This doesn't write out the records of the things that
566 // are still in the active list! As long as our network tests
567 // are recording a few seconds beyond the end of the interesting
568 // period & trimming them back that's OK, but if this code is ever
569 // used for precise timings there will be slight undersampling.
570
571 fp = fopen(filename, "wb");
572 if (!fp) {
573 fprintf(stderr, "vrpn_RedundantReceiver::writeMemory: "
574 "Couldn't open %s for writing.\n",
575 filename);
576 return;
577 }
578
579 for (mp = d_memory; mp; mp = mp->next) {
580 fprintf(fp, "%ld.%ld %d\n", mp->timestamp.tv_sec,
581 static_cast<long>(mp->timestamp.tv_usec), mp->numSeen);
582 }
583
584 fclose(fp);
585}
586
588{
589 RRMemory *mp;
590
591 for (mp = d_memory; d_memory; mp = d_memory) {
592 d_memory = mp->next;
593 try {
594 delete mp;
595 } catch (...) {
596 fprintf(stderr, "vrpn_RedundantReceiver::clearMemory(): delete failed\n");
597 return;
598 }
599 }
600
601 d_lastMemory = NULL;
602}
603
604// static
607{
610 RRMemory *memory;
611 int ntr;
612 int i;
613
614 for (i = 0; i < VRPN_RR_LENGTH; i++) {
615 if ((p.msg_time.tv_sec ==
616 me->d_records[p.type].timestampSeen[i].tv_sec) &&
617 (p.msg_time.tv_usec ==
618 me->d_records[p.type].timestampSeen[i].tv_usec)) {
619
620 // We've seen this one (recently); it's a redundant send whose
621 // predecessor didn't get lost, so this one is unnecessary FEC.
622 // Discard it.
623
624 // fprintf(stderr, "Threw out redundant copy of type %d.\n",
625 // p.type);
626
627 me->d_records[p.type].numSeen[i]++;
628
629 return 0;
630 }
631 }
632
634
635 // Store a record of how many copies of this message we received so
636 // we can compute loss later (knowing what level of redundancy was being
637 // used).
638
639 if (me->d_record) {
640 if (me->d_records[p.type].numSeen[ntr]) {
641 try { memory = new RRMemory; }
642 catch (...) {
643 fprintf(stderr,
644 "vrpn_RedundantReceiver::"
645 "handle_possiblyRedundantMessage: Out of memory.\n");
646 return -1;
647 }
648
649 memory->timestamp = me->d_records[p.type].timestampSeen[ntr];
650 memory->numSeen = me->d_records[p.type].numSeen[ntr];
651 memory->next = NULL;
652
653 if (me->d_lastMemory) {
654 me->d_lastMemory->next = memory;
655 }
656 else {
657 me->d_memory = memory;
658 }
659 me->d_lastMemory = memory;
660 }
661 }
662
663 me->d_records[p.type].timestampSeen[ntr] = p.msg_time;
664 me->d_records[p.type].numSeen[ntr] = 1;
666
667 for (who = me->d_generic.cb; who; who = who->next) {
668 if ((who->sender == vrpn_ANY_SENDER) || (who->sender == p.sender)) {
669 if (who->handler(who->userdata, p)) {
670 fprintf(stderr, "vrpn_RedundantReceiver::"
671 "handle_possiblyRedundantMessage: "
672 "Nonzero user generic handler return.\n");
673 return -1;
674 }
675 }
676 }
677
678 for (who = me->d_records[p.type].cb; who; who = who->next) {
679 // Verify that the sender is ANY or matches
680 if ((who->sender == vrpn_ANY_SENDER) || (who->sender == p.sender)) {
681 if (who->handler(who->userdata, p)) {
682 fprintf(stderr, "vrpn_RedundantReceiver::"
683 "handle_possiblyRedundantMessage: "
684 "Nonzero user handler return.\n");
685 return -1;
686 }
687 }
688 }
689
690 return 0;
691}
int register_autodeleted_handler(vrpn_int32 type, vrpn_MESSAGEHANDLER handler, void *userdata, vrpn_int32 sender=vrpn_ANY_SENDER)
Registers a handler with the connection, and remembers to delete at destruction.
vrpn_Connection * d_connection
Connection that this object talks to.
void client_mainloop(void)
Handles functions that all clients should provide in their mainloop() (warning of no server,...
vrpn_int32 d_sender_id
Sender ID registered with the connection.
void server_mainloop(void)
Handles functions that all servers should provide in their mainloop() (ping/pong, for example) Should...
Class from which all user-level (and other) classes that communicate with vrpn_Connections should der...
virtual int init(void)
Initialize things that the constructor can't. Returns 0 on success, -1 on failure.
Generic connection class not specific to the transport mechanism.
void addReference()
Counting references to this connection.
virtual vrpn_int32 register_message_type(const char *name)
virtual int send_pending_reports(void)=0
send pending report, clear the buffer. This function was protected, now is public,...
virtual int pack_message(vrpn_uint32 len, struct timeval time, vrpn_int32 type, vrpn_int32 sender, const char *buffer, vrpn_uint32 class_of_service)
Pack a message that will be sent the next time mainloop() is called. Turn off the RELIABLE flag if yo...
virtual int register_handler(vrpn_int32 type, vrpn_MESSAGEHANDLER handler, void *userdata, vrpn_int32 sender=vrpn_ANY_SENDER)
Set up (or remove) a handler for a message of a given type. Optionally, specify which sender to handl...
Accepts commands over a connection to control a local vrpn_RedundantTransmission's default parameters...
virtual int register_types(void)
Register the types of messages this device sends/receives. Return 0 on success, -1 on fail.
void mainloop(void)
Called once through each main loop iteration to handle updates. Remote object mainloop() should call ...
static int VRPN_CALLBACK handle_enable(void *, vrpn_HANDLERPARAM)
vrpn_RedundantController(vrpn_RedundantTransmission *, vrpn_Connection *)
vrpn_RedundantTransmission * d_object
vrpn_RedundantController_Protocol d_protocol
static int VRPN_CALLBACK handle_set(void *, vrpn_HANDLERPARAM)
Helper class that eliminates duplicates; only the first instance of a message is delivered....
vrpn_RedundantReceiver(vrpn_Connection *)
void clearMemory(void)
Throws away / resets statistics.
RRRecord d_records[vrpn_CONNECTION_MAX_TYPES]
void writeMemory(const char *filename)
Writes statistics to the named file: timestamp of every message received and number of copies of that...
virtual int unregister_handler(vrpn_int32 type, vrpn_MESSAGEHANDLER handler, void *userdata, vrpn_int32 sender=vrpn_ANY_SENDER)
void record(vrpn_bool)
Turns "memory" (tracking statistics of redundant reception) on and off.
virtual int register_handler(vrpn_int32 type, vrpn_MESSAGEHANDLER handler, void *userdata, vrpn_int32 sender=vrpn_ANY_SENDER)
static int VRPN_CALLBACK handle_possiblyRedundantMessage(void *, vrpn_HANDLERPARAM)
void set(int numRetransmissions, timeval transmissionInterval)
vrpn_RedundantController_Protocol d_protocol
int register_types(void)
Register the types of messages this device sends/receives. Return 0 on success, -1 on fail.
void mainloop(void)
Called once through each main loop iteration to handle updates. Remote object mainloop() should call ...
vrpn_RedundantRemote(vrpn_Connection *)
Helper class for vrpn_Connection that automates redundant transmission for unreliable (low-latency) m...
vrpn_uint32 defaultRetransmissions(void) const
virtual int pack_message(vrpn_uint32 len, timeval time, vrpn_uint32 type, vrpn_uint32 sender, const char *buffer, vrpn_uint32 class_of_service, vrpn_int32 numRetransmissions=-1, timeval *transmissionInterval=NULL)
If !isEnabled(), does a normal pack_message(), but if isEnabled() ignores class_of_service and sends ...
virtual void mainloop(void)
Determines which messages need to be resent and queues them up on the connection for transmission.
vrpn_RedundantTransmission(vrpn_Connection *c)
vrpn_uint32 d_numMessagesQueued
For debugging, mostly.
virtual void setDefaults(vrpn_uint32 numRetransmissions, timeval transmissionInterval)
Set default values for future calls to pack_message().
Description of a callback entry for a user type.
vrpnMsgCallbackEntry * next
Next handler.
void * userdata
Passed along.
vrpn_int32 sender
Only if from sender.
vrpn_MESSAGEHANDLER handler
Routine to call.
This structure is what is passed to a vrpn_Connection message callback.
const char * buffer
struct timeval msg_time
vrpn_int32 payload_len
char * encode_set(int *len, vrpn_uint32 num, timeval interval)
void decode_enable(const char **buf, vrpn_bool *)
void decode_set(const char **buf, vrpn_uint32 *num, timeval *interval)
const vrpn_uint32 vrpn_CONNECTION_RELIABLE
Classes of service for messages, specify multiple by ORing them together Priority of satisfying these...
const int vrpn_ANY_SENDER
vrpn_ANY_SENDER can be used to register callbacks on a given message type from any sender.
const int vrpn_ANY_TYPE
vrpn_ANY_TYPE can be used to register callbacks for any USER type of message from a given sender....
int(VRPN_CALLBACK * vrpn_MESSAGEHANDLER)(void *userdata, vrpn_HANDLERPARAM p)
Type of a message handler for vrpn_Connection messages.
const int vrpn_CONNECTION_MAX_TYPES
const vrpn_uint32 vrpn_CONNECTION_LOW_LATENCY
#define VRPN_RR_LENGTH
VRPN_API int vrpn_unbuffer(const char **buffer, timeval *t)
Utility routine for taking a struct timeval from a buffer that was sent as a message.
Definition: vrpn_Shared.C:321
bool vrpn_TimevalGreater(const timeval &tv1, const timeval &tv2)
Definition: vrpn_Shared.C:122
VRPN_API int vrpn_buffer(char **insertPt, vrpn_int32 *buflen, const timeval t)
Utility routine for placing a timeval struct into a buffer that is to be sent as a message.
Definition: vrpn_Shared.C:250
timeval vrpn_TimevalSum(const timeval &tv1, const timeval &tv2)
Definition: vrpn_Shared.C:54
#define vrpn_gettimeofday
Definition: vrpn_Shared.h:99