Fawkes API Fawkes Development Version
fuse_server_client_thread.cpp
1
2/***************************************************************************
3 * fuse_server_client_thread.cpp - client thread for FuseServer
4 *
5 * Created: Tue Nov 13 20:00:55 2007
6 * Copyright 2005-2007 Tim Niemueller [www.niemueller.de]
7 *
8 ****************************************************************************/
9
10/* This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version. A runtime exception applies to
14 * this software (see LICENSE.GPL_WRE file mentioned below for details).
15 *
16 * This program is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU Library General Public License for more details.
20 *
21 * Read the full text in the LICENSE.GPL_WRE file in the doc directory.
22 */
23
24#include <core/exceptions/system.h>
25#include <fvutils/compression/jpeg_compressor.h>
26#include <fvutils/ipc/shm_image.h>
27#include <fvutils/ipc/shm_lut.h>
28#include <fvutils/net/fuse_image_content.h>
29#include <fvutils/net/fuse_imagelist_content.h>
30#include <fvutils/net/fuse_lut_content.h>
31#include <fvutils/net/fuse_lutlist_content.h>
32#include <fvutils/net/fuse_message_queue.h>
33#include <fvutils/net/fuse_server.h>
34#include <fvutils/net/fuse_server_client_thread.h>
35#include <fvutils/net/fuse_transceiver.h>
36#include <logging/liblogger.h>
37#include <netcomm/socket/stream.h>
38#include <netcomm/utils/exceptions.h>
39#include <netinet/in.h>
40
41#include <cstdlib>
42#include <cstring>
43#include <unistd.h>
44
45using namespace fawkes;
46
47namespace firevision {
48
49/** @class FuseServerClientThread <fvutils/net/fuse_server_client_thread.h>
50 * FUSE Server Client Thread.
51 * This thread is instantiated and started for each client that connects to a
52 * FuseServer.
53 * @ingroup FUSE
54 * @ingroup FireVision
55 * @author Tim Niemueller
56 */
57
58/** Constructor.
59 * @param fuse_server parent FUSE server
60 * @param s socket to client
61 */
63: Thread("FuseServerClientThread")
64{
65 fuse_server_ = fuse_server;
66 socket_ = s;
67 jpeg_compressor_ = NULL;
68
69 inbound_queue_ = new FuseNetworkMessageQueue();
70 outbound_queue_ = new FuseNetworkMessageQueue();
71
72 FUSE_greeting_message_t *greetmsg =
74 greetmsg->version = htonl(FUSE_CURRENT_VERSION);
75 outbound_queue_->push(
76 new FuseNetworkMessage(FUSE_MT_GREETING, greetmsg, sizeof(FUSE_greeting_message_t)));
77
78 alive_ = true;
79}
80
81/** Destructor. */
83{
84 delete socket_;
85 delete jpeg_compressor_;
86
87 for (bit_ = buffers_.begin(); bit_ != buffers_.end(); ++bit_) {
88 delete bit_->second;
89 }
90 buffers_.clear();
91
92 for (lit_ = luts_.begin(); lit_ != luts_.end(); ++lit_) {
93 delete lit_->second;
94 }
95 luts_.clear();
96
97 while (!inbound_queue_->empty()) {
98 FuseNetworkMessage *m = inbound_queue_->front();
99 m->unref();
100 inbound_queue_->pop();
101 }
102
103 while (!outbound_queue_->empty()) {
104 FuseNetworkMessage *m = outbound_queue_->front();
105 m->unref();
106 outbound_queue_->pop();
107 }
108
109 delete inbound_queue_;
110 delete outbound_queue_;
111}
112
113/** Send all messages in outbound queue. */
114void
116{
117 if (!outbound_queue_->empty()) {
118 try {
119 FuseNetworkTransceiver::send(socket_, outbound_queue_);
120 } catch (Exception &e) {
121 fuse_server_->connection_died(this);
122 alive_ = false;
123 }
124 }
125}
126
127/** Receive data.
128 * Receives data from the network if there is any and then processes all
129 * inbound messages.
130 */
131void
133{
134 try {
135 FuseNetworkTransceiver::recv(socket_, inbound_queue_);
136 } catch (ConnectionDiedException &e) {
137 socket_->close();
138 fuse_server_->connection_died(this);
139 alive_ = false;
140 }
141}
142
143/** Process greeting message.
144 * @param m received message
145 */
146void
148{
150 if (ntohl(gm->version) != FUSE_CURRENT_VERSION) {
151 throw Exception("Invalid version on other side");
152 }
153}
154
156FuseServerClientThread::get_shmimgbuf(const char *id)
157{
158 char tmp_image_id[IMAGE_ID_MAX_LENGTH + 1];
159 tmp_image_id[IMAGE_ID_MAX_LENGTH] = 0;
160 strncpy(tmp_image_id, id, IMAGE_ID_MAX_LENGTH);
161
162 if ((bit_ = buffers_.find(tmp_image_id)) == buffers_.end()) {
163 // the buffer has not yet been opened
164 try {
166 buffers_[tmp_image_id] = b;
167 return b;
168 } catch (Exception &e) {
169 throw;
170 }
171 } else {
172 return bit_->second;
173 }
174}
175
176/** Process image request message.
177 * @param m received message
178 */
179void
181{
183
185 try {
186 b = get_shmimgbuf(irm->image_id);
187 } catch (Exception &e) {
188 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
189 m->payload(),
190 m->payload_size(),
191 /* copy payload */ true);
192 outbound_queue_->push(nm);
193 return;
194 }
195
196 if (irm->format == FUSE_IF_RAW) {
198 outbound_queue_->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im));
199 } else if (irm->format == FUSE_IF_JPEG) {
200 if (!jpeg_compressor_) {
201 jpeg_compressor_ = new JpegImageCompressor();
203 }
204 b->lock_for_read();
205 jpeg_compressor_->set_image_dimensions(b->width(), b->height());
206 jpeg_compressor_->set_image_buffer(b->colorspace(), b->buffer());
207 unsigned char *compressed_buffer =
208 (unsigned char *)malloc(jpeg_compressor_->recommended_compressed_buffer_size());
209 jpeg_compressor_->set_destination_buffer(
210 compressed_buffer, jpeg_compressor_->recommended_compressed_buffer_size());
211 jpeg_compressor_->compress();
212 b->unlock();
213 size_t compressed_buffer_size = jpeg_compressor_->compressed_size();
214 long int sec = 0, usec = 0;
215 b->capture_time(&sec, &usec);
216 FuseImageContent *im = new FuseImageContent(FUSE_IF_JPEG,
217 b->image_id(),
218 compressed_buffer,
219 compressed_buffer_size,
220 CS_UNKNOWN,
221 b->width(),
222 b->height(),
223 sec,
224 usec);
225 outbound_queue_->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im));
226 free(compressed_buffer);
227 } else {
228 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
229 m->payload(),
230 m->payload_size(),
231 /* copy payload */ true);
232 outbound_queue_->push(nm);
233 }
234}
235
236/** Process image info request message.
237 * @param m received message
238 */
239void
241{
243
245 try {
246 b = get_shmimgbuf(idm->image_id);
247
248 FUSE_imageinfo_t *ii = (FUSE_imageinfo_t *)calloc(1, sizeof(FUSE_imageinfo_t));
249
250 strncpy(ii->image_id, b->image_id(), IMAGE_ID_MAX_LENGTH - 1);
251 ii->colorspace = htons(b->colorspace());
252 ii->width = htonl(b->width());
253 ii->height = htonl(b->height());
254 ii->buffer_size = colorspace_buffer_size(b->colorspace(), b->width(), b->height());
255
257 new FuseNetworkMessage(FUSE_MT_IMAGE_INFO, ii, sizeof(FUSE_imageinfo_t));
258 outbound_queue_->push(nm);
259 } catch (Exception &e) {
260 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED,
261 m->payload(),
262 m->payload_size(),
263 /* copy payload */ true);
264 outbound_queue_->push(nm);
265 }
266}
267
268/** Process LUT request message.
269 * @param m received message
270 */
271void
273{
275
276 char tmp_lut_id[LUT_ID_MAX_LENGTH + 1];
277 tmp_lut_id[LUT_ID_MAX_LENGTH] = 0;
278 strncpy(tmp_lut_id, idm->lut_id, LUT_ID_MAX_LENGTH);
279
280 if ((lit_ = luts_.find(tmp_lut_id)) != luts_.end()) {
281 // the buffer had already be opened
282 FuseLutContent *lm = new FuseLutContent(lit_->second);
283 outbound_queue_->push(new FuseNetworkMessage(FUSE_MT_LUT, lm));
284 } else {
285 try {
287 luts_[tmp_lut_id] = b;
288 FuseLutContent *lm = new FuseLutContent(b);
289 outbound_queue_->push(new FuseNetworkMessage(FUSE_MT_LUT, lm));
290 } catch (Exception &e) {
291 // could not open the shared memory segment for some reason, send failure
292 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_LUT_FAILED,
293 m->payload(),
294 m->payload_size(),
295 /* copy payload */ true);
296 outbound_queue_->push(nm);
297 }
298 }
299}
300
301/** Process LUT setting.
302 * @param m received message
303 */
304void
306{
309 strncpy(reply->lut_id, lc->lut_id(), LUT_ID_MAX_LENGTH - 1);
310 // Currently we expect colormaps, so make sure we get sensible dimensions
311
313 if ((lit_ = luts_.find(lc->lut_id())) != luts_.end()) {
314 // the buffer had already been opened
315 b = lit_->second;
316 } else {
317 try {
318 b = new SharedMemoryLookupTable(lc->lut_id(), /* read only */ false);
319 luts_[lc->lut_id()] = b;
320 } catch (Exception &e) {
321 outbound_queue_->push(
322 new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED, reply, sizeof(FUSE_lutdesc_message_t)));
323 e.append("Cannot open shared memory lookup table %s", lc->lut_id());
324 LibLogger::log_warn("FuseServerClientThread", e);
325 delete lc;
326 return;
327 }
328 }
329
330 if ((b->width() != lc->width()) || (b->height() != lc->height()) || (b->depth() != lc->depth())
331 || (b->bytes_per_cell() != lc->bytes_per_cell())) {
332 outbound_queue_->push(
333 new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED, reply, sizeof(FUSE_lutdesc_message_t)));
334 LibLogger::log_warn("FuseServerClientThread",
335 "LUT upload: dimensions do not match. "
336 "Existing (%u,%u,%u,%u) != uploaded (%u,%u,%u,%u)",
337 b->width(),
338 b->height(),
339 b->depth(),
340 b->bytes_per_cell(),
341 lc->width(),
342 lc->height(),
343 lc->depth(),
344 lc->bytes_per_cell());
345 } else {
346 b->set(lc->buffer());
347 outbound_queue_->push(
348 new FuseNetworkMessage(FUSE_MT_SET_LUT_SUCCEEDED, reply, sizeof(FUSE_lutdesc_message_t)));
349 }
350
351 delete lc;
352}
353
354/** Process image list request message.
355 * @param m received message
356 */
357void
359{
361
363 SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_IMAGE_MAGIC_TOKEN, h);
364 SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
365
366 while (i != endi) {
368 dynamic_cast<const SharedMemoryImageBufferHeader *>(*i);
369 if (ih) {
370 ilm->add_imageinfo(ih->image_id(), ih->colorspace(), ih->width(), ih->height());
371 }
372
373 ++i;
374 }
375
376 delete h;
377
378 outbound_queue_->push(new FuseNetworkMessage(FUSE_MT_IMAGE_LIST, ilm));
379}
380
381/** Process LUT list request message.
382 * @param m received message
383 */
384void
386{
388
390 SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_LUT_MAGIC_TOKEN, h);
391 SharedMemory::SharedMemoryIterator endi = SharedMemory::end();
392
393 while (i != endi) {
395 dynamic_cast<const SharedMemoryLookupTableHeader *>(*i);
396 if (lh) {
397 llm->add_lutinfo(lh->lut_id(), lh->width(), lh->height(), lh->depth(), lh->bytes_per_cell());
398 }
399
400 ++i;
401 }
402
403 delete h;
404
405 outbound_queue_->push(new FuseNetworkMessage(FUSE_MT_LUT_LIST, llm));
406}
407
408/** Process inbound messages. */
409void
410FuseServerClientThread::process_inbound()
411{
412 inbound_queue_->lock();
413 while (!inbound_queue_->empty()) {
414 FuseNetworkMessage *m = inbound_queue_->front();
415
416 try {
417 switch (m->type()) {
418 case FUSE_MT_GREETING: process_greeting_message(m); break;
419 case FUSE_MT_GET_IMAGE: process_getimage_message(m); break;
420 case FUSE_MT_GET_IMAGE_INFO: process_getimageinfo_message(m); break;
421 case FUSE_MT_GET_IMAGE_LIST: process_getimagelist_message(m); break;
422 case FUSE_MT_GET_LUT_LIST: process_getlutlist_message(m); break;
423 case FUSE_MT_GET_LUT: process_getlut_message(m); break;
424 case FUSE_MT_SET_LUT: process_setlut_message(m); break;
425 default: throw Exception("Unknown message type received\n");
426 }
427 } catch (Exception &e) {
428 e.append("FUSE protocol error");
429 LibLogger::log_warn("FuseServerClientThread", e);
430 fuse_server_->connection_died(this);
431 alive_ = false;
432 }
433
434 m->unref();
435 inbound_queue_->pop();
436 }
437 inbound_queue_->unlock();
438}
439
440void
442{
443 if (!alive_) {
444 usleep(10000);
445 return;
446 }
447
448 short p = 0;
449 try {
450 p = socket_->poll(10); // block for up to 10 ms
451 } catch (InterruptedException &e) {
452 // we just ignore this and try it again
453 return;
454 }
455
456 if ((p & Socket::POLL_ERR) || (p & Socket::POLL_HUP) || (p & Socket::POLL_RDHUP)) {
457 fuse_server_->connection_died(this);
458 alive_ = false;
459 } else if (p & Socket::POLL_IN) {
460 try {
461 // Data can be read
462 recv();
463 process_inbound();
464 } catch (...) {
465 fuse_server_->connection_died(this);
466 alive_ = false;
467 }
468 }
469
470 if (alive_) {
471 send();
472 }
473}
474
475} // end namespace firevision
Thrown if the connection died during an operation.
Definition: exceptions.h:32
Base class for exceptions in Fawkes.
Definition: exception.h:36
void append(const char *format,...) noexcept
Append messages to the message list.
Definition: exception.cpp:333
The current system call has been interrupted (for instance by a signal).
Definition: system.h:39
void lock() const
Lock queue.
Definition: lock_queue.h:114
void unlock() const
Unlock list.
Definition: lock_queue.h:128
void unref()
Decrement reference count and conditionally delete this instance.
Definition: refcount.cpp:95
Shared Memory iterator.
Definition: shm.h:119
void set(void *memptr)
Copies data from the memptr to shared memory.
Definition: shm.cpp:774
virtual short poll(int timeout=-1, short what=POLL_IN|POLL_HUP|POLL_PRI|POLL_RDHUP)
Wait for some event on socket.
Definition: socket.cpp:685
virtual void close()
Close socket.
Definition: socket.cpp:311
TCP stream socket over IP.
Definition: stream.h:32
Thread class encapsulation of pthreads.
Definition: thread.h:46
void add_imageinfo(const char *image_id, colorspace_t colorspace, unsigned int pixel_width, unsigned int pixel_height)
Add image info.
FUSE lookup table content.
unsigned char * buffer() const
Get buffer.
unsigned int height() const
Height of LUT.
unsigned int bytes_per_cell() const
Bytes per cell in LUT.
unsigned int depth() const
Depth of LUT.
unsigned int width() const
Width of LUT.
const char * lut_id() const
Get LUT ID.
FUSE lookup table list content.
void add_lutinfo(const char *lut_id, unsigned int width, unsigned int height, unsigned int depth, unsigned int bytes_per_cell)
Add LUT info.
A LockQueue of FuseNetworkMessage to hold messages in inbound and outbound queues.
FUSE Network Message.
Definition: fuse_message.h:40
void * payload() const
Get pointer to payload.
MT * msg() const
Get correctly casted payload.
Definition: fuse_message.h:67
MT * msgc() const
Get correctly parsed output.
Definition: fuse_message.h:108
size_t payload_size() const
Get payload size.
uint32_t type() const
Get message type.
static void send(fawkes::StreamSocket *s, FuseNetworkMessageQueue *msgq)
Send messages.
static void recv(fawkes::StreamSocket *s, FuseNetworkMessageQueue *msgq, unsigned int max_num_msgs=8)
Receive data.
void process_setlut_message(FuseNetworkMessage *m)
Process LUT setting.
void process_getimageinfo_message(FuseNetworkMessage *m)
Process image info request message.
void process_getimage_message(FuseNetworkMessage *m)
Process image request message.
void process_greeting_message(FuseNetworkMessage *m)
Process greeting message.
void send()
Send all messages in outbound queue.
virtual void loop()
Code to execute in the thread.
void process_getlut_message(FuseNetworkMessage *m)
Process LUT request message.
FuseServerClientThread(FuseServer *fuse_server, fawkes::StreamSocket *s)
Constructor.
void process_getimagelist_message(FuseNetworkMessage *m)
Process image list request message.
void process_getlutlist_message(FuseNetworkMessage *m)
Process LUT list request message.
FireVision FUSE protocol server.
Definition: fuse_server.h:44
void connection_died(FuseServerClientThread *client) noexcept
Connection died.
@ COMP_DEST_MEM
write compressed image to buffer in memory
Jpeg image compressor.
virtual void compress()
Compress image.
virtual size_t compressed_size()
Get compressed size.
virtual size_t recommended_compressed_buffer_size()
Get the recommended size for the compressed buffer.
virtual void set_image_buffer(colorspace_t cspace, unsigned char *buffer)
Set image buffer to compress.
virtual void set_compression_destination(ImageCompressor::CompressionDestination cd)
Set compression destination.
virtual void set_destination_buffer(unsigned char *buf, unsigned int buf_size)
Set destination buffer (if compressing to memory).
virtual void set_image_dimensions(unsigned int width, unsigned int height)
Set dimensions of image to compress.
Shared memory image buffer header.
Definition: shm_image.h:67
const char * image_id() const
Get image number.
Definition: shm_image.cpp:838
unsigned int width() const
Get width.
Definition: shm_image.cpp:814
unsigned int height() const
Get height.
Definition: shm_image.cpp:826
colorspace_t colorspace() const
Get colorspace.
Definition: shm_image.cpp:802
Shared memory image buffer.
Definition: shm_image.h:184
Shared memory lookup table header.
Definition: shm_lut.h:49
const char * lut_id() const
Get LUT ID.
Definition: shm_lut.cpp:486
unsigned int height() const
Get LUT height.
Definition: shm_lut.cpp:453
unsigned int depth() const
Get LUT depth.
Definition: shm_lut.cpp:464
unsigned int bytes_per_cell() const
Get bytes per cell.
Definition: shm_lut.cpp:475
unsigned int width() const
Get LUT width.
Definition: shm_lut.cpp:442
Shared memory lookup table.
Definition: shm_lut.h:113
unsigned int bytes_per_cell() const
Get bytes per cell.
Definition: shm_lut.cpp:177
unsigned int width() const
Get LUT width.
Definition: shm_lut.cpp:150
unsigned int depth() const
Get LUT depth.
Definition: shm_lut.cpp:168
unsigned int height() const
Get LUT height.
Definition: shm_lut.cpp:159
Fawkes library namespace.
version packet, bi-directional
Definition: fuse.h:98
uint32_t version
version from FUSE_version_t
Definition: fuse.h:99
Image description message.
Definition: fuse.h:156
char image_id[IMAGE_ID_MAX_LENGTH]
image ID
Definition: fuse.h:157
Image info message.
Definition: fuse.h:168
uint32_t colorspace
color space
Definition: fuse.h:170
uint32_t height
height in pixels
Definition: fuse.h:173
uint32_t width
width in pixels
Definition: fuse.h:172
char image_id[IMAGE_ID_MAX_LENGTH]
image ID
Definition: fuse.h:169
uint32_t buffer_size
size of following image buffer in bytes
Definition: fuse.h:174
Image request message.
Definition: fuse.h:148
char image_id[IMAGE_ID_MAX_LENGTH]
image ID
Definition: fuse.h:149
uint32_t format
requested image format, see FUSE_image_format_t
Definition: fuse.h:150
LUT description message.
Definition: fuse.h:162
char lut_id[LUT_ID_MAX_LENGTH]
LUT ID.
Definition: fuse.h:163