32 #include <curl/curl.h>
35 #include <D4Attributes.h>
36 #include <XMLWriter.h>
38 #include <BESIndent.h>
41 #include <BESInternalError.h>
43 #include "DmrppRequestHandler.h"
44 #include "DmrppCommon.h"
45 #include "DmrppArray.h"
52 #define prolog std::string("DmrppCommon::").append(__func__).append("() - ")
57 static const string dmrpp_3 =
"dmrpp:3";
58 static const string dmrpp_4 =
"dmrpp:4";
60 bool DmrppCommon::d_print_chunks =
false;
61 string DmrppCommon::d_dmrpp_ns =
"http://xml.opendap.org/dap/dmrpp/1.0.0#";
62 string DmrppCommon::d_ns_prefix =
"dmrpp";
74 void join_threads(pthread_t threads[],
unsigned int num_threads)
77 for (
unsigned int i = 0; i < num_threads; ++i) {
79 BESDEBUG(dmrpp_3,
"Join thread " << i <<
" after an exception was caught." << endl);
81 if ((status = pthread_join(threads[i], (
void **) &error)) < 0) {
82 BESDEBUG(dmrpp_3,
"Could not join thread " << i <<
", " << strerror(status)<< endl);
85 else if (error != NULL) {
86 BESDEBUG(dmrpp_3,
"Joined thread " << i <<
", error exit: " << *error << endl);
90 BESDEBUG(dmrpp_3,
"Joined thread " << i <<
", successful exit." << endl);
105 void DmrppCommon::parse_chunk_dimension_sizes(
const string &chunk_dims_string)
107 d_chunk_dimension_sizes.clear();
109 if (chunk_dims_string.empty())
return;
111 string chunk_dims = chunk_dims_string;
113 if (chunk_dims.find_first_not_of(
"1234567890 ") != string::npos)
114 throw BESInternalError(
"while processing chunk dimension information, illegal character(s)", __FILE__, __LINE__);
123 if (chunk_dims.find(space) != string::npos) {
125 while ((strPos = chunk_dims.find(space)) != string::npos) {
126 strVal = chunk_dims.substr(0, strPos);
128 d_chunk_dimension_sizes.push_back(strtol(strVal.c_str(), NULL, 10));
129 chunk_dims.erase(0, strPos + space.length());
135 d_chunk_dimension_sizes.push_back(strtol(chunk_dims.c_str(), NULL, 10));
144 void DmrppCommon::ingest_compression_type(
const string &compression_type_string)
146 if (compression_type_string.empty())
return;
152 string deflate(
"deflate");
153 string shuffle(
"shuffle");
156 if (compression_type_string.find(deflate) != string::npos) {
160 if (compression_type_string.find(shuffle) != string::npos) {
170 void DmrppCommon::ingest_byte_order(
const string &byte_order_string) {
172 if (byte_order_string.empty())
return;
175 if (byte_order_string.compare(
"LE") == 0) {
177 d_twiddle_bytes = is_host_big_endian();
179 if (byte_order_string.compare(
"BE") == 0) {
181 d_twiddle_bytes = !(is_host_big_endian());
189 std::string DmrppCommon::get_byte_order()
199 unsigned long DmrppCommon::add_chunk(
200 const string &data_url,
201 const string &byte_order,
202 unsigned long long size,
203 unsigned long long offset,
204 const string &position_in_array)
207 vector<unsigned int> cpia_vector;
208 Chunk::parse_chunk_position_in_array_string(position_in_array, cpia_vector);
209 return add_chunk(data_url, byte_order, size, offset, cpia_vector);
212 unsigned long DmrppCommon::add_chunk(
213 const string &data_url,
214 const string &byte_order,
215 unsigned long long size,
216 unsigned long long offset,
217 const vector<unsigned int> &position_in_array)
219 std::shared_ptr<Chunk> chunk(
new Chunk(data_url, byte_order, size, offset, position_in_array));
224 msg << prolog <<
"ERROR DmrrpCommon::add_chunk() may only be called on an instance of DmrppArray. ";
225 msg <<
"The variable";
226 auto bt =
dynamic_cast<libdap::BaseType *
>(
this);
228 msg <<
" " << bt->type_name() <<
" " << bt->name();
230 msg <<
" is not an instance of DmrppArray.";
231 msg <<
"this: " << (
void **)
this <<
" ";
232 msg <<
"byte_order: " << byte_order <<
" ";
233 msg <<
"size: " << size <<
" ";
234 msg <<
"offset: " << offset <<
" ";
238 if(d_super_chunks.empty())
239 d_super_chunks.push_back( shared_ptr<SuperChunk>(
new SuperChunk()));
241 auto currentSuperChunk = d_super_chunks.back();
243 bool chunk_was_added = currentSuperChunk->add_chunk(chunk);
244 if(!chunk_was_added){
245 if(currentSuperChunk->empty()){
247 msg << prolog <<
"ERROR! Failed to add a Chunk to an empty SuperChunk. This should not happen.";
251 currentSuperChunk = shared_ptr<SuperChunk>(
new SuperChunk());
252 chunk_was_added = currentSuperChunk->add_chunk(chunk);
253 if(!chunk_was_added) {
255 msg << prolog <<
"ERROR! Failed to add a Chunk to an empty SuperChunk. This should not happen.";
258 d_super_chunks.push_back(currentSuperChunk);
262 d_chunks.push_back(chunk);
263 return d_chunks.size();
284 DmrppCommon::read_atomic(
const string &name)
286 auto chunk_refs = get_chunks();
288 if (chunk_refs.size() != 1)
289 throw BESInternalError(
string(
"Expected only a single chunk for variable ") + name, __FILE__, __LINE__);
291 auto chunk = chunk_refs[0];
295 return chunk->get_rbuf();
302 DmrppCommon::print_chunks_element(XMLWriter &xml,
const string &name_space)
305 if (xmlTextWriterStartElementNS(xml.get_writer(), (
const xmlChar*)name_space.c_str(), (
const xmlChar*)
"chunks", NULL) < 0)
306 throw BESInternalError(
"Could not start chunks element.", __FILE__, __LINE__);
308 string compression =
"";
309 if (is_shuffle_compression() && is_deflate_compression())
310 compression =
"deflate shuffle";
311 else if (is_shuffle_compression())
312 compression.append(
"shuffle");
313 else if (is_deflate_compression())
314 compression.append(
"deflate");
316 if (!compression.empty())
317 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar*)
"compressionType", (
const xmlChar*) compression.c_str()) < 0)
318 throw BESInternalError(
"Could not write compression attribute.", __FILE__, __LINE__);
321 if(!get_chunks().empty()){
322 auto first_chunk = get_chunks().front();
323 if (!first_chunk->get_byte_order().empty()) {
324 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"byteOrder",
325 (
const xmlChar *) first_chunk->get_byte_order().c_str()) < 0)
326 throw BESInternalError(
"Could not write attribute byteOrder", __FILE__, __LINE__);
330 if (d_chunk_dimension_sizes.size() > 0) {
333 copy(d_chunk_dimension_sizes.begin(), d_chunk_dimension_sizes.end(), ostream_iterator<unsigned int>(oss,
" "));
334 string sizes = oss.str();
335 sizes.erase(sizes.size() - 1, 1);
337 if (xmlTextWriterWriteElementNS(xml.get_writer(), (
const xmlChar*) name_space.c_str(), (
const xmlChar*)
"chunkDimensionSizes", NULL,
338 (
const xmlChar*) sizes.c_str()) < 0)
throw BESInternalError(
"Could not write chunkDimensionSizes attribute.", __FILE__, __LINE__);
344 for(
auto chunk: get_chunks()){
346 if (xmlTextWriterStartElementNS(xml.get_writer(), (
const xmlChar*)name_space.c_str(), (
const xmlChar*)
"chunk", NULL) < 0)
347 throw BESInternalError(
"Could not start element chunk", __FILE__, __LINE__);
350 ostringstream offset;
351 offset << chunk->get_offset();
352 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar*)
"offset", (
const xmlChar*) offset.str().c_str()) < 0)
353 throw BESInternalError(
"Could not write attribute offset", __FILE__, __LINE__);
356 ostringstream nBytes;
357 nBytes << chunk->get_size();
358 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar*)
"nBytes", (
const xmlChar*) nBytes.str().c_str()) < 0)
359 throw BESInternalError(
"Could not write attribute nBytes", __FILE__, __LINE__);
361 if (chunk->get_position_in_array().size() > 0) {
363 vector<unsigned int> pia = chunk->get_position_in_array();
366 copy(pia.begin(), pia.end(), ostream_iterator<unsigned int>(oss,
","));
367 string pia_str = oss.str();
368 if (pia.size() > 0) pia_str.replace(pia_str.size() - 1, 1,
"]");
369 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar*)
"chunkPositionInArray", (
const xmlChar*) pia_str.c_str()) < 0)
370 throw BESInternalError(
"Could not write attribute position in array", __FILE__, __LINE__);
374 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
throw BESInternalError(
"Could not end chunk element", __FILE__, __LINE__);
377 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
throw BESInternalError(
"Could not end chunks element", __FILE__, __LINE__);
384 DmrppCommon::print_compact_element(XMLWriter &xml,
const string &name_space,
const std::string &encoded)
388 copy(encoded.begin(), encoded.end(), ostream_iterator<char>(oss,
""));
389 string sizes = oss.str();
391 if (xmlTextWriterWriteElementNS(xml.get_writer(), (
const xmlChar *) name_space.c_str(),
392 (
const xmlChar *)
"compact", NULL,
393 (
const xmlChar *) sizes.c_str()) < 0)
394 throw BESInternalError(
"Could not write compact element.", __FILE__, __LINE__);
407 void DmrppCommon::print_dmrpp(XMLWriter &xml,
bool constrained )
409 BaseType &bt =
dynamic_cast<BaseType&
>(*this);
410 if (constrained && !bt.send_p())
413 if (xmlTextWriterStartElement(xml.get_writer(), (
const xmlChar*)bt.type_name().c_str()) < 0)
414 throw InternalErr(__FILE__, __LINE__,
"Could not write " + bt.type_name() +
" element");
416 if (!bt.name().empty())
417 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar*)
"name", (
const xmlChar*)bt.name().c_str()) < 0)
418 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
421 bt.attributes()->print_dap4(xml);
423 if (!bt.is_dap4() && bt.get_attr_table().get_size() > 0)
424 bt.get_attr_table().print_xml_writer(xml);
427 if (DmrppCommon::d_print_chunks && get_immutable_chunks().size() > 0)
428 print_chunks_element(xml, DmrppCommon::d_ns_prefix);
430 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
431 throw InternalErr(__FILE__, __LINE__,
"Could not end " + bt.type_name() +
" element");
434 void DmrppCommon::dump(ostream & strm)
const
436 strm << BESIndent::LMarg <<
"is_deflate: " << (is_deflate_compression() ?
"true" :
"false") << endl;
437 strm << BESIndent::LMarg <<
"is_shuffle_compression: " << (is_shuffle_compression() ?
"true" :
"false") << endl;
439 const vector<unsigned int> &chunk_dim_sizes = get_chunk_dimension_sizes();
441 strm << BESIndent::LMarg <<
"chunk dimension sizes: [";
442 for (
unsigned int i = 0; i < chunk_dim_sizes.size(); i++) {
443 strm << (i ?
"][" :
"") << chunk_dim_sizes[i];
447 auto chunk_refs = get_immutable_chunks();
448 strm << BESIndent::LMarg <<
"Chunks (aka chunks):" << (chunk_refs.size() ?
"" :
"None Found.") << endl;
450 for (
auto & chunk_ref : chunk_refs) {
451 strm << BESIndent::LMarg;
452 chunk_ref->dump(strm);
exception thrown if internal error encountered
Extend libdap::Array so that a handler can read data using a DMR++ file.