45 #include <D4ParserSax2.h>
46 #include <XMLWriter.h>
47 #include <BaseTypeFactory.h>
48 #include <D4BaseTypeFactory.h>
50 #include "PicoSHA2/picosha2.h"
53 #include "TheBESKeys.h"
56 #include "BESContextManager.h"
58 #include "BESRequestHandler.h"
59 #include "BESRequestHandlerList.h"
60 #include "BESNotFoundError.h"
62 #include "BESInternalError.h"
63 #include "BESInternalFatalError.h"
65 #include "GlobalMetadataStore.h"
67 #define DEBUG_KEY "metadata_store"
68 #define MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED 0
71 #define AT_EXIT(x) atexit((x))
85 #undef SYMETRIC_ADD_RESPONSES
91 static const unsigned int default_cache_size = 20;
92 static const string default_cache_prefix =
"mds";
93 static const string default_cache_dir =
"";
94 static const string default_ledger_name =
"mds_ledger.txt";
96 static const string PATH_KEY =
"DAP.GlobalMetadataStore.path";
97 static const string PREFIX_KEY =
"DAP.GlobalMetadataStore.prefix";
98 static const string SIZE_KEY =
"DAP.GlobalMetadataStore.size";
99 static const string LEDGER_KEY =
"DAP.GlobalMetadataStore.ledger";
100 static const string LOCAL_TIME_KEY =
"BES.LogTimeLocal";
103 bool GlobalMetadataStore::d_enabled =
true;
119 void GlobalMetadataStore::transfer_bytes(
int fd, ostream &os)
121 static const int BUFFER_SIZE = 16*1024;
123 #if _POSIX_C_SOURCE >= 200112L
125 int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
127 ERROR(
"Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
130 char buf[BUFFER_SIZE + 1];
132 while(
int bytes_read = read(fd, buf, BUFFER_SIZE))
135 throw BESInternalError(
"Could not read dds from the metadata store.", __FILE__, __LINE__);
139 os.write(buf, bytes_read);
155 void GlobalMetadataStore::insert_xml_base(
int fd, ostream &os,
const string &xml_base)
157 static const int BUFFER_SIZE = 1024;
159 #if _POSIX_C_SOURCE >= 200112L
161 int status = posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
163 ERROR(
"Error calling posix_advise() in the GlobalMetadataStore: " << strerror(status) << endl);
166 char buf[BUFFER_SIZE + 1];
167 size_t bytes_read = read(fd, buf, BUFFER_SIZE);
169 if(bytes_read == (
size_t)-1)
170 throw BESInternalError(
"Could not read dds from the metadata store.", __FILE__, __LINE__);
187 while (buf[i++] !=
'>')
194 char xml_base_literal[] =
"xml:base";
195 while (i < bytes_read) {
197 os.write(buf + s, i - s);
198 os <<
" xml:base=\"" << xml_base <<
"\"";
201 else if (j ==
sizeof(xml_base_literal) - 1) {
202 os.write(buf + s, i - s);
203 while (buf[i++] !=
'=')
205 while (buf[i++] !=
'"')
207 while (buf[i++] !=
'"')
209 os <<
"=\"" << xml_base <<
"\"";
212 else if (buf[i] == xml_base_literal[j]) {
223 os.write(buf + i, bytes_read - i);
226 transfer_bytes(fd, os);
229 unsigned long GlobalMetadataStore::get_cache_size_from_config()
233 unsigned long size_in_megabytes = default_cache_size;
237 "GlobalMetadataStore::getCacheSizeFromConfig(): Located BES key " << SIZE_KEY <<
"=" << size << endl);
238 istringstream iss(size);
239 iss >> size_in_megabytes;
242 return size_in_megabytes;
245 string GlobalMetadataStore::get_cache_prefix_from_config()
248 string prefix = default_cache_prefix;
252 "GlobalMetadataStore::getCachePrefixFromConfig(): Located BES key " << PREFIX_KEY <<
"=" << prefix << endl);
260 string GlobalMetadataStore::get_cache_dir_from_config()
264 string cacheDir = default_cache_dir;
268 "GlobalMetadataStore::getCacheDirFromConfig(): Located BES key " << PATH_KEY<<
"=" << cacheDir << endl);
306 GlobalMetadataStore::get_instance(
const string &cache_dir,
const string &prefix,
unsigned long long size)
308 if (d_enabled && d_instance == 0) {
310 d_enabled = d_instance->cache_enabled();
315 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::"<<__func__ <<
"() - " <<
"MDS is DISABLED"<< endl);
318 AT_EXIT(delete_instance);
320 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::"<<__func__ <<
"() - " <<
"MDS is ENABLED"<< endl);
324 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::get_instance(dir,prefix,size) - d_instance: " << d_instance << endl);
336 GlobalMetadataStore::get_instance()
338 if (d_enabled && d_instance == 0) {
339 d_instance =
new GlobalMetadataStore(get_cache_dir_from_config(), get_cache_prefix_from_config(),
340 get_cache_size_from_config());
341 d_enabled = d_instance->cache_enabled();
345 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::"<<__func__ <<
"() - " <<
"MDS is DISABLED"<< endl);
348 AT_EXIT(delete_instance);
350 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::"<<__func__ <<
"() - " <<
"MDS is ENABLED"<< endl);
354 BESDEBUG(DEBUG_KEY,
"GlobalMetadataStore::get_instance() - d_instance: " << (
void *) d_instance << endl);
364 GlobalMetadataStore::initialize()
370 BESDEBUG(DEBUG_KEY,
"Located BES key " << LEDGER_KEY <<
"=" << d_ledger_name << endl);
373 d_ledger_name = default_ledger_name;
376 ofstream of(d_ledger_name.c_str(), ios::app);
379 string local_time =
"no";
381 d_use_local_time = (local_time ==
"YES" || local_time ==
"Yes" || local_time ==
"yes");
399 GlobalMetadataStore::GlobalMetadataStore()
400 :
BESFileLockingCache(get_cache_dir_from_config(), get_cache_prefix_from_config(), get_cache_size_from_config())
416 static void dump_time(ostream &os,
bool use_local_time)
420 char buf[
sizeof "YYYY-MM-DDTHH:MM:SSzone"];
430 status = strftime(buf,
sizeof buf,
"%FT%T%Z", gmtime(&now));
432 status = strftime(buf,
sizeof buf,
"%FT%T%Z", localtime(&now));
435 LOG(
"Error getting time for Metadata Store ledger.");
449 if (get_exclusive_lock(d_ledger_name, fd)) {
450 BESDEBUG(DEBUG_KEY, __FUNCTION__ <<
" Ledger " << d_ledger_name <<
" write locked." << endl);
453 dump_time(of, d_use_local_time);
454 of <<
" " << d_ledger_entry << endl;
455 VERBOSE(
"MDS Ledger name: '" << d_ledger_name <<
"', entry: '" << d_ledger_entry +
"'.");
464 LOG(
"Warning: Metadata store could not write to its ledger file.");
469 throw BESInternalError(
"Could not write lock '" + d_ledger_name, __FILE__, __LINE__);
483 throw BESInternalError(
"Empty name passed to the Metadata Store.", __FILE__, __LINE__);
485 return picosha2::hash256_hex_string(name[0] ==
'/' ? name :
"/" + name);
507 D4BaseTypeFactory factory;
508 DMR dmr(&factory, *d_dds);
515 d_dmr->print_dap4(xml);
528 d_dmr->getDDS()->print(os);
536 d_dds->print_das(os);
538 d_dmr->getDDS()->print_das(os);
559 const string &response_name)
561 BESDEBUG(DEBUG_KEY, __FUNCTION__ <<
" BEGIN " << key << endl);
567 BESDEBUG(DEBUG_KEY,__FUNCTION__ <<
" Storing " << item_name << endl);
570 ofstream response(item_name.c_str(), ios::out|ios::app);
571 if (!response.is_open())
572 throw BESInternalError(
"Could not open '" + key +
"' to write the response.", __FILE__, __LINE__);
581 if (!
is_unlimited() || MAINTAIN_STORE_SIZE_EVEN_WHEN_UNLIMITED) {
599 VERBOSE(
"Metadata store: Wrote " << response_name <<
" response for '" << name <<
"'." << endl);
600 d_ledger_entry.append(
" ").append(key);
606 BESDEBUG(DEBUG_KEY,__FUNCTION__ <<
" Found " << item_name <<
" in the store already." << endl);
609 LOG(
"Metadata store: unable to store the " << response_name <<
" response for '" << name <<
"'." << endl);
614 throw BESInternalError(
"Could neither create or open '" + item_name +
"' in the metadata store.", __FILE__, __LINE__);
648 d_ledger_entry = string(
"add DDS ").append(name);
661 #if SYMETRIC_ADD_RESPONSES
668 #if SYMETRIC_ADD_RESPONSES
669 return (stored_dds && stored_das && stored_dmr);
671 return (stored_dds && stored_das);
690 d_ledger_entry = string(
"add DMR ").append(name);
697 #if SYMETRIC_ADD_RESPONSES
710 #if SYMETRIC_ADD_RESPONSES
711 return (stored_dds && stored_das && stored_dmr);
733 BESDEBUG(DEBUG_KEY, __func__ <<
"() MDS hashing name '" << name <<
"', '" << suffix <<
"'"<< endl);
737 "GlobalMetadataStore::get_read_lock_helper(). That should never happen.", __FILE__, __LINE__);
742 BESDEBUG(DEBUG_KEY, __func__ <<
"() MDS lock for " << item_name <<
": " << lock() << endl);
745 LOG(
"MDS Cache hit for '" << name <<
"' and response " << object_name << endl);
747 LOG(
"MDS Cache miss for '" << name <<
"' and response " << object_name << endl);
975 time_t file_time = besRH->
get_lmt(realName);
981 if (file_time > cache_time){
1000 struct stat statbuf;
1002 if (stat(item_name.c_str(), &statbuf) == -1){
1006 return statbuf.st_mtime;
1025 VERBOSE(
"Metadata store: Cache hit: read " << object_name <<
" response for '" << name <<
"'." << endl);
1026 BESDEBUG(DEBUG_KEY, __FUNCTION__ <<
" Found " << item_name <<
" in the store." << endl);
1037 throw BESInternalError(
"Could not open '" + item_name +
"' in the metadata store.", __FILE__, __LINE__);
1051 const string &object_name)
1056 VERBOSE(
"Metadata store: Cache hit: read " << object_name <<
" response for '" << name <<
"'." << endl);
1057 BESDEBUG(DEBUG_KEY, __FUNCTION__ <<
" Found " << item_name <<
" in the store." << endl);
1070 throw BESInternalError(
"Could not open '" + item_name +
"' in the metadata store.", __FILE__, __LINE__);
1109 string xml_base = BESContextManager::TheManager()->
get_context(
"xml:base", found);
1111 #if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
1114 throw BESInternalError(
"Could not read the value of xml:base.", __FILE__, __LINE__);
1132 string xml_base = BESContextManager::TheManager()->
get_context(
"xml:base", found);
1134 #if XML_BASE_MISSING_MEANS_OMIT_ATTRIBUTE
1137 throw BESInternalError(
"Could not read the value of xml:base.", __FILE__, __LINE__);
1155 string hash =
get_hash(name + suffix);
1157 VERBOSE(
"Metadata store: Removed " << object_name <<
" response for '" << hash <<
"'." << endl);
1158 d_ledger_entry.append(
" ").append(hash);
1162 LOG(
"Metadata store: unable to remove the " << object_name <<
" response for '" << name <<
"' (" << strerror(errno) <<
")."<< endl);
1178 d_ledger_entry = string(
"remove ").append(name);
1190 #if SYMETRIC_ADD_RESPONSES
1191 return (removed_dds && removed_das && removed_dmr);
1193 return (removed_dds || removed_das || removed_dmr || removed_dmrpp);
1215 D4BaseTypeFactory d4_btf;
1216 auto_ptr<DMR> dmr(
new DMR(&d4_btf,
"mds"));
1218 D4ParserSax2 parser;
1219 parser.intern(oss.str(), dmr.get());
1221 dmr->set_factory(0);
1223 return dmr.release();
1252 fstream dds_fs(dds_tmp.
get_name().c_str(), std::fstream::out);
1262 BaseTypeFactory btf;
1263 auto_ptr<DDS> dds(
new DDS(&btf));
1267 fstream das_fs(das_tmp.
get_name().c_str(), std::fstream::out);
1277 auto_ptr<DAS> das(
new DAS());
1280 dds->transfer_attributes(das.get());
1281 dds->set_factory(0);
1283 return dds.release();
1288 GlobalMetadataStore::parse_das_from_mds(libdap::DAS* das,
const std::string &name) {
1289 string suffix =
"das_r";
1293 VERBOSE(
"Metadata store: Cache hit: read " <<
" response for '" << name <<
"'." << endl);
1294 BESDEBUG(DEBUG_KEY, __FUNCTION__ <<
" Found " << item_name <<
" in the store." << endl);
1297 das->parse(item_name);
1306 throw BESInternalError(
"Could not open '" + item_name +
"' in the metadata store.", __FILE__, __LINE__);
A container is something that holds data. E.G., a netcdf file or a database entry.
std::string get_relative_name() const
Get the relative name of the object in this container.
std::string get_container_type() const
retrieve the type of data this container holds, such as cedar or netcdf.
std::string get_real_name() const
retrieve the real name for this container, such as a file name.
virtual std::string get_context(const std::string &name, bool &found)
retrieve the value of the specified context from the BES
Implementation of a caching mechanism for compressed data.
virtual void unlock_and_close(const std::string &target)
const std::string get_cache_directory()
bool is_unlimited() const
Is this cache allowed to store as much as it wants?
virtual unsigned long long update_cache_info(const std::string &target)
Update the cache info file to include 'target'.
virtual bool create_and_lock(const std::string &target, int &fd)
Create a file in the cache and lock it for write access.
virtual void exclusive_to_shared_lock(int fd)
Transfer from an exclusive lock to a shared lock.
virtual bool get_read_lock(const std::string &target, int &fd)
Get a read-only lock on the file if it exists.
virtual void purge_file(const std::string &file)
Purge a single file from the cache.
virtual bool cache_too_big(unsigned long long current_size) const
look at the cache size; is it too large? Look at the cache size and see if it is too big.
virtual void update_and_purge(const std::string &new_file)
Purge files from the cache.
virtual std::string get_cache_file_name(const std::string &src, bool mangle=true)
exception thrown if internal error encountered
exception thrown if an internal error is found and is fatal to the BES
error thrown if the resource requested cannot be found
virtual BESRequestHandler * find_handler(const std::string &handler_name)
find and return the specified request handler
Represents a specific data type request handler.
virtual time_t get_lmt(const std::string &name)
Get the Last modified time for.
static std::string lowercase(const std::string &s)
void get_value(const std::string &s, std::string &val, bool &found)
Retrieve the value of a given key, if set.
static TheBESKeys * TheKeys()
Get a new temporary file.
std::string get_name() const