XRootD
Loading...
Searching...
No Matches
XrdSsiClient.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d S s i G e t C l i e n t S e r v i c e . c c */
4/* */
5/* (c) 2013 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* Produced by Andrew Hanushevsky for Stanford University under contract */
7/* DE-AC02-76-SFO0515 with the Department of Energy */
8/* */
9/* This file is part of the XRootD software suite. */
10/* */
11/* XRootD is free software: you can redistribute it and/or modify it under */
12/* the terms of the GNU Lesser General Public License as published by the */
13/* Free Software Foundation, either version 3 of the License, or (at your */
14/* option) any later version. */
15/* */
16/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19/* License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24/* */
25/* The copyright holder's institutional names and contributor's names may not */
26/* be used to endorse or promote products derived from this software without */
27/* specific prior written permission of the institution or contributor. */
28/******************************************************************************/
29
30#include <cerrno>
31#include <fcntl.h>
32#include <cstdio>
33#include <string>
34#include <cstring>
35#include <ctime>
36#include <unistd.h>
37#include <sys/types.h>
38
39#include "Xrd/XrdScheduler.hh"
40#include "Xrd/XrdTrace.hh"
41
43
44#include "XrdNet/XrdNetAddr.hh"
46
51#include "XrdSsi/XrdSsiScale.hh"
52#include "XrdSsi/XrdSsiTrace.hh"
53
56#include "XrdSys/XrdSysError.hh"
58#include "XrdSys/XrdSysTrace.hh"
59
60/******************************************************************************/
61/* N a m e S p a c e G l o b a l s */
62/******************************************************************************/
63
64namespace XrdSsi
65{
66extern XrdSysError Log;
67extern XrdSysLogger *Logger;
69extern XrdSysTrace Trace;
72
76 Atomic(int) contactN(1);
77 short maxTCB = 300;
78 short maxCLW = 30;
79 short maxPEL = 10;
80 Atomic(bool) initDone(false);
81 bool dsTTLSet = false;
82 bool reqTOSet = false;
83 bool strTOSet = false;
84 bool hiResTime= false;
85
86static const int rDispNone = 0;
87static const int rDispRand = -1;
88static const int rDispRR = 1;
89
90 char rDisp = rDispRR;
91}
92
93using namespace XrdSsi;
94
95/******************************************************************************/
96/* L o c a l C l a s s e s */
97/******************************************************************************/
98
100{
101public:
102
104 const std::string &contact,
105 int oHold=256
106 );
107
108virtual bool Init(XrdSsiLogger *logP,
109 XrdSsiCluster *clsP,
110 std::string cfgFn,
111 std::string parms,
112 int argc,
113 char **argv
114 ) {return true;}
115
116virtual rStat QueryResource(const char *rName,
117 const char *contact=0
118 ) {return notPresent;}
119
120virtual void SetCBThreads(int cbNum, int ntNum);
121
122virtual bool SetConfig(XrdSsiErrInfo &eInfo,
123 std::string &optname, int optvalue);
124
125virtual void SetSpread(short ssz);
126
127virtual void SetTimeout(tmoType what, int tmoval);
128
131
132private:
133void SetLogger();
134void SetScheduler();
135};
136
137/******************************************************************************/
138/* X r d S s i C l i e n t P r o v i d e r : : G e t S e r v i c e */
139/******************************************************************************/
140
142 const std::string &contact,
143 int oHold)
144{
145 static const int maxTMO = 0x7fffffff;
146 XrdNetAddr netAddr;
147 std::string eMsg;
148 const char *eText = 0;
149 char buff[512];
150
151// Allocate a scheduler if we do not have one and set default env (1st call)
152//
153 if (!Atomic_GET(initDone))
154 {clMutex.Lock();
155 if (!Logger) SetLogger();
156 if (!schedP) SetScheduler();
158 if (!dsTTLSet) clEnvP->PutInt("DataServerTTL", maxTMO);
159 if (!reqTOSet) clEnvP->PutInt("RequestTimeout", maxTMO);
160 if (!strTOSet) clEnvP->PutInt("StreamTimeout", maxTMO);
161 clEnvP->PutInt("ParallelEvtLoop",maxPEL);
162 if (rDisp == rDispNone || rDisp == rDispRR)
163 clEnvP->PutInt("IPNoShuffle", 1);
164 initDone = true;
165 clMutex.UnLock();
166 }
167
168// If no contact is given then declare an error
169//
170 if (contact.empty())
171 {eInfo.Set("Contact not specified.", EINVAL); return 0;}
172
173// If this is a single contact which is really a singleton, don't create
174// a registry entry for it as it's just not needed (we need one to rotate).
175//
176 if (contact.find(',') != std::string::npos
177 || !XrdNetUtils::Singleton(contact.c_str()))
178 {int cNum = contactN++;
179 bool rotate = rDisp == rDispRR;
180 snprintf(buff,sizeof(buff),"%ccontact-%d:4901",XrdNetRegistry::pfx,cNum);
181 if (!XrdNetRegistry::Register(buff, contact.c_str(), &eMsg, rotate))
182 eText = (eMsg.size() ? eMsg.c_str() : "reason unknown");
183
184 } else {
185
186 if (!(eText = netAddr.Set(contact.c_str()))
187 && !netAddr.Format(buff, sizeof(buff), XrdNetAddrInfo::fmtName))
188 eText = "formatting failed";
189 }
190
191// Check if validation or registration failed
192//
193 if (eText)
194 {snprintf(buff, sizeof(buff), "Unable to validate contact; %s", eText);
195 eInfo.Set(buff, EINVAL);
196 return 0;
197 }
198
199// Allocate a service object and return it
200//
201 return new XrdSsiServReal(buff, oHold);
202}
203
204/******************************************************************************/
205/* X r d S s i C l i e n t P r o v i d e r : : S e t C B T h r e a d s */
206/******************************************************************************/
207
208void XrdSsiClientProvider::SetCBThreads(int cbNum, int ntNum)
209{
210// Validate thread number
211//
212 if (cbNum > 1)
213 {if (cbNum > 32767) cbNum = 32767; // Max short value
214 if (ntNum < 1) ntNum = cbNum*10/100;
215 if (ntNum < 3) ntNum = 0;
216 else if (ntNum > 100) ntNum = 100;
217 clMutex.Lock();
218 maxTCB = static_cast<short>(cbNum);
219 maxCLW = static_cast<short>(ntNum);
220 clMutex.UnLock();
221 }
222}
223
224/******************************************************************************/
225/* X r d S s i C l i e n t P r o v i d e r : : S e t C o n f i g */
226/******************************************************************************/
227
229 std::string &optname, int optvalue)
230{
231// Look for an option we recognize
232//
233 if (optname == "cbThreads")
234 {if (optvalue < 1)
235 {eInfo.Set("invalid cbThreads value.", EINVAL); return false;}
236 if (optvalue > 32767) optvalue = 32767;
237 clMutex.Lock();
238 maxTCB = static_cast<short>(optvalue);
239 clMutex.UnLock();
240 }
241 else if (optname == "hiResTime") hiResTime = true;
242 else if (optname == "netThreads")
243 {if (optvalue < 1)
244 {eInfo.Set("invalid netThreads value.", EINVAL); return false;}
245 if (optvalue > 32767) optvalue = 32767;
246 clMutex.Lock();
247 maxCLW = static_cast<short>(optvalue);
248 clMutex.UnLock();
249 }
250 else if (optname == "pollers")
251 {if (optvalue < 1)
252 {eInfo.Set("invalid pollers value.", EINVAL); return false;}
253 if (optvalue > 32767) optvalue = 32767;
254 clMutex.Lock();
255 maxPEL = static_cast<short>(optvalue);
256 clMutex.UnLock();
257 }
258 else if (optname == "reqDispatch")
259 {clMutex.Lock();
260 if (optvalue < 0) rDisp = rDispRand;
261 else if (optvalue > 0) rDisp = rDispRR;
262 else rDisp = rDispNone;
263 clMutex.UnLock();
264 }
265 else {eInfo.Set("invalid option name.", EINVAL); return false;}
266
267 return true;
268}
269
270/******************************************************************************/
271/* X r d S s i C l i e n t P r o v i d e r : : S e t L o g g e r */
272/******************************************************************************/
273
274void XrdSsiClientProvider::SetLogger()
275{
276 int eFD;
277
278// Get a file descriptor mirroring standard error
279//
280#if ( defined(__linux__) || defined(__GNU__) ) && defined(F_DUPFD_CLOEXEC)
281 eFD = fcntl(STDERR_FILENO, F_DUPFD_CLOEXEC, 0);
282#else
283 eFD = dup(STDERR_FILENO);
284 fcntl(eFD, F_SETFD, FD_CLOEXEC);
285#endif
286
287// Now we need to get a logger object. We make this a real dumb one.
288//
289 Logger = new XrdSysLogger(eFD, 0);
290 if (hiResTime || getenv("XRDSSI_HIRESLOG")) Logger->setHiRes();
293 if (getenv("XRDSSIDEBUG") != 0) Trace.What = TRACESSI_Debug;
294
295// Check for a message callback object. This must be set at global init time.
296//
297 if (msgCBCl)
298 {XrdSysLogging::Parms logParms;
299 msgCB = msgCBCl;
300 logParms.logpi = msgCBCl;
301 logParms.bufsz = 0;
303 }
304}
305
306/******************************************************************************/
307/* X r d S s i C l i e n t P r o v i d e r : : S e t S c h e d u l e r */
308/******************************************************************************/
309
310// This may not be called before the logger object is created!
311
312void XrdSsiClientProvider::SetScheduler()
313{
314 static XrdSysTrace myTrc("XrdSsi", Log.logger());
315
316// Now construct the proper trace object (note that we do not set tracing if
317// message forwarding is on because these messages will not be forwarded).
318//
319 if (!msgCBCl && Trace.What & TRACESSI_Debug) myTrc.What = TRACE_SCHED;
320
321// We can now set allocate a scheduler
322//
323 XrdSsi::schedP = new XrdScheduler(&Log, &myTrc);
324
325// Set thread count for callbacks
326//
327 XrdSsi::schedP->setParms(-1, maxTCB, -1, -1, 0);
328
329// Set number of framework worker hreads if need be
330//
331 if (maxCLW)
333 clEnvP->PutInt("WorkerThreads", maxCLW);
334 }
335
336// Start the scheduler
337//
339}
340
341/******************************************************************************/
342/* X r d S s i C l i e n t P r o v i d e r : : S e t S p r e a d */
343/******************************************************************************/
344
346{
347 sidScale.setSpread(ssz);
348}
349
350/******************************************************************************/
351/* X r d S s i C l i e n t P r o v i d e r : : S e t T i m e o u t */
352/******************************************************************************/
353
355{
356
357// Ignore invalid timeouts
358//
359 if (tmoval <= 0) return;
360
361// Get global environment
362//
363 clMutex.Lock();
365
366// Set requested timeout
367//
368 switch(what)
369 {case connect_N: clEnvP->PutInt("ConnectionRetry", tmoval);
370 break;
371 case connect_T: clEnvP->PutInt("ConnectionWindow", tmoval);
372 break;
373 case idleClose: clEnvP->PutInt("DataServerTTL", tmoval);
374 dsTTLSet = true;
375 break;
376 case request_T: clEnvP->PutInt("RequestTimeout", tmoval);
377 reqTOSet = true;
378 break;
379 case stream_T: clEnvP->PutInt("StreamTimeout", tmoval);
380 strTOSet = true;
381 break;
382 default: break;
383 }
384
385// All done
386//
387 clMutex.UnLock();
388}
389
390/******************************************************************************/
391/* G l o b a l S t a t i c s */
392/******************************************************************************/
393
394namespace
395{
396XrdSsiClientProvider ClientProvider;
397}
398
int fcntl(int fd, int cmd,...)
#define eMsg(x)
#define Atomic(type)
#define Atomic_GET(x)
XrdSsiProvider * XrdSsiProviderClient
#define TRACESSI_Debug
#define TRACE_SCHED
Definition XrdTrace.hh:42
static Env * GetEnv()
Get default client environment.
bool PutInt(const std::string &key, int value)
Definition XrdClEnv.cc:110
int Format(char *bAddr, int bLen, fmtUse fmtType=fmtAuto, int fmtOpts=0)
@ fmtName
Hostname if it is resolvable o/w use fmtAddr.
const char * Set(const char *hSpec, int pNum=PortInSpec)
static bool Register(const char *hName, const char *hList[], int hLNum, std::string *eText=0, bool rotate=false)
static const char pfx
Registry names must start with this character.
static bool Singleton(const char *hSpec, const char **eText=0)
void setParms(int minw, int maxw, int avlt, int maxi, int once=0)
virtual void SetSpread(short ssz)
virtual bool SetConfig(XrdSsiErrInfo &eInfo, std::string &optname, int optvalue)
virtual ~XrdSsiClientProvider()
virtual rStat QueryResource(const char *rName, const char *contact=0)
virtual void SetCBThreads(int cbNum, int ntNum)
XrdSsiService * GetService(XrdSsiErrInfo &eInfo, const std::string &contact, int oHold=256)
virtual void SetTimeout(tmoType what, int tmoval)
virtual bool Init(XrdSsiLogger *logP, XrdSsiCluster *clsP, std::string cfgFn, std::string parms, int argc, char **argv)
void Set(const char *eMsg=0, int eNum=0, int eArg=0)
void MCB_t(struct timeval const &mtime, unsigned long tID, const char *msg, int mlen)
Length of message text.
@ idleClose
Time before an idle socket is closed (client)
@ stream_T
Time to wait for socket activity (Client)
@ connect_T
Time to wait for a connection (client)
@ request_T
Time to wait for a request to finsish(client)
@ connect_N
Number of times to try connection (client)
void setSpread(short sval)
XrdSysLogger * logger(XrdSysLogger *lp=0)
void setHiRes()
Set log file timstamp to high resolution (hh:mm:ss.uuuu).
static bool Configure(XrdSysLogger &logr, Parms &parms)
void SetLogger(XrdSysLogger *logp)
XrdScheduler * schedP
short maxPEL
XrdSsiScale sidScale
XrdCl::Env * clEnvP
XrdSysMutex clMutex
XrdSsiLogger::MCB_t * msgCB
XrdSsiLogger::MCB_t * msgCBCl
short maxCLW
static const int rDispRR
short maxTCB
XrdSysTrace Trace
Definition XrdSsiSfs.cc:107
static const int rDispRand
XrdSysLogger * Logger
static const int rDispNone
bool reqTOSet
char rDisp
bool strTOSet
XrdSysError Log
bool dsTTLSet
bool hiResTime
Parameters to be passed to configure.
XrdSysLogPI_t logpi
-> log plugin object or nil if none
int bufsz
size of message buffer, -1 default, or 0