bes  Updated for version 3.20.8
SuperChunk.cc
1 // -*- mode: c++; c-basic-offset:4 -*-
2 
3 // This file is part of the BES
4 
5 // Copyright (c) 2018 OPeNDAP, Inc.
6 // Author: Nathan Potter<ndp@opendap.org>
7 //
8 // This library is free software; you can redistribute it and/or
9 // modify it under the terms of the GNU Lesser General Public
10 // License as published by the Free Software Foundation; either
11 // version 2.1 of the License, or (at your option) any later version.
12 //
13 // This library is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 // Lesser General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public
19 // License along with this library; if not, write to the Free Software
20 // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 //
22 // You can contact OPeNDAP, Inc. at PO Box 112, Saunderstown, RI. 02874-0112.
23 
24 #include "config.h"
25 
26 #include <sstream>
27 #include <vector>
28 #include <string>
29 
30 #include "BESInternalError.h"
31 #include "BESDebug.h"
32 #include "CurlUtils.h"
33 
34 #include "DmrppRequestHandler.h"
35 #include "CurlHandlePool.h"
36 #include "DmrppCommon.h"
37 #include "DmrppArray.h"
38 #include "DmrppNames.h"
39 #include "Chunk.h"
40 #include "SuperChunk.h"
41 
42 #define prolog std::string("SuperChunk::").append(__func__).append("() - ")
43 
44 using std::stringstream;
45 using std::string;
46 using std::vector;
47 
48 namespace dmrpp {
49 
50 #if 0
51 string SuperChunk::get_curl_range_arg_string() {
52  return curl::get_range_arg_string(d_offset, d_size);
53 }
54 #endif
55 
64 bool SuperChunk::add_chunk(const std::shared_ptr<Chunk> candidate_chunk) {
65  bool chunk_was_added = false;
66  if(d_chunks.empty()){
67  d_chunks.push_back(candidate_chunk);
68  d_offset = candidate_chunk->get_offset();
69  d_size = candidate_chunk->get_size();
70  d_data_url = candidate_chunk->get_data_url();
71  chunk_was_added = true;
72  }
73  else if(is_contiguous(candidate_chunk) ){
74  this->d_chunks.push_back(candidate_chunk);
75  d_size += candidate_chunk->get_size();
76  chunk_was_added = true;
77  }
78  return chunk_was_added;
79 }
80 
81 
94 bool SuperChunk::is_contiguous(const std::shared_ptr<Chunk> candidate_chunk) {
95  // Are the URLs the same?
96  bool contiguous = candidate_chunk->get_data_url() == d_data_url;
97  if(contiguous){
98  // If the URLs match then see if the locations are matching
99  contiguous = (d_offset + d_size) == candidate_chunk->get_offset();
100  }
101  return contiguous;
102 }
103 
112 void SuperChunk::map_chunks_to_buffer()
113 {
114  unsigned long long bindex = 0;
115  for(const auto &chunk : d_chunks){
116  chunk->set_read_buffer(d_read_buffer + bindex, chunk->get_size(),0, false);
117  bindex += chunk->get_size();
118  if(bindex>d_size){
119  stringstream msg;
120  msg << "ERROR The computed buffer index, " << bindex << " is larger than expected size of the SuperChunk. ";
121  msg << "d_size: " << d_size;
122  throw BESInternalError(msg.str(), __FILE__, __LINE__);
123 
124  }
125  }
126 }
127 
132 void SuperChunk::read_aggregate_bytes()
133 {
134  // Since we already have a good infrastructure for reading Chunks, we just make a big-ol-Chunk to
135  // use for grabbing bytes. Then, once read, we'll use the child Chunks to do the dirty work of inflating
136  // and moving the results into the DmrppCommon object.
137  Chunk chunk(d_data_url, "NOT_USED", d_size, d_offset);
138 
139  chunk.set_read_buffer(d_read_buffer, d_size,0,false);
140 
141  dmrpp_easy_handle *handle = DmrppRequestHandler::curl_handle_pool->get_easy_handle(&chunk);
142  if (!handle)
143  throw BESInternalError(prolog + "No more libcurl handles.", __FILE__, __LINE__);
144 
145  try {
146  handle->read_data(); // throws if error
147  DmrppRequestHandler::curl_handle_pool->release_handle(handle);
148  }
149  catch(...) {
150  DmrppRequestHandler::curl_handle_pool->release_handle(handle);
151  throw;
152  }
153 
154  // If the expected byte count was not read, it's an error.
155  if (d_size != chunk.get_bytes_read()) {
156  ostringstream oss;
157  oss << "Wrong number of bytes read for chunk; read: " << chunk.get_bytes_read() << ", expected: " << d_size;
158  throw BESInternalError(oss.str(), __FILE__, __LINE__);
159  }
160 
161  d_is_read = true;
162 }
163 
164 
169  if (d_is_read) {
170  BESDEBUG(MODULE, prolog << "SuperChunk (" << (void **) this << ") has already been read! Returning." << endl);
171  return;
172  }
173 
174  if(!d_read_buffer){
175  // Allocate memory for SuperChunk receive buffer.
176  // release memory in destructor.
177  d_read_buffer = new char[d_size];
178  }
179 
180  // Massage the chunks so that their read/receive/intern data buffer
181  // points to the correct section of the d_read_buffer memory.
182  // "Slice it up!"
183  map_chunks_to_buffer();
184 
185  // Read the bytes from the target URL. (pthreads, maybe depends on size...)
186  // Use one (or possibly more) thread(s) depending on d_size
187  // and utilize our friend cURL to stuff the bytes into d_read_buffer
188  read_aggregate_bytes();
189 
190  // Set each Chunk's read state to true.
191  // Set each chunks byte count to the expected
192  // size for the chunk - because upstream events
193  // have assured this to be true.
194  for(auto chunk : d_chunks){
195  chunk->set_is_read(true);
196  chunk->set_bytes_read(chunk->get_size());
197  }
198 
199 }
200 #if 0
205 void SuperChunk::read_and_copy(DmrppArray *target_array) {
206  BESDEBUG(MODULE, prolog << "BEGIN" << endl );
207 
208  read();
209 
210  vector<unsigned int> constrained_array_shape = target_array->get_shape(true);
211 
212  for(auto &chunk :d_chunks){
213  if (target_array->is_deflate_compression() || target_array->is_shuffle_compression())
214  chunk->inflate_chunk(target_array->is_deflate_compression(), target_array->is_shuffle_compression(),
215  target_array->get_chunk_size_in_elements(), target_array->var()->width());
216 
217  vector<unsigned int> target_element_address = chunk->get_position_in_array();
218  vector<unsigned int> chunk_source_address(target_array->dimensions(), 0);
219 
220  target_array->insert_chunk(
221  0 /* dimension */,
222  &target_element_address,
223  &chunk_source_address,
224  chunk,
225  constrained_array_shape);
226  }
227 
228  BESDEBUG(MODULE, prolog << "END" << endl );
229 }
234 void SuperChunk::read_and_copy_unconstrained(DmrppArray *target_array) {
235  BESDEBUG(MODULE, prolog << "BEGIN" << endl );
236 
237  read();
238 
239  // The size in element of each of the array's dimensions
240  const vector<unsigned int> array_shape = target_array->get_shape(true);
241  // The size, in elements, of each of the chunk's dimensions
242  const vector<unsigned int> chunk_shape = target_array->get_chunk_dimension_sizes();
243 
244 
245  for(auto &chunk :d_chunks){
246  if (target_array->is_deflate_compression() || target_array->is_shuffle_compression())
247  chunk->inflate_chunk(target_array->is_deflate_compression(), target_array->is_shuffle_compression(),
248  target_array->get_chunk_size_in_elements(), target_array->var()->width());
249 
250  vector<unsigned int> target_element_address = chunk->get_position_in_array();
251  vector<unsigned int> chunk_source_address(target_array->dimensions(), 0);
252 
253  target_array->insert_chunk_unconstrained(chunk, 0, 0, array_shape, 0, chunk_shape, chunk->get_position_in_array());
254  }
255 
256  BESDEBUG(MODULE, prolog << "END" << endl );
257 }
258 #endif
259 
265 string SuperChunk::to_string(bool verbose=false) const {
266  stringstream msg;
267  msg << "[SuperChunk: " << (void **)this;
268  msg << " offset: " << d_offset;
269  msg << " size: " << d_size ;
270  msg << " chunk_count: " << d_chunks.size();
271  //msg << " parent: " << d_parent->name();
272  msg << "]";
273  if (verbose) {
274  msg << endl;
275  for (auto chunk: d_chunks) {
276  msg << chunk->to_string() << endl;
277  }
278  }
279  return msg.str();
280 }
281 
286 void SuperChunk::dump(ostream & strm) const {
287  strm << to_string(false) ;
288 }
289 
290 } // namespace dmrpp
exception thrown if internal error encountered
Extend libdap::Array so that a handler can read data using a DMR++ file.
Definition: DmrppArray.h:64
virtual std::vector< unsigned int > get_shape(bool constrained)
Get the array shape.
Definition: DmrppArray.cc:321
virtual unsigned int get_chunk_size_in_elements() const
Get the number of elements in this chunk.
Definition: DmrppCommon.h:166
virtual bool is_shuffle_compression() const
Returns true if this object utilizes shuffle compression.
Definition: DmrppCommon.h:129
virtual bool is_deflate_compression() const
Returns true if this object utilizes deflate compression.
Definition: DmrppCommon.h:119
virtual void read()
Cause the SuperChunk and all of it's subordinate Chunks to be read.
Definition: SuperChunk.cc:168
virtual bool add_chunk(std::shared_ptr< Chunk > candidate_chunk)
Attempts to add a new Chunk to this SuperChunk.
Definition: SuperChunk.cc:64
std::string to_string(bool verbose) const
Makes a string representation of the SuperChunk.
Definition: SuperChunk.cc:265
virtual void dump(std::ostream &strm) const
Writes the to_string() output to the stream strm.
Definition: SuperChunk.cc:286