Fawkes API Fawkes Development Version
qa_bb_remote.cpp
1
2/***************************************************************************
3 * qa_bb_remote.cpp - BlackBoard remote access QA
4 *
5 * Created: Mon Mar 03 17:31:18 2008
6 * Copyright 2006-2008 Tim Niemueller [www.niemueller.de]
7 *
8 ****************************************************************************/
9
10/* This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version. A runtime exception applies to
14 * this software (see LICENSE.GPL_WRE file mentioned below for details).
15 *
16 * This program is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU Library General Public License for more details.
20 *
21 * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22 */
23
24/// @cond QA
25
26#include <blackboard/bbconfig.h>
27#include <blackboard/exceptions.h>
28#include <blackboard/interface_listener.h>
29#include <blackboard/local.h>
30#include <blackboard/remote.h>
31#include <core/exceptions/system.h>
32#include <interface/interface_info.h>
33#include <interfaces/TestInterface.h>
34#include <netcomm/fawkes/client.h>
35#include <netcomm/fawkes/server_thread.h>
36#include <utils/time/time.h>
37
38#include <cstdio>
39#include <cstdlib>
40#include <cstring>
41#include <iostream>
42#include <signal.h>
43#include <vector>
44
45using namespace std;
46using namespace fawkes;
47
48bool quit = false;
49
50void
51signal_handler(int signum)
52{
53 quit = true;
54}
55
56#define NUM_CHUNKS 5
57
58void
59test_messaging(TestInterface *ti_reader, TestInterface *ti_writer)
60{
61 while (!quit) {
62 int expval = ti_reader->test_int() + 1;
64 unsigned int msgid = ti_reader->msgq_enqueue(m);
65 printf("Sent with message ID %u\n", msgid);
66
67 if (ti_writer->msgq_size() > 1) {
68 cout << "Error, more than one message! flushing." << endl;
69 ti_writer->msgq_flush();
70 }
71
72 usleep(100000);
73
74 if (ti_writer->msgq_first() != NULL) {
77 printf(
78 "Received message of ID %u, Message improperly detected to be a SetTestStringMessage\n",
79 msg->id());
80 }
84 printf("Received message with ID %u (enqueue time: %s)\n",
85 m2->id(),
86 m2->time_enqueued()->str());
87 ti_writer->set_test_int(m2->test_int());
88 try {
89 ti_writer->write();
91 cout << "BUG: caught write denied exception" << endl;
92 e.print_trace();
93 }
94 ti_writer->msgq_pop();
95 } else {
96 cout << "Illegal message '" << ti_writer->msgq_first()->type() << "' type received" << endl;
97 }
98
99 usleep(100000);
100
101 //cout << "Reading value from reader interface.. " << flush;
102 ti_reader->read();
103 int val = ti_reader->test_int();
104 if (val == expval) {
105 //cout << " success, value is " << ti_reader->test_int() << " as expected" << endl;
106 } else {
107 cout << " failure, value is " << ti_reader->test_int() << ", expected " << expval << endl;
108 }
109 } else {
110 printf("No message in queue, if network test this means the message was dropped\n");
111 }
112
113 usleep(10);
114 }
115}
116
118{
119public:
121 fawkes::Interface * writer,
122 fawkes::BlackBoard *reader_bb,
123 fawkes::BlackBoard *writer_bb)
124 : BlackBoardInterfaceListener("SyncInterfaceListener(%s-%s)", writer->uid(), reader->id())
125 {
126 reader_ = reader;
127 writer_ = writer;
128 reader_bb_ = reader_bb;
129 writer_bb_ = writer_bb;
130
133
134 reader_bb_->register_listener(this);
135 writer_bb_->register_listener(this);
136 }
137
138 /** Destructor. */
140 {
141 reader_bb_->unregister_listener(this);
142 writer_bb_->unregister_listener(this);
143 }
144
145 bool
146 bb_interface_message_received(Interface *interface, Message *message) noexcept
147 {
148 try {
149 if (interface == writer_) {
150 printf("%s: Forwarding message\n", bbil_name());
151 Message *m = message->clone();
152 m->set_hops(message->hops());
153 m->ref();
154 reader_->msgq_enqueue(m);
155 message->set_id(m->id());
156 m->unref();
157 return false;
158 } else {
159 // Don't know why we were called, let 'em enqueue
160 printf("%s: Message received for unknown interface\n", bbil_name());
161 return true;
162 }
163 } catch (Exception &e) {
164 printf("%s: Exception when message received\n", bbil_name());
165 e.print_trace();
166 return false;
167 }
168 }
169
170 void
171 bb_interface_data_refreshed(Interface *interface) noexcept
172 {
173 try {
174 if (interface == reader_) {
175 //logger_->log_debug(bbil_name(), "Copying data");
176 reader_->read();
177 writer_->copy_values(reader_);
178 writer_->write();
179 } else {
180 // Don't know why we were called, let 'em enqueue
181 printf("%s: Data changed for unknown interface", bbil_name());
182 }
183 } catch (Exception &e) {
184 printf("%s: Exception when data changed\n", bbil_name());
185 e.print_trace();
186 }
187 }
188
189private:
190 fawkes::Interface *writer_;
191 fawkes::Interface *reader_;
192
193 fawkes::BlackBoard *writer_bb_;
194 fawkes::BlackBoard *reader_bb_;
195};
196
197int
198main(int argc, char **argv)
199{
200 signal(SIGINT, signal_handler);
201
202 LocalBlackBoard *llbb = new LocalBlackBoard(BLACKBOARD_MEMSIZE);
203 BlackBoard * lbb = llbb;
204
206 fns->start();
207
208 llbb->start_nethandler(fns);
209
210 BlackBoard *rbb = new RemoteBlackBoard("localhost", 1910);
211
212 InterfaceInfoList *infl = rbb->list_all();
213 for (InterfaceInfoList::iterator i = infl->begin(); i != infl->end(); ++i) {
214 const unsigned char *hash = (*i).hash();
215 char phash[INTERFACE_HASH_SIZE_ * 2 + 1];
216 memset(phash, 0, sizeof(phash));
217 for (unsigned int j = 0; j < INTERFACE_HASH_SIZE_; ++j) {
218 sprintf(&phash[j * 2], "%02x", hash[j]);
219 }
220 printf("%s::%s (%s), w:%i r:%u s:%u\n",
221 (*i).type(),
222 (*i).id(),
223 phash,
224 (*i).has_writer(),
225 (*i).num_readers(),
226 (*i).serial());
227 }
228 delete infl;
229
230 //TestInterface *ti_writer;
231 TestInterface *ti_reader;
232 TestInterface *ti_writer;
233 try {
234 cout << "Opening interfaces.. " << flush;
235 ti_writer = rbb->open_for_writing<TestInterface>("SomeID");
236 ti_reader = rbb->open_for_reading<TestInterface>("SomeID");
237 cout << "success, "
238 << "writer hash=" << ti_writer->hash_printable()
239 << " reader hash=" << ti_reader->hash_printable() << endl;
240 } catch (Exception &e) {
241 cout << "failed! Aborting" << endl;
242 e.print_trace();
243 exit(1);
244 }
245
246 try {
247 cout << "Trying to open second writer.. " << flush;
248 TestInterface *ti_writer_two;
249 ti_writer_two = rbb->open_for_writing<TestInterface>("SomeID");
250 rbb->close(ti_writer_two);
251 cout << "BUG: Detection of second writer did NOT work!" << endl;
252 exit(2);
254 cout << "exception caught as expected, detected and prevented second writer!" << endl;
255 }
256
257 try {
258 cout << "Trying to open third writer.. " << flush;
259 TestInterface *ti_writer_three;
260 ti_writer_three = rbb->open_for_writing<TestInterface>("AnotherID");
261 cout << "No exception as expected, different ID ok!" << endl;
262 rbb->close(ti_writer_three);
264 cout << "BUG: Third writer with different ID detected as another writer!" << endl;
265 exit(3);
266 }
267
268 cout << endl
269 << endl
270 << "Running data tests ==================================================" << endl;
271
272 cout << "Writing initial value (" << TestInterface::TEST_CONSTANT << ") into interface as TestInt"
273 << endl;
274 ti_writer->set_test_int(TestInterface::TEST_CONSTANT);
275 try {
276 ti_writer->write();
277 } catch (InterfaceWriteDeniedException &e) {
278 cout << "BUG: caught write denied exception" << endl;
279 e.print_trace();
280 }
281
282 cout << "Giving some time to have value processed" << endl;
283 usleep(100000);
284
285 cout << "Reading value from reader interface.. " << flush;
286 ti_reader->read();
287 int val = ti_reader->test_int();
288 if (val == TestInterface::TEST_CONSTANT) {
289 cout << " success, value is " << ti_reader->test_int() << " as expected" << endl;
290 } else {
291 cout << " failure, value is " << ti_reader->test_int() << ", expected "
292 << TestInterface::TEST_CONSTANT << endl;
293 }
294
295 cout << "Closing interfaces.. " << flush;
296 try {
297 rbb->close(ti_reader);
298 rbb->close(ti_writer);
299 cout << "done" << endl;
300 } catch (Exception &e) {
301 cout << "failed" << endl;
302 e.print_trace();
303 }
304
305 cout << endl
306 << endl
307 << "Starting MESSAGING tests" << endl
308 << "Press Ctrl-C to continue with next test" << endl
309 << endl;
310
311 ti_writer = lbb->open_for_writing<TestInterface>("Messaging");
312 ti_reader = rbb->open_for_reading<TestInterface>("Messaging");
313
314 printf("Writer serial: %u shifted: %u\n", ti_writer->serial(), ti_writer->serial() << 16);
315 printf("Reader serial: %u shifted: %u\n", ti_reader->serial(), ti_reader->serial() << 16);
316
317 test_messaging(ti_reader, ti_writer);
318
319 rbb->close(ti_reader);
320 lbb->close(ti_writer);
321
322 cout << endl
323 << endl
324 << "Starting MESSAGING tests, doing repeater scenario" << endl
325 << "Press Ctrl-C to continue with next test" << endl
326 << endl;
327 quit = false;
328
329 delete rbb;
330
331 LocalBlackBoard *repllbb = new LocalBlackBoard(BLACKBOARD_MEMSIZE);
332
334 repfns->start();
335
336 repllbb->start_nethandler(repfns);
337
338 BlackBoard *rep_rbb = new RemoteBlackBoard("localhost", 1911);
339 rbb = new RemoteBlackBoard("localhost", 1911);
340
341 TestInterface *rep_reader;
342 TestInterface *rep_writer;
343
344 ti_writer = rbb->open_for_writing<TestInterface>("Messaging");
345 ti_reader = lbb->open_for_reading<TestInterface>("Messaging");
346
347 rep_reader = rep_rbb->open_for_reading<TestInterface>("Messaging");
348 rep_writer = lbb->open_for_writing<TestInterface>("Messaging");
349
350 printf("Writer serial: %u shifted: %u\n", ti_writer->serial(), ti_writer->serial() << 16);
351 printf("Reader serial: %u shifted: %u\n", ti_reader->serial(), ti_reader->serial() << 16);
352
353 SyncInterfaceListener *sil = new SyncInterfaceListener(rep_reader, rep_writer, rep_rbb, lbb);
354
355 test_messaging(ti_reader, ti_writer);
356
357 delete sil;
358 lbb->close(ti_reader);
359 rbb->close(ti_writer);
360 rep_rbb->close(rep_reader);
361 lbb->close(rep_writer);
362 delete repllbb;
363 delete rep_rbb;
364
365 cout << "Tests done" << endl;
366
367 delete rbb;
368 delete llbb;
369 delete fns;
370}
371
372/// @endcond
Synchronize two interfaces.
Definition: sync_listener.h:34
virtual ~SyncInterfaceListener()
Destructor.
virtual bool bb_interface_message_received(fawkes::Interface *interface, fawkes::Message *message) noexcept
BlackBoard message received notification.
virtual void bb_interface_data_refreshed(fawkes::Interface *interface) noexcept
BlackBoard data refreshed notification.
SyncInterfaceListener(fawkes::Logger *logger, fawkes::Interface *reader, fawkes::Interface *writer, fawkes::BlackBoard *reader_bb, fawkes::BlackBoard *writer_bb)
Constructor.
BlackBoard interface listener.
void bbil_add_message_interface(Interface *interface)
Add an interface to the message received watch list.
const char * bbil_name() const
Get BBIL name.
void bbil_add_data_interface(Interface *interface)
Add an interface to the data modification watch list.
Thrown if a writer is already active on an interface that writing has been requested for.
Definition: exceptions.h:125
The BlackBoard abstract class.
Definition: blackboard.h:46
virtual Interface * open_for_reading(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for reading.
virtual InterfaceInfoList * list_all()=0
Get list of all currently existing interfaces.
virtual void unregister_listener(BlackBoardInterfaceListener *listener)
Unregister BB interface listener.
Definition: blackboard.cpp:212
virtual Interface * open_for_writing(const char *interface_type, const char *identifier, const char *owner=NULL)=0
Open interface for writing.
virtual void register_listener(BlackBoardInterfaceListener *listener, ListenerRegisterFlag flag=BBIL_FLAG_ALL)
Register BB event listener.
Definition: blackboard.cpp:185
virtual void close(Interface *interface)=0
Close interface.
Base class for exceptions in Fawkes.
Definition: exception.h:36
void print_trace() noexcept
Prints trace to stderr.
Definition: exception.cpp:601
Fawkes Network Thread.
Definition: server_thread.h:49
Interface information list.
This exception is thrown if a write has been attempted on a read-only interface.
Definition: interface.h:56
Base class for all Fawkes BlackBoard interfaces.
Definition: interface.h:80
bool msgq_first_is()
Check if first message has desired type.
Definition: interface.h:351
const char * hash_printable() const
Get printable interface hash.
Definition: interface.cpp:314
void msgq_pop()
Erase first message from queue.
Definition: interface.cpp:1215
Message * msgq_first()
Get the first message from the message queue.
Definition: interface.cpp:1200
unsigned int msgq_enqueue(Message *message, bool proxy=false)
Enqueue message at end of queue.
Definition: interface.cpp:915
unsigned int msgq_size()
Get size of message queue.
Definition: interface.cpp:1045
void write()
Write from local copy into BlackBoard memory.
Definition: interface.cpp:501
Uuid serial() const
Get instance serial of interface.
Definition: interface.cpp:695
virtual void copy_values(const Interface *interface)=0
Copy values from another interface.
void msgq_flush()
Flush all messages.
Definition: interface.cpp:1079
void read()
Read from BlackBoard into local copy.
Definition: interface.cpp:479
Local BlackBoard.
Definition: local.h:45
virtual void start_nethandler(FawkesNetworkHub *hub)
Start network handler.
Definition: local.cpp:194
Base class for all messages passed through interfaces in Fawkes BlackBoard.
Definition: message.h:44
const char * type() const
Get message type.
Definition: message.cpp:381
const Time * time_enqueued() const
Get time when message was enqueued.
Definition: message.cpp:262
void set_hops(unsigned int hops)
Set number of hops.
Definition: message.cpp:227
unsigned int id() const
Get message ID.
Definition: message.cpp:181
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:95
void ref()
Increment reference count.
Definition: refcount.cpp:67
Remote BlackBoard.
Definition: remote.h:50
SetTestIntMessage Fawkes BlackBoard Interface Message.
Definition: TestInterface.h:69
virtual Message * clone() const
Clone this message.
int32_t test_int() const
Get test_int value.
SetTestStringMessage Fawkes BlackBoard Interface Message.
Definition: TestInterface.h:95
TestInterface Fawkes BlackBoard Interface.
Definition: TestInterface.h:34
void set_test_int(const int32_t new_test_int)
Set test_int value.
int32_t test_int() const
Get test_int value.
void start(bool wait=true)
Call this method to start the thread.
Definition: thread.cpp:499
const char * str(bool utc=false) const
Output function.
Definition: time.cpp:790
Fawkes library namespace.