45#include <libdap/D4Dimensions.h>
46#include <libdap/D4StreamMarshaller.h>
48#include "BESInternalError.h"
51#include "TheBESKeys.h"
54#include "BESStopWatch.h"
59#include "EffectiveUrl.h"
60#include "EffectiveUrlCache.h"
61#include "RemoteResource.h"
64#include "CredentialsManager.h"
65#include "AccessCredentials.h"
66#include "CredentialsManager.h"
67#include "CurlHandlePool.h"
68#include "DmrppCommon.h"
69#include "DmrppRequestHandler.h"
71#include "DmrppArray.h"
73#include "DmrppTypeFactory.h"
74#include "DmrppD4Group.h"
75#include "DmrppParserSax2.h"
84bool bes_debug =
false;
90#define prolog std::string("retriever::").append(__func__).append("() - ")
92#define NULL_BODY_HASH "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
100 char *s_err = strerror(errno);
104 return "Unknown error.";
118 const string &bes_config_file,
119 const string &bes_log_file,
120 const string &bes_debug_log_file,
121 const string &bes_debug_keys,
122 const string &http_netrc_file,
123 const string &http_cache_dir
125 if (debug) cerr << prolog <<
"BEGIN" << endl;
132 if (bes_debug)
BESDebug::SetUp(bes_debug_log_file +
"," + bes_debug_keys);
135 if (!http_netrc_file.empty()) {
139 if (!http_cache_dir.empty()) {
146 if (debug) cerr << prolog <<
"END" << endl;
150curl_slist *aws_sign_request_url(shared_ptr<http::url> &target_url, curl_slist *request_headers) {
152 if (debug) cerr << prolog <<
"BEGIN" << endl;
155 if (credentials && credentials->
is_s3_cred()) {
157 cerr << prolog <<
"Got AWS S3 AccessCredentials instance: " << endl << credentials->to_json() << endl;
160 const std::time_t request_time = std::time(0);
162 const std::string auth_header =
163 AWSV4::compute_awsv4_signature(
166 credentials->
get(AccessCredentials::ID_KEY),
167 credentials->
get(AccessCredentials::KEY_KEY),
168 credentials->
get(AccessCredentials::REGION_KEY),
174 request_headers = curl::append_http_header(request_headers,
"Authorization", auth_header);
177 request_headers = curl::append_http_header(request_headers,
"x-amz-content-sha256", NULL_BODY_HASH);
178 request_headers = curl::append_http_header(request_headers,
"x-amz-date", AWSV4::ISO8601_date(request_time));
180 if (debug) cerr << prolog <<
"END" << endl;
181 return request_headers;
189size_t get_remote_size(shared_ptr<http::url> &target_url,
bool aws_signing) {
190 if (debug) cerr << prolog <<
"BEGIN" << endl;
192 char error_buffer[CURL_ERROR_SIZE];
193 std::vector<std::string> resp_hdrs;
194 curl_slist *request_headers =
nullptr;
196 request_headers = curl::add_edl_auth_headers(request_headers);
199 request_headers = aws_sign_request_url(target_url, request_headers);
201 CURL *ceh = curl::init(target_url->str(), request_headers, &resp_hdrs);
202 curl::set_error_buffer(ceh, error_buffer);
205 CURLcode curl_status = curl_easy_setopt(ceh, CURLOPT_NOBODY, 1L);
206 curl::eval_curl_easy_setopt_result(curl_status, prolog,
"CURLOPT_NOBODY", error_buffer, __FILE__, __LINE__);
208 if (Debug) cerr << prolog <<
"cURL HEAD request is configured" << endl;
210 curl::super_easy_perform(ceh);
212 curl::unset_error_buffer(ceh);
214 curl_slist_free_all(request_headers);
216 curl_easy_cleanup(ceh);
219 size_t how_big_it_is = 0;
220 string content_length_hdr_key(
"content-length: ");
221 for (
size_t i = 0; !done && i < resp_hdrs.size(); i++) {
222 if (Debug) cerr << prolog <<
"HEADER[" << i <<
"]: " << resp_hdrs[i] << endl;
224 size_t index = lc_header.find(content_length_hdr_key);
226 string value = lc_header.substr(content_length_hdr_key.size());
227 how_big_it_is = stol(value);
232 throw BESInternalError(prolog +
"Failed to determine size of target resource: " + target_url->str(), __FILE__, __LINE__);
234 if (debug) cerr << prolog <<
"END" << endl;
236 return how_big_it_is;
238size_t get_max_retrival_size(
const size_t &max_target_size, shared_ptr<http::url> &target_url) {
239 size_t target_size = max_target_size;
240 if (max_target_size == 0) {
241 target_size = get_remote_size(target_url,
true);
242 if (debug) cerr << prolog <<
"Remote resource size is " << max_target_size <<
" bytes. " << endl;
252void simple_get(
const string target_url_str,
const string output_file_base) {
254 string output_file = output_file_base +
"_simple_get.out";
255 vector<string> resp_hdrs;
256 mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
258 if ((fd = open(output_file.c_str(), O_WRONLY | O_CREAT | O_TRUNC, mode)) < 0) {
263 sw.
start(prolog +
"url: " + target_url_str);
264 shared_ptr<http::url> target_url(
new http::url(target_url_str));
265 curl::http_get_and_write_resource(target_url, fd,
271 for (
size_t i = 0; i < resp_hdrs.size(); i++) {
272 cerr << prolog <<
"ResponseHeader[" << i <<
"]: " << resp_hdrs[i] << endl;
285void make_chunks(shared_ptr<http::url> &target_url,
const size_t &target_size,
const size_t &chunk_count,
286 vector<dmrpp::Chunk *> &chunks) {
287 if (debug) cerr << prolog <<
"BEGIN" << endl;
288 size_t chunk_size = target_size / chunk_count;
289 size_t chunk_start = 0;
291 for (chunk_index = 0; chunk_index < chunk_count; chunk_index++) {
292 vector<unsigned long long> position_in_array;
293 position_in_array.push_back(chunk_index);
295 cerr << prolog <<
"chunks[" << chunk_index <<
"] chunk_start: " << chunk_start <<
" chunk_size: "
296 << chunk_size << endl;
297 auto chunk =
new dmrpp::Chunk(target_url,
"LE", chunk_size, chunk_start, position_in_array);
298 chunk_start += chunk_size;
299 chunks.push_back(chunk);
301 if (target_size % chunk_size) {
303 size_t last_chunk_size = target_size - chunk_start;
305 cerr << prolog <<
"Remainder chunk. chunk[" << chunks.size() <<
"] last_chunk_size: " << last_chunk_size
308 cerr << prolog <<
"Remainder chunk! target_size: " << target_size <<
" index: " << chunk_index
309 <<
" last_chunk_start: " << chunk_start <<
" last_chunk_size: " << last_chunk_size << endl;
310 if (last_chunk_size > 0) {
311 vector<unsigned long long> position_in_array;
312 position_in_array.push_back(chunk_index);
314 cerr << prolog <<
"chunks[" << chunk_index <<
"] chunk_start: " << chunk_start <<
" chunk_size: "
315 << last_chunk_size << endl;
316 auto last_chunk =
new dmrpp::Chunk(target_url,
"LE", last_chunk_size, chunk_start, position_in_array);
317 chunks.push_back(last_chunk);
320 if (debug) cerr << prolog <<
"END chunks: " << chunks.size() << endl;
330void serial_chunky_get(shared_ptr<http::url> &target_url,
const size_t target_size,
const unsigned long chunk_count,
331 const string &output_file_base) {
334 if (debug) cerr << prolog <<
"curl::retrieve_effective_url() returned: " << effectiveUrl->str() << endl;
335 size_t retrieval_size = get_max_retrival_size(target_size, effectiveUrl);
337 string output_file = output_file_base +
"_serial_chunky_get.out";
338 vector<dmrpp::Chunk *> chunks;
339 make_chunks(target_url, retrieval_size, chunk_count, chunks);
342 ofs.open(output_file, std::fstream::in | std::fstream::out | std::ofstream::trunc | std::ofstream::binary);
344 throw BESInternalError(prolog +
"Failed to open file: " + output_file, __FILE__, __LINE__);
346 for (
size_t i = 0; i < chunks.size(); i++) {
348 ss << prolog <<
"chunk={index: " << i <<
", offset: " << chunks[i]->get_offset() <<
", size: "
349 << chunks[i]->get_size() <<
"}";
354 chunks[i]->read_chunk();
357 if (debug) cerr << ss.str() <<
" retrieval from: " << target_url <<
" completed, timing finished." << endl;
358 ofs.write(chunks[i]->get_rbuf(), chunks[i]->get_rbuf_size());
359 if (debug) cerr << ss.str() <<
" has been written to: " << output_file << endl;
361 auto itr = chunks.begin();
362 while (itr != chunks.end()) {
371void parse_dmrpp(
const string &dmrpp_filename_url){
372 if(debug) cerr << prolog <<
"BEGIN" << endl;
375 string target_file_url = dmrpp_filename_url;
378 const string http_protocol(
"http://");
379 const string https_protocol(
"https://");
380 const string file_protocol(
"file://");
382 if(debug) cerr << prolog <<
"dmrpp_filename_url: " << dmrpp_filename_url << endl;
384 if(target_file_url.empty())
385 throw BESInternalError(prolog +
"The dmr++ filename was empty.", __FILE__, __LINE__);
388 if(target_file_url.rfind(http_protocol,0)==0 || target_file_url.rfind(https_protocol,0)==0 ){
390 shared_ptr<http::url> tfile_url(
new http::url(target_file_url));
392 target_resource.retrieveResource();
393 target_file = target_resource.getCacheFileName();
395 else if(target_file_url.rfind(file_protocol,0)==0){
396 target_file = target_file_url.substr(file_protocol.length());
399 target_file_url = file_protocol + target_file_url;
402 if(debug) cerr << prolog <<
" target_file: " << target_file << endl;
404 ifstream ifs(target_file);
406 throw BESInternalError(prolog +
"Failed open to dmr++ file: " + dmrpp_filename_url, __FILE__, __LINE__);
410 dmr.set_href(target_file_url);
412 msg << prolog << dmrpp_filename_url;
420 cerr << prolog <<
"Built dataset: " << endl;
422 libdap::XMLWriter xmlWriter;
423 dmr.print_dmrpp(xmlWriter, dmr.get_href());
424 cerr << xmlWriter.get_doc() << endl;
426 if(debug) cerr << prolog <<
"END" << endl;
439void add_chunks(shared_ptr<http::url> &target_url,
const size_t &target_size,
const size_t &chunk_count,
442 if (debug) cerr << prolog <<
"BEGIN" << endl;
444 size_t chunk_size = target_size / chunk_count;
446 throw BESInternalError(prolog +
"Chunk size was zero.", __FILE__, __LINE__);
447 stringstream chunk_dim_size;
448 chunk_dim_size << chunk_size;
451 size_t chunk_start = 0;
453 for (chunk_index = 0; chunk_index < chunk_count; chunk_index++) {
454 vector<unsigned long long> position_in_array;
455 position_in_array.push_back(chunk_start);
457 cerr << prolog <<
"chunks[" << chunk_index <<
"] chunk_start: " << chunk_start <<
" chunk_size: "
458 << chunk_size <<
" chunk_poa: " << position_in_array[0] << endl;
459 target_array->
add_chunk(target_url,
"LE", chunk_size, chunk_start, position_in_array);
460 chunk_start += chunk_size;
462 if (target_size % chunk_size) {
464 size_t last_chunk_size = target_size - chunk_start;
466 cerr << prolog <<
"Remainder chunk! target_size: " << target_size <<
" index: " << chunk_index
467 <<
" last_chunk_start: " << chunk_start <<
" last_chunk_size: " << last_chunk_size << endl;
468 if (last_chunk_size > 0) {
469 vector<unsigned long long> position_in_array;
470 position_in_array.push_back(chunk_start);
472 cerr << prolog <<
"chunks[" << chunk_index <<
"] chunk_start: " << chunk_start <<
" chunk_size: "
473 << last_chunk_size <<
" chunk_poa: " << position_in_array[0] << endl;
474 target_array->
add_chunk(target_url,
"LE", last_chunk_size, chunk_start, position_in_array);
477 if (debug) cerr << prolog <<
"END" << endl;
489size_t array_get(shared_ptr<http::url> &target_url,
const size_t &target_size,
const size_t &chunk_count,
490 const string &output_file_base) {
492 if (debug) cerr << prolog <<
"BEGIN" << endl;
493 string output_file = output_file_base +
"_array_get.out";
495 ofs.open(output_file, std::fstream::in | std::fstream::out | std::ofstream::trunc | std::ofstream::binary);
497 throw BESInternalError(prolog +
"Failed to open file: " + output_file, __FILE__, __LINE__);
503 target_array->append_dim(target_size);
504 add_chunks(target_url, target_size, chunk_count, target_array);
505 target_array->set_send_p(
true);
509 dmr.set_href(target_url->str());
511 root->add_var_nocopy(target_array);
512 root->set_in_selection(
true);
515 cerr << prolog <<
"Built dataset: " << endl;
517 libdap::XMLWriter xmlWriter;
518 dmr.print_dmrpp(xmlWriter, dmr.get_href());
519 cerr << xmlWriter.get_doc() << endl;
523 stringstream timer_msg;
524 timer_msg << prolog <<
"DmrppD4Group.intern_data() for " << target_size <<
" bytes in " << chunk_count <<
525 " chunks, parallel transfers ";
526 if (dmrpp::DmrppRequestHandler::d_use_transfer_threads) {
527 timer_msg <<
"enabled. (max: " << dmrpp::DmrppRequestHandler::d_max_transfer_threads <<
")";
529 timer_msg <<
"disabled.";
532 sw.
start(timer_msg.str());
536 size_t started = ofs.tellp();
537 libdap::D4StreamMarshaller streamMarshaller(ofs);
538 root->serialize(streamMarshaller, dmr);
540 size_t stopped = ofs.tellp();
541 size_t numberOfBytesWritten = stopped - started;
542 if (debug) cerr << prolog <<
"target_size: " << target_size <<
" numberOfBytesWritten: " << numberOfBytesWritten << endl;
545 if (debug) cerr << prolog <<
"END" << endl;
546 return numberOfBytesWritten;
581int test_plan_01(
const string &target_url,
582 const string &output_prefix,
583 const unsigned int reps,
584 const size_t retrieval_size,
585 const unsigned int power_of_two_chunk_count,
586 const unsigned int power_of_two_threads_max,
587 const string &output_file_base
591 cerr << prolog <<
"BEGIN" << endl;
596 cerr << prolog <<
"curl::retrieve_effective_url() returned: " << effectiveUrl << endl;
597 size_t target_size = get_max_retrival_size(retrieval_size, effectiveUrl);
600 size_t chunk_count = 2;
601 for (
size_t chunk_pwr = 1; chunk_pwr <= power_of_two_chunk_count; chunk_pwr++) {
604 dmrpp::DmrppRequestHandler::d_use_transfer_threads =
false;
605 for (
unsigned int rep = 0; rep < reps; rep++) {
606 array_get(effectiveUrl, target_size, chunk_count, output_file_base );
610 dmrpp::DmrppRequestHandler::d_use_transfer_threads =
true;
611 unsigned int thread_count = 2;
612 for (
unsigned int tpwr = 1; tpwr <= power_of_two_threads_max; tpwr++) {
613 dmrpp::DmrppRequestHandler::d_max_transfer_threads = thread_count;
614 for (
unsigned int rep = 0; rep < reps; rep++) {
615 array_get(effectiveUrl, target_size, chunk_count, output_file_base);
625 cerr << prolog <<
"Caught BESError. Message: " << e.
get_message() <<
" " << e.
get_file()<<
":" << e. get_line() << endl;
629 cerr << prolog <<
"Caught Unknown Exception." <<
633 cerr << prolog <<
"END" << endl;
644int main(
int argc,
char *argv[]) {
648 string bes_debug_log_file =
"cerr";
649 string bes_debug_keys =
"bes,http,curl,dmrpp,dmrpp:3,dmrpp:4,rr";
650 shared_ptr<http::url> target_url(
new http::url(
"https://www.opendap.org/pub/binary/hyrax-1.16/centos-7.x/bes-debuginfo-3.20.7-1.static.el7.x86_64.rpm"));
651 string output_file_base(
"retriever");
652 string http_cache_dir;
654 size_t pwr2_number_o_chunks = 18;
655 size_t max_target_size = 0;
656 string http_netrc_file;
657 unsigned int reps=10;
658 unsigned pwr2_parallel_reads = 0;
661 char *prefixCstr = getenv(
"prefix");
670 GetOpt getopt(argc, argv,
"h:r:n:C:c:o:u:l:S:dbDp:");
672 while ((option_char = getopt()) != -1) {
673 switch (option_char) {
690 bes_config_file = getopt.optarg;
693 target_url = shared_ptr<http::url>(
new http::url(getopt.optarg));
696 bes_log_file = getopt.optarg;
699 http_netrc_file = getopt.optarg;
702 output_file_base = getopt.optarg;
705 pwr2_number_o_chunks = atol(getopt.optarg);
708 max_target_size = atol(getopt.optarg);
711 pwr2_parallel_reads = atol(getopt.optarg);
714 reps = atol(getopt.optarg);
717 http_cache_dir = getopt.optarg;
725 if (bes_log_file.empty()) {
726 bes_log_file = output_file_base +
"_bes.log";
729 cerr << prolog <<
"-- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - " << endl;
730 cerr << prolog <<
"debug: " << (debug ?
"true" :
"false") << endl;
731 cerr << prolog <<
"Debug: " << (Debug ?
"true" :
"false") << endl;
732 cerr << prolog <<
"bes_debug: " << (bes_debug ?
"true" :
"false") << endl;
733 cerr << prolog <<
"output_file_base: '" << output_file_base <<
"'" << endl;
734 cerr << prolog <<
"bes_config_file: '" << bes_config_file <<
"'" << endl;
735 cerr << prolog <<
"bes_log_file: '" << bes_log_file <<
"'" << endl;
736 cerr << prolog <<
"bes_debug_log_file: '" << bes_debug_log_file <<
"'" << endl;
737 cerr << prolog <<
"bes_debug_keys: '" << bes_debug_keys <<
"'" << endl;
738 cerr << prolog <<
"http_netrc_file: '" << http_netrc_file <<
"'" << endl;
739 cerr << prolog <<
"target_url: '" << target_url->str() <<
"'" << endl;
740 cerr << prolog <<
"max_target_size: '" << max_target_size <<
"'" << endl;
741 cerr << prolog <<
"number_o_chunks: 2^" << pwr2_number_o_chunks << endl;
742 cerr << prolog <<
"reps: " << reps << endl;
743 if (pwr2_parallel_reads)
744 cerr << prolog <<
"parallel_reads: ENABLED (max: 2^" << pwr2_parallel_reads <<
")" << endl;
746 cerr << prolog <<
"parallel_reads: DISABLED" << endl;
747 cerr << prolog <<
"-- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - -- - " << endl;
751 if(pwr2_parallel_reads){
752 unsigned long long int max_threads = 1ULL << pwr2_parallel_reads;
753 dmrpp::DmrppRequestHandler::d_use_transfer_threads =
true;
754 dmrpp::DmrppRequestHandler::d_max_transfer_threads = max_threads;
757 dmrpp::DmrppRequestHandler::d_use_transfer_threads =
false;
758 dmrpp::DmrppRequestHandler::d_max_transfer_threads = 1;
762 bes_debug_keys, http_netrc_file,http_cache_dir);
765 if (debug) cerr << prolog <<
"curl::retrieve_effective_url() returned: " << effectiveUrl << endl;
766 size_t target_size = get_max_retrival_size(max_target_size, effectiveUrl);
768 unsigned long long int chunks = 1ULL << pwr2_number_o_chunks;
769 if (debug) cerr << prolog <<
"Dividing target into " << chunks <<
" chunks." << endl;
773 array_get(effectiveUrl, target_size, chunks, output_file_base);
777 result = test_plan_01(
782 pwr2_number_o_chunks,
786 simple_get(effectiveUrl, output_file_base);
787 serial_chunky_get( effectiveUrl, max_target_size, pwr2_number_o_chunks, output_file_base);
789 parse_dmrpp(target_url);
794 cerr << prolog <<
"curl::retrieve_effective_url() returned: " << effectiveUrl << endl;
795 target_size = get_max_retrival_size(retrieval_size, effectiveUrl);
796 array_get(effectiveUrl, max_target_size, pwr2_number_o_chunks, output_file_base);
799 curl_global_cleanup();
808 cerr << prolog <<
"Caught Unknown Exception." << endl;
virtual std::string get(const std::string &key)
virtual bool is_s3_cred()
Do the URL, ID, Key amd Region items make up an S3 Credential?
static void SetUp(const std::string &values)
Sets up debugging for the bes.
Abstract exception class for the BES with basic string message.
virtual int get_line()
get the line number where the exception was thrown
virtual std::string get_file()
get the file name where the exception was thrown
virtual std::string get_message()
get the error message for this exception
exception thrown if internal error encountered
virtual bool start(std::string name)
static std::string lowercase(const std::string &s)
static std::string assemblePath(const std::string &firstPart, const std::string &secondPart, bool leadingSlash=false, bool trailingSlash=false)
Assemble path fragments making sure that they are separated by a single '/' character.
AccessCredentials * get(std::shared_ptr< http::url > &url)
static CredentialsManager * theCM()
Returns the singleton instance of the CrednetialsManager.
static TheBESKeys * TheKeys()
void set_key(const std::string &key, const std::string &val, bool addto=false)
allows the user to set key/value pairs from within the application.
static std::string ConfigFile
Provide a way to print the DMR++ response.
Extend libdap::Array so that a handler can read data using a DMR++ file.
static bool d_print_chunks
if true, print_dap4() prints chunk elements
virtual void parse_chunk_dimension_sizes(const std::string &chunk_dim_sizes_string)
Set the dimension sizes for a chunk.
virtual unsigned long add_chunk(std::shared_ptr< http::url > d_data_url, const std::string &byte_order, unsigned long long size, unsigned long long offset, const std::string &position_in_array)
Add a new chunk as defined by an h4:byteStream element.
void intern(std::istream &f, libdap::DMR *dest_dmr)
static EffectiveUrlCache * TheCache()
Get the singleton EffectiveUrlCache instance.
std::shared_ptr< EffectiveUrl > get_effective_url(std::shared_ptr< url > source_url)