Fawkes API  Fawkes Development Version
mjpeg_reply.cpp
1 
2 /***************************************************************************
3  * mjpeg_reply.cpp - Web request MJPEG stream reply
4  *
5  * Created: Wed Feb 05 17:55:41 2014
6  * Copyright 2006-2014 Tim Niemueller [www.niemueller.de]
7  ****************************************************************************/
8 
9 /* This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17  * GNU Library General Public License for more details.
18  *
19  * Read the full text in the LICENSE.GPL file in the doc directory.
20  */
21 
22 #include "mjpeg_reply.h"
23 
24 #include <core/exceptions/system.h>
25 #include <core/threading/mutex.h>
26 #include <core/threading/wait_condition.h>
27 #include <fvcams/shmem.h>
28 #include <fvutils/compression/jpeg_compressor.h>
29 
30 #include <cerrno>
31 #include <cstring>
32 #include <unistd.h>
33 
34 using namespace firevision;
35 
36 namespace fawkes {
37 
38 /** @class DynamicMJPEGStreamWebReply <webview/file_reply.h>
39  * Dynamic raw file transfer reply.
40  * This dynamic file transfer reply transmits the given file with a mime type
41  * determined with libmagic.
42  * @author Tim Niemueller
43  */
44 
45 /** Constructor.
46  * @param stream_producer stream producer to query for JPEG buffers
47  */
48 DynamicMJPEGStreamWebReply::DynamicMJPEGStreamWebReply(
49  std::shared_ptr<WebviewJpegStreamProducer> stream_producer)
50 : DynamicWebReply(WebReply::HTTP_OK)
51 {
52  next_buffer_mutex_ = new fawkes::Mutex();
53  next_buffer_waitcond_ = new fawkes::WaitCondition(next_buffer_mutex_);
54  next_frame_ = true;
55 
56  add_header("Content-type", "multipart/x-mixed-replace;boundary=MJPEG-next-frame");
57  stream_producer_ = stream_producer;
58  stream_producer_->add_subscriber(this);
59 }
60 
61 /** Copy Constructor.
62  * @param other instance to copy from
63  */
65 : DynamicWebReply(WebReply::HTTP_OK)
66 {
67  next_buffer_mutex_ = new fawkes::Mutex();
68  next_buffer_waitcond_ = new fawkes::WaitCondition(next_buffer_mutex_);
69  next_frame_ = other.next_frame_;
70 
71  add_header("Content-type", "multipart/x-mixed-replace;boundary=MJPEG-next-frame");
72  stream_producer_ = other.stream_producer_;
73  stream_producer_->add_subscriber(this);
74 }
75 
76 /** Destructor. */
78 {
79  stream_producer_->remove_subscriber(this);
80  delete next_buffer_mutex_;
81  delete next_buffer_waitcond_;
82 }
83 
84 /** Assignment operator.
85  * @param other instance to copy from
86  * @return reference to this instance
87  */
90 {
91  stream_producer_->remove_subscriber(this);
92  next_frame_ = other.next_frame_;
93 
94  add_header("Content-type", "multipart/x-mixed-replace;boundary=MJPEG-next-frame");
95  stream_producer_ = other.stream_producer_;
96  stream_producer_->add_subscriber(this);
97 
98  return *this;
99 }
100 
101 size_t
103 {
104  return -1;
105 }
106 
107 void
108 DynamicMJPEGStreamWebReply::handle_buffer(std::shared_ptr<WebviewJpegStreamProducer::Buffer> buffer)
109 {
110  next_buffer_mutex_->lock();
111  next_buffer_ = buffer;
112  next_buffer_waitcond_->wake_all();
113  next_buffer_mutex_->unlock();
114 }
115 
116 size_t
117 DynamicMJPEGStreamWebReply::next_chunk(size_t pos, char *buffer, size_t buf_max_size)
118 {
119  if (buf_max_size == 0)
120  return 0;
121 
122  size_t written = 0;
123 
124  if (next_frame_) {
125  next_buffer_mutex_->lock();
126  while (!next_buffer_) {
127  next_buffer_waitcond_->wait();
128  }
129  buffer_ = next_buffer_;
130  next_buffer_.reset();
131  next_buffer_mutex_->unlock();
132 
133  char *header;
134  if (asprintf(&header,
135  "--MJPEG-next-frame\r\n"
136  "Content-type: image/jpeg\r\n"
137  "Content-length: %zu\r\n"
138  "\r\n",
139  buffer_->size())
140  == -1) {
141  return -2;
142  }
143  size_t header_len = strlen(header);
144  memcpy(buffer, header, header_len);
145  buffer += header_len;
146  buf_max_size -= header_len;
147  written += header_len;
148 
149  buffer_bytes_written_ = 0;
150  next_frame_ = false;
151  }
152 
153  size_t remaining = buffer_->size() - buffer_bytes_written_;
154  if (remaining <= buf_max_size) {
155  memcpy(buffer, buffer_->data() + buffer_bytes_written_, remaining);
156  next_frame_ = true;
157  written += remaining;
158  } else {
159  memcpy(buffer, buffer_->data() + buffer_bytes_written_, buf_max_size);
160  buffer_bytes_written_ += buf_max_size;
161  written += buf_max_size;
162  }
163 
164  return written;
165 }
166 
167 } // end namespace fawkes
DynamicMJPEGStreamWebReply(std::shared_ptr< WebviewJpegStreamProducer > stream_producer)
Constructor.
Definition: mjpeg_reply.cpp:48
Wait until a given condition holds.
virtual ~DynamicMJPEGStreamWebReply()
Destructor.
Definition: mjpeg_reply.cpp:77
Fawkes library namespace.
void unlock()
Unlock the mutex.
Definition: mutex.cpp:131
void wake_all()
Wake up all waiting threads.
virtual size_t size()
Total size of the web reply.
Dynamic web reply.
Definition: reply.h:125
virtual void handle_buffer(std::shared_ptr< WebviewJpegStreamProducer::Buffer > buffer)
Notification if a new buffer is available.
Dynamic raw file transfer reply.
Definition: mjpeg_reply.h:35
void wait()
Wait for the condition forever.
Basic web reply.
Definition: reply.h:33
DynamicMJPEGStreamWebReply & operator=(const DynamicMJPEGStreamWebReply &other)
Assignment operator.
Definition: mjpeg_reply.cpp:89
void add_header(const std::string &header, const std::string &content)
Add a HTTP header.
Definition: reply.cpp:123
virtual size_t next_chunk(size_t pos, char *buffer, size_t buf_max_size)
Get data of next chunk.
void lock()
Lock this mutex.
Definition: mutex.cpp:87
Mutex mutual exclusion lock.
Definition: mutex.h:32