Fawkes API Fawkes Development Version
server_client_thread.cpp
1
2/***************************************************************************
3 * server_client_thread.cpp - Thread handling Fawkes network client
4 *
5 * Created: Fri Nov 17 17:23:24 2006
6 * Copyright 2006-2007 Tim Niemueller [www.niemueller.de]
7 *
8 ****************************************************************************/
9
10/* This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version. A runtime exception applies to
14 * this software (see LICENSE.GPL_WRE file mentioned below for details).
15 *
16 * This program is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU Library General Public License for more details.
20 *
21 * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22 */
23
24#include <core/exceptions/system.h>
25#include <core/threading/mutex.h>
26#include <core/threading/wait_condition.h>
27#include <netcomm/fawkes/message_queue.h>
28#include <netcomm/fawkes/server_client_thread.h>
29#include <netcomm/fawkes/server_thread.h>
30#include <netcomm/fawkes/transceiver.h>
31#include <netcomm/socket/stream.h>
32#include <netcomm/utils/exceptions.h>
33
34#include <unistd.h>
35
36namespace fawkes {
37
38/** @class FawkesNetworkServerClientSendThread <netcomm/fawkes/server_client_thread.h>
39 * Sending thread for a Fawkes client connected to the server.
40 * This thread is spawned for each client connected to the server to handle the
41 * server-side sending
42 * @ingroup NetComm
43 * @author Tim Niemueller
44 */
45
47{
48public:
49 /** Constructor.
50 * @param s client stream socket
51 * @param parent parent FawkesNetworkServerClientThread instance
52 */
54 : Thread("FawkesNetworkServerClientSendThread", Thread::OPMODE_WAITFORWAKEUP)
55 {
56 s_ = s;
57 parent_ = parent;
58 outbound_mutex_ = new Mutex();
59 outbound_msgqs_[0] = new FawkesNetworkMessageQueue();
60 outbound_msgqs_[1] = new FawkesNetworkMessageQueue();
61 outbound_active_ = 0;
62 outbound_msgq_ = outbound_msgqs_[0];
63 }
64
65 /** Destructor. */
67 {
68 for (unsigned int i = 0; i < 2; ++i) {
69 while (!outbound_msgqs_[i]->empty()) {
70 FawkesNetworkMessage *m = outbound_msgqs_[i]->front();
71 m->unref();
72 outbound_msgqs_[i]->pop();
73 }
74 }
75 delete outbound_msgqs_[0];
76 delete outbound_msgqs_[1];
77 delete outbound_mutex_;
78 }
79
80 virtual void
82 {
83 if (!parent_->alive())
84 return;
85
86 while (outbound_havemore_) {
87 outbound_mutex_->lock();
88 outbound_havemore_ = false;
89 FawkesNetworkMessageQueue *q = outbound_msgq_;
90 outbound_active_ = 1 - outbound_active_;
91 outbound_msgq_ = outbound_msgqs_[outbound_active_];
92 outbound_mutex_->unlock();
93
94 if (!q->empty()) {
95 try {
97 } catch (ConnectionDiedException &e) {
98 parent_->connection_died();
99 exit();
100 }
101 }
102 }
103 }
104
105 /** Enqueue message to outbound queue.
106 * This enqueues the given message to the outbound queue. The message will
107 * be sent in the next loop iteration. This method takes ownership of the
108 * transmitted message. If you want to use the message after enqueuing you
109 * must reference it explicitly.
110 * @param msg message to enqueue
111 */
112 void
114 {
115 outbound_mutex_->lock();
116 outbound_msgq_->push(msg);
117 outbound_havemore_ = true;
118 outbound_mutex_->unlock();
119 wakeup();
120 }
121
122 /** Wait until all data has been sent. */
123 void
125 {
126 loop_mutex->lock();
128 }
129
130 /** Stub to see name in backtrace for easier debugging. @see Thread::run() */
131protected:
132 virtual void
134 {
135 Thread::run();
136 }
137
138private:
139 StreamSocket * s_;
141
142 Mutex * outbound_mutex_;
143 unsigned int outbound_active_;
144 bool outbound_havemore_;
145 FawkesNetworkMessageQueue *outbound_msgq_;
146 FawkesNetworkMessageQueue *outbound_msgqs_[2];
147};
148
149/** @class FawkesNetworkServerClientThread netcomm/fawkes/server_client_thread.h
150 * Fawkes Network Client Thread for server.
151 * The FawkesNetworkServerThread spawns an instance of this class for every incoming
152 * connection. It is then used to handle the client.
153 * The thread will start another thread, an instance of
154 * FawkesNetworkServerClientSendThread. This will be used to handle all outgoing
155 * traffic.
156 *
157 * @ingroup NetComm
158 * @author Tim Niemueller
159 */
160
161/** Constructor.
162 * @param s socket to client
163 * @param parent parent network thread
164 */
167: Thread("FawkesNetworkServerClientThread")
168{
169 _s = s;
170 _parent = parent;
171 _alive = true;
172 _clid = 0;
173 _inbound_queue = new FawkesNetworkMessageQueue();
174
175 _send_slave = new FawkesNetworkServerClientSendThread(_s, this);
176
178}
179
180/** Destructor. */
182{
183 _send_slave->cancel();
184 _send_slave->join();
185 delete _send_slave;
186 delete _s;
187 delete _inbound_queue;
188}
189
190/** Get client ID.
191 * The client ID can be used to send replies.
192 * @return client ID
193 */
194unsigned int
196{
197 return _clid;
198}
199
200/** Set client ID.
201 * @param client_id new client ID
202 */
203void
205{
206 _clid = client_id;
207}
208
209/** Receive data.
210 * Receives data from the network if there is any and then dispatches all
211 * inbound messages via the parent FawkesNetworkThread::dispatch()
212 */
213void
214FawkesNetworkServerClientThread::recv()
215{
216 try {
217 FawkesNetworkTransceiver::recv(_s, _inbound_queue);
218
219 _inbound_queue->lock();
220 while (!_inbound_queue->empty()) {
221 FawkesNetworkMessage *m = _inbound_queue->front();
222 m->set_client_id(_clid);
223 _parent->dispatch(m);
224 m->unref();
225 _inbound_queue->pop();
226 }
227 _parent->wakeup();
228 _inbound_queue->unlock();
229
230 } catch (ConnectionDiedException &e) {
231 _alive = false;
232 _s->close();
233 _parent->wakeup();
234 }
235}
236
237void
239{
240 _send_slave->start();
241}
242
243/** Thread loop.
244 * The client thread loop polls on the socket for 10 ms (wait for events
245 * on the socket like closed connection or data that can be read). If any
246 * event occurs it is processed. If the connection died or any other
247 * error occured the thread is cancelled and the parent FawkesNetworkThread
248 * is woken up to carry out any action that is needed when a client dies.
249 * If data is available for reading thedata is received and dispatched
250 * via recv().
251 * Afterwards the outbound message queue is processed and alle messages are
252 * sent. This is also done if the operation could block (POLL_OUT is not
253 * honored).
254 */
255void
257{
258 if (!_alive) {
259 usleep(1000000);
260 return;
261 }
262
263 short p = 0;
264 try {
265 p = _s->poll(); // block until we got a message
266 } catch (InterruptedException &e) {
267 // we just ignore this and try it again
268 return;
269 }
270
271 if ((p & Socket::POLL_ERR) || (p & Socket::POLL_HUP) || (p & Socket::POLL_RDHUP)) {
272 _alive = false;
273 _parent->wakeup();
274 } else if (p & Socket::POLL_IN) {
275 // Data can be read
276 recv();
277 }
278}
279
280/** Enqueue message to outbound queue.
281 * This enqueues the given message to the outbound queue. The message will be send
282 * in the next loop iteration.
283 * @param msg message to enqueue
284 */
285void
287{
288 _send_slave->enqueue(msg);
289}
290
291/** Check aliveness of connection.
292 * @return true if connection is still alive, false otherwise.
293 */
294bool
296{
297 return _alive;
298}
299
300/** Force sending of all pending outbound messages.
301 * This is a blocking operation. The current poll will be interrupted by sending
302 * a signal to this thread (and ignoring it) and then wait for the sending to
303 * finish.
304 */
305void
307{
308 _send_slave->wait_for_all_sent();
309}
310
311/** Connection died notification.
312 * To be called only be the send slave thread.
313 */
314void
316{
317 _alive = false;
318 _parent->wakeup();
319}
320
321} // end namespace fawkes
Thrown if the connection died during an operation.
Definition: exceptions.h:32
A LockQueue of FawkesNetworkMessage to hold messages in inbound and outbound queues.
Definition: message_queue.h:33
Representation of a message that is sent over the network.
Definition: message.h:77
void set_client_id(unsigned int clid)
Set client ID.
Definition: message.cpp:330
Sending thread for a Fawkes client connected to the server.
virtual void loop()
Code to execute in the thread.
FawkesNetworkServerClientSendThread(StreamSocket *s, FawkesNetworkServerClientThread *parent)
Constructor.
void wait_for_all_sent()
Wait until all data has been sent.
void enqueue(FawkesNetworkMessage *msg)
Enqueue message to outbound queue.
virtual void run()
Stub to see name in backtrace for easier debugging.
Fawkes Network Client Thread for server.
unsigned int clid() const
Get client ID.
void force_send()
Force sending of all pending outbound messages.
bool alive() const
Check aliveness of connection.
void set_clid(unsigned int client_id)
Set client ID.
void connection_died()
Connection died notification.
void enqueue(FawkesNetworkMessage *msg)
Enqueue message to outbound queue.
FawkesNetworkServerClientThread(StreamSocket *s, FawkesNetworkServerThread *parent)
Constructor.
virtual void once()
Execute an action exactly once.
Fawkes Network Thread.
Definition: server_thread.h:49
void dispatch(FawkesNetworkMessage *msg)
Dispatch messages.
static void send(StreamSocket *s, FawkesNetworkMessageQueue *msgq)
Send messages.
Definition: transceiver.cpp:51
static void recv(StreamSocket *s, FawkesNetworkMessageQueue *msgq, unsigned int max_num_msgs=8)
Receive data.
Definition: transceiver.cpp:85
The current system call has been interrupted (for instance by a signal).
Definition: system.h:39
void lock() const
Lock queue.
Definition: lock_queue.h:114
void unlock() const
Unlock list.
Definition: lock_queue.h:128
Mutex mutual exclusion lock.
Definition: mutex.h:33
void lock()
Lock this mutex.
Definition: mutex.cpp:87
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:95
static const short POLL_RDHUP
Stream socket peer closed connection, or shut down writing half of connection.
Definition: socket.h:69
static const short POLL_HUP
Hang up.
Definition: socket.h:71
static const short POLL_IN
Data can be read.
Definition: socket.h:66
virtual short poll(int timeout=-1, short what=POLL_IN|POLL_HUP|POLL_PRI|POLL_RDHUP)
Wait for some event on socket.
Definition: socket.cpp:685
static const short POLL_ERR
Error condition.
Definition: socket.h:70
virtual void close()
Close socket.
Definition: socket.cpp:311
TCP stream socket over IP.
Definition: stream.h:32
Thread class encapsulation of pthreads.
Definition: thread.h:46
void set_prepfin_conc_loop(bool concurrent=true)
Set concurrent execution of prepare_finalize() and loop().
Definition: thread.cpp:716
Mutex * loop_mutex
Mutex that is used to protect a call to loop().
Definition: thread.h:152
void start(bool wait=true)
Call this method to start the thread.
Definition: thread.cpp:499
void join()
Join the thread.
Definition: thread.cpp:597
void exit()
Exit the thread.
Definition: thread.cpp:582
void wakeup()
Wake up thread.
Definition: thread.cpp:995
void cancel()
Cancel a thread.
Definition: thread.cpp:646
virtual void run()
Code to execute in the thread.
Definition: thread.cpp:918
@ OPMODE_WAITFORWAKEUP
operate in wait-for-wakeup mode
Definition: thread.h:58
Fawkes library namespace.