24#include <core/exceptions/system.h>
25#include <fvutils/compression/jpeg_compressor.h>
26#include <fvutils/ipc/shm_image.h>
27#include <fvutils/ipc/shm_lut.h>
28#include <fvutils/net/fuse_image_content.h>
29#include <fvutils/net/fuse_imagelist_content.h>
30#include <fvutils/net/fuse_lut_content.h>
31#include <fvutils/net/fuse_lutlist_content.h>
32#include <fvutils/net/fuse_message_queue.h>
33#include <fvutils/net/fuse_server.h>
34#include <fvutils/net/fuse_server_client_thread.h>
35#include <fvutils/net/fuse_transceiver.h>
36#include <logging/liblogger.h>
37#include <netcomm/socket/stream.h>
38#include <netcomm/utils/exceptions.h>
39#include <netinet/in.h>
63:
Thread(
"FuseServerClientThread")
65 fuse_server_ = fuse_server;
67 jpeg_compressor_ = NULL;
74 greetmsg->
version = htonl(FUSE_CURRENT_VERSION);
75 outbound_queue_->push(
85 delete jpeg_compressor_;
87 for (bit_ = buffers_.begin(); bit_ != buffers_.end(); ++bit_) {
92 for (lit_ = luts_.begin(); lit_ != luts_.end(); ++lit_) {
97 while (!inbound_queue_->empty()) {
100 inbound_queue_->pop();
103 while (!outbound_queue_->empty()) {
106 outbound_queue_->pop();
109 delete inbound_queue_;
110 delete outbound_queue_;
117 if (!outbound_queue_->empty()) {
150 if (ntohl(gm->
version) != FUSE_CURRENT_VERSION) {
151 throw Exception(
"Invalid version on other side");
156FuseServerClientThread::get_shmimgbuf(
const char *
id)
158 char tmp_image_id[IMAGE_ID_MAX_LENGTH + 1];
159 tmp_image_id[IMAGE_ID_MAX_LENGTH] = 0;
160 strncpy(tmp_image_id,
id, IMAGE_ID_MAX_LENGTH);
162 if ((bit_ = buffers_.find(tmp_image_id)) == buffers_.end()) {
166 buffers_[tmp_image_id] = b;
192 outbound_queue_->push(nm);
196 if (irm->
format == FUSE_IF_RAW) {
199 }
else if (irm->
format == FUSE_IF_JPEG) {
200 if (!jpeg_compressor_) {
207 unsigned char *compressed_buffer =
214 long int sec = 0, usec = 0;
215 b->capture_time(&sec, &usec);
219 compressed_buffer_size,
226 free(compressed_buffer);
232 outbound_queue_->push(nm);
250 strncpy(ii->
image_id, b->image_id(), IMAGE_ID_MAX_LENGTH - 1);
252 ii->
width = htonl(b->width());
253 ii->
height = htonl(b->height());
254 ii->
buffer_size = colorspace_buffer_size(b->colorspace(), b->width(), b->height());
258 outbound_queue_->push(nm);
264 outbound_queue_->push(nm);
276 char tmp_lut_id[LUT_ID_MAX_LENGTH + 1];
277 tmp_lut_id[LUT_ID_MAX_LENGTH] = 0;
278 strncpy(tmp_lut_id, idm->
lut_id, LUT_ID_MAX_LENGTH);
280 if ((lit_ = luts_.find(tmp_lut_id)) != luts_.end()) {
287 luts_[tmp_lut_id] = b;
296 outbound_queue_->push(nm);
309 strncpy(reply->lut_id, lc->
lut_id(), LUT_ID_MAX_LENGTH - 1);
313 if ((lit_ = luts_.find(lc->
lut_id())) != luts_.end()) {
321 outbound_queue_->push(
323 e.
append(
"Cannot open shared memory lookup table %s", lc->
lut_id());
324 LibLogger::log_warn(
"FuseServerClientThread", e);
332 outbound_queue_->push(
334 LibLogger::log_warn(
"FuseServerClientThread",
335 "LUT upload: dimensions do not match. "
336 "Existing (%u,%u,%u,%u) != uploaded (%u,%u,%u,%u)",
347 outbound_queue_->push(
410FuseServerClientThread::process_inbound()
412 inbound_queue_->
lock();
413 while (!inbound_queue_->empty()) {
425 default:
throw Exception(
"Unknown message type received\n");
428 e.
append(
"FUSE protocol error");
429 LibLogger::log_warn(
"FuseServerClientThread", e);
435 inbound_queue_->pop();
450 p = socket_->
poll(10);
456 if ((p & Socket::POLL_ERR) || (p & Socket::POLL_HUP) || (p & Socket::POLL_RDHUP)) {
459 }
else if (p & Socket::POLL_IN) {
Thrown if the connection died during an operation.
Base class for exceptions in Fawkes.
void append(const char *format,...) noexcept
Append messages to the message list.
The current system call has been interrupted (for instance by a signal).
void lock() const
Lock queue.
void unlock() const
Unlock list.
void unref()
Decrement reference count and conditionally delete this instance.
void set(void *memptr)
Copies data from the memptr to shared memory.
virtual short poll(int timeout=-1, short what=POLL_IN|POLL_HUP|POLL_PRI|POLL_RDHUP)
Wait for some event on socket.
virtual void close()
Close socket.
TCP stream socket over IP.
Thread class encapsulation of pthreads.
void add_imageinfo(const char *image_id, colorspace_t colorspace, unsigned int pixel_width, unsigned int pixel_height)
Add image info.
FUSE lookup table content.
unsigned char * buffer() const
Get buffer.
unsigned int height() const
Height of LUT.
unsigned int bytes_per_cell() const
Bytes per cell in LUT.
unsigned int depth() const
Depth of LUT.
unsigned int width() const
Width of LUT.
const char * lut_id() const
Get LUT ID.
FUSE lookup table list content.
void add_lutinfo(const char *lut_id, unsigned int width, unsigned int height, unsigned int depth, unsigned int bytes_per_cell)
Add LUT info.
A LockQueue of FuseNetworkMessage to hold messages in inbound and outbound queues.
void * payload() const
Get pointer to payload.
MT * msg() const
Get correctly casted payload.
MT * msgc() const
Get correctly parsed output.
size_t payload_size() const
Get payload size.
uint32_t type() const
Get message type.
static void send(fawkes::StreamSocket *s, FuseNetworkMessageQueue *msgq)
Send messages.
static void recv(fawkes::StreamSocket *s, FuseNetworkMessageQueue *msgq, unsigned int max_num_msgs=8)
Receive data.
void process_setlut_message(FuseNetworkMessage *m)
Process LUT setting.
virtual ~FuseServerClientThread()
Destructor.
void process_getimageinfo_message(FuseNetworkMessage *m)
Process image info request message.
void process_getimage_message(FuseNetworkMessage *m)
Process image request message.
void process_greeting_message(FuseNetworkMessage *m)
Process greeting message.
void send()
Send all messages in outbound queue.
virtual void loop()
Code to execute in the thread.
void process_getlut_message(FuseNetworkMessage *m)
Process LUT request message.
FuseServerClientThread(FuseServer *fuse_server, fawkes::StreamSocket *s)
Constructor.
void process_getimagelist_message(FuseNetworkMessage *m)
Process image list request message.
void process_getlutlist_message(FuseNetworkMessage *m)
Process LUT list request message.
FireVision FUSE protocol server.
void connection_died(FuseServerClientThread *client) noexcept
Connection died.
@ COMP_DEST_MEM
write compressed image to buffer in memory
virtual void compress()
Compress image.
virtual size_t compressed_size()
Get compressed size.
virtual size_t recommended_compressed_buffer_size()
Get the recommended size for the compressed buffer.
virtual void set_image_buffer(colorspace_t cspace, unsigned char *buffer)
Set image buffer to compress.
virtual void set_compression_destination(ImageCompressor::CompressionDestination cd)
Set compression destination.
virtual void set_destination_buffer(unsigned char *buf, unsigned int buf_size)
Set destination buffer (if compressing to memory).
virtual void set_image_dimensions(unsigned int width, unsigned int height)
Set dimensions of image to compress.
Shared memory image buffer.
Shared memory lookup table.
unsigned int bytes_per_cell() const
Get bytes per cell.
unsigned int width() const
Get LUT width.
unsigned int depth() const
Get LUT depth.
unsigned int height() const
Get LUT height.
Fawkes library namespace.
version packet, bi-directional
uint32_t version
version from FUSE_version_t
Image description message.
char image_id[IMAGE_ID_MAX_LENGTH]
image ID
uint32_t colorspace
color space
uint32_t height
height in pixels
uint32_t width
width in pixels
char image_id[IMAGE_ID_MAX_LENGTH]
image ID
uint32_t buffer_size
size of following image buffer in bytes
char image_id[IMAGE_ID_MAX_LENGTH]
image ID
uint32_t format
requested image format, see FUSE_image_format_t
char lut_id[LUT_ID_MAX_LENGTH]
LUT ID.