Fawkes API  Fawkes Development Version
server_thread.cpp
1 
2 /***************************************************************************
3  * server_thread.cpp - Fawkes Network Protocol (server part)
4  *
5  * Created: Sun Nov 19 15:08:30 2006
6  * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7  *
8  ****************************************************************************/
9 
10 /* This program is free software; you can redistribute it and/or modify
11  * it under the terms of the GNU General Public License as published by
12  * the Free Software Foundation; either version 2 of the License, or
13  * (at your option) any later version. A runtime exception applies to
14  * this software (see LICENSE.GPL_WRE file mentioned below for details).
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19  * GNU Library General Public License for more details.
20  *
21  * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22  */
23 
24 #include <core/exception.h>
25 #include <core/threading/mutex.h>
26 #include <core/threading/mutex_locker.h>
27 #include <core/threading/thread_collector.h>
28 #include <netcomm/fawkes/handler.h>
29 #include <netcomm/fawkes/message.h>
30 #include <netcomm/fawkes/message_content.h>
31 #include <netcomm/fawkes/message_queue.h>
32 #include <netcomm/fawkes/server_client_thread.h>
33 #include <netcomm/fawkes/server_thread.h>
34 #include <netcomm/utils/acceptor_thread.h>
35 
36 #include <unistd.h>
37 
38 namespace fawkes {
39 
40 /** @class FawkesNetworkServerThread <netcomm/fawkes/server_thread.h>
41  * Fawkes Network Thread.
42  * Maintains a list of clients and reacts on events triggered by the clients.
43  * Also runs the acceptor thread.
44  *
45  * @ingroup NetComm
46  * @author Tim Niemueller
47  */
48 
49 /** Constructor.
50  * @param enable_ipv4 true to listen on the IPv4 TCP port
51  * @param enable_ipv6 true to listen on the IPv6 TCP port
52  * @param listen_ipv4 IPv4 address to listen on for incoming connections,
53  * 0.0.0.0 to listen on any local address
54  * @param listen_ipv6 IPv6 address to listen on for incoming connections,
55  * :: to listen on any local address
56  * @param fawkes_port port for Fawkes network protocol
57  * @param thread_collector thread collector to register new threads with
58  */
60  bool enable_ipv6,
61  const std::string &listen_ipv4,
62  const std::string &listen_ipv6,
63  unsigned int fawkes_port,
64  ThreadCollector * thread_collector)
65 : Thread("FawkesNetworkServerThread", Thread::OPMODE_WAITFORWAKEUP)
66 {
67  this->thread_collector = thread_collector;
68  clients.clear();
69  next_client_id = 1;
70  inbound_messages = new FawkesNetworkMessageQueue();
71 
72  if (enable_ipv4) {
73  acceptor_threads.push_back(new NetworkAcceptorThread(
74  this, Socket::IPv4, listen_ipv4, fawkes_port, "FawkesNetworkAcceptorThread"));
75  }
76  if (enable_ipv6) {
77  acceptor_threads.push_back(new NetworkAcceptorThread(
78  this, Socket::IPv6, listen_ipv6, fawkes_port, "FawkesNetworkAcceptorThread"));
79  }
80 
81  if (thread_collector) {
82  for (size_t i = 0; i < acceptor_threads.size(); ++i) {
83  thread_collector->add(acceptor_threads[i]);
84  }
85  } else {
86  for (size_t i = 0; i < acceptor_threads.size(); ++i) {
87  acceptor_threads[i]->start();
88  }
89  }
90 }
91 
92 /** Destructor. */
94 {
95  for (cit = clients.begin(); cit != clients.end(); ++cit) {
96  if (thread_collector) {
97  thread_collector->remove((*cit).second);
98  } else {
99  (*cit).second->cancel();
100  (*cit).second->join();
101  }
102  delete (*cit).second;
103  }
104  for (size_t i = 0; i < acceptor_threads.size(); ++i) {
105  if (thread_collector) {
106  thread_collector->remove(acceptor_threads[i]);
107  } else {
108  acceptor_threads[i]->cancel();
109  acceptor_threads[i]->join();
110  }
111  delete acceptor_threads[i];
112  }
113  acceptor_threads.clear();
114 
115  delete inbound_messages;
116 }
117 
118 /** Add a new connection.
119  * Called by the NetworkAcceptorThread if a new client connected.
120  * @param s socket for new client
121  */
122 void
124 {
126 
127  clients.lock();
128  client->set_clid(next_client_id);
129  if (thread_collector) {
130  thread_collector->add(client);
131  } else {
132  client->start();
133  }
134  unsigned int cid = next_client_id++;
135  clients[cid] = client;
136  clients.unlock();
137 
138  MutexLocker handlers_lock(handlers.mutex());
139  for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
140  (*hit).second->client_connected(cid);
141  }
142  handlers_lock.unlock();
143 
144  wakeup();
145 }
146 
147 /** Add a handler.
148  * @param handler to add.
149  */
150 void
152 {
153  MutexLocker handlers_lock(handlers.mutex());
154  if (handlers.find(handler->id()) != handlers.end()) {
155  throw Exception("Handler already registered");
156  }
157  handlers[handler->id()] = handler;
158 }
159 
160 /** Remove handler.
161  * @param handler handler to remove
162  */
163 void
165 {
166  MutexLocker handlers_lock(handlers.mutex());
167  if (handlers.find(handler->id()) != handlers.end()) {
168  handlers.erase(handler->id());
169  }
170 }
171 
172 /** Fawkes network thread loop.
173  * The thread loop will check all clients for their alivness and dead
174  * clients are removed. Then inbound messages are processed and dispatched
175  * properly to registered handlers. Then the thread waits for a new event
176  * to happen (event emitting threads need to wakeup this thread!).
177  */
178 void
180 {
181  std::list<unsigned int> dead_clients;
182  clients.lock();
183  // check for dead clients
184  for (cit = clients.begin(); cit != clients.end(); ++cit) {
185  if (!cit->second->alive()) {
186  dead_clients.push_back(cit->first);
187  }
188  }
189  clients.unlock();
190 
191  std::list<unsigned int>::iterator dci;
192  for (dci = dead_clients.begin(); dci != dead_clients.end(); ++dci) {
193  const unsigned int clid = *dci;
194 
195  {
196  MutexLocker handlers_lock(handlers.mutex());
197  for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
198  (*hit).second->client_disconnected(clid);
199  }
200  }
201 
202  {
203  MutexLocker clients_lock(clients.mutex());
204  if (thread_collector) {
205  thread_collector->remove(clients[clid]);
206  } else {
207  clients[clid]->cancel();
208  clients[clid]->join();
209  }
210  usleep(5000);
211  delete clients[clid];
212  clients.erase(clid);
213  }
214  }
215 
216  // dispatch messages
217  inbound_messages->lock();
218  while (!inbound_messages->empty()) {
219  FawkesNetworkMessage *m = inbound_messages->front();
220  {
221  MutexLocker handlers_lock(handlers.mutex());
222  if (handlers.find(m->cid()) != handlers.end()) {
223  handlers[m->cid()]->handle_network_message(m);
224  }
225  }
226  m->unref();
227  inbound_messages->pop();
228  }
229  inbound_messages->unlock();
230 }
231 
232 /** Force sending of all pending messages. */
233 void
235 {
236  clients.lock();
237  for (cit = clients.begin(); cit != clients.end(); ++cit) {
238  (*cit).second->force_send();
239  }
240  clients.unlock();
241 }
242 
243 /** Broadcast a message.
244  * Method to broadcast a message to all connected clients. This method will take
245  * ownership of the passed message. If you want to use if after enqueing it you
246  * must reference it explicitly before calling this method.
247  * @param msg Message to broadcast
248  */
249 void
251 {
252  clients.lock();
253  for (cit = clients.begin(); cit != clients.end(); ++cit) {
254  if ((*cit).second->alive()) {
255  msg->ref();
256  (*cit).second->enqueue(msg);
257  }
258  }
259  clients.unlock();
260  msg->unref();
261 }
262 
263 /** Broadcast a message.
264  * A FawkesNetworkMessage is created and broacasted via the emitter.
265  * @param component_id component ID
266  * @param msg_id message type id
267  * @param payload payload buffer
268  * @param payload_size size of payload buffer
269  * @see FawkesNetworkEmitter::broadcast()
270  */
271 void
272 FawkesNetworkServerThread::broadcast(unsigned short int component_id,
273  unsigned short int msg_id,
274  void * payload,
275  unsigned int payload_size)
276 {
277  FawkesNetworkMessage *m = new FawkesNetworkMessage(component_id, msg_id, payload, payload_size);
278  broadcast(m);
279 }
280 
281 /** Broadcast message without payload.
282  * @param component_id component ID
283  * @param msg_id message type ID
284  */
285 void
286 FawkesNetworkServerThread::broadcast(unsigned short int component_id, unsigned short int msg_id)
287 {
288  FawkesNetworkMessage *m = new FawkesNetworkMessage(component_id, msg_id);
289  broadcast(m);
290 }
291 
292 /** Send a message.
293  * Method to send a message to a specific client.
294  * The client ID provided in the message is used to determine the correct
295  * recipient. If no client is connected for the given client ID the message
296  * shall be silently ignored.
297  * This method will take ownership of the passed message. If you want to use
298  * if after enqueing it you must reference it explicitly before calling this
299  * method.
300  * Implemented Emitter interface message.
301  * @param msg Message to send
302  */
303 void
305 {
306  MutexLocker lock(clients.mutex());
307  unsigned int clid = msg->clid();
308  if (clients.find(clid) != clients.end()) {
309  if (clients[clid]->alive()) {
310  clients[clid]->enqueue(msg);
311  } else {
312  throw Exception("Client %u not alive", clid);
313  }
314  } else {
315  throw Exception("Client %u not found", clid);
316  }
317 }
318 
319 /** Send a message.
320  * A FawkesNetworkMessage is created and sent via the emitter.
321  * @param to_clid client ID of recipient
322  * @param component_id component ID
323  * @param msg_id message type id
324  * @param payload payload buffer
325  * @param payload_size size of payload buffer
326  * @see FawkesNetworkEmitter::broadcast()
327  */
328 void
329 FawkesNetworkServerThread::send(unsigned int to_clid,
330  unsigned short int component_id,
331  unsigned short int msg_id,
332  void * payload,
333  unsigned int payload_size)
334 {
336  new FawkesNetworkMessage(to_clid, component_id, msg_id, payload, payload_size);
337  send(m);
338 }
339 
340 /** Send a message.
341  * A FawkesNetworkMessage is created and sent via the emitter.
342  * @param to_clid client ID of recipient
343  * @param component_id component ID
344  * @param msg_id message type id
345  * @param content Fawkes complex network message content
346  * @see FawkesNetworkEmitter::broadcast()
347  */
348 void
349 FawkesNetworkServerThread::send(unsigned int to_clid,
350  unsigned short int component_id,
351  unsigned short int msg_id,
353 {
354  FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id, content);
355  send(m);
356 }
357 
358 /** Send a message without payload.
359  * A FawkesNetworkMessage with empty payload is created and sent via the emitter.
360  * This is particularly useful for simple status messages that you want to send.
361  * @param to_clid client ID of recipient
362  * @param component_id component ID
363  * @param msg_id message type id
364  * @see FawkesNetworkEmitter::broadcast()
365  */
366 void
367 FawkesNetworkServerThread::send(unsigned int to_clid,
368  unsigned short int component_id,
369  unsigned short int msg_id)
370 {
371  FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id);
372  send(m);
373 }
374 
375 /** Dispatch messages.
376  * Actually messages are just put into the inbound message queue and dispatched
377  * during the next loop iteration. So after adding all the messages you have
378  * to wakeup the thread to get them actually dispatched.
379  * @param msg message to dispatch
380  */
381 void
383 {
384  msg->ref();
385  inbound_messages->push_locked(msg);
386 }
387 
388 } // end namespace fawkes
Fawkes Network Client Thread for server.
A LockQueue of FawkesNetworkMessage to hold messages in inbound and outbound queues.
Definition: message_queue.h:32
Network Acceptor Thread.
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:95
virtual void broadcast(FawkesNetworkMessage *msg)
Broadcast a message.
void dispatch(FawkesNetworkMessage *msg)
Dispatch messages.
void unlock() const
Unlock list.
Definition: lock_queue.h:128
unsigned short int cid() const
Get component ID.
Definition: message.cpp:285
virtual void remove(ThreadList &tl)=0
Remove multiple threads.
Fawkes library namespace.
unsigned int clid() const
Get client ID.
Definition: message.cpp:276
Mutex locking helper.
Definition: mutex_locker.h:33
Representation of a message that is sent over the network.
Definition: message.h:76
Thread class encapsulation of pthreads.
Definition: thread.h:45
virtual void send(FawkesNetworkMessage *msg)
Send a message.
unsigned short int id() const
Get the component ID for this handler.
Definition: handler.cpp:76
TCP stream socket over IP.
Definition: stream.h:31
Fawkes network message content.
void add_connection(StreamSocket *s)
Add a new connection.
void unlock()
Unlock the mutex.
virtual ~FawkesNetworkServerThread()
Destructor.
void set_clid(unsigned int client_id)
Set client ID.
Base class for exceptions in Fawkes.
Definition: exception.h:35
virtual void remove_handler(FawkesNetworkHandler *handler)
Remove handler.
virtual void add_handler(FawkesNetworkHandler *handler)
Add a handler.
void ref()
Increment reference count.
Definition: refcount.cpp:67
Network handler abstract base class.
Definition: handler.h:31
virtual void add(ThreadList &tl)=0
Add multiple threads.
FawkesNetworkServerThread(bool enable_ipv4, bool enable_ipv6, const std::string &listen_ipv4, const std::string &listen_ipv6, unsigned int fawkes_port, ThreadCollector *thread_collector=0)
Constructor.
virtual void loop()
Fawkes network thread loop.
void push_locked(const Type &x)
Push element to queue with lock protection.
Definition: lock_queue.h:135
void lock() const
Lock queue.
Definition: lock_queue.h:114
void force_send()
Force sending of all pending messages.
void start(bool wait=true)
Call this method to start the thread.
Definition: thread.cpp:499