Fawkes API Fawkes Development Version
fuse_client.cpp
1
2/***************************************************************************
3 * fuse_client.cpp - FUSE network transport client
4 *
5 * Created: Thu Mar 29 00:47:24 2007
6 * Copyright 2005-2007 Tim Niemueller [www.niemueller.de]
7 *
8 ****************************************************************************/
9
10/* This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version. A runtime exception applies to
14 * this software (see LICENSE.GPL_WRE file mentioned below for details).
15 *
16 * This program is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU Library General Public License for more details.
20 *
21 * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22 */
23
24#include <core/exceptions/software.h>
25#include <core/threading/mutex.h>
26#include <core/threading/wait_condition.h>
27#include <fvutils/net/fuse_client.h>
28#include <fvutils/net/fuse_client_handler.h>
29#include <fvutils/net/fuse_message.h>
30#include <fvutils/net/fuse_message_queue.h>
31#include <fvutils/net/fuse_transceiver.h>
32#include <netcomm/socket/stream.h>
33#include <netcomm/utils/exceptions.h>
34#include <netinet/in.h>
35
36#include <cstdlib>
37#include <cstring>
38#include <unistd.h>
39
40using namespace fawkes;
41
42namespace firevision {
43
44/** @class FuseClient <fvutils/net/fuse_client.h>
45 * FUSE client.
46 * FUSE is the FireVision protocol to retrieve information, images and lookup
47 * tables from vision processes and to send control commands to these systems.
48 * The client is used in the retrieving or controlling process.
49 * @ingroup FUSE
50 * @ingroup FireVision
51 * @author Tim Niemueller
52 */
53
54/** Constructor.
55 * @param hostname host to connect to
56 * @param port port to connect to
57 * @param handler client handler to handle incoming data
58 */
59FuseClient::FuseClient(const char *hostname, unsigned short int port, FuseClientHandler *handler)
60: Thread("FuseClient")
61{
62 hostname_ = strdup(hostname);
63 port_ = port;
64 handler_ = handler;
65
66 wait_timeout_ = 10;
67
68 inbound_msgq_ = new FuseNetworkMessageQueue();
69 outbound_msgq_ = new FuseNetworkMessageQueue();
70
71 mutex_ = new Mutex();
72 recv_mutex_ = new Mutex();
73 recv_waitcond_ = new WaitCondition(recv_mutex_);
74 socket_ = new StreamSocket();
75 greeting_mutex_ = new Mutex();
76 greeting_waitcond_ = new WaitCondition(greeting_mutex_);
77
78 alive_ = true;
79 greeting_received_ = false;
80}
81
82/** Destructor. */
84{
85 free(hostname_);
86
87 while (!inbound_msgq_->empty()) {
88 FuseNetworkMessage *m = inbound_msgq_->front();
89 m->unref();
90 inbound_msgq_->pop();
91 }
92 delete inbound_msgq_;
93
94 while (!outbound_msgq_->empty()) {
95 FuseNetworkMessage *m = outbound_msgq_->front();
96 m->unref();
97 outbound_msgq_->pop();
98 }
99 delete outbound_msgq_;
100
101 delete mutex_;
102 delete recv_mutex_;
103 delete recv_waitcond_;
104 delete socket_;
105 delete greeting_mutex_;
106 delete greeting_waitcond_;
107}
108
109/** Connect. */
110void
112{
113 socket_->connect(hostname_, port_);
114
115 FUSE_greeting_message_t *greetmsg =
117 greetmsg->version = htonl(FUSE_CURRENT_VERSION);
118 outbound_msgq_->push(
119 new FuseNetworkMessage(FUSE_MT_GREETING, greetmsg, sizeof(FUSE_greeting_message_t)));
120}
121
122/** Disconnect. */
123void
125{
126 mutex_->lock();
127 delete socket_;
128 socket_ = new StreamSocket();
129 alive_ = false;
130 mutex_->unlock();
131}
132
133/** Send queued messages. */
134void
135FuseClient::send()
136{
137 try {
138 FuseNetworkTransceiver::send(socket_, outbound_msgq_);
139 } catch (ConnectionDiedException &e) {
140 e.print_trace();
141 socket_->close();
142 alive_ = false;
143 handler_->fuse_connection_died();
144 recv_waitcond_->wake_all();
145 }
146}
147
148/** Receive messages. */
149void
150FuseClient::recv()
151{
152 recv_mutex_->lock();
153 try {
154 while (socket_->available()) {
155 FuseNetworkTransceiver::recv(socket_, inbound_msgq_);
156 }
157 } catch (ConnectionDiedException &e) {
158 e.print_trace();
159 socket_->close();
160 alive_ = false;
161 handler_->fuse_connection_died();
162 recv_waitcond_->wake_all();
163 }
164 recv_mutex_->unlock();
165}
166
167/** Enqueue message.
168 * This method takes ownership of the passed message. You must explicitly
169 * reference it before enqueing if you want to use it afterwards.
170 * @param m message to enqueue
171 */
172void
174{
175 outbound_msgq_->push_locked(m);
176}
177
178/** Enqueue message.
179 * @param type type of message
180 * @param payload payload of message
181 * @param payload_size size of payload
182 */
183void
184FuseClient::enqueue(FUSE_message_type_t type, void *payload, size_t payload_size)
185{
186 FuseNetworkMessage *m = new FuseNetworkMessage(type, payload, payload_size);
187 outbound_msgq_->push_locked(m);
188}
189
190/** Enqueue message without payload.
191 * @param type type of message
192 */
193void
194FuseClient::enqueue(FUSE_message_type_t type)
195{
197 outbound_msgq_->push_locked(m);
198}
199
200/** Enqueue message and wait for reply.
201 * The wait happens atomically, use this to avoid race conditions. This method
202 * takes ownership of the passed message. You must explicitly reference it
203 * before enqueing if you want to use it afterwards.
204 * @param m message to enqueue
205 */
206void
208{
209 recv_mutex_->lock();
210 outbound_msgq_->push_locked(m);
211 recv_waitcond_->wait();
212 recv_mutex_->unlock();
213}
214
215/** Enqueue message and wait for reply.
216 * The wait happens atomically, use this to avoid race conditions.
217 * @param type type of message
218 * @param payload payload of message
219 * @param payload_size size of payload
220 */
221void
222FuseClient::enqueue_and_wait(FUSE_message_type_t type, void *payload, size_t payload_size)
223{
224 FuseNetworkMessage *m = new FuseNetworkMessage(type, payload, payload_size);
225 recv_mutex_->lock();
226 outbound_msgq_->push_locked(m);
227 recv_waitcond_->wait();
228 recv_mutex_->unlock();
229}
230
231/** Enqueue message without payload and wait for reply.
232 * The wait happens atomically, use this to avoid race conditions.
233 * @param type type of message
234 */
235void
236FuseClient::enqueue_and_wait(FUSE_message_type_t type)
237{
239 recv_mutex_->lock();
240 outbound_msgq_->push_locked(m);
241 recv_waitcond_->wait();
242 recv_mutex_->unlock();
243}
244
245/** Sleep for some time.
246 * Wait until inbound messages have been receive, the connection dies or the
247 * timeout has been reached, whatever comes first. So you sleep at most timeout ms,
248 * but short under some circumstances (incoming data or lost connection).
249 */
250void
251FuseClient::sleep()
252{
253 try {
254 socket_->poll(wait_timeout_ /* ms timeout */, Socket::POLL_IN);
255 } catch (Exception &e) {
256 }
257}
258
259/** Thread loop.
260 * Sends enqueued messages and reads incoming messages off the network.
261 */
262void
264{
265 mutex_->lock();
266
267 if (!alive_) {
268 mutex_->unlock();
269 usleep(10000);
270 return;
271 }
272
273 bool wake = false;
274
275 send();
276 sleep();
277 recv();
278
279 //process_inbound();
280
281 inbound_msgq_->lock();
282 while (!inbound_msgq_->empty()) {
283 FuseNetworkMessage *m = inbound_msgq_->front();
284
285 if (m->type() == FUSE_MT_GREETING) {
287 if (ntohl(gm->version) != FUSE_CURRENT_VERSION) {
288 handler_->fuse_invalid_server_version(FUSE_CURRENT_VERSION, ntohl(gm->version));
289 alive_ = false;
290 } else {
291 greeting_mutex_->lock();
292 greeting_received_ = true;
293 greeting_waitcond_->wake_all();
294 greeting_mutex_->unlock();
295 handler_->fuse_connection_established();
296 }
297 } else {
298 handler_->fuse_inbound_received(m);
299 wake = true;
300 }
301
302 m->unref();
303 inbound_msgq_->pop();
304 }
305 inbound_msgq_->unlock();
306
307 if (wake) {
308 recv_waitcond_->wake_all();
309 }
310 mutex_->unlock();
311}
312
313/** Wait for messages.
314 * This will wait for messages to arrive. The calling
315 * thread is blocked until messages are available.
316 */
317void
319{
320 recv_mutex_->lock();
321 recv_waitcond_->wait();
322 recv_mutex_->unlock();
323}
324
325/** Wait for greeting message.
326 * This method will wait for the greeting message to arrive. Make sure that you called
327 * connect() before waiting or call it concurrently in another thread. The calling thread
328 * will be blocked until the message has been received. If the message has already been
329 * received this method will return immediately. Thus it is safe to call this at any time
330 * without risking a race condition.
331 */
332void
334{
335 greeting_mutex_->lock();
336 while (!greeting_received_) {
337 greeting_waitcond_->wait();
338 }
339 greeting_mutex_->unlock();
340}
341
342} // end namespace firevision
Thrown if the connection died during an operation.
Definition: exceptions.h:32
Base class for exceptions in Fawkes.
Definition: exception.h:36
void print_trace() noexcept
Prints trace to stderr.
Definition: exception.cpp:601
void push_locked(const Type &x)
Push element to queue with lock protection.
Definition: lock_queue.h:135
void lock() const
Lock queue.
Definition: lock_queue.h:114
void unlock() const
Unlock list.
Definition: lock_queue.h:128
Mutex mutual exclusion lock.
Definition: mutex.h:33
void lock()
Lock this mutex.
Definition: mutex.cpp:87
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:95
virtual bool available()
Check if data is available.
Definition: socket.cpp:644
virtual void connect(const char *hostname, const unsigned short int port)
Connect socket.
Definition: socket.cpp:376
virtual short poll(int timeout=-1, short what=POLL_IN|POLL_HUP|POLL_PRI|POLL_RDHUP)
Wait for some event on socket.
Definition: socket.cpp:685
virtual void close()
Close socket.
Definition: socket.cpp:311
TCP stream socket over IP.
Definition: stream.h:32
Thread class encapsulation of pthreads.
Definition: thread.h:46
Wait until a given condition holds.
void wait()
Wait for the condition forever.
void wake_all()
Wake up all waiting threads.
virtual void fuse_connection_died() noexcept=0
Connection died.
virtual void fuse_inbound_received(FuseNetworkMessage *m) noexcept=0
Message received.
virtual void fuse_invalid_server_version(uint32_t local_version, uint32_t remote_version) noexcept=0
Invalid version string received.
virtual void fuse_connection_established() noexcept=0
Connection has been established.
virtual void loop()
Thread loop.
FuseClient(const char *hostname, unsigned short int port, FuseClientHandler *handler)
Constructor.
Definition: fuse_client.cpp:59
void wait_greeting()
Wait for greeting message.
void enqueue_and_wait(FuseNetworkMessage *message)
Enqueue message and wait for reply.
void enqueue(FuseNetworkMessage *m)
Enqueue message.
void connect()
Connect.
void wait()
Wait for messages.
void disconnect()
Disconnect.
virtual ~FuseClient()
Destructor.
Definition: fuse_client.cpp:83
A LockQueue of FuseNetworkMessage to hold messages in inbound and outbound queues.
FUSE Network Message.
Definition: fuse_message.h:40
MT * msg() const
Get correctly casted payload.
Definition: fuse_message.h:67
uint32_t type() const
Get message type.
static void send(fawkes::StreamSocket *s, FuseNetworkMessageQueue *msgq)
Send messages.
static void recv(fawkes::StreamSocket *s, FuseNetworkMessageQueue *msgq, unsigned int max_num_msgs=8)
Receive data.
Fawkes library namespace.
version packet, bi-directional
Definition: fuse.h:98
uint32_t version
version from FUSE_version_t
Definition: fuse.h:99