bes Updated for version 3.20.10
SuperChunk.cc
1// -*- mode: c++; c-basic-offset:4 -*-
2
3// This file is part of the BES
4
5// Copyright (c) 2018 OPeNDAP, Inc.
6// Author: Nathan Potter<ndp@opendap.org>
7//
8// This library is free software; you can redistribute it and/or
9// modify it under the terms of the GNU Lesser General Public
10// License as published by the Free Software Foundation; either
11// version 2.1 of the License, or (at your option) any later version.
12//
13// This library is distributed in the hope that it will be useful,
14// but WITHOUT ANY WARRANTY; without even the implied warranty of
15// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16// Lesser General Public License for more details.
17//
18// You should have received a copy of the GNU Lesser General Public
19// License along with this library; if not, write to the Free Software
20// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21//
22// You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23
24#include "config.h"
25
26#include <sstream>
27#include <vector>
28#include <string>
29
30#include "BESInternalError.h"
31#include "BESDebug.h"
32
33#include "DmrppRequestHandler.h"
34#include "CurlHandlePool.h"
35#include "DmrppArray.h"
36#include "DmrppNames.h"
37#include "Chunk.h"
38#include "SuperChunk.h"
39
40#define prolog std::string("SuperChunk::").append(__func__).append("() - ")
41
42#define SUPER_CHUNK_MODULE "dmrpp:3"
43
44using std::stringstream;
45using std::string;
46using std::vector;
47
48namespace dmrpp {
49
50// ThreadPool state variables.
51std::mutex chunk_processing_thread_pool_mtx; // mutex for critical section
52atomic_uint chunk_processing_thread_counter(0);
53#define COMPUTE_THREADS "compute_threads"
54
73void process_one_chunk(shared_ptr<Chunk> chunk, DmrppArray *array, const vector<unsigned long long> &constrained_array_shape)
74{
75 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "BEGIN" << endl );
76
77 chunk->read_chunk();
78
79 if(array) {
80 if (!array->is_filters_empty())
81 chunk->filter_chunk(array->get_filters(), array->get_chunk_size_in_elements(), array->var()->width());
82
83 vector<unsigned long long> target_element_address = chunk->get_position_in_array();
84 vector<unsigned long long> chunk_source_address(array->dimensions(), 0);
85
86 array->insert_chunk(0 /* dimension */, &target_element_address, &chunk_source_address, chunk,
87 constrained_array_shape);
88 }
89 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "END" << endl );
90}
91
111void process_one_chunk_unconstrained(shared_ptr<Chunk> chunk, const vector<unsigned long long> &chunk_shape,
112 DmrppArray *array, const vector<unsigned long long> &array_shape)
113{
114 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "BEGIN" << endl );
115
116 chunk->read_chunk();
117
118 if(array){
119 if (!array->is_filters_empty())
120 chunk->filter_chunk(array->get_filters(), array->get_chunk_size_in_elements(), array->var()->width());
121
122 array->insert_chunk_unconstrained(chunk, 0, 0, array_shape, 0, chunk_shape, chunk->get_position_in_array());
123 }
124 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "END" << endl );
125}
126
127
133bool one_chunk_compute_thread(unique_ptr<one_chunk_args> args)
134{
135
136#if DMRPP_ENABLE_THREAD_TIMERS
137 stringstream timer_tag;
138 timer_tag << prolog << "tid: 0x" << std::hex << std::this_thread::get_id() <<
139 " parent_tid: 0x" << std::hex << args->parent_thread_id << " parent_sc: " << args->parent_super_chunk_id;
140 BESStopWatch sw(COMPUTE_THREADS);
141 sw.start(timer_tag.str());
142#endif
143
144 process_one_chunk(args->chunk, args->array, args->array_shape);
145 return true;
146}
147
153bool one_chunk_unconstrained_compute_thread(unique_ptr<one_chunk_unconstrained_args> args)
154{
155
156#if DMRPP_ENABLE_THREAD_TIMERS
157 stringstream timer_tag;
158 timer_tag << prolog << "tid: 0x" << std::hex << std::this_thread::get_id() <<
159 " parent_tid: 0x" << std::hex << args->parent_thread_id << " parent_sc: " << args->parent_super_chunk_id ;
160 BESStopWatch sw(COMPUTE_THREADS);
161 sw.start(timer_tag.str());
162#endif
163 process_one_chunk_unconstrained(args->chunk, args->chunk_shape, args->array, args->array_shape);
164 return true;
165}
166
177bool start_one_chunk_compute_thread(list<std::future<bool>> &futures, unique_ptr<one_chunk_args> args) {
178 bool retval = false;
179 std::unique_lock<std::mutex> lck (chunk_processing_thread_pool_mtx);
180 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads << " chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
181 if (chunk_processing_thread_counter < DmrppRequestHandler::d_max_compute_threads) {
182 chunk_processing_thread_counter++;
183 futures.push_back(std::async(std::launch::async, one_chunk_compute_thread, std::move(args)));
184 retval = true;
185 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Got std::future '" << futures.size() <<
186 "' from std::async, chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
187 }
188 return retval;
189}
190
201bool start_one_chunk_unconstrained_compute_thread(list<std::future<bool>> &futures, unique_ptr<one_chunk_unconstrained_args> args) {
202 bool retval = false;
203 std::unique_lock<std::mutex> lck (chunk_processing_thread_pool_mtx);
204 if (chunk_processing_thread_counter < DmrppRequestHandler::d_max_compute_threads) {
205 futures.push_back(std::async(std::launch::async, one_chunk_unconstrained_compute_thread, std::move(args)));
206 chunk_processing_thread_counter++;
207 retval = true;
208 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Got std::future '" << futures.size() <<
209 "' from std::async, chunk_processing_thread_counter: " << chunk_processing_thread_counter << endl);
210 }
211 return retval;
212}
213
214
236void process_chunks_concurrent(
237 const string &super_chunk_id,
238 queue<shared_ptr<Chunk>> &chunks,
239 DmrppArray *array,
240 const vector<unsigned long long> &constrained_array_shape ){
241
242 // We maintain a list of futures to track our parallel activities.
243 list<future<bool>> futures;
244 try {
245 bool done = false;
246 bool future_finished = true;
247 while (!done) {
248
249 if(!futures.empty())
250 future_finished = get_next_future(futures, chunk_processing_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
251
252 // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
253 // because future::get() was called or a call to future::valid() returned false.
254 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
255
256 if (!chunks.empty()){
257 // Next we try to add a new Chunk compute thread if we can - there might be room.
258 bool thread_started = true;
259 while(thread_started && !chunks.empty()) {
260 auto chunk = chunks.front();
261 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Starting thread for " << chunk->to_string() << endl);
262
263 auto args = unique_ptr<one_chunk_args>(new one_chunk_args(super_chunk_id, chunk, array, constrained_array_shape));
264 thread_started = start_one_chunk_compute_thread(futures, std::move(args));
265
266 if (thread_started) {
267 chunks.pop();
268 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "STARTED thread for " << chunk->to_string() << endl);
269 } else {
270 // Thread did not start, ownership of the arguments was not passed to the thread.
271 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Thread not started. args deleted, Chunk remains in queue.) " <<
272 "chunk_processing_thread_counter: " << chunk_processing_thread_counter << " futures.size(): " << futures.size() << endl);
273 }
274 }
275 }
276 else {
277 // No more Chunks and no futures means we're done here.
278 if(futures.empty())
279 done = true;
280 }
281 future_finished = false;
282 }
283 }
284 catch (...) {
285 // Complete all of the futures, otherwise we'll have threads out there using up resources
286 while(!futures.empty()){
287 if(futures.back().valid())
288 futures.back().get();
289 futures.pop_back();
290 }
291 // re-throw the exception
292 throw;
293 }
294}
295
296
297
298
321void process_chunks_unconstrained_concurrent(
322 const string &super_chunk_id,
323 queue<shared_ptr<Chunk>> &chunks,
324 const vector<unsigned long long> &chunk_shape,
325 DmrppArray *array,
326 const vector<unsigned long long> &array_shape){
327
328 // We maintain a list of futures to track our parallel activities.
329 list<future<bool>> futures;
330 try {
331 bool done = false;
332 bool future_finished = true;
333 while (!done) {
334
335 if(!futures.empty())
336 future_finished = get_next_future(futures, chunk_processing_thread_counter, DMRPP_WAIT_FOR_FUTURE_MS, prolog);
337
338 // If future_finished is true this means that the chunk_processing_thread_counter has been decremented,
339 // because future::get() was called or a call to future::valid() returned false.
340 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "future_finished: " << (future_finished ? "true" : "false") << endl);
341
342 if (!chunks.empty()){
343 // Next we try to add a new Chunk compute thread if we can - there might be room.
344 bool thread_started = true;
345 while(thread_started && !chunks.empty()) {
346 auto chunk = chunks.front();
347 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Starting thread for " << chunk->to_string() << endl);
348
349 auto args = unique_ptr<one_chunk_unconstrained_args>(
350 new one_chunk_unconstrained_args(super_chunk_id, chunk, array, array_shape, chunk_shape) );
351 thread_started = start_one_chunk_unconstrained_compute_thread(futures, std::move(args));
352
353 if (thread_started) {
354 chunks.pop();
355 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "STARTED thread for " << chunk->to_string() << endl);
356 } else {
357 // Thread did not start, ownership of the arguments was not passed to the thread.
358 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "Thread not started. args deleted, Chunk remains in queue.)" <<
359 " chunk_processing_thread_counter: " << chunk_processing_thread_counter <<
360 " futures.size(): " << futures.size() << endl);
361 }
362 }
363 }
364 else {
365 // No more Chunks and no futures means we're done here.
366 if(futures.empty())
367 done = true;
368 }
369 future_finished = false;
370 }
371 }
372 catch (...) {
373 // Complete all of the futures, otherwise we'll have threads out there using up resources
374 while(!futures.empty()){
375 if(futures.back().valid())
376 futures.back().get();
377 futures.pop_back();
378 }
379 // re-throw the exception
380 throw;
381 }
382}
383//#####################################################################################################################
384//#####################################################################################################################
385//#####################################################################################################################
386//
387// SuperChunk Code Begins Here
388//
389// = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
390
391
400bool SuperChunk::add_chunk(const std::shared_ptr<Chunk> candidate_chunk) {
401 bool chunk_was_added = false;
402 if(d_chunks.empty()){
403 d_chunks.push_back(candidate_chunk);
404 d_offset = candidate_chunk->get_offset();
405 d_size = candidate_chunk->get_size();
406 d_data_url = candidate_chunk->get_data_url();
407 chunk_was_added = true;
408 }
409 else if(is_contiguous(candidate_chunk) ){
410 this->d_chunks.push_back(candidate_chunk);
411 d_size += candidate_chunk->get_size();
412 chunk_was_added = true;
413 }
414 return chunk_was_added;
415}
416
417
430bool SuperChunk::is_contiguous(const std::shared_ptr<Chunk> candidate_chunk) {
431 // Are the URLs the same?
432 bool contiguous = candidate_chunk->get_data_url()->str() == d_data_url->str();
433 if(contiguous){
434 // If the URLs match then see if the locations are matching
435 contiguous = (d_offset + d_size) == candidate_chunk->get_offset();
436 }
437 return contiguous;
438}
439
448void SuperChunk::map_chunks_to_buffer()
449{
450 unsigned long long bindex = 0;
451 for(const auto &chunk : d_chunks){
452 chunk->set_read_buffer(d_read_buffer + bindex, chunk->get_size(),0, false);
453 bindex += chunk->get_size();
454 if(bindex>d_size){
455 stringstream msg;
456 msg << "ERROR The computed buffer index, " << bindex << " is larger than expected size of the SuperChunk. ";
457 msg << "d_size: " << d_size;
458 throw BESInternalError(msg.str(), __FILE__, __LINE__);
459
460 }
461 }
462}
463
468void SuperChunk::read_aggregate_bytes()
469{
470 // Since we already have a good infrastructure for reading Chunks, we just make a big-ol-Chunk to
471 // use for grabbing bytes. Then, once read, we'll use the child Chunks to do the dirty work of inflating
472 // and moving the results into the DmrppCommon object.
473 Chunk chunk(d_data_url, "NOT_USED", d_size, d_offset);
474
475 chunk.set_read_buffer(d_read_buffer, d_size,0,false);
476
477 dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(&chunk);
478 if (!handle)
479 throw BESInternalError(prolog + "No more libcurl handles.", __FILE__, __LINE__);
480
481 try {
482 handle->read_data(); // throws if error
483 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
484 }
485 catch(...) {
486 DmrppRequestHandler::curl_handle_pool->release_handle(handle);
487 throw;
488 }
489
490 // If the expected byte count was not read, it's an error.
491 if (d_size != chunk.get_bytes_read()) {
492 ostringstream oss;
493 oss << "Wrong number of bytes read for chunk; read: " << chunk.get_bytes_read() << ", expected: " << d_size;
494 throw BESInternalError(oss.str(), __FILE__, __LINE__);
495 }
496 d_is_read = true;
497}
498
499
504 if (d_is_read) {
505 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "SuperChunk (" << (void **) this << ") has already been read! Returning." << endl);
506 return;
507 }
508
509 if(!d_read_buffer){
510 // Allocate memory for SuperChunk receive buffer.
511 // release memory in destructor.
512 d_read_buffer = new char[d_size];
513 }
514
515 // Massage the chunks so that their read/receive/intern data buffer
516 // points to the correct section of the d_read_buffer memory.
517 // "Slice it up!"
518 map_chunks_to_buffer();
519
520 // Read the bytes from the target URL. (pthreads, maybe depends on size...)
521 // Use one (or possibly more) thread(s) depending on d_size
522 // and utilize our friend cURL to stuff the bytes into d_read_buffer
523 read_aggregate_bytes();
524
525 // Set each Chunk's read state to true.
526 // Set each chunks byte count to the expected
527 // size for the chunk - because upstream events
528 // have assured this to be true.
529 for(auto chunk : d_chunks){
530 chunk->set_is_read(true);
531 chunk->set_bytes_read(chunk->get_size());
532 }
533}
534
535
541 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "BEGIN" << endl );
543
544 vector<unsigned long long> constrained_array_shape = d_parent_array->get_shape(true);
545 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "d_use_compute_threads: " << (DmrppRequestHandler::d_use_compute_threads ? "true" : "false") << endl);
546 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "d_max_compute_threads: " << DmrppRequestHandler::d_max_compute_threads << endl);
547
548 if(!DmrppRequestHandler::d_use_compute_threads){
549#if DMRPP_ENABLE_THREAD_TIMERS
550 BESStopWatch sw(SUPER_CHUNK_MODULE);
551 sw.start(prolog+"Serial Chunk Processing. id: " + d_id);
552#endif
553 for(const auto &chunk :get_chunks()){
554 process_one_chunk(chunk,d_parent_array,constrained_array_shape);
555 }
556 }
557 else {
558#if DMRPP_ENABLE_THREAD_TIMERS
559 stringstream timer_name;
560 timer_name << prolog << "Concurrent Chunk Processing. id: " << d_id;
561 BESStopWatch sw(SUPER_CHUNK_MODULE);
562 sw.start(timer_name.str());
563#endif
564 queue<shared_ptr<Chunk>> chunks_to_process;
565 for(const auto &chunk:get_chunks())
566 chunks_to_process.push(chunk);
567
568 process_chunks_concurrent(d_id, chunks_to_process, d_parent_array, constrained_array_shape);
569 }
570 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "END" << endl );
571}
572
573
579
580 BESDEBUG(SUPER_CHUNK_MODULE, prolog << "BEGIN" << endl );
582
583 // The size in element of each of the array's dimensions
584 const vector<unsigned long long> array_shape = d_parent_array->get_shape(true);
585 // The size, in elements, of each of the chunk's dimensions
586 const vector<unsigned long long> chunk_shape = d_parent_array->get_chunk_dimension_sizes();
587
588 if(!DmrppRequestHandler::d_use_compute_threads){
589#if DMRPP_ENABLE_THREAD_TIMERS
590 BESStopWatch sw(SUPER_CHUNK_MODULE);
591 sw.start(prolog + "Serial Chunk Processing. sc_id: " + d_id );
592#endif
593 for(auto &chunk :get_chunks()){
594 process_one_chunk_unconstrained(chunk, chunk_shape, d_parent_array, array_shape);
595 }
596 }
597 else {
598#if DMRPP_ENABLE_THREAD_TIMERS
599 stringstream timer_name;
600 timer_name << prolog << "Concurrent Chunk Processing. sc_id: " << d_id;
601 BESStopWatch sw(SUPER_CHUNK_MODULE);
602 sw.start(timer_name.str());
603#endif
604 queue<shared_ptr<Chunk>> chunks_to_process;
605 for (auto &chunk:get_chunks())
606 chunks_to_process.push(chunk);
607
608 process_chunks_unconstrained_concurrent(d_id,chunks_to_process, chunk_shape, d_parent_array, array_shape);
609 }
610
611}
612
613
619string SuperChunk::to_string(bool verbose=false) const {
620 stringstream msg;
621 msg << "[SuperChunk: " << (void **)this;
622 msg << " offset: " << d_offset;
623 msg << " size: " << d_size ;
624 msg << " chunk_count: " << d_chunks.size();
625 //msg << " parent: " << d_parent->name();
626 msg << "]";
627 if (verbose) {
628 msg << endl;
629 for (auto chunk: d_chunks) {
630 msg << chunk->to_string() << endl;
631 }
632 }
633 return msg.str();
634}
635
640void SuperChunk::dump(ostream & strm) const {
641 strm << to_string(false) ;
642}
643
644} // namespace dmrpp
exception thrown if internal error encountered
virtual bool start(std::string name)
Definition: BESStopWatch.cc:67
virtual std::vector< unsigned long long > get_shape(bool constrained)
Get the array shape.
Definition: DmrppArray.cc:596
virtual const std::vector< unsigned long long > & get_chunk_dimension_sizes() const
The chunk dimension sizes held in a const vector.
Definition: DmrppCommon.h:179
virtual void retrieve_data()
Cause the SuperChunk and all of it's subordinate Chunks to be read.
Definition: SuperChunk.cc:503
virtual bool add_chunk(std::shared_ptr< Chunk > candidate_chunk)
Attempts to add a new Chunk to this SuperChunk.
Definition: SuperChunk.cc:400
std::string to_string(bool verbose) const
Makes a string representation of the SuperChunk.
Definition: SuperChunk.cc:619
virtual void dump(std::ostream &strm) const
Writes the to_string() output to the stream strm.
Definition: SuperChunk.cc:640
virtual void process_child_chunks()
Reads the SuperChunk, inflates/deshuffles the subordinate chunks as required and copies the values in...
Definition: SuperChunk.cc:540
virtual void process_child_chunks_unconstrained()
Reads the SuperChunk, inflates/deshuffles the subordinate chunks as required and copies the values in...
Definition: SuperChunk.cc:578