XRootD
Loading...
Searching...
No Matches
XrdPfcConfiguration.cc
Go to the documentation of this file.
1#include "XrdPfc.hh"
2#include "XrdPfcTrace.hh"
3#include "XrdPfcInfo.hh"
4
5#include "XrdOss/XrdOss.hh"
6
7#include "XrdOuc/XrdOucEnv.hh"
11#include "XrdOuc/XrdOuca2x.hh"
12
13#include "XrdVersion.hh"
15#include "XrdSys/XrdSysXAttr.hh"
16
17#include <fcntl.h>
18
20
21namespace XrdPfc
22{
23 const char *trace_what_strings[] = {"","error ","warning ","info ","debug ","dump "};
24}
25
26using namespace XrdPfc;
27
29
31 m_hdfsmode(false),
32 m_allow_xrdpfc_command(false),
33 m_data_space("public"),
34 m_meta_space("public"),
35 m_diskTotalSpace(-1),
36 m_diskUsageLWM(-1),
37 m_diskUsageHWM(-1),
38 m_fileUsageBaseline(-1),
39 m_fileUsageNominal(-1),
40 m_fileUsageMax(-1),
41 m_purgeInterval(300),
42 m_purgeColdFilesAge(-1),
43 m_purgeAgeBasedPeriod(10),
44 m_accHistorySize(20),
45 m_dirStatsMaxDepth(-1),
46 m_dirStatsStoreDepth(0),
47 m_bufferSize(128*1024),
48 m_RamAbsAvailable(0),
49 m_RamKeepStdBlocks(0),
50 m_wqueue_blocks(16),
51 m_wqueue_threads(4),
52 m_prefetch_max_blocks(10),
53 m_hdfsbsize(128*1024*1024),
54 m_flushCnt(2000),
55 m_cs_UVKeep(-1),
56 m_cs_Chk(CSChk_Net),
57 m_cs_ChkTLS(false),
58 m_onlyIfCachedMinSize(1024*1024),
59 m_onlyIfCachedMinFrac(1.0)
60{}
61
62
63bool Cache::cfg2bytes(const std::string &str, long long &store, long long totalSpace, const char *name)
64{
65 char errStr[1024];
66 snprintf(errStr, 1024, "ConfigParameters() Error parsing parameter %s", name);
67
68 if (::isalpha(*(str.rbegin())))
69 {
70 if (XrdOuca2x::a2sz(m_log, errStr, str.c_str(), &store, 0, totalSpace))
71 {
72 return false;
73 }
74 }
75 else
76 {
77 char *eP;
78 errno = 0;
79 double frac = strtod(str.c_str(), &eP);
80 if (errno || eP == str.c_str())
81 {
82 m_log.Emsg(errStr, str.c_str());
83 return false;
84 }
85
86 store = static_cast<long long>(totalSpace * frac + 0.5);
87 }
88
89 if (store < 0 || store > totalSpace)
90 {
91 snprintf(errStr, 1024, "ConfigParameters() Error: parameter %s should be between 0 and total available disk space (%lld) - it is %lld (given as %s)",
92 name, totalSpace, store, str.c_str());
93 m_log.Emsg(errStr, "");
94 return false;
95 }
96
97 return true;
98}
99
100/* Function: xcschk
101
102 Purpose: To parse the directive: cschk <parms>
103
104 parms: [[no]net] [[no]tls] [[no]cache] [uvkeep <arg>]
105
106 all Checksum check on cache & net transfers.
107 cache Checksum check on cache only, 'no' turns it off.
108 net Checksum check on net transfers 'no' turns it off.
109 tls use TLS if server doesn't support checksums 'no' turns it off.
110 uvkeep Maximum amount of time a cached file make be kept if it
111 contains unverified checksums as n[d|h|m|s], where 'n'
112 is a non-negative integer. A value of 0 prohibits disk
113 caching unless the checksum can be verified. You can
114 also specify "lru" which means the standard purge policy
115 is to be used.
116
117 Output: true upon success or false upon failure.
118 */
119bool Cache::xcschk(XrdOucStream &Config)
120{
121 const char *val, *val2;
122 struct cschkopts {const char *opname; int opval;} csopts[] =
123 {
124 {"off", CSChk_None},
125 {"cache", CSChk_Cache},
126 {"net", CSChk_Net},
127 {"tls", CSChk_TLS}
128 };
129 int i, numopts = sizeof(csopts)/sizeof(struct cschkopts);
130 bool isNo;
131
132 if (! (val = Config.GetWord()))
133 {m_log.Emsg("Config", "cschk parameter not specified"); return false; }
134
135 while(val)
136 {
137 if ((isNo = strncmp(val, "no", 2) == 0))
138 val2 = val + 2;
139 else
140 val2 = val;
141 for (i = 0; i < numopts; i++)
142 {
143 if (!strcmp(val2, csopts[i].opname))
144 {
145 if (isNo)
146 m_configuration.m_cs_Chk &= ~csopts[i].opval;
147 else if (csopts[i].opval)
148 m_configuration.m_cs_Chk |= csopts[i].opval;
149 else
150 m_configuration.m_cs_Chk = csopts[i].opval;
151 break;
152 }
153 }
154 if (i >= numopts)
155 {
156 if (strcmp(val, "uvkeep"))
157 {
158 m_log.Emsg("Config", "invalid cschk option -", val);
159 return false;
160 }
161 if (!(val = Config.GetWord()))
162 {
163 m_log.Emsg("Config", "cschk uvkeep value not specified");
164 return false;
165 }
166 if (!strcmp(val, "lru"))
167 m_configuration.m_cs_UVKeep = -1;
168 else
169 {
170 int uvkeep;
171 if (XrdOuca2x::a2tm(m_log, "uvkeep time", val, &uvkeep, 0))
172 return false;
173 m_configuration.m_cs_UVKeep = uvkeep;
174 }
175 }
176 val = Config.GetWord();
177 }
178 // Decompose into separate TLS state, it is only passed on to psx
179 m_configuration.m_cs_ChkTLS = m_configuration.m_cs_Chk & CSChk_TLS;
180 m_configuration.m_cs_Chk &= ~CSChk_TLS;
181
182 m_env->Put("psx.CSNet", m_configuration.is_cschk_net() ? (m_configuration.m_cs_ChkTLS ? "2" : "1") : "0");
183
184 return true;
185}
186
187
188/* Function: xdlib
189
190 Purpose: To parse the directive: decisionlib <path> [<parms>]
191
192 <path> the path of the decision library to be used.
193 <parms> optional parameters to be passed.
194
195
196 Output: true upon success or false upon failure.
197 */
198bool Cache::xdlib(XrdOucStream &Config)
199{
200 const char* val;
201
202 std::string libp;
203 if (! (val = Config.GetWord()) || ! val[0])
204 {
205 TRACE(Info," Cache::Config() decisionlib not specified; always caching files");
206 return true;
207 }
208 else
209 {
210 libp = val;
211 }
212
213 char params[4096];
214 if (val[0])
215 Config.GetRest(params, 4096);
216 else
217 params[0] = 0;
218
219 XrdOucPinLoader* myLib = new XrdOucPinLoader(&m_log, 0, "decisionlib",
220 libp.c_str());
221
222 Decision *(*ep)(XrdSysError&);
223 ep = (Decision *(*)(XrdSysError&))myLib->Resolve("XrdPfcGetDecision");
224 if (! ep) {myLib->Unload(true); return false; }
225
226 Decision * d = ep(m_log);
227 if (! d)
228 {
229 TRACE(Error, "Config() decisionlib was not able to create a decision object");
230 return false;
231 }
232 if (params[0])
233 d->ConfigDecision(params);
234
235 m_decisionpoints.push_back(d);
236 return true;
237}
238
239/* Function: xtrace
240
241 Purpose: To parse the directive: trace <level>
242 Output: true upon success or false upon failure.
243 */
244bool Cache::xtrace(XrdOucStream &Config)
245{
246 char *val;
247 static struct traceopts {const char *opname; int opval; } tropts[] =
248 {
249 {"none", 0},
250 {"error", 1},
251 {"warning", 2},
252 {"info", 3},
253 {"debug", 4},
254 {"dump", 5},
255 {"dumpxl", 6}
256 };
257 int numopts = sizeof(tropts)/sizeof(struct traceopts);
258
259 if (! (val = Config.GetWord()))
260 {m_log.Emsg("Config", "trace option not specified"); return 1; }
261
262 for (int i = 0; i < numopts; i++)
263 {
264 if (! strcmp(val, tropts[i].opname))
265 {
266 m_trace->What = tropts[i].opval;
267 return true;
268 }
269 }
270 m_log.Emsg("Config", "invalid trace option -", val);
271 return false;
272}
273
274// Determine if oss spaces are operational and if they support xattrs.
275bool Cache::test_oss_basics_and_features()
276{
277 static const char *epfx = "test_oss_basics_and_features()";
278
279 const auto &conf = m_configuration;
280 const char *user = conf.m_username.c_str();
281 XrdOucEnv env;
282
283 auto check_space = [&](const char *space, bool &has_xattr)
284 {
285 std::string fname("__prerun_test_pfc_");
286 fname += space;
287 fname += "_space__";
288 env.Put("oss.cgroup", space);
289
290 int res = m_oss->Create(user, fname.c_str(), 0600, env, XRDOSS_mkpath);
291 if (res != XrdOssOK) {
292 m_log.Emsg(epfx, "Can not create a file on space", space);
293 return false;
294 }
295 XrdOssDF *oss_file = m_oss->newFile(user);
296 res = oss_file->Open(fname.c_str(), O_RDWR, 0600, env);
297 if (res != XrdOssOK) {
298 m_log.Emsg(epfx, "Can not open a file on space", space);
299 return false;
300 }
301 res = oss_file->Write(fname.data(), 0, fname.length());
302 if (res != (int) fname.length()) {
303 m_log.Emsg(epfx, "Can not write into a file on space", space);
304 return false;
305 }
306
307 has_xattr = true;
308 long long fsize = fname.length();
309 res = XrdSysXAttrActive->Set("pfc.fsize", &fsize, sizeof(long long), 0, oss_file->getFD(), 0);
310 if (res != 0) {
311 m_log.Emsg(epfx, "Can not write xattr to a file on space", space);
312 has_xattr = false;
313 }
314
315 oss_file->Close();
316
317 if (has_xattr) {
318 char pfn[4096];
319 m_oss->Lfn2Pfn(fname.c_str(), pfn, 4096);
320 fsize = -1ll;
321 res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
322 if (res != sizeof(long long) || fsize != (long long) fname.length())
323 {
324 m_log.Emsg(epfx, "Can not read xattr from a file on space", space);
325 has_xattr = false;
326 }
327 }
328
329 res = m_oss->Unlink(fname.c_str());
330 if (res != XrdOssOK) {
331 m_log.Emsg(epfx, "Can not unlink a file on space", space);
332 return false;
333 }
334
335 return true;
336 };
337
338 bool aOK = true;
339 aOK &= check_space(conf.m_data_space.c_str(), m_dataXattr);
340 aOK &= check_space(conf.m_meta_space.c_str(), m_metaXattr);
341
342 return aOK;
343}
344
345//______________________________________________________________________________
346/* Function: Config
347
348 Purpose: To parse configuration file and configure Cache instance.
349 Output: true upon success or false upon failure.
350 */
351bool Cache::Config(const char *config_filename, const char *parameters)
352{
353 // Indicate whether or not we are a client instance
354 const char *theINS = getenv("XRDINSTANCE");
355 m_isClient = (theINS != 0 && strncmp("*client ", theINS, 8) == 0);
356
357 // Tell everyone else we are a caching proxy
358 XrdOucEnv::Export("XRDPFC", 1);
359
360 XrdOucEnv myEnv;
361 XrdOucStream Config(&m_log, theINS, &myEnv, "=====> ");
362
363 if (! config_filename || ! *config_filename)
364 {
365 TRACE(Error, "Config() configuration file not specified.");
366 return false;
367 }
368
369 int fd;
370 if ( (fd = open(config_filename, O_RDONLY, 0)) < 0)
371 {
372 TRACE( Error, "Config() can't open configuration file " << config_filename);
373 return false;
374 }
375
376 Config.Attach(fd);
377 static const char *cvec[] = { "*** pfc plugin config:", 0 };
378 Config.Capture(cvec);
379
380 // Obtain OFS configurator for OSS plugin.
381 XrdOfsConfigPI *ofsCfg = XrdOfsConfigPI::New(config_filename,&Config,&m_log,
382 &XrdVERSIONINFOVAR(XrdOucGetCache));
383 if (! ofsCfg) return false;
384
385 TmpConfiguration tmpc;
386
387 // Adjust default parameters for client/serverless caching
388 if (m_isClient)
389 {
390 m_configuration.m_bufferSize = 128 * 1024; // same as normal.
391 m_configuration.m_wqueue_blocks = 8;
392 m_configuration.m_wqueue_threads = 1;
393 }
394
395 // If network checksum processing is the default, indicate so.
396 if (m_configuration.is_cschk_net()) m_env->Put("psx.CSNet", m_configuration.m_cs_ChkTLS ? "2" : "1");
397
398 // Actual parsing of the config file.
399 bool retval = true, aOK = true;
400 char *var;
401 while ((var = Config.GetMyFirstWord()))
402 {
403 if (! strcmp(var,"pfc.osslib"))
404 {
405 retval = ofsCfg->Parse(XrdOfsConfigPI::theOssLib);
406 }
407 else if (! strcmp(var,"pfc.cschk"))
408 {
409 retval = xcschk(Config);
410 }
411 else if (! strcmp(var,"pfc.decisionlib"))
412 {
413 retval = xdlib(Config);
414 }
415 else if (! strcmp(var,"pfc.trace"))
416 {
417 retval = xtrace(Config);
418 }
419 else if (! strcmp(var,"pfc.allow_xrdpfc_command"))
420 {
421 m_configuration.m_allow_xrdpfc_command = true;
422 }
423 else if (! strncmp(var,"pfc.", 4))
424 {
425 retval = ConfigParameters(std::string(var+4), Config, tmpc);
426 }
427
428 if ( ! retval)
429 {
430 TRACE(Error, "Config() error in parsing");
431 aOK = false;
432 }
433 }
434
435 Config.Close();
436
437 // Load OSS plugin.
438 myEnv.Put("oss.runmode", "pfc");
439 if (m_configuration.is_cschk_cache())
440 {
441 char csi_conf[128];
442 if (snprintf(csi_conf, 128, "space=%s nofill", m_configuration.m_meta_space.c_str()) < 128)
443 {
444 ofsCfg->Push(XrdOfsConfigPI::theOssLib, "libXrdOssCsi.so", csi_conf);
445 } else {
446 TRACE(Error, "Config() buffer too small for libXrdOssCsi params.");
447 return false;
448 }
449 }
450 if (ofsCfg->Load(XrdOfsConfigPI::theOssLib, &myEnv))
451 {
452 ofsCfg->Plugin(m_oss);
453 }
454 else
455 {
456 TRACE(Error, "Config() Unable to create an OSS object");
457 return false;
458 }
459
460 // Test if OSS is operational, determine optional features.
461 aOK &= test_oss_basics_and_features();
462
463 // sets default value for disk usage
464 XrdOssVSInfo sP;
465 {
466 if (m_oss->StatVS(&sP, m_configuration.m_data_space.c_str(), 1) < 0)
467 {
468 m_log.Emsg("ConfigParameters()", "error obtaining stat info for data space ", m_configuration.m_data_space.c_str());
469 return false;
470 }
471 if (sP.Total < 10ll << 20)
472 {
473 m_log.Emsg("ConfigParameters()", "available data space is less than 10 MB (can be due to a mistake in oss.localroot directive) for space ",
474 m_configuration.m_data_space.c_str());
475 return false;
476 }
477
478 m_configuration.m_diskTotalSpace = sP.Total;
479
480 if (cfg2bytes(tmpc.m_diskUsageLWM, m_configuration.m_diskUsageLWM, sP.Total, "lowWatermark") &&
481 cfg2bytes(tmpc.m_diskUsageHWM, m_configuration.m_diskUsageHWM, sP.Total, "highWatermark"))
482 {
483 if (m_configuration.m_diskUsageLWM >= m_configuration.m_diskUsageHWM) {
484 m_log.Emsg("ConfigParameters()", "pfc.diskusage should have lowWatermark < highWatermark.");
485 aOK = false;
486 }
487 }
488 else aOK = false;
489
490 if ( ! tmpc.m_fileUsageMax.empty())
491 {
492 if (cfg2bytes(tmpc.m_fileUsageBaseline, m_configuration.m_fileUsageBaseline, sP.Total, "files baseline") &&
493 cfg2bytes(tmpc.m_fileUsageNominal, m_configuration.m_fileUsageNominal, sP.Total, "files nominal") &&
494 cfg2bytes(tmpc.m_fileUsageMax, m_configuration.m_fileUsageMax, sP.Total, "files max"))
495 {
496 if (m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageNominal ||
497 m_configuration.m_fileUsageBaseline >= m_configuration.m_fileUsageMax ||
498 m_configuration.m_fileUsageNominal >= m_configuration.m_fileUsageMax)
499 {
500 m_log.Emsg("ConfigParameters()", "pfc.diskusage files should have baseline < nominal < max.");
501 aOK = false;
502 }
503 }
504 else aOK = false;
505 }
506 }
507
508 // sets flush frequency
509 if ( ! tmpc.m_flushRaw.empty())
510 {
511 if (::isalpha(*(tmpc.m_flushRaw.rbegin())))
512 {
513 if (XrdOuca2x::a2sz(m_log, "Error getting number of bytes written before flush", tmpc.m_flushRaw.c_str(),
514 &m_configuration.m_flushCnt,
515 100 * m_configuration.m_bufferSize , 100000 * m_configuration.m_bufferSize))
516 {
517 return false;
518 }
519 m_configuration.m_flushCnt /= m_configuration.m_bufferSize;
520 }
521 else
522 {
523 if (XrdOuca2x::a2ll(m_log, "Error getting number of blocks written before flush", tmpc.m_flushRaw.c_str(),
524 &m_configuration.m_flushCnt, 100, 100000))
525 {
526 return false;
527 }
528 }
529 }
530
531 // get number of available RAM blocks after process configuration
532 if (m_configuration.m_RamAbsAvailable == 0)
533 {
534 m_configuration.m_RamAbsAvailable = m_isClient ? 256ll * 1024 * 1024 : 1024ll * 1024 * 1024;
535 char buff[1024];
536 snprintf(buff, sizeof(buff), "RAM usage pfc.ram is not specified. Default value %s is used.", m_isClient ? "256m" : "1g");
537 m_log.Say("Config info: ", buff);
538 }
539 // Setup number of standard-size blocks not released back to the system to 5% of total RAM.
540 m_configuration.m_RamKeepStdBlocks = (m_configuration.m_RamAbsAvailable / m_configuration.m_bufferSize + 1) * 5 / 100;
541
542 // Set tracing to debug if this is set in environment
543 char* cenv = getenv("XRDDEBUG");
544 if (cenv && ! strcmp(cenv,"1") && m_trace->What < 4) m_trace->What = 4;
545
546 if (aOK)
547 {
548 int loff = 0;
549// 000 001 010
550 const char *csc[] = {"off", "cache nonet", "nocache net notls",
551// 011
552 "cache net notls",
553// 100 101 110
554 "off", "cache nonet", "nocache net tls",
555// 111
556 "cache net tls"};
557 char buff[8192], uvk[32];
558 if (m_configuration.m_cs_UVKeep < 0)
559 strcpy(uvk, "lru");
560 else
561 sprintf(uvk, "%lld", (long long) m_configuration.m_cs_UVKeep);
562 float rg = (m_configuration.m_RamAbsAvailable) / float(1024*1024*1024);
563 loff = snprintf(buff, sizeof(buff), "Config effective %s pfc configuration:\n"
564 " pfc.cschk %s uvkeep %s\n"
565 " pfc.blocksize %lld\n"
566 " pfc.prefetch %d\n"
567 " pfc.ram %.fg\n"
568 " pfc.writequeue %d %d\n"
569 " # Total available disk: %lld\n"
570 " pfc.diskusage %lld %lld files %lld %lld %lld purgeinterval %d purgecoldfiles %d\n"
571 " pfc.spaces %s %s\n"
572 " pfc.trace %d\n"
573 " pfc.flush %lld\n"
574 " pfc.acchistorysize %d\n"
575 " pfc.onlyIfCachedMinBytes %lld\n"
576 " pfc.onlyIfCachedMinFrac %.2f\n",
577 config_filename,
578 csc[int(m_configuration.m_cs_Chk)], uvk,
579 m_configuration.m_bufferSize,
580 m_configuration.m_prefetch_max_blocks,
581 rg,
582 m_configuration.m_wqueue_blocks, m_configuration.m_wqueue_threads,
583 sP.Total,
584 m_configuration.m_diskUsageLWM, m_configuration.m_diskUsageHWM,
585 m_configuration.m_fileUsageBaseline, m_configuration.m_fileUsageNominal, m_configuration.m_fileUsageMax,
586 m_configuration.m_purgeInterval, m_configuration.m_purgeColdFilesAge,
587 m_configuration.m_data_space.c_str(),
588 m_configuration.m_meta_space.c_str(),
589 m_trace->What,
590 m_configuration.m_flushCnt,
591 m_configuration.m_accHistorySize,
592 m_configuration.m_onlyIfCachedMinSize,
593 m_configuration.m_onlyIfCachedMinFrac);
594
595 if (m_configuration.is_dir_stat_reporting_on())
596 {
597 loff += snprintf(buff + loff, sizeof(buff) - loff,
598 " pfc.dirstats maxdepth %d ((internal: store_depth %d, size_of_dirlist %d, size_of_globlist %d))\n",
599 m_configuration.m_dirStatsMaxDepth, m_configuration.m_dirStatsStoreDepth,
600 (int) m_configuration.m_dirStatsDirs.size(), (int) m_configuration.m_dirStatsDirGlobs.size());
601 loff += snprintf(buff + loff, sizeof(buff) - loff, " dirlist:\n");
602 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirs.begin(); i != m_configuration.m_dirStatsDirs.end(); ++i)
603 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s\n", i->c_str());
604 loff += snprintf(buff + loff, sizeof(buff) - loff, " globlist:\n");
605 for (std::set<std::string>::iterator i = m_configuration.m_dirStatsDirGlobs.begin(); i != m_configuration.m_dirStatsDirGlobs.end(); ++i)
606 loff += snprintf(buff + loff, sizeof(buff) - loff, " %s/*\n", i->c_str());
607 }
608
609 if (m_configuration.m_hdfsmode)
610 {
611 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.hdfsmode hdfsbsize %lld\n", m_configuration.m_hdfsbsize);
612 }
613
614 if (m_configuration.m_username.empty())
615 {
616 char unameBuff[256];
617 XrdOucUtils::UserName(getuid(), unameBuff, sizeof(unameBuff));
618 m_configuration.m_username = unameBuff;
619 }
620 else
621 {
622 loff += snprintf(buff + loff, sizeof(buff) - loff, " pfc.user %s\n", m_configuration.m_username.c_str());
623 }
624
625 m_log.Say(buff);
626
627 m_env->Put("XRDPFC.SEGSIZE", std::to_string(m_configuration.m_bufferSize).c_str());
628 }
629
630 // Derived settings
631 m_prefetch_enabled = m_configuration.m_prefetch_max_blocks > 0;
632 Info::s_maxNumAccess = m_configuration.m_accHistorySize;
633
634 m_gstream = (XrdXrootdGStream*) m_env->GetPtr("pfc.gStream*");
635
636 m_log.Say("Config Proxy File Cache g-stream has", m_gstream ? "" : " NOT", " been configured via xrootd.monitor directive");
637
638 m_log.Say("------ Proxy File Cache configuration parsing ", aOK ? "completed" : "failed");
639
640 if (ofsCfg) delete ofsCfg;
641
642 // XXXX-CKSUM Testing. To be removed after OssPgi is also merged and valildated.
643 // Building of xrdpfc_print fails when this is enabled.
644#ifdef XRDPFC_CKSUM_TEST
645 {
646 int xxx = m_configuration.m_cs_Chk;
647
648 for (m_configuration.m_cs_Chk = CSChk_None; m_configuration.m_cs_Chk <= CSChk_Both; ++m_configuration.m_cs_Chk)
649 {
650 Info::TestCksumStuff();
651 }
652
653 m_configuration.m_cs_Chk = xxx;
654 }
655#endif
656
657 return aOK;
658}
659
660//------------------------------------------------------------------------------
661
662bool Cache::ConfigParameters(std::string part, XrdOucStream& config, TmpConfiguration &tmpc)
663{
664 struct ConfWordGetter
665 {
666 XrdOucStream &m_config;
667 char *m_last_word;
668
669 ConfWordGetter(XrdOucStream& c) : m_config(c), m_last_word((char*)1) {}
670
671 const char* GetWord() { if (HasLast()) m_last_word = m_config.GetWord(); return HasLast() ? m_last_word : ""; }
672 bool HasLast() { return (m_last_word != 0); }
673 };
674
675 ConfWordGetter cwg(config);
676
677 XrdSysError err(0, "");
678 if ( part == "user" )
679 {
680 m_configuration.m_username = cwg.GetWord();
681 if ( ! cwg.HasLast())
682 {
683 m_log.Emsg("Config", "Error: pfc.user requires a parameter.");
684 return false;
685 }
686 }
687 else if ( part == "diskusage" )
688 {
689 tmpc.m_diskUsageLWM = cwg.GetWord();
690 tmpc.m_diskUsageHWM = cwg.GetWord();
691
692 if (tmpc.m_diskUsageHWM.empty())
693 {
694 m_log.Emsg("Config", "Error: pfc.diskusage parameter requires at least two arguments.");
695 return false;
696 }
697
698 const char *p = 0;
699 while ((p = cwg.GetWord()) && cwg.HasLast())
700 {
701 if (strcmp(p, "files") == 0)
702 {
703 tmpc.m_fileUsageBaseline = cwg.GetWord();
704 tmpc.m_fileUsageNominal = cwg.GetWord();
705 tmpc.m_fileUsageMax = cwg.GetWord();
706
707 if ( ! cwg.HasLast())
708 {
709 m_log.Emsg("Config", "Error: pfc.diskusage files directive requires three arguments.");
710 return false;
711 }
712 }
713 else if (strcmp(p, "sleep") == 0 || strcmp(p, "purgeinterval") == 0)
714 {
715 if (strcmp(p, "sleep") == 0) m_log.Emsg("Config", "warning sleep directive is deprecated in pfc.diskusage. Please use purgeinterval instead.");
716
717 if (XrdOuca2x::a2tm(m_log, "Error getting purgeinterval", cwg.GetWord(), &m_configuration.m_purgeInterval, 60, 3600))
718 {
719 return false;
720 }
721 }
722 else if (strcmp(p, "purgecoldfiles") == 0)
723 {
724 if (XrdOuca2x::a2tm(m_log, "Error getting purgecoldfiles age", cwg.GetWord(), &m_configuration.m_purgeColdFilesAge, 3600, 3600*24*360))
725 {
726 return false;
727 }
728 if (XrdOuca2x::a2i(m_log, "Error getting purgecoldfiles period", cwg.GetWord(), &m_configuration.m_purgeAgeBasedPeriod, 1, 1000))
729 {
730 return false;
731 }
732 }
733 else
734 {
735 m_log.Emsg("Config", "Error: diskusage stanza contains unknown directive", p);
736 }
737 }
738 }
739 else if ( part == "acchistorysize" )
740 {
741 if ( XrdOuca2x::a2i(m_log, "Error getting access-history-size", cwg.GetWord(), &m_configuration.m_accHistorySize, 20, 200))
742 {
743 return false;
744 }
745 }
746 else if ( part == "dirstats" )
747 {
748 const char *p = 0;
749 while ((p = cwg.GetWord()) && cwg.HasLast())
750 {
751 if (strcmp(p, "maxdepth") == 0)
752 {
753 if (XrdOuca2x::a2i(m_log, "Error getting maxdepth value", cwg.GetWord(), &m_configuration.m_dirStatsMaxDepth, 0, 16))
754 {
755 return false;
756 }
757 m_configuration.m_dirStatsStoreDepth = std::max(m_configuration.m_dirStatsStoreDepth, m_configuration.m_dirStatsMaxDepth);
758 }
759 else if (strcmp(p, "dir") == 0)
760 {
761 p = cwg.GetWord();
762 if (p && p[0] == '/')
763 {
764 // XXX -- should we just store them as sets of PathTokenizer objects, not strings?
765
766 char d[1024]; d[0] = 0;
767 int depth = 0;
768 { // Compress multiple slashes and "measure" depth
769 const char *pp = p;
770 char *pd = d;
771 *(pd++) = *(pp++);
772 while (*pp != 0)
773 {
774 if (*(pd - 1) == '/')
775 {
776 if (*pp == '/')
777 {
778 ++pp; continue;
779 }
780 ++depth;
781 }
782 *(pd++) = *(pp++);
783 }
784 *(pd--) = 0;
785 // remove trailing but but not leading /
786 if (*pd == '/' && pd != d) *pd = 0;
787 }
788 int ld = strlen(d);
789 if (ld >= 2 && d[ld-1] == '*' && d[ld-2] == '/')
790 {
791 d[ld-2] = 0;
792 ld -= 2;
793 m_configuration.m_dirStatsDirGlobs.insert(d);
794 printf("Glob %s -> %s -- depth = %d\n", p, d, depth);
795 }
796 else
797 {
798 m_configuration.m_dirStatsDirs.insert(d);
799 printf("Dir %s -> %s -- depth = %d\n", p, d, depth);
800 }
801
802 m_configuration.m_dirStatsStoreDepth = std::max(m_configuration.m_dirStatsStoreDepth, depth);
803 }
804 else
805 {
806 m_log.Emsg("Config", "Error: dirstats dir parameter requires a directory argument starting with a '/'.");
807 return false;
808 }
809 }
810 else
811 {
812 m_log.Emsg("Config", "Error: dirstats stanza contains unknown directive '", p, "'");
813 return false;
814 }
815 }
816 }
817 else if ( part == "blocksize" )
818 {
819 long long minBSize = 4 * 1024;
820 long long maxBSize = 512 * 1024 * 1024;
821 if (XrdOuca2x::a2sz(m_log, "Error reading block-size", cwg.GetWord(), &m_configuration.m_bufferSize, minBSize, maxBSize))
822 {
823 return false;
824 }
825 if (m_configuration.m_bufferSize & 0xFFF)
826 {
827 m_configuration.m_bufferSize &= ~0x0FFF;
828 m_configuration.m_bufferSize += 0x1000;
829 m_log.Emsg("Config", "pfc.blocksize must be a multiple of 4 kB. Rounded up.");
830 }
831 }
832 else if ( part == "prefetch" || part == "nramprefetch" )
833 {
834 if (part == "nramprefetch")
835 {
836 m_log.Emsg("Config", "pfc.nramprefetch is deprecated, please use pfc.prefetch instead. Replacing the directive internally.");
837 }
838
839 if (XrdOuca2x::a2i(m_log, "Error setting prefetch block count", cwg.GetWord(), &m_configuration.m_prefetch_max_blocks, 0, 128))
840 {
841 return false;
842 }
843
844 }
845 else if ( part == "nramread" )
846 {
847 m_log.Emsg("Config", "pfc.nramread is deprecated, please use pfc.ram instead. Ignoring this directive.");
848 cwg.GetWord(); // Ignoring argument.
849 }
850 else if ( part == "ram" )
851 {
852 long long minRAM = m_isClient ? 256 * 1024 * 1024 : 1024 * 1024 * 1024;
853 long long maxRAM = 256 * minRAM;
854 if ( XrdOuca2x::a2sz(m_log, "get RAM available", cwg.GetWord(), &m_configuration.m_RamAbsAvailable, minRAM, maxRAM))
855 {
856 return false;
857 }
858 }
859 else if ( part == "writequeue")
860 {
861 if (XrdOuca2x::a2i(m_log, "Error getting pfc.writequeue num-blocks", cwg.GetWord(), &m_configuration.m_wqueue_blocks, 1, 1024))
862 {
863 return false;
864 }
865 if (XrdOuca2x::a2i(m_log, "Error getting pfc.writequeue num-threads", cwg.GetWord(), &m_configuration.m_wqueue_threads, 1, 64))
866 {
867 return false;
868 }
869 }
870 else if ( part == "spaces" )
871 {
872 m_configuration.m_data_space = cwg.GetWord();
873 m_configuration.m_meta_space = cwg.GetWord();
874 if ( ! cwg.HasLast())
875 {
876 m_log.Emsg("Config", "spacenames requires two parameters: <data-space> <metadata-space>.");
877 return false;
878 }
879 }
880 else if ( part == "hdfsmode" )
881 {
882 m_log.Emsg("Config", "pfc.hdfsmode is currently unsupported.");
883 return false;
884
885 m_configuration.m_hdfsmode = true;
886
887 const char* params = cwg.GetWord();
888 if (params)
889 {
890 if (! strncmp("hdfsbsize", params, 9))
891 {
892 long long minBlSize = 32 * 1024;
893 long long maxBlSize = 128 * 1024 * 1024;
894 if ( XrdOuca2x::a2sz(m_log, "Error getting file fragment size", cwg.GetWord(), &m_configuration.m_hdfsbsize, minBlSize, maxBlSize))
895 {
896 return false;
897 }
898 }
899 else
900 {
901 m_log.Emsg("Config", "Error setting the fragment size parameter name");
902 return false;
903 }
904 }
905 }
906 else if ( part == "flush" )
907 {
908 tmpc.m_flushRaw = cwg.GetWord();
909 if ( ! cwg.HasLast())
910 {
911 m_log.Emsg("Config", "Error: pfc.flush requires a parameter.");
912 return false;
913 }
914 }
915 else if ( part == "onlyifcached" )
916 {
917 const char *p = 0;
918 while ((p = cwg.GetWord()) && cwg.HasLast())
919 {
920 if (strcmp(p, "minsize") == 0)
921 {
922 std::string minBytes = cwg.GetWord();
923 long long minBytesTop = 1024 * 1024 * 1024;
924 if (::isalpha(*(minBytes.rbegin())))
925 {
926 if (XrdOuca2x::a2sz(m_log, "Error in parsing minsize value for onlyifcached parameter", minBytes.c_str(), &m_configuration.m_onlyIfCachedMinSize, 0, minBytesTop))
927 {
928 return false;
929 }
930 }
931 else
932 {
933 if (XrdOuca2x::a2ll(m_log, "Error in parsing numeric minsize value for onlyifcached parameter", minBytes.c_str(),&m_configuration.m_onlyIfCachedMinSize, 0, minBytesTop))
934 {
935 return false;
936 }
937 }
938 }
939 if (strcmp(p, "minfrac") == 0)
940 {
941 std::string minFrac = cwg.GetWord();
942 char *eP;
943 errno = 0;
944 double frac = strtod(minFrac.c_str(), &eP);
945 if (errno || eP == minFrac.c_str())
946 {
947 m_log.Emsg("Config", "Error setting fraction for only-if-cached directive");
948 return false;
949 }
950 m_configuration.m_onlyIfCachedMinFrac = frac;
951 }
952 else
953 {
954 m_log.Emsg("Config", "Error: onlyifcached stanza contains unknown directive", p);
955 }
956 }
957 }
958 else
959 {
960 m_log.Emsg("ConfigParameters() unmatched pfc parameter", part.c_str());
961 return false;
962 }
963
964 return true;
965}
#define XrdOssOK
Definition XrdOss.hh:50
#define XRDOSS_mkpath
Definition XrdOss.hh:466
XrdSysXAttr * XrdSysXAttrActive
XrdVERSIONINFO(XrdOucGetCache, XrdPfc)
XrdSysXAttr * XrdSysXAttrActive
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition XrdPfc.cc:84
#define open
Definition XrdPosix.hh:71
int isNo(int dflt, const char *Msg1, const char *Msg2, const char *Msg3)
#define TRACE(act, x)
Definition XrdTrace.hh:63
bool Parse(TheLib what)
bool Plugin(XrdAccAuthorize *&piP)
Get Authorization plugin.
static XrdOfsConfigPI * New(const char *cfn, XrdOucStream *cfgP, XrdSysError *errP, XrdVersionInfo *verP=0, XrdSfsFileSystem *sfsP=0)
bool Load(int what, XrdOucEnv *envP=0)
bool Push(TheLib what, const char *plugP, const char *parmP=0)
@ theOssLib
Oss plugin.
virtual int Close(long long *retsz=0)=0
virtual int getFD()
Definition XrdOss.hh:426
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition XrdOss.hh:345
long long Total
Definition XrdOssVS.hh:90
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
Definition XrdOss.hh:873
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
Definition XrdOss.cc:117
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
static int Export(const char *Var, const char *Val)
Definition XrdOucEnv.cc:170
void * GetPtr(const char *varname)
Definition XrdOucEnv.cc:263
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
void * Resolve(const char *symbl, int mcnt=1)
void Unload(bool dodel=false)
char * GetWord(int lowcase=0)
static int UserName(uid_t uID, char *uName, int uNsz)
static int a2i(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition XrdOuca2x.cc:45
static int a2sz(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:257
static int a2ll(XrdSysError &, const char *emsg, const char *item, long long *val, long long minv=-1, long long maxv=-1)
Definition XrdOuca2x.cc:70
static int a2tm(XrdSysError &, const char *emsg, const char *item, int *val, int minv=-1, int maxv=-1)
Definition XrdOuca2x.cc:288
bool Config(const char *config_filename, const char *parameters)
Parse configuration file.
Base class for selecting which files should be cached.
virtual bool ConfigDecision(const char *params)
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:45
static size_t s_maxNumAccess
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
virtual int Get(const char *Aname, void *Aval, int Avsz, const char *Path, int fd=-1)=0
virtual int Set(const char *Aname, const void *Aval, int Avsz, const char *Path, int fd=-1, int isNew=0)=0
const char * trace_what_strings[]
@ CSChk_Cache
long long m_hdfsbsize
used with m_hdfsmode, default 128MB
Definition XrdPfc.hh:108
long long m_RamAbsAvailable
available from configuration
Definition XrdPfc.hh:102
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition XrdPfc.hh:109
int m_accHistorySize
max number of entries in access history part of cinfo file
Definition XrdPfc.hh:94
int m_wqueue_threads
number of threads writing blocks to disk
Definition XrdPfc.hh:105
long long m_diskTotalSpace
total disk space on configured partition or oss space
Definition XrdPfc.hh:85
long long m_fileUsageMax
cache purge - files usage maximum
Definition XrdPfc.hh:90
long long m_fileUsageBaseline
cache purge - files usage baseline
Definition XrdPfc.hh:88
int m_dirStatsStoreDepth
depth to which statistics should be collected
Definition XrdPfc.hh:99
bool m_allow_xrdpfc_command
flag for enabling access to /xrdpfc-command/ functionality.
Definition XrdPfc.hh:79
long long m_diskUsageHWM
cache purge - disk usage high water mark
Definition XrdPfc.hh:87
bool is_cschk_cache() const
Definition XrdPfc.hh:69
std::set< std::string > m_dirStatsDirGlobs
directory globs for which stat reporting was requested
Definition XrdPfc.hh:97
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition XrdPfc.hh:106
bool m_cs_ChkTLS
Allow TLS.
Definition XrdPfc.hh:113
long long m_fileUsageNominal
cache purge - files usage nominal
Definition XrdPfc.hh:89
int m_cs_Chk
Checksum check.
Definition XrdPfc.hh:112
int m_purgeAgeBasedPeriod
peform cold file / uvkeep purge every this many purge cycles
Definition XrdPfc.hh:93
bool m_hdfsmode
flag for enabling block-level operation
Definition XrdPfc.hh:78
int m_purgeColdFilesAge
purge files older than this age
Definition XrdPfc.hh:92
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:82
std::set< std::string > m_dirStatsDirs
directories for which stat reporting was requested
Definition XrdPfc.hh:96
long long m_diskUsageLWM
cache purge - disk usage low water mark
Definition XrdPfc.hh:86
int m_RamKeepStdBlocks
number of standard-sized blocks kept after release
Definition XrdPfc.hh:103
long long m_bufferSize
prefetch buffer size, default 1MB
Definition XrdPfc.hh:101
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:83
int m_wqueue_blocks
maximum number of blocks written per write-queue loop
Definition XrdPfc.hh:104
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:81
bool is_cschk_net() const
Definition XrdPfc.hh:70
double m_onlyIfCachedMinFrac
minimum fraction of downloaded file, used by only-if-cached CGI option
Definition XrdPfc.hh:116
time_t m_cs_UVKeep
unverified checksum cache keep
Definition XrdPfc.hh:111
int m_dirStatsMaxDepth
maximum depth for statistics write out
Definition XrdPfc.hh:98
int m_purgeInterval
sleep interval between cache purges
Definition XrdPfc.hh:91
long long m_onlyIfCachedMinSize
minumum size of downloaded file, used by only-if-cached CGI option
Definition XrdPfc.hh:115
bool is_dir_stat_reporting_on() const
Definition XrdPfc.hh:62
std::string m_diskUsageLWM
Definition XrdPfc.hh:123
std::string m_diskUsageHWM
Definition XrdPfc.hh:124
std::string m_fileUsageBaseline
Definition XrdPfc.hh:125
std::string m_fileUsageNominal
Definition XrdPfc.hh:126
std::string m_flushRaw
Definition XrdPfc.hh:128
std::string m_fileUsageMax
Definition XrdPfc.hh:127