XRootD
Loading...
Searching...
No Matches
XrdXrootdCallBack.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d X r o o t d C a l l B a c k . c c */
4/* */
5/* (c) 2006 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cerrno>
32#include <cstdio>
33#include <cstring>
34#include <netinet/in.h>
35#include <sys/uio.h>
36
37#include "Xrd/XrdScheduler.hh"
39#include "XProtocol/XPtypes.hh"
40#include "XrdSys/XrdSysError.hh"
49
50/******************************************************************************/
51/* L o c a l C l a s s e s */
52/******************************************************************************/
53
55{
56public:
57
59 const char *Path, int rval);
60
61 void DoIt();
62
63inline void Recycle(){myMutex.Lock();
64 Next = FreeJob;
65 FreeJob = this;
66 myMutex.UnLock();
67 }
68
70 XrdOucErrInfo *erp,
71 const char *path,
72 int rval)
73 : XrdJob("async response"),
74 cbFunc(cbp), eInfo(erp), Path(path),
75 Result(rval) {}
76
78
79private:
80XrdXrootdFile *DoClose(XrdOucErrInfo *eInfo);
81void DoStatx(XrdOucErrInfo *eInfo);
82static XrdSysMutex myMutex;
83static XrdXrootdCBJob *FreeJob;
84
85XrdXrootdCBJob *Next;
86XrdXrootdCallBack *cbFunc;
87XrdOucErrInfo *eInfo;
88const char *Path;
89int Result;
90};
91
92/******************************************************************************/
93/* G l o b a l s */
94/******************************************************************************/
95
97
98namespace
99{
103 int Port;
104}
105
106 XrdSysMutex XrdXrootdCBJob::myMutex;
107 XrdXrootdCBJob *XrdXrootdCBJob::FreeJob;
108
109/******************************************************************************/
110/* X r d X r o o t d C B J o b */
111/******************************************************************************/
112/******************************************************************************/
113/* A l l o c */
114/******************************************************************************/
115
117 XrdOucErrInfo *erp,
118 const char *Path,
119 int rval)
120{
121 XrdXrootdCBJob *cbj;
122
123// Obtain a call back object by trying to avoid new()
124//
125 myMutex.Lock();
126 if (!(cbj = FreeJob)) cbj = new XrdXrootdCBJob(cbF, erp, Path, rval);
127 else {cbj->cbFunc = cbF, cbj->eInfo = erp;
128 cbj->Result = rval;cbj->Path = Path;
129 FreeJob = cbj->Next;
130 }
131 myMutex.UnLock();
132
133// Return the new object
134//
135 return cbj;
136}
137
138/******************************************************************************/
139/* D o I t */
140/******************************************************************************/
141
143{
144 static const char *TraceID = "DoIt";
145 XrdXrootdFile *fP = 0;
146
147// Do some tracing here
148//
149 TRACE(RSP, eInfo->getErrUser() <<' ' <<cbFunc->Func() <<" async callback");
150
151// Some operations differ in the way we handle them. For instance, for open()
152// if it succeeds then we must force the client to retry the open request
153// because we can't attach the file to the client here. We do this by asking
154// the client to wait zero seconds. Protocol demands a client retry. Close
155// operations are always final and we need to do some cleanup.
156//
157 if (*(cbFunc->Func()) == 'c') fP = DoClose(eInfo);
158 else if (SFS_OK == Result)
159 {if (*(cbFunc->Func()) == 'o')
160 {int rc = 0; cbFunc->sendResp(eInfo, kXR_wait, &rc);}
161 else {if (*(cbFunc->Func()) == 'x') DoStatx(eInfo);
162 cbFunc->sendResp(eInfo, kXR_ok, 0, eInfo->getErrText(),
163 eInfo->getErrTextLen());
164 }
165 }
166 else cbFunc->sendError(Result, eInfo, Path);
167
168// Tell the requestor that the callback has completed
169//
170 if (eInfo->getErrCB()) eInfo->getErrCB()->Done(Result, eInfo);
171 else delete eInfo;
172 eInfo = 0;
173 if (fP) delete fP;
174 Recycle();
175}
176
177/******************************************************************************/
178/* D o C l o s e */
179/******************************************************************************/
180
181XrdXrootdFile *XrdXrootdCBJob::DoClose(XrdOucErrInfo *eInfo)
182{
183 XrdXrootdFile *fP = (XrdXrootdFile *)eInfo->getErrArg();
184
185// For close the main argument is the file pointer. Set the main arg to
186// be the request identifier which is saved in the file object.
187//
188 eInfo->setErrArg(fP->cbArg);
189
190// Responses to close() must be final; otherwise it's a system error.
191//
192 if (Result != SFS_OK && Result != SFS_ERROR)
193 {char buff[64];
194 SI->errorCnt++;
195 sprintf(buff, "Invalid close() callback result of %d for", Result);
196 eDest->Emsg("DoClose", buff, Path);
197 Result = SFS_ERROR;
198 eInfo->setErrInfo(kXR_FSError, "Internal error; file close forced");
199 }
200
201// Send appropriate response (OK or error)
202//
203 if (Result == SFS_OK) cbFunc->sendResp(eInfo, kXR_ok);
204 else cbFunc->sendError(Result, eInfo, Path);
205
206// Return the file object for disposal
207//
208 return fP;
209}
210
211/******************************************************************************/
212/* D o S t a t x */
213/******************************************************************************/
214
215void XrdXrootdCBJob::DoStatx(XrdOucErrInfo *einfo)
216{
217 const char *tp = einfo->getErrText();
218 char cflags[2];
219 int flags;
220
221// Skip to the third token
222//
223 while(*tp && *tp == ' ') tp++;
224 while(*tp && *tp != ' ') tp++; // 1st
225 while(*tp && *tp == ' ') tp++;
226 while(*tp && *tp != ' ') tp++; // 2nd
227
228// Convert to flags
229//
230 flags = atoi(tp);
231
232// Convert to proper indicator
233//
234 if (flags & kXR_offline) cflags[0] = (char)kXR_offline;
235 else if (flags & kXR_isDir) cflags[0] = (char)kXR_isDir;
236 else cflags[0] = (char)kXR_file;
237
238// Set the new response
239//
240 cflags[1] = '\0';
241 einfo->setErrInfo(0, cflags);
242}
243
244/******************************************************************************/
245/* X r d X r o o t d C a l l B a c k */
246/******************************************************************************/
247/******************************************************************************/
248/* D o n e */
249/******************************************************************************/
250
251void XrdXrootdCallBack::Done(int &Result, //I/O: Function result
252 XrdOucErrInfo *eInfo, // In: Error information
253 const char *Path) // In: Path related
254{
255 XrdXrootdCBJob *cbj;
256
257// Sending an async response may take a long time. So, we schedule the task
258// to run asynchronously from the forces that got us here.
259//
260 if (!(cbj = XrdXrootdCBJob::Alloc(this, eInfo, Path, Result)))
261 {eDest->Emsg("Done",ENOMEM,"get call back job; user",eInfo->getErrUser());
262 if (eInfo->getErrCB()) eInfo->getErrCB()->Done(Result, eInfo);
263 else delete eInfo;
264 } else Sched->Schedule((XrdJob *)cbj);
265}
266
267/******************************************************************************/
268/* S a m e */
269/******************************************************************************/
270
271int XrdXrootdCallBack::Same(unsigned long long arg1, unsigned long long arg2)
272{
273 XrdXrootdReqID ReqID1(arg1), ReqID2(arg2);
274 unsigned char sid1[2], sid2[2];
275 unsigned int inst1, inst2;
276 int lid1, lid2;
277
278 ReqID1.getID(sid1, lid1, inst1);
279 ReqID2.getID(sid2, lid2, inst2);
280 return lid1 == lid2;
281}
282
283/******************************************************************************/
284/* s e n d E r r o r */
285/******************************************************************************/
286
288 XrdOucErrInfo *eInfo,
289 const char *Path)
290{
291 static const char *TraceID = "fsError";
292 static int Xserr = kXR_ServerError;
293 int ecode;
294 const char *eMsg = eInfo->getErrText(ecode);
295 const char *User = eInfo->getErrUser();
296
297// Process the data response vector (we need to do this here)
298//
299 if (rc == SFS_DATAVEC)
300 {if (ecode > 1) sendVesp(eInfo, kXR_ok, (struct iovec *)eMsg, ecode);
301 else sendResp(eInfo, kXR_ok, 0);
302 return;
303 }
304
305// Optimize error message handling here
306//
307 if (eMsg && !*eMsg) eMsg = 0;
308
309// Process standard errors
310//
311 if (rc == SFS_ERROR)
312 {SI->errorCnt++;
313 rc = XProtocol::mapError(ecode);
314 sendResp(eInfo, kXR_error, &rc, eMsg, eInfo->getErrTextLen()+1);
315 return;
316 }
317
318// Process the redirection (error msg is host:port)
319//
320 if (rc == SFS_REDIRECT)
321 {SI->redirCnt++;
322 if (ecode <= 0) ecode = (ecode ? -ecode : Port);
323 TRACE(REDIR, User <<" async redir to " << eMsg <<':' <<ecode <<' '
324 <<(Path ? Path : ""));
325 sendResp(eInfo, kXR_redirect, &ecode, eMsg, eInfo->getErrTextLen());
327 XrdXrootdMonitor::Redirect(eInfo->getErrMid(),eMsg,ecode,Opcode,Path);
328 return;
329 }
330
331// Process the deferal
332//
333 if (rc >= SFS_STALL)
334 {SI->stallCnt++;
335 TRACE(STALL, "Stalling " <<User <<" for " <<rc <<" sec");
336 sendResp(eInfo, kXR_wait, &rc, eMsg, eInfo->getErrTextLen()+1);
337 return;
338 }
339
340// Process the data response
341//
342 if (rc == SFS_DATA)
343 {if (ecode) sendResp(eInfo, kXR_ok, 0, eMsg, ecode);
344 else sendResp(eInfo, kXR_ok, 0);
345 return;
346 }
347
348// Unknown conditions, report it
349//
350 {char buff[64];
351 SI->errorCnt++;
352 ecode = sprintf(buff, "Unknown sfs response code %d", rc);
353 eDest->Emsg("sendError", buff);
354 sendResp(eInfo, kXR_error, &Xserr, buff, ecode+1);
355 return;
356 }
357}
358
359/******************************************************************************/
360/* s e n d R e s p */
361/******************************************************************************/
362
364 XResponseType Status,
365 int *Data,
366 const char *Msg,
367 int Mlen)
368{
369 static const char *TraceID = "sendResp";
370 struct iovec rspVec[4];
371 XrdXrootdReqID ReqID;
372 int dlen = 0, n = 1;
373 kXR_int32 xbuf;
374
375 if (Data)
376 {xbuf = static_cast<kXR_int32>(htonl(*Data));
377 rspVec[n].iov_base = (caddr_t)(&xbuf);
378 dlen = rspVec[n].iov_len = sizeof(xbuf); n++; // 1
379 }
380 if (Msg && *Msg)
381 { rspVec[n].iov_base = (caddr_t)Msg;
382 dlen += rspVec[n].iov_len = Mlen; n++; // 2
383 }
384
385// Set the destination
386//
387 ReqID.setID(eInfo->getErrArg());
388
389// Send the async response
390//
391 if (XrdXrootdResponse::Send(ReqID, Status, rspVec, n, dlen) < 0)
392 eDest->Emsg("sendResp", eInfo->getErrUser(), Opname,
393 "async resp aborted; user gone.");
394 else if (TRACING(TRACE_RSP))
395 {XrdXrootdResponse theResp;
396 theResp.Set(ReqID.Stream());
397 TRACE(RSP, eInfo->getErrUser() <<" async " <<theResp.ID()
398 <<' ' <<Opname <<" status " <<Status);
399 }
400
401// Release any external buffer from the errinfo object
402//
403 if (eInfo->extData()) eInfo->Reset();
404}
405
406/******************************************************************************/
407/* s e n d V e s p */
408/******************************************************************************/
409
411 XResponseType Status,
412 struct iovec *ioV,
413 int ioN)
414{
415 static const char *TraceID = "sendVesp";
416 XrdXrootdReqID ReqID;
417 int dlen = 0;
418
419// Calculate the amount of data being sent
420//
421 for (int i = 1; i < ioN; i++) dlen += ioV[i].iov_len;
422
423// Set the destination
424//
425 ReqID.setID(eInfo->getErrArg());
426
427// Send the async response
428//
429 if (XrdXrootdResponse::Send(ReqID, Status, ioV, ioN, dlen) < 0)
430 eDest->Emsg("sendResp", eInfo->getErrUser(), Opname,
431 "async resp aborted; user gone.");
432 else if (TRACING(TRACE_RSP))
433 {XrdXrootdResponse theResp;
434 theResp.Set(ReqID.Stream());
435 TRACE(RSP, eInfo->getErrUser() <<" async " <<theResp.ID()
436 <<' ' <<Opname <<" status " <<Status);
437 }
438
439// Release any external buffer from the errinfo object
440//
441 if (eInfo->extData()) eInfo->Reset();
442}
443
444/******************************************************************************/
445/* S e t V a l s */
446/******************************************************************************/
447
449 XrdXrootdStats *SIp,
450 XrdScheduler *schp,
451 int port)
452{
453// Set values into out unnamed static space
454//
455 eDest = erp;
456 SI = SIp;
457 Sched = schp;
458 Port = port;
459}
@ kXR_ServerError
@ kXR_FSError
Definition XProtocol.hh:995
XResponseType
Definition XProtocol.hh:898
@ kXR_redirect
Definition XProtocol.hh:904
@ kXR_ok
Definition XProtocol.hh:899
@ kXR_wait
Definition XProtocol.hh:905
@ kXR_error
Definition XProtocol.hh:903
@ kXR_file
@ kXR_isDir
@ kXR_offline
int kXR_int32
Definition XPtypes.hh:89
static XrdSysError eDest(0,"crypto_")
#define TRACE_RSP
XrdOucString Path
#define eMsg(x)
#define SFS_DATAVEC
#define SFS_DATA
#define SFS_ERROR
#define SFS_REDIRECT
#define SFS_STALL
#define SFS_OK
#define TRACE(act, x)
Definition XrdTrace.hh:63
#define TRACING(x)
Definition XrdTrace.hh:70
XrdSysTrace XrdXrootdTrace
static int mapError(int rc)
virtual void Done(int &Result, XrdOucErrInfo *eInfo, const char *Path=0)=0
const char * getErrUser()
XrdOucEICB * getErrCB()
void setErrArg(unsigned long long cbarg=0)
unsigned long long getErrArg()
const char * getErrText()
int setErrInfo(int code, const char *emsg)
void Reset()
Reset object to no message state. Call this method to release appendages.
void Schedule(XrdJob *jp)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdXrootdCBJob(XrdXrootdCallBack *cbp, XrdOucErrInfo *erp, const char *path, int rval)
static XrdXrootdCBJob * Alloc(XrdXrootdCallBack *cbF, XrdOucErrInfo *erp, const char *Path, int rval)
void Done(int &Result, XrdOucErrInfo *eInfo, const char *Path=0)
static void setVals(XrdSysError *erp, XrdXrootdStats *SIp, XrdScheduler *schp, int port)
int Same(unsigned long long arg1, unsigned long long arg2)
void sendError(int rc, XrdOucErrInfo *eInfo, const char *Path)
void sendResp(XrdOucErrInfo *eInfo, XResponseType xrt, int *Data=0, const char *Msg=0, int Mlen=0)
void sendVesp(XrdOucErrInfo *eInfo, XResponseType xrt, struct iovec *ioV, int ioN)
void setID(unsigned long long id)
unsigned char * Stream()
unsigned long long getID()
void Set(XrdLink *lp)
XrdScheduler Sched
Definition XrdLinkCtl.cc:54
XrdXrootdStats * SI