46 #include <D4Attributes.h>
50 #include "BESInternalError.h"
53 #include "BESStopWatch.h"
55 #include "byteswap_compat.h"
56 #include "CurlHandlePool.h"
58 #include "DmrppArray.h"
59 #include "DmrppRequestHandler.h"
60 #include "DmrppNames.h"
64 #define dmrpp_3 "dmrpp:3"
65 #define dmrpp_4 "dmrpp:4"
70 #define MB (1024*1024)
71 #define prolog std::string("DmrppArray::").append(__func__).append("() - ")
72 #define WAIT_FOR_FUTURE_MS 1
77 void *one_super_chunk_thread(
void *arg_list);
78 void *one_super_chunk_unconstrained_thread(
void *arg_list);
81 std::mutex thread_pool_mtx;
82 atomic_uint thread_counter(0);
92 bool get_next_future(list<std::future<void *>> &futures,
unsigned long timeout) {
95 std::chrono::milliseconds timeout_ms (timeout);
98 auto futr = futures.begin();
99 auto fend = futures.end();
100 while(!joined && futr != fend){
102 if((*futr).wait_for(timeout_ms) != std::future_status::timeout){
105 BESDEBUG(dmrpp_3, prolog <<
"Called future::get() on a ready future." << endl);
109 BESDEBUG(dmrpp_3, prolog <<
"future::wait_for() timed out. (timeout: "<<
110 timeout <<
" ms) There are currently "<< futures.size() <<
" futures in process." << endl);
116 BESDEBUG(dmrpp_3, prolog <<
"Erased future from futures list. There are currently " <<
117 futures.size() <<
" futures in process." << endl);
119 done = joined || futures.empty();
131 bool start_super_chunk_thread(list<std::future<void *>> &futures, one_super_chunk_args *args) {
133 std::unique_lock<std::mutex> lck (thread_pool_mtx);
134 if (thread_counter < DmrppRequestHandler::d_max_parallel_transfers) {
136 futures.push_back(std::async(std::launch::async, one_super_chunk_thread, (
void *) args));
138 BESDEBUG(dmrpp_3, prolog <<
"Got std::future '"<< futures.size() <<
139 "' from std::async for " << args->super_chunk->to_string(
false) << endl);
145 void *one_super_chunk_thread(
void *arg_list)
147 auto *args =
reinterpret_cast<one_super_chunk_args *
>(arg_list);
150 process_super_chunk(args->super_chunk, args->array);
171 bool start_super_chunk_unconstrained_thread(list<std::future<void *>> &futures, one_super_chunk_args *args) {
173 std::unique_lock<std::mutex> lck (thread_pool_mtx);
174 if(thread_counter < DmrppRequestHandler::d_max_parallel_transfers) {
176 futures.push_back(std::async(std::launch::async, one_super_chunk_unconstrained_thread, (
void *)args));
178 BESDEBUG(dmrpp_3, prolog <<
"Got std::future '"<< futures.size() <<
179 "' from std::async for " << args->super_chunk->to_string(
false) << endl);
184 void *one_super_chunk_unconstrained_thread(
void *arg_list)
186 auto args =
reinterpret_cast<one_super_chunk_args *
>(arg_list);
189 process_super_chunk_unconstrained(args->super_chunk, args->array);
209 void DmrppArray::_duplicate(
const DmrppArray &)
213 DmrppArray::DmrppArray(
const string &n, BaseType *v) :
214 Array(n, v, true ), DmrppCommon()
218 DmrppArray::DmrppArray(
const string &n,
const string &d, BaseType *v) :
219 Array(n, d, v, true), DmrppCommon()
224 DmrppArray::ptr_duplicate()
226 return new DmrppArray(*
this);
229 DmrppArray::DmrppArray(
const DmrppArray &rhs) :
230 Array(rhs), DmrppCommon(rhs)
236 DmrppArray::operator=(
const DmrppArray &rhs)
238 if (
this == &rhs)
return *
this;
240 dynamic_cast<Array &
>(*this) = rhs;
243 DmrppCommon::m_duplicate_common(rhs);
252 bool DmrppArray::is_projected()
254 for (Dim_iter p = dim_begin(), e = dim_end(); p != e; ++p)
255 if (dimension_size(p,
true) != dimension_size(p,
false))
return true;
278 static unsigned long long
279 get_index(
const vector<unsigned int> &address_in_target,
const vector<unsigned int> &target_shape)
281 assert(address_in_target.size() == target_shape.size());
283 auto shape_index = target_shape.rbegin();
284 auto index = address_in_target.rbegin(), index_end = address_in_target.rend();
286 unsigned long long multiplier = *shape_index++;
287 unsigned long long offset = *index++;
289 while (index != index_end) {
290 assert(*index < *shape_index);
292 offset += multiplier * *index++;
293 multiplier *= *shape_index++;
305 unsigned long long DmrppArray::get_size(
bool constrained)
308 unsigned long long size = 1;
309 for (Dim_iter dim = dim_begin(), end = dim_end(); dim != end; dim++) {
310 size *= dimension_size(dim, constrained);
321 vector<unsigned int> DmrppArray::get_shape(
bool constrained)
323 Dim_iter dim = dim_begin(), edim = dim_end();
324 vector<unsigned int> shape;
328 shape.reserve(edim - dim);
330 for (; dim != edim; dim++) {
331 shape.push_back(dimension_size(dim, constrained));
342 DmrppArray::dimension DmrppArray::get_dimension(
unsigned int i)
344 assert(i <= (dim_end() - dim_begin()));
345 return *(dim_begin() + i);
361 void DmrppArray::insert_constrained_contiguous(Dim_iter dim_iter,
unsigned long *target_index,
362 vector<unsigned int> &subset_addr,
363 const vector<unsigned int> &array_shape,
char *src_buf)
365 BESDEBUG(
"dmrpp",
"DmrppArray::" << __func__ <<
"() - subsetAddress.size(): " << subset_addr.size() << endl);
367 unsigned int bytes_per_elem = prototype()->width();
369 char *dest_buf = get_buf();
371 unsigned int start = this->dimension_start(dim_iter,
true);
372 unsigned int stop = this->dimension_stop(dim_iter,
true);
373 unsigned int stride = this->dimension_stride(dim_iter,
true);
379 if (dim_iter == dim_end() && stride == 1) {
381 subset_addr.push_back(start);
382 unsigned long start_index = get_index(subset_addr, array_shape);
383 subset_addr.pop_back();
385 subset_addr.push_back(stop);
386 unsigned long stop_index = get_index(subset_addr, array_shape);
387 subset_addr.pop_back();
391 for (
unsigned long source_index = start_index; source_index <= stop_index; source_index++) {
392 unsigned long target_byte = *target_index * bytes_per_elem;
393 unsigned long source_byte = source_index * bytes_per_elem;
395 for (
unsigned long i = 0; i < bytes_per_elem; i++) {
396 dest_buf[target_byte++] = src_buf[source_byte++];
402 for (
unsigned int myDimIndex = start; myDimIndex <= stop; myDimIndex += stride) {
405 if (dim_iter != dim_end()) {
407 subset_addr.push_back(myDimIndex);
408 insert_constrained_contiguous(dim_iter, target_index, subset_addr, array_shape, src_buf);
409 subset_addr.pop_back();
413 subset_addr.push_back(myDimIndex);
414 unsigned int sourceIndex = get_index(subset_addr, array_shape);
415 subset_addr.pop_back();
418 unsigned long target_byte = *target_index * bytes_per_elem;
419 unsigned long source_byte = sourceIndex * bytes_per_elem;
421 for (
unsigned int i = 0; i < bytes_per_elem; i++) {
422 dest_buf[target_byte++] = src_buf[source_byte++];
440 void *one_child_chunk_thread(
void *arg_list)
442 one_child_chunk_args *args =
reinterpret_cast<one_child_chunk_args *
>(arg_list);
445 args->child_chunk->read_chunk();
447 assert(args->master_chunk->get_rbuf());
448 assert(args->child_chunk->get_rbuf());
449 assert(args->child_chunk->get_bytes_read() == args->child_chunk->get_size());
462 unsigned int offset_within_master_chunk = args->child_chunk->get_offset() - args->master_chunk->get_offset();
464 memcpy(args->master_chunk->get_rbuf() + offset_within_master_chunk, args->child_chunk->get_rbuf(),
465 args->child_chunk->get_bytes_read());
468 write(args->fds[1], &args->tid,
sizeof(args->tid));
470 pthread_exit(
new string(error.get_verbose_message()));
475 write(args->fds[1], &args->tid,
sizeof(args->tid));
498 void DmrppArray::read_contiguous()
507 auto chunk_refs = get_chunks();
509 if (chunk_refs.size() != 1)
510 throw BESInternalError(
string(
"Expected only a single chunk for variable ") + name(), __FILE__, __LINE__);
513 auto master_chunk = chunk_refs[0];
515 unsigned long long master_chunk_size = master_chunk->get_size();
519 if (!DmrppRequestHandler::d_use_parallel_transfers || master_chunk_size <= DmrppRequestHandler::d_min_size) {
521 master_chunk->read_chunk();
526 master_chunk->set_rbuf_to_size();
531 unsigned int num_chunks = floor(master_chunk_size / MB);
532 if (num_chunks >= DmrppRequestHandler::d_max_parallel_transfers)
533 num_chunks = DmrppRequestHandler::d_max_parallel_transfers;
537 int status = pipe(fds);
539 throw BESInternalError(
string(
"Could not open a pipe for thread communication: ").append(strerror(errno)),
543 unsigned long long chunk_size = master_chunk_size / num_chunks;
544 unsigned long long chunk_offset = master_chunk->get_offset();
545 std::string chunk_byteorder = master_chunk->get_byte_order();
549 unsigned int chunk_remainder = master_chunk->get_size() % num_chunks;
551 string chunk_url = master_chunk->get_data_url();
554 queue<shared_ptr<Chunk>> chunks_to_read;
556 for (
unsigned int i = 0; i < num_chunks - 1; i++) {
557 chunks_to_read.push(shared_ptr<Chunk>(
new Chunk(chunk_url, chunk_byteorder, chunk_size, (chunk_size * i) + chunk_offset)));
560 chunks_to_read.push(shared_ptr<Chunk>(
new Chunk(chunk_url, chunk_byteorder, chunk_size + chunk_remainder,
561 (chunk_size * (num_chunks - 1)) + chunk_offset)));
564 pthread_t threads[DmrppRequestHandler::d_max_parallel_transfers];
565 memset(&threads[0], 0,
sizeof(pthread_t) * DmrppRequestHandler::d_max_parallel_transfers);
568 unsigned int num_threads = 0;
571 for (
unsigned int i = 0;
572 i < (
unsigned int) DmrppRequestHandler::d_max_parallel_transfers && !chunks_to_read.empty(); ++i) {
573 shared_ptr<Chunk> current_chunk = chunks_to_read.front();
574 chunks_to_read.pop();
577 one_child_chunk_args *args =
new one_child_chunk_args(fds, i, current_chunk, master_chunk);
578 status = pthread_create(&threads[i], NULL, dmrpp::one_child_chunk_thread, (
void *) args);
582 BESDEBUG(dmrpp_3,
"started thread: " << i << endl);
585 ostringstream oss(
"Could not start process_one_chunk_unconstrained thread for master_chunk ",
587 oss << i <<
": " << strerror(status);
588 BESDEBUG(dmrpp_3, oss.str());
594 while (num_threads > 0) {
597 int bytes = ::read(fds[0], &tid,
sizeof(tid));
598 if (bytes !=
sizeof(tid))
599 throw BESInternalError(
string(
"Could not read the thread id: ").append(strerror(errno)), __FILE__,
602 if (tid >= DmrppRequestHandler::d_max_parallel_transfers) {
603 ostringstream oss(
"Invalid thread id read after thread exit: ", std::ios::ate);
609 status = pthread_join(threads[tid], (
void **) &error);
611 BESDEBUG(dmrpp_3,
"joined thread: " << (
unsigned int) tid <<
", there are: " << num_threads << endl);
614 ostringstream oss(
"Could not join process_one_chunk_unconstrained thread for master_chunk ",
616 oss << tid <<
": " << strerror(status);
619 else if (error != 0) {
624 else if (chunks_to_read.size() > 0) {
625 auto current_chunk = chunks_to_read.front();
626 chunks_to_read.pop();
629 one_child_chunk_args *args =
new one_child_chunk_args(fds, tid, current_chunk, master_chunk);
630 int status = pthread_create(&threads[tid], NULL, dmrpp::one_child_chunk_thread, (
void *) args);
634 oss <<
"Could not start process_one_chunk_unconstrained thread for master_chunk " << tid <<
": "
639 BESDEBUG(dmrpp_3,
"started thread: " << (
unsigned int) tid <<
", there are: " << num_threads << endl);
650 join_threads(threads, DmrppRequestHandler::d_max_parallel_transfers);
660 master_chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(),
664 if (!is_projected()) {
665 val2buf(master_chunk->get_rbuf());
668 vector<unsigned int> array_shape = get_shape(
false);
671 reserve_value_capacity(get_size(
true));
672 unsigned long target_index = 0;
673 vector<unsigned int> subset;
675 insert_constrained_contiguous(dim_begin(), &target_index, subset, array_shape, master_chunk->get_rbuf());
697 static unsigned long multiplier(
const vector<unsigned int> &shape,
unsigned int k)
699 assert(shape.size() > 1);
700 assert(shape.size() > k + 1);
702 vector<unsigned int>::const_iterator i = shape.begin(), e = shape.end();
704 unsigned long multiplier = *i++;
731 void DmrppArray::insert_chunk_unconstrained(shared_ptr<Chunk> chunk,
unsigned int dim,
unsigned long long array_offset,
732 const vector<unsigned int> &array_shape,
733 unsigned long long chunk_offset,
const vector<unsigned int> &chunk_shape,
734 const vector<unsigned int> &chunk_origin)
739 dimension thisDim = this->get_dimension(dim);
740 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
741 if ((
unsigned) thisDim.stop < end_element) {
742 end_element = thisDim.stop;
745 unsigned long long chunk_end = end_element - chunk_origin[dim];
747 unsigned int last_dim = chunk_shape.size() - 1;
748 if (dim == last_dim) {
749 unsigned int elem_width = prototype()->width();
751 array_offset += chunk_origin[dim];
754 unsigned long long chunk_bytes = (end_element - chunk_origin[dim] + 1) * elem_width;
755 char *source_buffer = chunk->get_rbuf();
756 char *target_buffer = get_buf();
757 memcpy(target_buffer + (array_offset * elem_width), source_buffer + (chunk_offset * elem_width), chunk_bytes);
760 unsigned long mc = multiplier(chunk_shape, dim);
761 unsigned long ma = multiplier(array_shape, dim);
764 for (
unsigned int chunk_index = 0 ; chunk_index <= chunk_end; ++chunk_index) {
765 unsigned long long next_chunk_offset = chunk_offset + (mc * chunk_index);
766 unsigned long long next_array_offset = array_offset + (ma * (chunk_index + chunk_origin[dim]));
769 insert_chunk_unconstrained(chunk, dim + 1, next_array_offset, array_shape, next_chunk_offset, chunk_shape,
780 void *one_chunk_unconstrained_thread(
void *arg_list)
782 one_chunk_unconstrained_args *args =
reinterpret_cast<one_chunk_unconstrained_args *
>(arg_list);
785 process_one_chunk_unconstrained(args->chunk, args->array, args->array_shape, args->chunk_shape);
789 msg << prolog <<
"ERROR. tid: " << +(args->tid) <<
" message: " << error.get_verbose_message() << endl;
790 ERROR_LOG(msg.str());
791 write(args->fds[1], &args->tid,
sizeof(args->tid));
793 pthread_exit(
new string(msg.str()));
795 catch (std::exception &e){
797 msg << prolog <<
"ERROR. tid: " << +(args->tid) <<
" process_one_chunk_unconstrained() "
798 "failed. Message: " << e.what() << endl;
799 ERROR_LOG(msg.str());
800 write(args->fds[1], &args->tid,
sizeof(args->tid));
802 pthread_exit(
new string(msg.str()));
807 msg << prolog <<
"ERROR. tid: " << +(args->tid) <<
" process_one_chunk_unconstrained() "
808 "failed for an unknown reason." << endl;
809 ERROR_LOG(msg.str());
810 write(args->fds[1], &args->tid,
sizeof(args->tid));
812 pthread_exit(
new string(msg.str()));
817 write(args->fds[1], &args->tid,
sizeof(args->tid));
831 void process_super_chunk_unconstrained(
const shared_ptr<SuperChunk>& super_chunk,
DmrppArray *array)
833 BESDEBUG(dmrpp_3, prolog <<
"BEGIN" << endl );
837 const vector<unsigned int> array_shape = array->
get_shape(
true);
839 const vector<unsigned int> chunk_shape = array->get_chunk_dimension_sizes();
842 for(
auto &chunk :super_chunk->get_chunks()){
847 vector<unsigned int> target_element_address = chunk->get_position_in_array();
848 vector<unsigned int> chunk_source_address(array->dimensions(), 0);
850 array->insert_chunk_unconstrained(chunk, 0, 0, array_shape, 0, chunk_shape, chunk->get_position_in_array());
854 void process_one_chunk_unconstrained(shared_ptr<Chunk> chunk,
DmrppArray *array,
const vector<unsigned int> &array_shape,
855 const vector<unsigned int> &chunk_shape)
857 BESDEBUG(dmrpp_3, prolog <<
"BEGIN" << endl );
863 array->var()->width());
865 array->insert_chunk_unconstrained(chunk, 0, 0, array_shape, 0, chunk_shape, chunk->get_position_in_array());
866 BESDEBUG(dmrpp_3, prolog <<
"END" << endl );
876 void read_chunks_unconstrained_concurrent(DmrppArray *array, queue<shared_ptr<SuperChunk>> &super_chunks)
886 list<future<void *>> futures;
894 joined = get_next_future(futures, WAIT_FOR_FUTURE_MS);
902 bool thread_started =
true;
903 while(thread_started && !super_chunks.empty()) {
904 auto super_chunk = super_chunks.front();
905 BESDEBUG(dmrpp_3, prolog <<
"Starting thread for " << super_chunk->to_string(
false) << endl);
907 auto *args =
new one_super_chunk_args(super_chunk, array);
908 thread_started = start_super_chunk_unconstrained_thread(futures, args);
910 if (thread_started) {
912 BESDEBUG(dmrpp_3, prolog <<
"STARTED thread for" << super_chunk->to_string(
false) << endl);
916 BESDEBUG(dmrpp_3, prolog <<
"Thread not started, Returned SuperChunk to queue. " <<
917 "thread_count: " << thread_counter << endl);
921 else if(!super_chunks.empty()){
925 msg << prolog <<
"No threads joined, yet " << super_chunks.size() <<
" SuperChunks remain unread.";
937 while(!futures.empty()){
938 futures.back().get();
958 void DmrppArray::read_chunks_unconstrained()
963 auto chunk_refs = get_chunks();
964 if (chunk_refs.size() < 2)
965 throw BESInternalError(
string(
"Expected chunks for variable ") + name(), __FILE__, __LINE__);
969 queue<shared_ptr<SuperChunk>> super_chunks;
970 auto current_super_chunk = shared_ptr<SuperChunk>(
new SuperChunk()) ;
971 super_chunks.push(current_super_chunk);
974 for(
const auto& chunk: get_chunks()){
975 bool added = current_super_chunk->add_chunk(chunk);
977 current_super_chunk = shared_ptr<SuperChunk>(
new SuperChunk());
978 super_chunks.push(current_super_chunk);
979 if(!current_super_chunk->add_chunk(chunk)){
981 msg << prolog <<
"Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
986 reserve_value_capacity(get_size());
988 const vector<unsigned int> array_shape = get_shape(
true);
990 const vector<unsigned int> chunk_shape = get_chunk_dimension_sizes();
993 BESDEBUG(dmrpp_3, __func__ << endl);
994 BESDEBUG(dmrpp_3,
"d_use_parallel_transfers: " << DmrppRequestHandler::d_use_parallel_transfers << endl);
995 BESDEBUG(dmrpp_3,
"d_max_parallel_transfers: " << DmrppRequestHandler::d_max_parallel_transfers << endl);
997 if (!DmrppRequestHandler::d_use_parallel_transfers) {
998 while(!super_chunks.empty()) {
999 auto super_chunk = super_chunks.front();
1001 process_super_chunk_unconstrained(super_chunk,
this);
1010 read_chunks_unconstrained_concurrent(
this,super_chunks);
1031 unsigned long long DmrppArray::get_chunk_start(
const dimension &thisDim,
unsigned int chunk_origin)
1034 unsigned long long first_element_offset = 0;
1035 if ((
unsigned) (thisDim.start) < chunk_origin) {
1037 if (thisDim.stride != 1) {
1039 first_element_offset = (chunk_origin - thisDim.start) % thisDim.stride;
1041 if (first_element_offset != 0) {
1043 first_element_offset = thisDim.stride - first_element_offset;
1048 first_element_offset = thisDim.start - chunk_origin;
1051 return first_element_offset;
1076 DmrppArray::find_needed_chunks(
unsigned int dim, vector<unsigned int> *target_element_address, shared_ptr<Chunk> chunk)
1078 BESDEBUG(dmrpp_3, prolog <<
" BEGIN, dim: " << dim << endl);
1081 const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
1084 const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
1086 dimension thisDim = this->get_dimension(dim);
1089 if ((
unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) ||
1090 (
unsigned) thisDim.stop < chunk_origin[dim]) {
1095 unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
1098 if (chunk_start > chunk_shape[dim]) {
1103 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1104 if ((
unsigned) thisDim.stop < end_element) {
1105 end_element = thisDim.stop;
1108 unsigned long long chunk_end = end_element - chunk_origin[dim];
1110 unsigned int last_dim = chunk_shape.size() - 1;
1111 if (dim == last_dim) {
1112 BESDEBUG(dmrpp_3, prolog <<
" END, This is the last_dim. chunk: " << chunk->to_string() << endl);
1117 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1118 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1121 auto needed = find_needed_chunks(dim + 1, target_element_address, chunk);
1123 BESDEBUG(dmrpp_3, prolog <<
" END, Found chunk: " << needed->to_string() << endl);
1129 BESDEBUG(dmrpp_3, prolog <<
" END, dim: " << dim << endl);
1153 void DmrppArray::insert_chunk(
1155 vector<unsigned int> *target_element_address,
1156 vector<unsigned int> *chunk_element_address,
1157 shared_ptr<Chunk> chunk,
1158 const vector<unsigned int> &constrained_array_shape){
1161 const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
1164 const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
1166 dimension thisDim = this->get_dimension(dim);
1169 unsigned long long chunk_start = get_chunk_start(thisDim, chunk_origin[dim]);
1172 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1173 if ((
unsigned) thisDim.stop < end_element) {
1174 end_element = thisDim.stop;
1177 unsigned long long chunk_end = end_element - chunk_origin[dim];
1179 unsigned int last_dim = chunk_shape.size() - 1;
1180 if (dim == last_dim) {
1181 char *source_buffer = chunk->get_rbuf();
1182 char *target_buffer = get_buf();
1183 unsigned int elem_width = prototype()->width();
1185 if (thisDim.stride == 1) {
1187 unsigned long long start_element = chunk_origin[dim] + chunk_start;
1189 unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
1192 (*target_element_address)[dim] = (start_element - thisDim.start);
1194 (*chunk_element_address)[dim] = chunk_start;
1197 unsigned int target_char_start_index =
1198 get_index(*target_element_address, constrained_array_shape) * elem_width;
1199 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1201 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index,
1202 chunk_constrained_inner_dim_bytes);
1206 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1208 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1211 (*chunk_element_address)[dim] = chunk_index;
1214 unsigned int target_char_start_index =
1215 get_index(*target_element_address, constrained_array_shape) * elem_width;
1216 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1218 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
1224 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1225 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1226 (*chunk_element_address)[dim] = chunk_index;
1229 insert_chunk(dim + 1, target_element_address, chunk_element_address, chunk, constrained_array_shape);
1234 void *one_chunk_thread(
void *arg_list)
1236 one_chunk_args *args =
reinterpret_cast<one_chunk_args *
>(arg_list);
1239 process_one_chunk(args->chunk, args->array, args->array_shape);
1242 write(args->fds[1], &args->tid,
sizeof(args->tid));
1244 pthread_exit(
new string(error.get_verbose_message()));
1249 write(args->fds[1], &args->tid,
sizeof(args->tid));
1271 void process_one_chunk(shared_ptr<Chunk> chunk, DmrppArray *array,
const vector<unsigned int> &constrained_array_shape)
1273 BESDEBUG(dmrpp_3, prolog <<
"BEGIN" << endl );
1275 chunk->read_chunk();
1277 if (array->is_deflate_compression() || array->is_shuffle_compression())
1278 chunk->inflate_chunk(array->is_deflate_compression(), array->is_shuffle_compression(),
1279 array->get_chunk_size_in_elements(), array->var()->width());
1281 vector<unsigned int> target_element_address = chunk->get_position_in_array();
1282 vector<unsigned int> chunk_source_address(array->dimensions(), 0);
1284 array->insert_chunk(0 , &target_element_address, &chunk_source_address, chunk, constrained_array_shape);
1285 BESDEBUG(dmrpp_3, prolog <<
"END" << endl );
1296 void process_super_chunk(
const shared_ptr<SuperChunk>& super_chunk,
DmrppArray *array)
1298 BESDEBUG(dmrpp_3, prolog <<
"BEGIN" << endl );
1299 super_chunk->read();
1301 vector<unsigned int> constrained_array_shape = array->
get_shape(
true);
1303 for(
auto &chunk :super_chunk->get_chunks()){
1308 vector<unsigned int> target_element_address = chunk->get_position_in_array();
1309 vector<unsigned int> chunk_source_address(array->dimensions(), 0);
1311 array->insert_chunk(0 , &target_element_address, &chunk_source_address, chunk, constrained_array_shape);
1314 BESDEBUG(dmrpp_3, prolog <<
"END" << endl );
1325 void read_chunks_concurrent(
DmrppArray *array, queue<shared_ptr<SuperChunk>> &super_chunks)
1335 list<future<void *>> futures;
1342 if(!futures.empty())
1343 joined = get_next_future(futures, WAIT_FOR_FUTURE_MS);
1351 bool thread_started =
true;
1352 while(thread_started && !super_chunks.empty()) {
1353 auto super_chunk = super_chunks.front();
1354 BESDEBUG(dmrpp_3, prolog <<
"Starting thread for " << super_chunk->to_string(
false) << endl);
1357 thread_started = start_super_chunk_thread(futures, args);
1359 if (thread_started) {
1361 BESDEBUG(dmrpp_3, prolog <<
"STARTED thread for" << super_chunk->to_string(
false) << endl);
1365 BESDEBUG(dmrpp_3, prolog <<
"Thread not started, Returned SuperChunk to queue. " <<
1366 "thread_count: " << thread_counter << endl);
1370 else if(!super_chunks.empty()){
1374 msg << prolog <<
"No threads joined, yet " << super_chunks.size() <<
" SuperChunks remain unread.";
1386 while(!futures.empty()){
1387 futures.back().get();
1401 void DmrppArray::read_chunks()
1406 auto chunk_refs = get_chunks();
1407 if (chunk_refs.size() < 2)
1408 throw BESInternalError(
string(
"Expected chunks for variable ") + name(), __FILE__, __LINE__);
1412 queue<shared_ptr<SuperChunk>> super_chunks;
1413 auto current_super_chunk = shared_ptr<SuperChunk>(
new SuperChunk()) ;
1414 super_chunks.push(current_super_chunk);
1421 for(
const auto& chunk: get_chunks()){
1422 vector<unsigned int> target_element_address = chunk->get_position_in_array();
1423 auto needed = find_needed_chunks(0 , &target_element_address, chunk);
1425 bool added = current_super_chunk->add_chunk(chunk);
1427 current_super_chunk = shared_ptr<SuperChunk>(
new SuperChunk());
1428 super_chunks.push(current_super_chunk);
1429 if(!current_super_chunk->add_chunk(chunk)){
1431 msg << prolog <<
"Failed to add Chunk to new SuperChunk. chunk: " << chunk->to_string();
1438 reserve_value_capacity(get_size(
true));
1440 BESDEBUG(dmrpp_3, prolog <<
"d_use_parallel_transfers: " << DmrppRequestHandler::d_use_parallel_transfers << endl);
1441 BESDEBUG(dmrpp_3, prolog <<
"d_max_parallel_transfers: " << DmrppRequestHandler::d_max_parallel_transfers << endl);
1442 BESDEBUG(dmrpp_3, prolog <<
"SuperChunks.size(): " << super_chunks.size() << endl);
1444 if (!DmrppRequestHandler::d_use_parallel_transfers) {
1447 while (!super_chunks.empty()) {
1448 auto super_chunk = super_chunks.front();
1450 BESDEBUG(dmrpp_3, prolog << super_chunk->to_string(
true) << endl );
1451 process_super_chunk(super_chunk,
this);
1460 read_chunks_concurrent(
this, super_chunks);
1466 #ifdef USE_READ_SERIAL
1488 void DmrppArray::insert_chunk_serial(
unsigned int dim, vector<unsigned int> *target_element_address, vector<unsigned int> *chunk_element_address,
1491 BESDEBUG(
"dmrpp", __func__ <<
" dim: "<< dim <<
" BEGIN "<< endl);
1494 const vector<unsigned int> &chunk_shape = get_chunk_dimension_sizes();
1497 const vector<unsigned int> &chunk_origin = chunk->get_position_in_array();
1499 dimension thisDim = this->get_dimension(dim);
1502 if ((
unsigned) thisDim.start > (chunk_origin[dim] + chunk_shape[dim]) || (
unsigned) thisDim.stop < chunk_origin[dim]) {
1507 unsigned int first_element_offset = get_chunk_start(dim, chunk_origin);
1510 if (first_element_offset > chunk_shape[dim]) {
1515 unsigned long long end_element = chunk_origin[dim] + chunk_shape[dim] - 1;
1516 if ((
unsigned) thisDim.stop < end_element) {
1517 end_element = thisDim.stop;
1520 unsigned long long chunk_start = first_element_offset;
1521 unsigned long long chunk_end = end_element - chunk_origin[dim];
1522 vector<unsigned int> constrained_array_shape = get_shape(
true);
1524 unsigned int last_dim = chunk_shape.size() - 1;
1525 if (dim == last_dim) {
1527 chunk->read_chunk();
1529 chunk->inflate_chunk(is_deflate_compression(), is_shuffle_compression(), get_chunk_size_in_elements(), var()->width());
1531 char *source_buffer = chunk->get_rbuf();
1532 char *target_buffer = get_buf();
1533 unsigned int elem_width = prototype()->width();
1535 if (thisDim.stride == 1) {
1537 unsigned long long start_element = chunk_origin[dim] + first_element_offset;
1539 unsigned long long chunk_constrained_inner_dim_bytes = (end_element - start_element + 1) * elem_width;
1542 (*target_element_address)[dim] = (start_element - thisDim.start) / thisDim.stride;
1544 (*chunk_element_address)[dim] = first_element_offset;
1546 unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
1547 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1549 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, chunk_constrained_inner_dim_bytes);
1553 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1555 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1558 (*chunk_element_address)[dim] = chunk_index;
1560 unsigned int target_char_start_index = get_index(*target_element_address, constrained_array_shape) * elem_width;
1561 unsigned int chunk_char_start_index = get_index(*chunk_element_address, chunk_shape) * elem_width;
1563 memcpy(target_buffer + target_char_start_index, source_buffer + chunk_char_start_index, elem_width);
1569 for (
unsigned int chunk_index = chunk_start; chunk_index <= chunk_end; chunk_index += thisDim.stride) {
1570 (*target_element_address)[dim] = (chunk_index + chunk_origin[dim] - thisDim.start) / thisDim.stride;
1571 (*chunk_element_address)[dim] = chunk_index;
1574 insert_chunk_serial(dim + 1, target_element_address, chunk_element_address, chunk);
1579 void DmrppArray::read_chunks_serial()
1581 BESDEBUG(
"dmrpp", __func__ <<
" for variable '" << name() <<
"' - BEGIN" << endl);
1583 vector<Chunk> &chunk_refs = get_chunk_vec();
1584 if (chunk_refs.size() == 0)
throw BESInternalError(
string(
"Expected one or more chunks for variable ") + name(), __FILE__, __LINE__);
1587 reserve_value_capacity(get_size(
true));
1595 for (
unsigned long i = 0; i < chunk_refs.size(); i++) {
1596 Chunk &chunk = chunk_refs[i];
1598 vector<unsigned int> chunk_source_address(dimensions(), 0);
1599 vector<unsigned int> target_element_address = chunk.get_position_in_array();
1602 insert_chunk_serial(0, &target_element_address, &chunk_source_address, &chunk);
1607 BESDEBUG(
"dmrpp",
"DmrppArray::"<< __func__ <<
"() for " << name() <<
" END"<< endl);
1622 bool DmrppArray::read()
1624 if (read_p())
return true;
1628 if (get_immutable_chunks().size() == 1) {
1629 BESDEBUG(dmrpp_4,
"Calling read_contiguous() for " << name() << endl);
1633 if (!is_projected()) {
1634 BESDEBUG(dmrpp_4,
"Calling read_chunks_unconstrained() for " << name() << endl);
1635 read_chunks_unconstrained();
1638 BESDEBUG(dmrpp_4,
"Calling read_chunks() for " << name() << endl);
1643 if (this->twiddle_bytes()) {
1644 int num = this->length();
1645 Type var_type = this->var()->type();
1649 case dods_uint16_c: {
1650 dods_uint16 *local =
reinterpret_cast<dods_uint16*
>(this->get_buf());
1652 *local = bswap_16(*local);
1658 case dods_uint32_c: {
1659 dods_uint32 *local =
reinterpret_cast<dods_uint32*
>(this->get_buf());;
1661 *local = bswap_32(*local);
1667 case dods_uint64_c: {
1668 dods_uint64 *local =
reinterpret_cast<dods_uint64*
>(this->get_buf());;
1670 *local = bswap_64(*local);
1686 class PrintD4ArrayDimXMLWriter :
public unary_function<Array::dimension &, void> {
1693 PrintD4ArrayDimXMLWriter(XMLWriter &xml,
bool c) :
1694 xml(xml), d_constrained(c)
1698 void operator()(Array::dimension &d)
1704 if (xmlTextWriterStartElement(xml.get_writer(), (
const xmlChar *)
"Dim") < 0)
1705 throw InternalErr(__FILE__, __LINE__,
"Could not write Dim element");
1707 string name = (d.dim) ? d.dim->fully_qualified_name() : d.name;
1710 if (!d_constrained && !name.empty()) {
1711 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"name",
1712 (
const xmlChar *) name.c_str()) < 0)
1713 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1715 else if (d.use_sdim_for_slice) {
1716 assert(!name.empty());
1717 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"name",
1718 (
const xmlChar *) name.c_str()) < 0)
1719 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1723 size << (d_constrained ? d.c_size : d.size);
1724 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"size",
1725 (
const xmlChar *) size.str().c_str()) < 0)
1726 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1729 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
1730 throw InternalErr(__FILE__, __LINE__,
"Could not end Dim element");
1734 class PrintD4ConstructorVarXMLWriter :
public unary_function<BaseType *, void> {
1738 PrintD4ConstructorVarXMLWriter(XMLWriter &xml,
bool c) :
1739 xml(xml), d_constrained(c)
1743 void operator()(BaseType *btp)
1745 btp->print_dap4(xml, d_constrained);
1749 class PrintD4MapXMLWriter :
public unary_function<D4Map *, void> {
1753 PrintD4MapXMLWriter(XMLWriter &xml) :
1758 void operator()(D4Map *m)
1788 void DmrppArray::print_dap4(XMLWriter &xml,
bool constrained )
1790 if (constrained && !send_p())
return;
1792 if (xmlTextWriterStartElement(xml.get_writer(), (
const xmlChar *) var()->type_name().c_str()) < 0)
1793 throw InternalErr(__FILE__, __LINE__,
"Could not write " + type_name() +
" element");
1795 if (!name().empty())
1796 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"name", (
const xmlChar *) name().c_str()) <
1798 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for name");
1801 if (var()->type() == dods_enum_c) {
1802 D4Enum *e =
static_cast<D4Enum *
>(var());
1803 string path = e->enumeration()->name();
1804 if (e->enumeration()->parent()) {
1806 path =
static_cast<D4Group *
>(e->enumeration()->parent()->parent())->FQN() + path;
1808 if (xmlTextWriterWriteAttribute(xml.get_writer(), (
const xmlChar *)
"enum", (
const xmlChar *) path.c_str()) < 0)
1809 throw InternalErr(__FILE__, __LINE__,
"Could not write attribute for enum");
1812 if (prototype()->is_constructor_type()) {
1813 Constructor &c =
static_cast<Constructor &
>(*prototype());
1814 for_each(c.var_begin(), c.var_end(), PrintD4ConstructorVarXMLWriter(xml, constrained));
1819 for_each(dim_begin(), dim_end(), PrintD4ArrayDimXMLWriter(xml, constrained));
1821 attributes()->print_dap4(xml);
1823 for_each(maps()->map_begin(), maps()->map_end(), PrintD4MapXMLWriter(xml));
1827 if (DmrppCommon::d_print_chunks && get_immutable_chunks().size() > 0)
1828 print_chunks_element(xml, DmrppCommon::d_ns_prefix);
1837 if (DmrppCommon::d_print_chunks && is_compact_layout() && read_p()) {
1838 switch (var()->type()) {
1852 case dods_float32_c:
1853 case dods_float64_c: {
1854 u_int8_t *values = 0;
1856 size_t size = buf2val(
reinterpret_cast<void **
>(&values));
1857 string encoded = base64::Base64::encode(values, size);
1859 print_compact_element(xml, DmrppCommon::d_ns_prefix, encoded);
1873 buf2val(
reinterpret_cast<void **
>(&values));
1875 for (
int i = 0; i < length(); ++i) {
1876 str = (*(
static_cast<string *
> (values) + i));
1877 string encoded = base64::Base64::encode(
reinterpret_cast<const u_int8_t *
>(str.c_str()), str.size());
1878 print_compact_element(xml, DmrppCommon::d_ns_prefix, encoded);
1890 throw InternalErr(__FILE__, __LINE__,
"Vector::val2buf: bad type");
1893 if (xmlTextWriterEndElement(xml.get_writer()) < 0)
1894 throw InternalErr(__FILE__, __LINE__,
"Could not end " + type_name() +
" element");
1897 void DmrppArray::dump(ostream &strm)
const
1899 strm << BESIndent::LMarg <<
"DmrppArray::" << __func__ <<
"(" << (
void *)
this <<
")" << endl;
1900 BESIndent::Indent();
1901 DmrppCommon::dump(strm);
1903 strm << BESIndent::LMarg <<
"value: " <<
"----" << endl;
1904 BESIndent::UnIndent();
static bool IsSet(const std::string &flagName)
see if the debug context flagName is set to true
Abstract exception class for the BES with basic string message.
exception thrown if internal error encountered
virtual bool start(std::string name)
Extend libdap::Array so that a handler can read data using a DMR++ file.
virtual std::vector< unsigned int > get_shape(bool constrained)
Get the array shape.
virtual unsigned int get_chunk_size_in_elements() const
Get the number of elements in this chunk.
virtual bool is_shuffle_compression() const
Returns true if this object utilizes shuffle compression.
virtual bool is_deflate_compression() const
Returns true if this object utilizes deflate compression.