bes Updated for version 3.20.10
Chunk.cc
1// -*- mode: c++; c-basic-offset:4 -*-
2
3// This file is part of the BES
4
5// Copyright (c) 2016 OPeNDAP, Inc.
6// Author: Nathan Potter <ndp@opendap.org>
7//
8// This library is free software; you can redistribute it and/or
9// modify it under the terms of the GNU Lesser General Public
10// License as published by the Free Software Foundation; either
11// version 2.1 of the License, or (at your option) any later version.
12//
13// This library is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16// Lesser General Public License for more details.
17//
18// You should have received a copy of the GNU Lesser General Public
19// License along with this library; if not, write to the Free Software
20// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21//
22// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23
24#include "config.h"
25
26#include <sstream>
27#include <cstring>
28#include <cassert>
29
30#include <zlib.h>
31
32#include <BESDebug.h>
33#include <BESLog.h>
34#include <BESInternalError.h>
35#include <BESSyntaxUserError.h>
36#include <BESForbiddenError.h>
37#include <BESContextManager.h>
38#include <BESUtil.h>
39
40#define PUGIXML_NO_XPATH
41#define PUGIXML_HEADER_ONLY
42#include <pugixml.hpp>
43
44#include "Chunk.h"
45#include "CurlUtils.h"
46#include "CurlHandlePool.h"
47#include "EffectiveUrlCache.h"
48#include "DmrppRequestHandler.h"
49#include "DmrppNames.h"
50
51using namespace std;
53
54#define prolog std::string("Chunk::").append(__func__).append("() - ")
55
56#define FLETCHER32_CHECKSUM 4 // Bytes in the fletcher32 checksum
57#define ACTUALLY_USE_FLETCHER32_CHECKSUM 1 // Computing checksums takes time...
58
59namespace dmrpp {
60
73size_t chunk_header_callback(char *buffer, size_t /*size*/, size_t nitems, void *data) {
74 // received header is nitems * size long in 'buffer' NOT ZERO TERMINATED
75 // 'userdata' is set with CURLOPT_HEADERDATA
76 // 'size' is always 1
77
78 // -2 strips of the CRLF at the end of the header
79 string header(buffer, buffer + nitems - 2);
80
81 // Look for the content type header and store its value in the Chunk
82 if (header.find("Content-Type") != string::npos) {
83 // Header format 'Content-Type: <value>'
84 auto c_ptr = reinterpret_cast<Chunk *>(data);
85 c_ptr->set_response_content_type(header.substr(header.find_last_of(' ') + 1));
86 }
87
88 return nitems;
89}
90
96void process_s3_error_response(const shared_ptr<http::url> &data_url, const string &xml_message)
97{
98#if 0
99 string json_message = xml2json(xml_message.c_str());
101 d.Parse(json_message.c_str());
102 // rapidjson::Value &message = d["Error"]["Message"];
103 rapidjson::Value &code = d["Error"]["Code"];
104#endif
105 // See https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
106 // for the low-down on this XML document.
107 pugi::xml_document error;
108 pugi::xml_parse_result result = error.load_string(xml_message.c_str());
109 if (!result)
110 throw BESInternalError("The underlying data store returned an unintelligible error message.", __FILE__, __LINE__);
111
112 pugi::xml_node err_elmnt = error.document_element();
113 if (!err_elmnt || (strcmp(err_elmnt.name(), "Error") != 0))
114 throw BESInternalError("The underlying data store returned a bogus error message.", __FILE__, __LINE__);
115
116 string code = err_elmnt.child_value("Code");
117 string message = err_elmnt.child_value("Message");
118
119 // We might want to get the "Code" from the "Error" if these text messages
120 // are not good enough. But the "Code" is not really suitable for normal humans...
121 // jhrg 12/31/19
122
123 if (code == "AccessDenied") {
124 stringstream msg;
125 msg << prolog << "ACCESS DENIED - The underlying object store has refused access to: ";
126 msg << data_url->str() << " Object Store Message: " << message;
127 BESDEBUG(MODULE, msg.str() << endl);
128 VERBOSE(msg.str() << endl);
129 throw BESForbiddenError(msg.str(), __FILE__, __LINE__);
130 }
131 else {
132 stringstream msg;
133 msg << prolog << "ERROR - The underlying object store returned an error. ";
134 msg << "(Tried: " << data_url->str() << ") Object Store Message: " << message;
135 BESDEBUG(MODULE, msg.str() << endl);
136 VERBOSE(msg.str() << endl);
137 throw BESInternalError(msg.str(), __FILE__, __LINE__);
138 }
139}
140
154size_t chunk_write_data(void *buffer, size_t size, size_t nmemb, void *data) {
155 BESDEBUG(MODULE, prolog << "BEGIN " << endl);
156 size_t nbytes = size * nmemb;
157 auto chunk = reinterpret_cast<Chunk *>(data);
158
159
160 auto data_url = chunk->get_data_url();
161 BESDEBUG(MODULE, prolog << "chunk->get_data_url():" << data_url << endl);
162
163 // When Content-Type is 'application/xml,' that's an error. jhrg 6/9/20
164 BESDEBUG(MODULE, prolog << "chunk->get_response_content_type():" << chunk->get_response_content_type() << endl);
165 if (chunk->get_response_content_type().find("application/xml") != string::npos) {
166 // At this point we no longer care about great performance - error msg readability
167 // is more important. jhrg 12/30/19
168 string xml_message = reinterpret_cast<const char *>(buffer);
169 xml_message.erase(xml_message.find_last_not_of("\t\n\v\f\r 0") + 1);
170 // Decode the AWS XML error message. In some cases this will fail because pub keys,
171 // which maybe in this error text, may have < or > chars in them. the XML parser
172 // will be sad if that happens. jhrg 12/30/19
173 try {
174 process_s3_error_response(data_url, xml_message); // throws a BESError
175 }
176 catch (BESError) {
177 // re-throw any BESError - added for the future if we make BESError a child
178 // of std::exception as it should be. jhrg 12/30/19
179 throw;
180 }
181 catch (std::exception &e) {
182 stringstream msg;
183 msg << prolog << "Caught std::exception when accessing object store data.";
184 msg << " (Tried: " << data_url->str() << ")" << " Message: " << e.what();
185 BESDEBUG(MODULE, msg.str() << endl);
186 throw BESSyntaxUserError(msg.str(), __FILE__, __LINE__);
187 }
188 }
189
190 // rbuf: |******++++++++++----------------------|
191 // ^ ^ bytes_read + nbytes
192 // | bytes_read
193
194 unsigned long long bytes_read = chunk->get_bytes_read();
195
196 // If this fails, the code will write beyond the buffer.
197 if (bytes_read + nbytes > chunk->get_rbuf_size()) {
198 stringstream msg;
199 msg << prolog << "ERROR! The number of bytes_read: " << bytes_read << " plus the number of bytes to read: "
200 << nbytes << " is larger than the target buffer size: " << chunk->get_rbuf_size();
201 BESDEBUG(MODULE, msg.str() << endl);
202 DmrppRequestHandler::curl_handle_pool->release_all_handles();
203 throw BESInternalError(msg.str(), __FILE__, __LINE__);
204 }
205
206 memcpy(chunk->get_rbuf() + bytes_read, buffer, nbytes);
207 chunk->set_bytes_read(bytes_read + nbytes);
208
209 BESDEBUG(MODULE, prolog << "END" << endl);
210
211 return nbytes;
212}
213
224void inflate(char *dest, unsigned long long dest_len, char *src, unsigned long long src_len) {
225 /* Sanity check */
226 assert(src_len > 0);
227 assert(src);
228 assert(dest_len > 0);
229 assert(dest);
230
231 /* Input; uncompress */
232 z_stream z_strm; /* zlib parameters */
233
234 /* Set the decompression parameters */
235 memset(&z_strm, 0, sizeof(z_strm));
236 z_strm.next_in = (Bytef *) src;
237 z_strm.avail_in = src_len;
238 z_strm.next_out = (Bytef *) dest;
239 z_strm.avail_out = dest_len;
240
241 /* Initialize the decompression routines */
242 if (Z_OK != inflateInit(&z_strm))
243 throw BESError("Failed to initialize inflate software.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
244
245 /* Loop to uncompress the buffer */
246 int status = Z_OK;
247 do {
248 /* Uncompress some data */
249 status = inflate(&z_strm, Z_SYNC_FLUSH);
250
251 /* Check if we are done decompressing data */
252 if (Z_STREAM_END == status) break; /*done*/
253
254 /* Check for error */
255 if (Z_OK != status) {
256 stringstream err_msg;
257 err_msg << "Failed to inflate data chunk.";
258 char *err_msg_cstr = z_strm.msg;
259 if(err_msg_cstr)
260 err_msg << " zlib message: " << err_msg_cstr;
261 (void) inflateEnd(&z_strm);
262 throw BESError(err_msg.str(), BES_INTERNAL_ERROR, __FILE__, __LINE__);
263 }
264 else {
265 /* If we're not done and just ran out of buffer space, it's an error.
266 * The HDF5 library code would extend the buffer as-needed, but for
267 * this handler, we should always know the size of the decompressed chunk.
268 */
269 if (0 == z_strm.avail_out) {
270 throw BESError("Data buffer is not big enough for uncompressed data.", BES_INTERNAL_ERROR, __FILE__, __LINE__);
271#if 0
272 /* Here's how to extend the buffer if needed. This might be useful some day... */
273 void *new_outbuf; /* Pointer to new output buffer */
274
275 /* Allocate a buffer twice as big */
276 nalloc *= 2;
277 if (NULL == (new_outbuf = H5MM_realloc(outbuf, nalloc))) {
278 (void) inflateEnd(&z_strm);
279 HGOTO_ERROR(H5E_RESOURCE, H5E_NOSPACE, 0, "memory allocation failed for inflate decompression")
280 } /* end if */
281 outbuf = new_outbuf;
282
283 /* Update pointers to buffer for next set of uncompressed data */
284 z_strm.next_out = (unsigned char*) outbuf + z_strm.total_out;
285 z_strm.avail_out = (uInt) (nalloc - z_strm.total_out);
286#endif
287 } /* end if */
288 } /* end else */
289 } while (true /* status == Z_OK */); // Exit via the break statement after the call to inflate(). jhrg 11/8/21
290
291 /* Finish decompressing the stream */
292 (void) inflateEnd(&z_strm);
293}
294
295// #define this to enable the duff's device loop unrolling code.
296// jhrg 1/19/17
297#define DUFFS_DEVICE
298
320void unshuffle(char *dest, const char *src, unsigned long long src_size, unsigned long long width) {
321 unsigned long long elems = src_size / width; // int division rounds down
322
323 /* Don't do anything for 1-byte elements, or "fractional" elements */
324 if (!(width > 1 && elems > 1)) {
325 memcpy(dest, const_cast<char *>(src), src_size);
326 }
327 else {
328 /* Get the pointer to the source buffer (Alias for source buffer) */
329 char *_src = const_cast<char *>(src);
330 char *_dest = 0; // Alias for destination buffer
331
332 /* Input; unshuffle */
333 for (unsigned int i = 0; i < width; i++) {
334 _dest = dest + i;
335#ifndef DUFFS_DEVICE
336 size_t j = elems;
337 while(j > 0) {
338 *_dest = *_src++;
339 _dest += width;
340
341 j--;
342 }
343#else /* DUFFS_DEVICE */
344 {
345 size_t duffs_index = (elems + 7) / 8; /* Counting index for Duff's device */
346 switch (elems % 8) {
347 default:
348 assert(0 && "This Should never be executed!");
349 break;
350 case 0:
351 do {
352 // This macro saves repeating the same line 8 times
353#define DUFF_GUTS *_dest = *_src++; _dest += width;
354
355 DUFF_GUTS
356 case 7:
357 DUFF_GUTS
358 case 6:
359 DUFF_GUTS
360 case 5:
361 DUFF_GUTS
362 case 4:
363 DUFF_GUTS
364 case 3:
365 DUFF_GUTS
366 case 2:
367 DUFF_GUTS
368 case 1:
369 DUFF_GUTS
370 } while (--duffs_index > 0);
371 } /* end switch */
372 } /* end block */
373#endif /* DUFFS_DEVICE */
374
375 } /* end for i = 0 to width*/
376
377 /* Compute the leftover bytes if there are any */
378 size_t leftover = src_size % width;
379
380 /* Add leftover to the end of data */
381 if (leftover > 0) {
382 /* Adjust back to end of shuffled bytes */
383 _dest -= (width - 1); /*lint !e794 _dest is initialized */
384 memcpy((void *) _dest, (void *) _src, leftover);
385 }
386 } /* end if width and elems both > 1 */
387}
388
394static void split_by_comma(const string &s, vector<unsigned long long> &res)
395{
396 const string delimiter = ",";
397 const size_t delim_len = delimiter.length();
398
399 size_t pos_start = 0, pos_end;
400
401 while ((pos_end = s.find (delimiter, pos_start)) != string::npos) {
402 res.push_back (stoull(s.substr(pos_start, pos_end - pos_start)));
403 pos_start = pos_end + delim_len;
404 }
405
406 res.push_back (stoull(s.substr (pos_start)));
407}
408
409void Chunk::parse_chunk_position_in_array_string(const string &pia, vector<unsigned long long> &cpia_vect)
410{
411 if (pia.empty()) return;
412
413 if (!cpia_vect.empty()) cpia_vect.clear();
414
415 // Assume input is [x,y,...,z] where x, ..., are integers; modest syntax checking
416 // [1] is a minimal 'position in array' string.
417 if (pia.find('[') == string::npos || pia.find(']') == string::npos || pia.length() < 3)
418 throw BESInternalError("while parsing a DMR++, chunk position string malformed", __FILE__, __LINE__);
419
420 if (pia.find_first_not_of("[]1234567890,") != string::npos)
421 throw BESInternalError("while parsing a DMR++, chunk position string illegal character(s)", __FILE__, __LINE__);
422
423#if 0
424 // strip off []; iss holds x,y,...,z
425 istringstream iss(pia.substr(1, pia.length() - 2));
426
427 char c;
428 unsigned int i;
429 while (!iss.eof()) {
430 iss >> i; // read an integer
431 cpia_vect.push_back(i);
432 iss >> c; // read a separator (,)
433 }
434#else
435 try {
436 split_by_comma(pia.substr(1, pia.length() - 2), cpia_vect);
437 }
438 catch(std::invalid_argument &e) {
439 throw BESInternalError(string("while parsing a DMR++, chunk position string illegal character(s): ").append(e.what()), __FILE__, __LINE__);
440 }
441#endif
442}
443
444
458void Chunk::set_position_in_array(const string &pia) {
459#if 0
460 if (pia.empty()) return;
461
462 if (d_chunk_position_in_array.size()) d_chunk_position_in_array.clear();
463
464 // Assume input is [x,y,...,z] where x, ..., are integers; modest syntax checking
465 // [1] is a minimal 'position in array' string.
466 if (pia.find('[') == string::npos || pia.find(']') == string::npos || pia.length() < 3)
467 throw BESInternalError("while parsing a DMR++, chunk position string malformed", __FILE__, __LINE__);
468
469 if (pia.find_first_not_of("[]1234567890,") != string::npos)
470 throw BESInternalError("while parsing a DMR++, chunk position string illegal character(s)", __FILE__, __LINE__);
471
472 // strip off []; iss holds x,y,...,z
473 istringstream iss(pia.substr(1, pia.length() - 2));
474
475 char c;
476 unsigned int i;
477 while (!iss.eof()) {
478 iss >> i; // read an integer
479 d_chunk_position_in_array.push_back(i);
480 iss >> c; // read a separator (,)
481 }
482#endif
483 parse_chunk_position_in_array_string(pia,d_chunk_position_in_array);
484}
485
494void Chunk::set_position_in_array(const std::vector<unsigned long long> &pia) {
495 if (pia.empty()) return;
496
497 if (!d_chunk_position_in_array.empty()) d_chunk_position_in_array.clear();
498
499 d_chunk_position_in_array = pia;
500}
501
510 return curl::get_range_arg_string(d_offset, d_size);
511}
512
527
528 // If there is no data url then there is nothing to add the parameter too.
529 if(d_data_url == nullptr)
530 return;
531
532 bool found = false;
533 string cloudydap_context_value = BESContextManager::TheManager()->get_context(S3_TRACKING_CONTEXT, found);
534 if (!found)
535 return;
536
551 bool add_tracking = false;
552
553 // All S3 buckets, virtual host style URL
554 // Simpler regex that's likely equivalent:
555 // ^https?:\/\/[a-z0-9]([-.a-z0-9]){1,61}[a-z0-9]\.s3[-.]us-(east|west)-[12])?\.amazonaws\.com\/.*$
556 string s3_vh_regex_str = R"(^https?:\/\/([a-z]|[0-9])(([a-z]|[0-9]|\.|-){1,61})([a-z]|[0-9])\.s3((\.|-)us-(east|west)-(1|2))?\.amazonaws\.com\/.*$)";
557
558 BESRegex s3_vh_regex(s3_vh_regex_str.c_str());
559 int match_result = s3_vh_regex.match(d_data_url->str().c_str(), d_data_url->str().length());
560 if(match_result>=0) {
561 auto match_length = (unsigned int) match_result;
562 if (match_length == d_data_url->str().length()) {
563 BESDEBUG(MODULE,
564 prolog << "FULL MATCH. pattern: " << s3_vh_regex_str << " url: " << d_data_url->str() << endl);
565 add_tracking = true;;
566 }
567 }
568
569 if(!add_tracking){
570 // All S3 buckets, path style URL
571 string s3_path_regex_str = R"(^https?:\/\/s3((\.|-)us-(east|west)-(1|2))?\.amazonaws\.com\/([a-z]|[0-9])(([a-z]|[0-9]|\.|-){1,61})([a-z]|[0-9])\/.*$)";
572 BESRegex s3_path_regex(s3_path_regex_str.c_str());
573 match_result = s3_path_regex.match(d_data_url->str().c_str(), d_data_url->str().length());
574 if(match_result>=0) {
575 auto match_length = (unsigned int) match_result;
576 if (match_length == d_data_url->str().length()) {
577 BESDEBUG(MODULE,
578 prolog << "FULL MATCH. pattern: " << s3_vh_regex_str << " url: " << d_data_url->str() << endl);
579 add_tracking = true;;
580 }
581 }
582 }
583
584 if (add_tracking) {
585 // Yup, headed to S3.
586 d_query_marker.append(S3_TRACKING_CONTEXT).append("=").append(cloudydap_context_value);
587 }
588}
589
596uint32_t
597checksum_fletcher32(const void *_data, size_t _len)
598{
599 const auto *data = (const uint8_t *)_data; // Pointer to the data to be summed
600 size_t len = _len / 2; // Length in 16-bit words
601 uint32_t sum1 = 0, sum2 = 0;
602
603 // Sanity check
604 assert(_data);
605 assert(_len > 0);
606
607 // Compute checksum for pairs of bytes
608 // (the magic "360" value is the largest number of sums that can be performed without numeric overflow)
609 while (len) {
610 size_t tlen = len > 360 ? 360 : len;
611 len -= tlen;
612 do {
613 sum1 += (uint32_t)(((uint16_t)data[0]) << 8) | ((uint16_t)data[1]);
614 data += 2;
615 sum2 += sum1;
616 } while (--tlen);
617 sum1 = (sum1 & 0xffff) + (sum1 >> 16);
618 sum2 = (sum2 & 0xffff) + (sum2 >> 16);
619 }
620
621 /* Check for odd # of bytes */
622 if(_len % 2) {
623 sum1 += (uint32_t)(((uint16_t)*data) << 8);
624 sum2 += sum1;
625 sum1 = (sum1 & 0xffff) + (sum1 >> 16);
626 sum2 = (sum2 & 0xffff) + (sum2 >> 16);
627 } /* end if */
628
629 /* Second reduction step to reduce sums to 16 bits */
630 sum1 = (sum1 & 0xffff) + (sum1 >> 16);
631 sum2 = (sum2 & 0xffff) + (sum2 >> 16);
632
633 return ((sum2 << 16) | sum1);
634} /* end H5_checksum_fletcher32() */
635
636#if 0
648void Chunk::inflate_chunk(bool deflate, bool shuffle, bool fletcher32, unsigned long long chunk_size,
649 unsigned long long elem_width) {
650 // This code is pretty naive - there are apparently a number of
651 // different ways HDF5 can compress data, and it does also use a scheme
652 // where several algorithms can be applied in sequence. For now, get
653 // simple zlib deflate working.jhrg 1/15/17
654 // Added support for shuffle. Assuming unshuffle always is applied _after_
655 // inflating the data (reversing the shuffle --> deflate process). It is
656 // possible that data could just be deflated or shuffled (because we
657 // have test data are use only shuffle). jhrg 1/20/17
658 // The file that implements the deflate filter is H5Zdeflate.c in the hdf5 source.
659 // The file that implements the shuffle filter is H5Zshuffle.c.
660
661 if (d_is_inflated)
662 return;
663
664 chunk_size *= elem_width;
665
666 if (deflate) {
667 char *dest = new char[chunk_size];
668 try {
669 inflate(dest, chunk_size, get_rbuf(), get_rbuf_size());
670 // This replaces (and deletes) the original read_buffer with dest.
671#if DMRPP_USE_SUPER_CHUNKS
672 set_read_buffer(dest, chunk_size, chunk_size, true);
673#else
674 set_rbuf(dest, chunk_size);
675#endif
676 }
677 catch (...) {
678 delete[] dest;
679 throw;
680 }
681 }
682
683 if (shuffle) {
684 // The internal buffer is chunk's full size at this point.
685 char *dest = new char[get_rbuf_size()];
686 try {
687 unshuffle(dest, get_rbuf(), get_rbuf_size(), elem_width);
688#if DMRPP_USE_SUPER_CHUNKS
690#else
691 set_rbuf(dest, get_rbuf_size());
692#endif
693 }
694 catch (...) {
695 delete[] dest;
696 throw;
697 }
698 }
699
700 if (fletcher32) {
701 // Compute the fletcher32 checksum and compare to the value of the last four bytes of the chunk.
702#if ACTUALLY_USE_FLETCHER32_CHECKSUM
703 // Get the last four bytes of chunk's data (which is a byte array) and treat that as the four-byte
704 // integer fletcher32 checksum. jhrg 10/15/21
705#pragma GCC diagnostic push
706#pragma GCC diagnostic ignored "-Wcast-align"
707 assert(get_rbuf_size() - FLETCHER32_CHECKSUM >= 0);
708 assert((get_rbuf_size() - FLETCHER32_CHECKSUM) % 4 == 0);
709 auto f_checksum = *(uint32_t *)(get_rbuf() + get_rbuf_size() - FLETCHER32_CHECKSUM);
710#pragma GCC diagnostic pop
711
712 // If the code should actually use the checksum (they can be expensive to compute), does it match
713 // with once computed on the data actually read? Maybe make this a bes.conf parameter?
714 // jhrg 10/15/21
715 if (f_checksum != checksum_fletcher32((const void *)get_rbuf(), get_rbuf_size() - FLETCHER32_CHECKSUM)) {
716 throw BESInternalError("Data read from the DMR++ handler did not match the Fletcher32 checksum.",
717 __FILE__, __LINE__);
718 }
719#endif
720 if (d_read_buffer_size > FLETCHER32_CHECKSUM)
721 d_read_buffer_size -= FLETCHER32_CHECKSUM;
722 else {
723 throw BESInternalError("Data filtered with fletcher32 don't include the four-byte checksum.",
724 __FILE__, __LINE__);
725 }
726 }
727
728 d_is_inflated = true;
729
730#if 0 // This was handy during development for debugging. Keep it for a while (year or two) before we drop it ndp - 01/18/17
731 if(BESDebug::IsSet(MODULE)) {
732 unsigned long long chunk_buf_size = get_rbuf_size();
733 dods_float32 *vals = (dods_float32 *) get_rbuf();
734 ostream *os = BESDebug::GetStrm();
735 (*os) << std::fixed << std::setfill('_') << std::setw(10) << std::setprecision(0);
736 (*os) << "DmrppArray::"<< __func__ <<"() - Chunk[" << i << "]: " << endl;
737 for(unsigned long long k=0; k< chunk_buf_size/prototype()->width(); k++) {
738 (*os) << vals[k] << ", " << ((k==0)|((k+1)%10)?"":"\n");
739 }
740 }
741#endif
742}
743#endif
744
755void Chunk::filter_chunk(const string &filters, unsigned long long chunk_size, unsigned long long elem_width) {
756
757 if (d_is_inflated)
758 return;
759
760 chunk_size *= elem_width;
761
762 vector<string> filter_array = BESUtil::split(filters, ' ' );
763
764 for (auto i = filter_array.rbegin(), e = filter_array.rend(); i != e; ++i){
765 string filter = *i;
766
767 if (filter == "deflate"){
768 char *dest = new char[chunk_size];
769 try {
770 inflate(dest, chunk_size, get_rbuf(), get_rbuf_size());
771 // This replaces (and deletes) the original read_buffer with dest.
772#if DMRPP_USE_SUPER_CHUNKS
773 set_read_buffer(dest, chunk_size, chunk_size, true);
774#else
775 set_rbuf(dest, chunk_size);
776#endif
777 }
778 catch (...) {
779 delete[] dest;
780 throw;
781 }
782 }// end if(filter == deflate)
783 else if (filter == "shuffle"){
784 // The internal buffer is chunk's full size at this point.
785 char *dest = new char[get_rbuf_size()];
786 try {
787 unshuffle(dest, get_rbuf(), get_rbuf_size(), elem_width);
788#if DMRPP_USE_SUPER_CHUNKS
790#else
791 set_rbuf(dest, get_rbuf_size());
792#endif
793 }
794 catch (...) {
795 delete[] dest;
796 throw;
797 }
798 }//end if(filter == shuffle)
799 else if (filter == "fletcher32"){
800 // Compute the fletcher32 checksum and compare to the value of the last four bytes of the chunk.
801#if ACTUALLY_USE_FLETCHER32_CHECKSUM
802 // Get the last four bytes of chunk's data (which is a byte array) and treat that as the four-byte
803 // integer fletcher32 checksum. jhrg 10/15/21
804#pragma GCC diagnostic push
805#pragma GCC diagnostic ignored "-Wcast-align"
806 assert(get_rbuf_size() > FLETCHER32_CHECKSUM);
807 //assert((get_rbuf_size() - FLETCHER32_CHECKSUM) % 4 == 0); //probably wrong
808 auto f_checksum = *(uint32_t *)(get_rbuf() + get_rbuf_size() - FLETCHER32_CHECKSUM);
809#pragma GCC diagnostic pop
810
811 // If the code should actually use the checksum (they can be expensive to compute), does it match
812 // with once computed on the data actually read? Maybe make this a bes.conf parameter?
813 // jhrg 10/15/21
814 uint32_t calc_checksum = checksum_fletcher32((const void *)get_rbuf(), get_rbuf_size() - FLETCHER32_CHECKSUM);
815 if (f_checksum != calc_checksum) {
816 throw BESInternalError("Data read from the DMR++ handler did not match the Fletcher32 checksum.",
817 __FILE__, __LINE__);
818 }
819#endif
820 if (d_read_buffer_size > FLETCHER32_CHECKSUM)
821 d_read_buffer_size -= FLETCHER32_CHECKSUM;
822 else {
823 throw BESInternalError("Data filtered with fletcher32 don't include the four-byte checksum.",
824 __FILE__, __LINE__);
825 }
826 } //end if(filter == fletcher32)
827 }// end for loop
828 d_is_inflated = true;
829}
830
841 if (d_is_read) {
842 BESDEBUG(MODULE, prolog << "Already been read! Returning." << endl);
843 return;
844 }
845
847
848 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(this);
849 if (!handle)
850 throw BESInternalError(prolog + "No more libcurl handles.", __FILE__, __LINE__);
851
852 try {
853 handle->read_data(); // throws if error
854 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
855 }
856 catch(...) {
857 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
858 throw;
859 }
860
861 // If the expected byte count was not read, it's an error.
862 if (get_size() != get_bytes_read()) {
863 ostringstream oss;
864 oss << "Wrong number of bytes read for chunk; read: " << get_bytes_read() << ", expected: " << get_size();
865 throw BESInternalError(oss.str(), __FILE__, __LINE__);
866 }
867
868 d_is_read = true;
869}
870
880void Chunk::dump(ostream &oss) const {
881 oss << "Chunk";
882 oss << "[ptr='" << (void *) this << "']";
883 oss << "[data_url='" << d_data_url->str() << "']";
884 oss << "[offset=" << d_offset << "]";
885 oss << "[size=" << d_size << "]";
886 oss << "[chunk_position_in_array=(";
887 for (unsigned long i = 0; i < d_chunk_position_in_array.size(); i++) {
888 if (i) oss << ",";
889 oss << d_chunk_position_in_array[i];
890 }
891 oss << ")]";
892 oss << "[is_read=" << d_is_read << "]";
893 oss << "[is_inflated=" << d_is_inflated << "]";
894}
895
896string Chunk::to_string() const {
897 std::ostringstream oss;
898 dump(oss);
899 return oss.str();
900}
901
902
903std::shared_ptr<http::url> Chunk::get_data_url() const {
904
905 std::shared_ptr<http::EffectiveUrl> effective_url = EffectiveUrlCache::TheCache()->get_effective_url(d_data_url);
906 BESDEBUG(MODULE, prolog << "Using data_url: " << effective_url->str() << endl);
907
908 //A conditional call to void Chunk::add_tracking_query_param()
909 // here for the NASA cost model work THG's doing. jhrg 8/7/18
910 if (!d_query_marker.empty()) {
911 string url_str = effective_url->str();
912 if(url_str.find("?") != string::npos){
913 url_str += "&";
914 }
915 else {
916 url_str +="?";
917 }
918 url_str += d_query_marker;
919 shared_ptr<http::url> query_marker_url( new http::url(url_str));
920 return query_marker_url;
921 }
922
923 return effective_url;
924}
925
926} // namespace dmrpp
927
virtual std::string get_context(const std::string &name, bool &found)
retrieve the value of the specified context from the BES
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
Definition: BESDebug.h:168
static std::ostream * GetStrm()
return the debug stream
Definition: BESDebug.h:187
Abstract exception class for the BES with basic string message.
Definition: BESError.h:58
error thrown if the BES is not allowed to access the resource requested
exception thrown if internal error encountered
Regular expression matching.
Definition: BESRegex.h:53
int match(const char *s, int len, int pos=0) const
Does the pattern match.
Definition: BESRegex.cc:127
error thrown if there is a user syntax error in the request or any other user error
static std::vector< std::string > split(const std::string &s, char delim='/', bool skip_empty=true)
Splits the string s into the return vector of tokens using the delimiter delim and skipping empty val...
Definition: BESUtil.cc:1159
virtual void dump(std::ostream &strm) const
Definition: Chunk.cc:880
virtual char * get_rbuf()
Definition: Chunk.h:381
virtual void read_chunk()
Definition: Chunk.cc:840
void add_tracking_query_param()
Modify this chunk's data URL so that it includes tracking info.
Definition: Chunk.cc:526
virtual std::string get_curl_range_arg_string()
Returns a curl range argument. The libcurl requires a string argument for range-ge activitys,...
Definition: Chunk.cc:509
virtual std::shared_ptr< http::url > get_data_url() const
Get the data url for this Chunk's data block.
Definition: Chunk.cc:903
virtual void set_rbuf_to_size()
Allocates the internal read buffer to be d_size bytes.
Definition: Chunk.h:366
virtual unsigned long long get_bytes_read() const
Get the number of bytes read so far for this Chunk.
Definition: Chunk.h:338
void set_position_in_array(const std::string &pia)
parse the chunk position string
Definition: Chunk.cc:458
virtual unsigned long long get_rbuf_size() const
Definition: Chunk.h:442
virtual unsigned long long get_size() const
Get the size of this Chunk's data block on disk.
Definition: Chunk.h:309
void set_read_buffer(char *buf, unsigned long long buf_size, unsigned long long bytes_read=0, bool assume_ownership=true)
Set the target read buffer for this chunk.
Definition: Chunk.h:423
virtual void filter_chunk(const std::string &filters, unsigned long long chunk_size, unsigned long long elem_width)
filter data in the chunk
Definition: Chunk.cc:755
Bundle a libcurl easy handle with other information.
void read_data()
This is the read_data() method for all transfers.
GenericValue< UTF8<> > Value
GenericValue with UTF8 encoding.
Definition: document.h:2189
GenericDocument< UTF8<> > Document
GenericDocument with UTF8 encoding.
Definition: document.h:2585