Fawkes API Fawkes Development Version
server_thread.cpp
1
2/***************************************************************************
3 * server_thread.cpp - Fawkes Network Protocol (server part)
4 *
5 * Created: Sun Nov 19 15:08:30 2006
6 * Copyright 2006-2009 Tim Niemueller [www.niemueller.de]
7 *
8 ****************************************************************************/
9
10/* This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version. A runtime exception applies to
14 * this software (see LICENSE.GPL_WRE file mentioned below for details).
15 *
16 * This program is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU Library General Public License for more details.
20 *
21 * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22 */
23
24#include <core/exception.h>
25#include <core/threading/mutex.h>
26#include <core/threading/mutex_locker.h>
27#include <core/threading/thread_collector.h>
28#include <netcomm/fawkes/handler.h>
29#include <netcomm/fawkes/message.h>
30#include <netcomm/fawkes/message_content.h>
31#include <netcomm/fawkes/message_queue.h>
32#include <netcomm/fawkes/server_client_thread.h>
33#include <netcomm/fawkes/server_thread.h>
34#include <netcomm/utils/acceptor_thread.h>
35
36#include <unistd.h>
37
38namespace fawkes {
39
40/** @class FawkesNetworkServerThread <netcomm/fawkes/server_thread.h>
41 * Fawkes Network Thread.
42 * Maintains a list of clients and reacts on events triggered by the clients.
43 * Also runs the acceptor thread.
44 *
45 * @ingroup NetComm
46 * @author Tim Niemueller
47 */
48
49/** Constructor.
50 * @param enable_ipv4 true to listen on the IPv4 TCP port
51 * @param enable_ipv6 true to listen on the IPv6 TCP port
52 * @param listen_ipv4 IPv4 address to listen on for incoming connections,
53 * 0.0.0.0 to listen on any local address
54 * @param listen_ipv6 IPv6 address to listen on for incoming connections,
55 * :: to listen on any local address
56 * @param fawkes_port port for Fawkes network protocol
57 * @param thread_collector thread collector to register new threads with
58 */
60 bool enable_ipv6,
61 const std::string &listen_ipv4,
62 const std::string &listen_ipv6,
63 unsigned int fawkes_port,
64 ThreadCollector * thread_collector)
65: Thread("FawkesNetworkServerThread", Thread::OPMODE_WAITFORWAKEUP)
66{
67 this->thread_collector = thread_collector;
68 clients.clear();
69 next_client_id = 1;
70 inbound_messages = new FawkesNetworkMessageQueue();
71
72 if (enable_ipv4) {
73 acceptor_threads.push_back(new NetworkAcceptorThread(
74 this, Socket::IPv4, listen_ipv4, fawkes_port, "FawkesNetworkAcceptorThread"));
75 }
76 if (enable_ipv6) {
77 acceptor_threads.push_back(new NetworkAcceptorThread(
78 this, Socket::IPv6, listen_ipv6, fawkes_port, "FawkesNetworkAcceptorThread"));
79 }
80
81 if (thread_collector) {
82 for (size_t i = 0; i < acceptor_threads.size(); ++i) {
83 thread_collector->add(acceptor_threads[i]);
84 }
85 } else {
86 for (size_t i = 0; i < acceptor_threads.size(); ++i) {
87 acceptor_threads[i]->start();
88 }
89 }
90}
91
92/** Destructor. */
94{
95 for (cit = clients.begin(); cit != clients.end(); ++cit) {
96 if (thread_collector) {
97 thread_collector->remove((*cit).second);
98 } else {
99 (*cit).second->cancel();
100 (*cit).second->join();
101 }
102 delete (*cit).second;
103 }
104 for (size_t i = 0; i < acceptor_threads.size(); ++i) {
105 if (thread_collector) {
106 thread_collector->remove(acceptor_threads[i]);
107 } else {
108 acceptor_threads[i]->cancel();
109 acceptor_threads[i]->join();
110 }
111 delete acceptor_threads[i];
112 }
113 acceptor_threads.clear();
114
115 delete inbound_messages;
116}
117
118/** Add a new connection.
119 * Called by the NetworkAcceptorThread if a new client connected.
120 * @param s socket for new client
121 */
122void
124{
126
127 clients.lock();
128 client->set_clid(next_client_id);
129 if (thread_collector) {
130 thread_collector->add(client);
131 } else {
132 client->start();
133 }
134 unsigned int cid = next_client_id++;
135 clients[cid] = client;
136 clients.unlock();
137
138 MutexLocker handlers_lock(handlers.mutex());
139 for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
140 (*hit).second->client_connected(cid);
141 }
142 handlers_lock.unlock();
143
144 wakeup();
145}
146
147/** Add a handler.
148 * @param handler to add.
149 */
150void
152{
153 MutexLocker handlers_lock(handlers.mutex());
154 if (handlers.find(handler->id()) != handlers.end()) {
155 throw Exception("Handler already registered");
156 }
157 handlers[handler->id()] = handler;
158}
159
160/** Remove handler.
161 * @param handler handler to remove
162 */
163void
165{
166 MutexLocker handlers_lock(handlers.mutex());
167 if (handlers.find(handler->id()) != handlers.end()) {
168 handlers.erase(handler->id());
169 }
170}
171
172/** Fawkes network thread loop.
173 * The thread loop will check all clients for their alivness and dead
174 * clients are removed. Then inbound messages are processed and dispatched
175 * properly to registered handlers. Then the thread waits for a new event
176 * to happen (event emitting threads need to wakeup this thread!).
177 */
178void
180{
181 std::list<unsigned int> dead_clients;
182 clients.lock();
183 // check for dead clients
184 for (cit = clients.begin(); cit != clients.end(); ++cit) {
185 if (!cit->second->alive()) {
186 dead_clients.push_back(cit->first);
187 }
188 }
189 clients.unlock();
190
191 std::list<unsigned int>::iterator dci;
192 for (dci = dead_clients.begin(); dci != dead_clients.end(); ++dci) {
193 const unsigned int clid = *dci;
194
195 {
196 MutexLocker handlers_lock(handlers.mutex());
197 for (hit = handlers.begin(); hit != handlers.end(); ++hit) {
198 (*hit).second->client_disconnected(clid);
199 }
200 }
201
202 {
203 MutexLocker clients_lock(clients.mutex());
204 if (thread_collector) {
205 thread_collector->remove(clients[clid]);
206 } else {
207 clients[clid]->cancel();
208 clients[clid]->join();
209 }
210 usleep(5000);
211 delete clients[clid];
212 clients.erase(clid);
213 }
214 }
215
216 // dispatch messages
217 inbound_messages->lock();
218 while (!inbound_messages->empty()) {
219 FawkesNetworkMessage *m = inbound_messages->front();
220 {
221 MutexLocker handlers_lock(handlers.mutex());
222 if (handlers.find(m->cid()) != handlers.end()) {
223 handlers[m->cid()]->handle_network_message(m);
224 }
225 }
226 m->unref();
227 inbound_messages->pop();
228 }
229 inbound_messages->unlock();
230}
231
232/** Force sending of all pending messages. */
233void
235{
236 clients.lock();
237 for (cit = clients.begin(); cit != clients.end(); ++cit) {
238 (*cit).second->force_send();
239 }
240 clients.unlock();
241}
242
243/** Broadcast a message.
244 * Method to broadcast a message to all connected clients. This method will take
245 * ownership of the passed message. If you want to use if after enqueing it you
246 * must reference it explicitly before calling this method.
247 * @param msg Message to broadcast
248 */
249void
251{
252 clients.lock();
253 for (cit = clients.begin(); cit != clients.end(); ++cit) {
254 if ((*cit).second->alive()) {
255 msg->ref();
256 (*cit).second->enqueue(msg);
257 }
258 }
259 clients.unlock();
260 msg->unref();
261}
262
263/** Broadcast a message.
264 * A FawkesNetworkMessage is created and broacasted via the emitter.
265 * @param component_id component ID
266 * @param msg_id message type id
267 * @param payload payload buffer
268 * @param payload_size size of payload buffer
269 * @see FawkesNetworkEmitter::broadcast()
270 */
271void
272FawkesNetworkServerThread::broadcast(unsigned short int component_id,
273 unsigned short int msg_id,
274 void * payload,
275 unsigned int payload_size)
276{
277 FawkesNetworkMessage *m = new FawkesNetworkMessage(component_id, msg_id, payload, payload_size);
278 broadcast(m);
279}
280
281/** Broadcast message without payload.
282 * @param component_id component ID
283 * @param msg_id message type ID
284 */
285void
286FawkesNetworkServerThread::broadcast(unsigned short int component_id, unsigned short int msg_id)
287{
288 FawkesNetworkMessage *m = new FawkesNetworkMessage(component_id, msg_id);
289 broadcast(m);
290}
291
292/** Send a message.
293 * Method to send a message to a specific client.
294 * The client ID provided in the message is used to determine the correct
295 * recipient. If no client is connected for the given client ID or the client
296 * is not alive, the message shall be silently ignored.
297 * This method will take ownership of the passed message. If you want to use
298 * if after enqueing it you must reference it explicitly before calling this
299 * method.
300 * Implemented Emitter interface message.
301 * @param msg Message to send
302 */
303void
305{
306 MutexLocker lock(clients.mutex());
307 unsigned int clid = msg->clid();
308 if (clients.find(clid) != clients.end()) {
309 if (clients[clid]->alive()) {
310 clients[clid]->enqueue(msg);
311 }
312 }
313}
314
315/** Send a message.
316 * A FawkesNetworkMessage is created and sent via the emitter.
317 * @param to_clid client ID of recipient
318 * @param component_id component ID
319 * @param msg_id message type id
320 * @param payload payload buffer
321 * @param payload_size size of payload buffer
322 * @see FawkesNetworkEmitter::broadcast()
323 */
324void
326 unsigned short int component_id,
327 unsigned short int msg_id,
328 void * payload,
329 unsigned int payload_size)
330{
332 new FawkesNetworkMessage(to_clid, component_id, msg_id, payload, payload_size);
333 send(m);
334}
335
336/** Send a message.
337 * A FawkesNetworkMessage is created and sent via the emitter.
338 * @param to_clid client ID of recipient
339 * @param component_id component ID
340 * @param msg_id message type id
341 * @param content Fawkes complex network message content
342 * @see FawkesNetworkEmitter::broadcast()
343 */
344void
346 unsigned short int component_id,
347 unsigned short int msg_id,
349{
350 FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id, content);
351 send(m);
352}
353
354/** Send a message without payload.
355 * A FawkesNetworkMessage with empty payload is created and sent via the emitter.
356 * This is particularly useful for simple status messages that you want to send.
357 * @param to_clid client ID of recipient
358 * @param component_id component ID
359 * @param msg_id message type id
360 * @see FawkesNetworkEmitter::broadcast()
361 */
362void
364 unsigned short int component_id,
365 unsigned short int msg_id)
366{
367 FawkesNetworkMessage *m = new FawkesNetworkMessage(to_clid, component_id, msg_id);
368 send(m);
369}
370
371/** Dispatch messages.
372 * Actually messages are just put into the inbound message queue and dispatched
373 * during the next loop iteration. So after adding all the messages you have
374 * to wakeup the thread to get them actually dispatched.
375 * @param msg message to dispatch
376 */
377void
379{
380 msg->ref();
381 inbound_messages->push_locked(msg);
382}
383
384} // end namespace fawkes
Base class for exceptions in Fawkes.
Definition: exception.h:36
Network handler abstract base class.
Definition: handler.h:32
unsigned short int id() const
Get the component ID for this handler.
Definition: handler.cpp:76
Fawkes network message content.
A LockQueue of FawkesNetworkMessage to hold messages in inbound and outbound queues.
Definition: message_queue.h:33
Representation of a message that is sent over the network.
Definition: message.h:77
unsigned short int cid() const
Get component ID.
Definition: message.cpp:285
unsigned int clid() const
Get client ID.
Definition: message.cpp:276
Fawkes Network Client Thread for server.
void set_clid(unsigned int client_id)
Set client ID.
virtual void broadcast(FawkesNetworkMessage *msg)
Broadcast a message.
void add_connection(StreamSocket *s) noexcept
Add a new connection.
void dispatch(FawkesNetworkMessage *msg)
Dispatch messages.
virtual void add_handler(FawkesNetworkHandler *handler)
Add a handler.
virtual void loop()
Fawkes network thread loop.
virtual ~FawkesNetworkServerThread()
Destructor.
void force_send()
Force sending of all pending messages.
virtual void remove_handler(FawkesNetworkHandler *handler)
Remove handler.
virtual void send(FawkesNetworkMessage *msg)
Send a message.
FawkesNetworkServerThread(bool enable_ipv4, bool enable_ipv6, const std::string &listen_ipv4, const std::string &listen_ipv6, unsigned int fawkes_port, ThreadCollector *thread_collector=0)
Constructor.
void push_locked(const Type &x)
Push element to queue with lock protection.
Definition: lock_queue.h:135
void lock() const
Lock queue.
Definition: lock_queue.h:114
void unlock() const
Unlock list.
Definition: lock_queue.h:128
Mutex locking helper.
Definition: mutex_locker.h:34
void unlock()
Unlock the mutex.
Network Acceptor Thread.
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:95
void ref()
Increment reference count.
Definition: refcount.cpp:67
@ IPv4
IPv4.
Definition: socket.h:77
@ IPv6
IPv6.
Definition: socket.h:78
TCP stream socket over IP.
Definition: stream.h:32
virtual void add(ThreadList &tl)=0
Add multiple threads.
virtual void remove(ThreadList &tl)=0
Remove multiple threads.
Thread class encapsulation of pthreads.
Definition: thread.h:46
void start(bool wait=true)
Call this method to start the thread.
Definition: thread.cpp:499
Fawkes library namespace.