XRootD
Loading...
Searching...
No Matches
XrdClXRootDMsgHandler.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
26#include "XrdCl/XrdClLog.hh"
30#include "XrdCl/XrdClMessage.hh"
31#include "XrdCl/XrdClURL.hh"
32#include "XrdCl/XrdClUtils.hh"
39#include "XrdCl/XrdClSocket.hh"
40#include "XrdCl/XrdClTls.hh"
42
43#include "XrdOuc/XrdOucCRC.hh"
45
46#include "XrdSys/XrdSysPlatform.hh" // same as above
49#include <memory>
50#include <sstream>
51#include <numeric>
52
53namespace
54{
55 //----------------------------------------------------------------------------
56 // We need an extra task what will run the handler in the future, because
57 // tasks get deleted and we need the handler
58 //----------------------------------------------------------------------------
59 class WaitTask: public XrdCl::Task
60 {
61 public:
62 WaitTask( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
63 {
64 std::ostringstream o;
65 o << "WaitTask for: 0x" << handler->GetRequest();
66 SetName( o.str() );
67 }
68
69 virtual time_t Run( time_t now )
70 {
71 pHandler->WaitDone( now );
72 return 0;
73 }
74 private:
76 };
77}
78
79namespace XrdCl
80{
81 //----------------------------------------------------------------------------
82 // Delegate the response handling to the thread-pool
83 //----------------------------------------------------------------------------
85 {
86 public:
87 HandleRspJob( XrdCl::XRootDMsgHandler *handler ): pHandler( handler )
88 {
89
90 }
91
92 virtual ~HandleRspJob()
93 {
94
95 }
96
97 virtual void Run( void *arg )
98 {
99 pHandler->HandleResponse();
100 delete this;
101 }
102 private:
103 XrdCl::XRootDMsgHandler *pHandler;
104 };
105
106 //----------------------------------------------------------------------------
107 // Examine an incoming message, and decide on the action to be taken
108 //----------------------------------------------------------------------------
109 uint16_t XRootDMsgHandler::Examine( std::shared_ptr<Message> &msg )
110 {
111 //--------------------------------------------------------------------------
112 // if the MsgHandler is already being used to process another request
113 // (kXR_oksofar) we need to wait
114 //--------------------------------------------------------------------------
115 if( pOksofarAsAnswer )
116 {
117 XrdSysCondVarHelper lck( pCV );
118 while( pResponse ) pCV.Wait();
119 }
120 else
121 {
122 if( pResponse )
123 {
124 Log *log = DefaultEnv::GetLog();
125 log->Warning( ExDbgMsg, "[%s] MsgHandler is examining a response although "
126 "it already owns a response: %p (message: %s ).",
127 pUrl.GetHostId().c_str(), this,
128 pRequest->GetObfuscatedDescription().c_str() );
129 }
130 }
131
132 if( msg->GetSize() < 8 )
133 return Ignore;
134
135 ServerResponse *rsp = (ServerResponse *)msg->GetBuffer();
136 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
137 uint16_t status = 0;
138 uint32_t dlen = 0;
139
140 //--------------------------------------------------------------------------
141 // We only care about async responses, but those are extracted now
142 // in the SocketHandler.
143 //--------------------------------------------------------------------------
144 if( rsp->hdr.status == kXR_attn )
145 {
146 return Ignore;
147 }
148 //--------------------------------------------------------------------------
149 // We got a sync message - check if it belongs to us
150 //--------------------------------------------------------------------------
151 else
152 {
153 if( rsp->hdr.streamid[0] != req->header.streamid[0] ||
154 rsp->hdr.streamid[1] != req->header.streamid[1] )
155 return Ignore;
156
157 status = rsp->hdr.status;
158 dlen = rsp->hdr.dlen;
159 }
160
161 //--------------------------------------------------------------------------
162 // We take the ownership of the message and decide what we will do
163 // with the handler itself, the options are:
164 // 1) we want to either read in raw mode (the Raw flag) or have the message
165 // body reconstructed for us by the TransportHandler by the time
166 // Process() is called (default, no extra flag)
167 // 2) we either got a full response in which case we don't want to be
168 // notified about anything anymore (RemoveHandler) or we got a partial
169 // answer and we need to wait for more (default, no extra flag)
170 //--------------------------------------------------------------------------
171 pResponse = msg;
172 pBodyReader->SetDataLength( dlen );
173
174 Log *log = DefaultEnv::GetLog();
175 switch( status )
176 {
177 //------------------------------------------------------------------------
178 // Handle the cached cases
179 //------------------------------------------------------------------------
180 case kXR_error:
181 case kXR_redirect:
182 case kXR_wait:
183 return RemoveHandler;
184
185 case kXR_waitresp:
186 {
187 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response to "
188 "message %s", pUrl.GetHostId().c_str(),
189 pRequest->GetObfuscatedDescription().c_str() );
190
191 pResponse.reset();
192 return Ignore; // This must be handled synchronously!
193 }
194
195 //------------------------------------------------------------------------
196 // Handle the potential raw cases
197 //------------------------------------------------------------------------
198 case kXR_ok:
199 {
200 //----------------------------------------------------------------------
201 // For kXR_read we read in raw mode
202 //----------------------------------------------------------------------
203 uint16_t reqId = ntohs( req->header.requestid );
204 if( reqId == kXR_read )
205 {
206 return Raw | RemoveHandler;
207 }
208
209 //----------------------------------------------------------------------
210 // kXR_readv is the same as kXR_read
211 //----------------------------------------------------------------------
212 if( reqId == kXR_readv )
213 {
214 return Raw | RemoveHandler;
215 }
216
217 //----------------------------------------------------------------------
218 // For everything else we just take what we got
219 //----------------------------------------------------------------------
220 return RemoveHandler;
221 }
222
223 //------------------------------------------------------------------------
224 // kXR_oksofars are special, they are not full responses, so we reset
225 // the response pointer to 0 and add the message to the partial list
226 //------------------------------------------------------------------------
227 case kXR_oksofar:
228 {
229 log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request "
230 "%s", pUrl.GetHostId().c_str(),
231 pRequest->GetObfuscatedDescription().c_str() );
232
233 if( !pOksofarAsAnswer )
234 {
235 pPartialResps.emplace_back( std::move( pResponse ) );
236 }
237
238 //----------------------------------------------------------------------
239 // For kXR_read we either read in raw mode if the message has not
240 // been fully reconstructed already, if it has, we adjust
241 // the buffer offset to prepare for the next one
242 //----------------------------------------------------------------------
243 uint16_t reqId = ntohs( req->header.requestid );
244 if( reqId == kXR_read )
245 {
246 pTimeoutFence.store( true, std::memory_order_relaxed );
247 return Raw | ( pOksofarAsAnswer ? None : NoProcess );
248 }
249
250 //----------------------------------------------------------------------
251 // kXR_readv is similar to read, except that the payload is different
252 //----------------------------------------------------------------------
253 if( reqId == kXR_readv )
254 {
255 pTimeoutFence.store( true, std::memory_order_relaxed );
256 return Raw | ( pOksofarAsAnswer ? None : NoProcess );
257 }
258
259 return ( pOksofarAsAnswer ? None : NoProcess );
260 }
261
262 case kXR_status:
263 {
264 log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request "
265 "%s", pUrl.GetHostId().c_str(),
266 pRequest->GetObfuscatedDescription().c_str() );
267
268 uint16_t reqId = ntohs( req->header.requestid );
269 if( reqId == kXR_pgwrite )
270 {
271 //--------------------------------------------------------------------
272 // In case of pgwrite by definition this wont be a partial response
273 // so we can already remove the handler from the in-queue
274 //--------------------------------------------------------------------
275 return RemoveHandler;
276 }
277
278 //----------------------------------------------------------------------
279 // Otherwise (pgread), first of all we need to read the body of the
280 // kXR_status response, we can handle the raw data (if any) only after
281 // we have the whole kXR_status body
282 //----------------------------------------------------------------------
283 pTimeoutFence.store( true, std::memory_order_relaxed );
284 return None;
285 }
286
287 //------------------------------------------------------------------------
288 // Default
289 //------------------------------------------------------------------------
290 default:
291 return RemoveHandler;
292 }
293 return RemoveHandler;
294 }
295
296 //----------------------------------------------------------------------------
297 // Reexamine the incoming message, and decide on the action to be taken
298 //----------------------------------------------------------------------------
300 {
301 if( !pResponse )
302 return 0;
303
304 Log *log = DefaultEnv::GetLog();
305 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
306
307 //--------------------------------------------------------------------------
308 // Additional action is only required for kXR_status
309 //--------------------------------------------------------------------------
310 if( rsp->hdr.status != kXR_status ) return 0;
311
312 //--------------------------------------------------------------------------
313 // Ignore malformed status response
314 //--------------------------------------------------------------------------
315 if( pResponse->GetSize() < sizeof( ServerResponseStatus ) )
316 {
317 log->Error( XRootDMsg, "[%s] kXR_status: invalid message size.", pUrl.GetHostId().c_str() );
318 return Corrupted;
319 }
320
321 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
322 uint16_t reqId = ntohs( req->header.requestid );
323 //--------------------------------------------------------------------------
324 // Unmarshal the status body
325 //--------------------------------------------------------------------------
326 XRootDStatus st = XRootDTransport::UnMarshalStatusBody( *pResponse, reqId );
327
328 if( !st.IsOK() && st.code == errDataError )
329 {
330 log->Error( XRootDMsg, "[%s] %s", pUrl.GetHostId().c_str(),
331 st.GetErrorMessage().c_str() );
332 return Corrupted;
333 }
334
335 if( !st.IsOK() )
336 {
337 log->Error( XRootDMsg, "[%s] Failed to unmarshall status body.",
338 pUrl.GetHostId().c_str() );
339 pStatus = st;
340 HandleRspOrQueue();
341 return Ignore;
342 }
343
344 //--------------------------------------------------------------------------
345 // Common handling for partial results
346 //--------------------------------------------------------------------------
347 ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
349 {
350 pPartialResps.push_back( std::move( pResponse ) );
351 }
352
353 //--------------------------------------------------------------------------
354 // Decide the actions that we need to take
355 //--------------------------------------------------------------------------
356 uint16_t action = 0;
357 if( reqId == kXR_pgread )
358 {
359 //----------------------------------------------------------------------
360 // The message contains only Status header and body but no raw data
361 //----------------------------------------------------------------------
362 if( !pPageReader )
363 pPageReader.reset( new AsyncPageReader( *pChunkList, pCrc32cDigests ) );
364 pPageReader->SetRsp( rspst );
365
366 action |= Raw;
367
369 action |= NoProcess;
370 else
371 action |= RemoveHandler;
372 }
373 else if( reqId == kXR_pgwrite )
374 {
375 // if data corruption has been detected on the server side we will
376 // send some additional data pointing to the pages that need to be
377 // retransmitted
378 if( size_t( sizeof( ServerResponseHeader ) + rspst->status.hdr.dlen + rspst->status.bdy.dlen ) >
379 pResponse->GetCursor() )
380 action |= More;
381 }
382
383 return action;
384 }
385
386 //----------------------------------------------------------------------------
387 // Get handler sid
388 //----------------------------------------------------------------------------
390 {
391 ClientRequest* req = (ClientRequest*) pRequest->GetBuffer();
392 return ((uint16_t)req->header.streamid[1] << 8) | (uint16_t)req->header.streamid[0];
393 }
394
395 //----------------------------------------------------------------------------
397 //----------------------------------------------------------------------------
399 {
400 Log *log = DefaultEnv::GetLog();
401
402 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
403
404 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
405
406 //--------------------------------------------------------------------------
407 // If it is a local file, it can be only a metalink redirector
408 //--------------------------------------------------------------------------
409 if( pUrl.IsLocalFile() && pUrl.IsMetalink() )
410 pHosts->back().protocol = kXR_PROTOCOLVERSION;
411
412 //--------------------------------------------------------------------------
413 // We got an answer, check who we were talking to
414 //--------------------------------------------------------------------------
415 else
416 {
417 AnyObject qryResult;
418 int *qryResponse = 0;
419 pPostMaster->QueryTransport( pUrl, XRootDQuery::ServerFlags, qryResult );
420 qryResult.Get( qryResponse );
421 pHosts->back().flags = *qryResponse; delete qryResponse; qryResponse = 0;
422 pPostMaster->QueryTransport( pUrl, XRootDQuery::ProtocolVersion, qryResult );
423 qryResult.Get( qryResponse );
424 pHosts->back().protocol = *qryResponse; delete qryResponse;
425 }
426
427 //--------------------------------------------------------------------------
428 // Process the message
429 //--------------------------------------------------------------------------
430 Status st = XRootDTransport::UnMarshallBody( pResponse.get(), req->header.requestid );
431 if( !st.IsOK() )
432 {
433 pStatus = Status( stFatal, errInvalidMessage );
434 HandleResponse();
435 return;
436 }
437
438 //--------------------------------------------------------------------------
439 // we have an response for the message so it's not in fly anymore
440 //--------------------------------------------------------------------------
441 pMsgInFly = false;
442
443 //--------------------------------------------------------------------------
444 // Reset the aggregated wait (used to omit wait response in case of Metalink
445 // redirector)
446 //--------------------------------------------------------------------------
447 if( rsp->hdr.status != kXR_wait )
448 pAggregatedWaitTime = 0;
449
450 switch( rsp->hdr.status )
451 {
452 //------------------------------------------------------------------------
453 // kXR_ok - we're done here
454 //------------------------------------------------------------------------
455 case kXR_ok:
456 {
457 log->Dump( XRootDMsg, "[%s] Got a kXR_ok response to request %s",
458 pUrl.GetHostId().c_str(),
459 pRequest->GetObfuscatedDescription().c_str() );
460 pStatus = Status();
461 HandleResponse();
462 return;
463 }
464
465 case kXR_status:
466 {
467 log->Dump( XRootDMsg, "[%s] Got a kXR_status response to request %s",
468 pUrl.GetHostId().c_str(),
469 pRequest->GetObfuscatedDescription().c_str() );
470 pStatus = Status();
471 HandleResponse();
472 return;
473 }
474
475 //------------------------------------------------------------------------
476 // kXR_ok - we're serving partial result to the user
477 //------------------------------------------------------------------------
478 case kXR_oksofar:
479 {
480 log->Dump( XRootDMsg, "[%s] Got a kXR_oksofar response to request %s",
481 pUrl.GetHostId().c_str(),
482 pRequest->GetObfuscatedDescription().c_str() );
483 pStatus = Status( stOK, suContinue );
484 HandleResponse();
485 return;
486 }
487
488 //------------------------------------------------------------------------
489 // kXR_error - we've got a problem
490 //------------------------------------------------------------------------
491 case kXR_error:
492 {
493 char *errmsg = new char[rsp->hdr.dlen-3]; errmsg[rsp->hdr.dlen-4] = 0;
494 memcpy( errmsg, rsp->body.error.errmsg, rsp->hdr.dlen-4 );
495 log->Dump( XRootDMsg, "[%s] Got a kXR_error response to request %s "
496 "[%d] %s", pUrl.GetHostId().c_str(),
497 pRequest->GetObfuscatedDescription().c_str(), rsp->body.error.errnum,
498 errmsg );
499 delete [] errmsg;
500
501 HandleError( Status(stError, errErrorResponse, rsp->body.error.errnum) );
502 return;
503 }
504
505 //------------------------------------------------------------------------
506 // kXR_redirect - they tell us to go elsewhere
507 //------------------------------------------------------------------------
508 case kXR_redirect:
509 {
510 if( rsp->hdr.dlen <= 4 )
511 {
512 log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
513 pUrl.GetHostId().c_str() );
514 pStatus = Status( stError, errInvalidResponse );
515 HandleResponse();
516 return;
517 }
518
519 char *urlInfoBuff = new char[rsp->hdr.dlen-3];
520 urlInfoBuff[rsp->hdr.dlen-4] = 0;
521 memcpy( urlInfoBuff, rsp->body.redirect.host, rsp->hdr.dlen-4 );
522 std::string urlInfo = urlInfoBuff;
523 delete [] urlInfoBuff;
524 log->Dump( XRootDMsg, "[%s] Got kXR_redirect response to "
525 "message %s: %s, port %d", pUrl.GetHostId().c_str(),
526 pRequest->GetObfuscatedDescription().c_str(), urlInfo.c_str(),
527 rsp->body.redirect.port );
528
529 //----------------------------------------------------------------------
530 // Check if we can proceed
531 //----------------------------------------------------------------------
532 if( !pRedirectCounter )
533 {
534 log->Warning( XRootDMsg, "[%s] Redirect limit has been reached for "
535 "message %s, the last known error is: %s",
536 pUrl.GetHostId().c_str(),
537 pRequest->GetObfuscatedDescription().c_str(),
538 pLastError.ToString().c_str() );
539
540
541 pStatus = Status( stFatal, errRedirectLimit );
542 HandleResponse();
543 return;
544 }
545 --pRedirectCounter;
546
547 //----------------------------------------------------------------------
548 // Keep the info about this server if we still need to find a load
549 // balancer
550 //----------------------------------------------------------------------
551 uint32_t flags = pHosts->back().flags;
552 if( !pHasLoadBalancer )
553 {
554 if( flags & kXR_isManager )
555 {
556 //------------------------------------------------------------------
557 // If the current server is a meta manager then it supersedes
558 // any existing load balancer, otherwise we assign a load-balancer
559 // only if it has not been already assigned
560 //------------------------------------------------------------------
561 if( ( flags & kXR_attrMeta ) || !pLoadBalancer.url.IsValid() )
562 {
563 pLoadBalancer = pHosts->back();
564 log->Dump( XRootDMsg, "[%s] Current server has been assigned "
565 "as a load-balancer for message %s",
566 pUrl.GetHostId().c_str(),
567 pRequest->GetObfuscatedDescription().c_str() );
568 HostList::iterator it;
569 for( it = pHosts->begin(); it != pHosts->end(); ++it )
570 it->loadBalancer = false;
571 pHosts->back().loadBalancer = true;
572 }
573 }
574 }
575
576 //----------------------------------------------------------------------
577 // If the redirect comes from a data server safe the URL because
578 // in case of a failure we will use it as the effective data server URL
579 // for the tried CGI opaque info
580 //----------------------------------------------------------------------
581 if( flags & kXR_isServer )
582 pEffectiveDataServerUrl = new URL( pHosts->back().url );
583
584 //----------------------------------------------------------------------
585 // Build the URL and check it's validity
586 //----------------------------------------------------------------------
587 std::vector<std::string> urlComponents;
588 std::string newCgi;
589 Utils::splitString( urlComponents, urlInfo, "?" );
590
591 std::ostringstream o;
592
593 o << urlComponents[0];
594 if( rsp->body.redirect.port > 0 )
595 o << ":" << rsp->body.redirect.port << "/";
596 else if( rsp->body.redirect.port < 0 )
597 {
598 //--------------------------------------------------------------------
599 // check if the manager wants to enforce write recovery at himself
600 // (beware we are dealing here with negative flags)
601 //--------------------------------------------------------------------
602 if( ~uint32_t( rsp->body.redirect.port ) & kXR_recoverWrts )
603 pHosts->back().flags |= kXR_recoverWrts;
604
605 //--------------------------------------------------------------------
606 // check if the manager wants to collapse the communication channel
607 // (the redirect host is to replace the current host)
608 //--------------------------------------------------------------------
609 if( ~uint32_t( rsp->body.redirect.port ) & kXR_collapseRedir )
610 {
611 std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
612 pPostMaster->CollapseRedirect( pUrl, url );
613 }
614
615 if( ~uint32_t( rsp->body.redirect.port ) & kXR_ecRedir )
616 {
617 std::string url( rsp->body.redirect.host, rsp->hdr.dlen-4 );
618 if( Utils::CheckEC( pRequest, url ) )
619 pRedirectAsAnswer = true;
620 }
621 }
622
623 URL newUrl = URL( o.str() );
624 if( !newUrl.IsValid() )
625 {
627 log->Error( XRootDMsg, "[%s] Got invalid redirection URL: %s",
628 pUrl.GetHostId().c_str(), urlInfo.c_str() );
629 HandleResponse();
630 return;
631 }
632
633 if( pUrl.GetUserName() != "" && newUrl.GetUserName() == "" )
634 newUrl.SetUserName( pUrl.GetUserName() );
635
636 if( pUrl.GetPassword() != "" && newUrl.GetPassword() == "" )
637 newUrl.SetPassword( pUrl.GetPassword() );
638
639 //----------------------------------------------------------------------
640 // Forward any "xrd.*" params from the original client request also to
641 // the new redirection url
642 // Also, we need to preserve any "xrdcl.*' as they are important for
643 // our internal workflows.
644 //----------------------------------------------------------------------
645 std::ostringstream ossXrd;
646 const URL::ParamsMap &urlParams = pUrl.GetParams();
647
648 for(URL::ParamsMap::const_iterator it = urlParams.begin();
649 it != urlParams.end(); ++it )
650 {
651 if( it->first.compare( 0, 4, "xrd." ) &&
652 it->first.compare( 0, 6, "xrdcl." ) )
653 continue;
654
655 ossXrd << it->first << '=' << it->second << '&';
656 }
657
658 std::string xrdCgi = ossXrd.str();
659 pRedirectUrl = newUrl.GetURL();
660
661 URL cgiURL;
662 if( urlComponents.size() > 1 )
663 {
664 pRedirectUrl += "?";
665 pRedirectUrl += urlComponents[1];
666 std::ostringstream o;
667 o << "fake://fake:111//fake?";
668 o << urlComponents[1];
669
670 if( urlComponents.size() == 3 )
671 o << '?' << urlComponents[2];
672
673 if (!xrdCgi.empty())
674 {
675 o << '&' << xrdCgi;
676 pRedirectUrl += '&';
677 pRedirectUrl += xrdCgi;
678 }
679
680 cgiURL = URL( o.str() );
681 }
682 else {
683 if (!xrdCgi.empty())
684 {
685 std::ostringstream o;
686 o << "fake://fake:111//fake?";
687 o << xrdCgi;
688 cgiURL = URL( o.str() );
689 pRedirectUrl += '?';
690 pRedirectUrl += xrdCgi;
691 }
692 }
693
694 //----------------------------------------------------------------------
695 // Check if we need to return the URL as a response
696 //----------------------------------------------------------------------
697 if( newUrl.GetProtocol() != "root" && newUrl.GetProtocol() != "xroot" &&
698 newUrl.GetProtocol() != "roots" && newUrl.GetProtocol() != "xroots" &&
699 !newUrl.IsLocalFile() )
700 pRedirectAsAnswer = true;
701
702 if( pRedirectAsAnswer )
703 {
704 pStatus = Status( stError, errRedirect );
705 HandleResponse();
706 return;
707 }
708
709 //----------------------------------------------------------------------
710 // Rewrite the message in a way required to send it to another server
711 //----------------------------------------------------------------------
712 newUrl.SetParams( cgiURL.GetParams() );
713 Status st = RewriteRequestRedirect( newUrl );
714 if( !st.IsOK() )
715 {
716 pStatus = st;
717 HandleResponse();
718 return;
719 }
720
721 //----------------------------------------------------------------------
722 // Make sure we don't change the protocol by accident (root vs roots)
723 //----------------------------------------------------------------------
724 if( ( pUrl.GetProtocol() == "roots" || pUrl.GetProtocol() == "xroots" ) &&
725 ( newUrl.GetProtocol() == "root" || newUrl.GetProtocol() == "xroot" ) )
726 newUrl.SetProtocol( "roots" );
727
728 //----------------------------------------------------------------------
729 // Send the request to the new location
730 //----------------------------------------------------------------------
731 HandleError( RetryAtServer( newUrl, RedirectEntry::EntryRedirect ) );
732 return;
733 }
734
735 //------------------------------------------------------------------------
736 // kXR_wait - we wait, and re-issue the request later
737 //------------------------------------------------------------------------
738 case kXR_wait:
739 {
740 uint32_t waitSeconds = 0;
741
742 if( rsp->hdr.dlen >= 4 )
743 {
744 char *infoMsg = new char[rsp->hdr.dlen-3];
745 infoMsg[rsp->hdr.dlen-4] = 0;
746 memcpy( infoMsg, rsp->body.wait.infomsg, rsp->hdr.dlen-4 );
747 log->Dump( XRootDMsg, "[%s] Got kXR_wait response of %d seconds to "
748 "message %s: %s", pUrl.GetHostId().c_str(),
749 rsp->body.wait.seconds, pRequest->GetObfuscatedDescription().c_str(),
750 infoMsg );
751 delete [] infoMsg;
752 waitSeconds = rsp->body.wait.seconds;
753 }
754 else
755 {
756 log->Dump( XRootDMsg, "[%s] Got kXR_wait response of 0 seconds to "
757 "message %s", pUrl.GetHostId().c_str(),
758 pRequest->GetObfuscatedDescription().c_str() );
759 }
760
761 pAggregatedWaitTime += waitSeconds;
762
763 // We need a special case if the data node comes from metalink
764 // redirector. In this case it might make more sense to try the
765 // next entry in the Metalink than wait.
766 if( OmitWait( *pRequest, pLoadBalancer.url ) )
767 {
768 int maxWait = DefaultMaxMetalinkWait;
769 DefaultEnv::GetEnv()->GetInt( "MaxMetalinkWait", maxWait );
770 if( pAggregatedWaitTime > maxWait )
771 {
772 UpdateTriedCGI();
773 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRedirectOnWait ) );
774 return;
775 }
776 }
777
778 //----------------------------------------------------------------------
779 // Some messages require rewriting before they can be sent again
780 // after wait
781 //----------------------------------------------------------------------
782 Status st = RewriteRequestWait();
783 if( !st.IsOK() )
784 {
785 pStatus = st;
786 HandleResponse();
787 return;
788 }
789
790 //----------------------------------------------------------------------
791 // Register a task to resend the message in some seconds, if we still
792 // have time to do that, and report a timeout otherwise
793 //----------------------------------------------------------------------
794 time_t resendTime = ::time(0)+waitSeconds;
795
796 if( resendTime < pExpiration )
797 {
798 log->Debug( ExDbgMsg, "[%s] Scheduling WaitTask for MsgHandler: %p (message: %s ).",
799 pUrl.GetHostId().c_str(), this,
800 pRequest->GetObfuscatedDescription().c_str() );
801
802 TaskManager *taskMgr = pPostMaster->GetTaskManager();
803 taskMgr->RegisterTask( new WaitTask( this ), resendTime );
804 }
805 else
806 {
807 log->Debug( XRootDMsg, "[%s] Wait time is too long, timing out %s",
808 pUrl.GetHostId().c_str(),
809 pRequest->GetObfuscatedDescription().c_str() );
810 HandleError( Status( stError, errOperationExpired) );
811 }
812 return;
813 }
814
815 //------------------------------------------------------------------------
816 // kXR_waitresp - the response will be returned in some seconds as an
817 // unsolicited message. Currently all messages of this type are handled
818 // one step before in the XrdClStream::OnIncoming as they need to be
819 // processed synchronously.
820 //------------------------------------------------------------------------
821 case kXR_waitresp:
822 {
823 if( rsp->hdr.dlen < 4 )
824 {
825 log->Error( XRootDMsg, "[%s] Got invalid waitresp response.",
826 pUrl.GetHostId().c_str() );
827 pStatus = Status( stError, errInvalidResponse );
828 HandleResponse();
829 return;
830 }
831
832 log->Dump( XRootDMsg, "[%s] Got kXR_waitresp response of %d seconds to "
833 "message %s", pUrl.GetHostId().c_str(),
834 rsp->body.waitresp.seconds,
835 pRequest->GetObfuscatedDescription().c_str() );
836 return;
837 }
838
839 //------------------------------------------------------------------------
840 // Default - unrecognized/unsupported response, declare an error
841 //------------------------------------------------------------------------
842 default:
843 {
844 log->Dump( XRootDMsg, "[%s] Got unrecognized response %d to "
845 "message %s", pUrl.GetHostId().c_str(),
846 rsp->hdr.status, pRequest->GetObfuscatedDescription().c_str() );
847 pStatus = Status( stError, errInvalidResponse );
848 HandleResponse();
849 return;
850 }
851 }
852
853 return;
854 }
855
856 //----------------------------------------------------------------------------
857 // Handle an event other that a message arrival - may be timeout
858 //----------------------------------------------------------------------------
860 XRootDStatus status )
861 {
862 Log *log = DefaultEnv::GetLog();
863 log->Dump( XRootDMsg, "[%s] Stream event reported for msg %s",
864 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
865
866 if( event == Ready )
867 return 0;
868
869 if( pTimeoutFence.load( std::memory_order_relaxed ) )
870 return 0;
871
872 HandleError( status );
873 return RemoveHandler;
874 }
875
876 //----------------------------------------------------------------------------
877 // Read message body directly from a socket
878 //----------------------------------------------------------------------------
880 Socket *socket,
881 uint32_t &bytesRead )
882 {
883 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
884 uint16_t reqId = ntohs( req->header.requestid );
885
886 if( reqId == kXR_pgread )
887 return pPageReader->Read( *socket, bytesRead );
888
889 return pBodyReader->Read( *socket, bytesRead );
890 }
891
892 //----------------------------------------------------------------------------
893 // We're here when we requested sending something over the wire
894 // and there has been a status update on this action
895 //----------------------------------------------------------------------------
897 XRootDStatus status )
898 {
899 Log *log = DefaultEnv::GetLog();
900
901 //--------------------------------------------------------------------------
902 // We were successful, so we now need to listen for a response
903 //--------------------------------------------------------------------------
904 if( status.IsOK() )
905 {
906 log->Dump( XRootDMsg, "[%s] Message %s has been successfully sent.",
907 pUrl.GetHostId().c_str(), message->GetObfuscatedDescription().c_str() );
908
909 log->Debug( ExDbgMsg, "[%s] Moving MsgHandler: %p (message: %s ) from out-queue to in-queue.",
910 pUrl.GetHostId().c_str(), this,
911 pRequest->GetObfuscatedDescription().c_str() );
912
913 pMsgInFly = true;
914 return;
915 }
916
917 //--------------------------------------------------------------------------
918 // We have failed, recover if possible
919 //--------------------------------------------------------------------------
920 log->Error( XRootDMsg, "[%s] Impossible to send message %s. Trying to "
921 "recover.", pUrl.GetHostId().c_str(),
922 message->GetObfuscatedDescription().c_str() );
923 HandleError( status );
924 }
925
926 //----------------------------------------------------------------------------
927 // Are we a raw writer or not?
928 //----------------------------------------------------------------------------
930 {
931 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
932 uint16_t reqId = ntohs( req->header.requestid );
933 if( reqId == kXR_write || reqId == kXR_writev || reqId == kXR_pgwrite )
934 return true;
935 // checkpoint + execute
936 if( reqId == kXR_chkpoint && req->chkpoint.opcode == kXR_ckpXeq )
937 {
938 ClientRequest *xeq = (ClientRequest*)pRequest->GetBuffer( sizeof( ClientRequest ) );
939 reqId = ntohs( xeq->header.requestid );
940 return reqId != kXR_truncate; // only checkpointed truncate does not have raw data
941 }
942
943 return false;
944 }
945
946 //----------------------------------------------------------------------------
947 // Write the message body
948 //----------------------------------------------------------------------------
950 uint32_t &bytesWritten )
951 {
952 //--------------------------------------------------------------------------
953 // First check if it is a PgWrite
954 //--------------------------------------------------------------------------
955 if( !pChunkList->empty() && !pCrc32cDigests.empty() )
956 {
957 //------------------------------------------------------------------------
958 // PgWrite will have just one chunk
959 //------------------------------------------------------------------------
960 ChunkInfo chunk = pChunkList->front();
961 //------------------------------------------------------------------------
962 // Calculate the size of the first and last page (in case the chunk is not
963 // 4KB aligned)
964 //------------------------------------------------------------------------
965 int fLen = 0, lLen = 0;
966 size_t nbpgs = XrdOucPgrwUtils::csNum( chunk.offset, chunk.length, fLen, lLen );
967
968 //------------------------------------------------------------------------
969 // Set the crc32c buffer if not ready yet
970 //------------------------------------------------------------------------
971 if( pPgWrtCksumBuff.GetCursor() == 0 )
972 {
973 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
974 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
975 }
976
977 uint32_t btsLeft = chunk.length - pAsyncOffset;
978 uint32_t pglen = ( pPgWrtCurrentPageNb == 0 ? fLen : XrdSys::PageSize ) - pPgWrtCurrentPageOffset;
979 if( pglen > btsLeft ) pglen = btsLeft;
980 char* pgbuf = static_cast<char*>( chunk.buffer ) + pAsyncOffset;
981
982 while( btsLeft > 0 )
983 {
984 // first write the crc32c digest
985 while( pPgWrtCksumBuff.GetCursor() < sizeof( uint32_t ) )
986 {
987 uint32_t dgstlen = sizeof( uint32_t ) - pPgWrtCksumBuff.GetCursor();
988 char* dgstbuf = pPgWrtCksumBuff.GetBufferAtCursor();
989 int btswrt = 0;
990 Status st = socket->Send( dgstbuf, dgstlen, btswrt );
991 if( !st.IsOK() ) return st;
992 bytesWritten += btswrt;
993 pPgWrtCksumBuff.AdvanceCursor( btswrt );
994 if( st.code == suRetry ) return st;
995 }
996 // then write the raw data (one page)
997 int btswrt = 0;
998 Status st = socket->Send( pgbuf, pglen, btswrt );
999 if( !st.IsOK() ) return st;
1000 pgbuf += btswrt;
1001 pglen -= btswrt;
1002 btsLeft -= btswrt;
1003 bytesWritten += btswrt;
1004 pAsyncOffset += btswrt; // update the offset to the raw data
1005 if( st.code == suRetry ) return st;
1006 // if we managed to write all the data ...
1007 if( pglen == 0 )
1008 {
1009 // move to the next page
1010 ++pPgWrtCurrentPageNb;
1011 if( pPgWrtCurrentPageNb < nbpgs )
1012 {
1013 // set the digest buffer
1014 pPgWrtCksumBuff.SetCursor( 0 );
1015 uint32_t digest = htonl( pCrc32cDigests[pPgWrtCurrentPageNb] );
1016 memcpy( pPgWrtCksumBuff.GetBuffer(), &digest, sizeof( uint32_t ) );
1017 }
1018 // set the page length
1019 pglen = XrdSys::PageSize;
1020 if( pglen > btsLeft ) pglen = btsLeft;
1021 // reset offset in the current page
1022 pPgWrtCurrentPageOffset = 0;
1023 }
1024 else
1025 // otherwise just adjust the offset in the current page
1026 pPgWrtCurrentPageOffset += btswrt;
1027
1028 }
1029 }
1030 else if( !pChunkList->empty() )
1031 {
1032 size_t size = pChunkList->size();
1033 for( size_t i = pAsyncChunkIndex ; i < size; ++i )
1034 {
1035 char *buffer = (char*)(*pChunkList)[i].buffer;
1036 uint32_t size = (*pChunkList)[i].length;
1037 size_t leftToBeWritten = size - pAsyncOffset;
1038
1039 while( leftToBeWritten )
1040 {
1041 int btswrt = 0;
1042 Status st = socket->Send( buffer + pAsyncOffset, leftToBeWritten, btswrt );
1043 bytesWritten += btswrt;
1044 if( !st.IsOK() || st.code == suRetry ) return st;
1045 pAsyncOffset += btswrt;
1046 leftToBeWritten -= btswrt;
1047 }
1048 //----------------------------------------------------------------------
1049 // Remember that we have moved to the next chunk, also clear the offset
1050 // within the buffer as we are going to move to a new one
1051 //----------------------------------------------------------------------
1052 ++pAsyncChunkIndex;
1053 pAsyncOffset = 0;
1054 }
1055 }
1056 else
1057 {
1058 Log *log = DefaultEnv::GetLog();
1059
1060 //------------------------------------------------------------------------
1061 // If the socket is encrypted we cannot use a kernel buffer, we have to
1062 // convert to user space buffer
1063 //------------------------------------------------------------------------
1064 if( socket->IsEncrypted() )
1065 {
1066 log->Debug( XRootDMsg, "[%s] Channel is encrypted: cannot use kernel buffer.",
1067 pUrl.GetHostId().c_str() );
1068
1069 char *ubuff = 0;
1070 ssize_t ret = XrdSys::Move( *pKBuff, ubuff );
1071 if( ret < 0 ) return Status( stError, errInternal );
1072 pChunkList->push_back( ChunkInfo( 0, ret, ubuff ) );
1073 return WriteMessageBody( socket, bytesWritten );
1074 }
1075
1076 //------------------------------------------------------------------------
1077 // Send the data
1078 //------------------------------------------------------------------------
1079 while( !pKBuff->Empty() )
1080 {
1081 int btswrt = 0;
1082 Status st = socket->Send( *pKBuff, btswrt );
1083 bytesWritten += btswrt;
1084 if( !st.IsOK() || st.code == suRetry ) return st;
1085 }
1086
1087 log->Debug( XRootDMsg, "[%s] Request %s payload (kernel buffer) transferred to socket.",
1088 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str() );
1089 }
1090
1091 return Status();
1092 }
1093
1094 //----------------------------------------------------------------------------
1095 // We're here when we got a time event. We needed to re-issue the request
1096 // in some time in the future, and that moment has arrived
1097 //----------------------------------------------------------------------------
1099 {
1100 HandleError( RetryAtServer( pUrl, RedirectEntry::EntryWait ) );
1101 }
1102
1103 //----------------------------------------------------------------------------
1104 // Bookkeeping after partial response has been received.
1105 //----------------------------------------------------------------------------
1107 {
1108 pTimeoutFence.store( false, std::memory_order_relaxed ); // Take down the timeout fence
1109 }
1110
1111 //----------------------------------------------------------------------------
1112 // Unpack the message and call the response handler
1113 //----------------------------------------------------------------------------
1114 void XRootDMsgHandler::HandleResponse()
1115 {
1116 //--------------------------------------------------------------------------
1117 // Process the response and notify the listener
1118 //--------------------------------------------------------------------------
1120 XRootDStatus *status = ProcessStatus();
1121 AnyObject *response = 0;
1122
1123 Log *log = DefaultEnv::GetLog();
1124 log->Debug( ExDbgMsg, "[%s] Calling MsgHandler: %p (message: %s ) "
1125 "with status: %s.",
1126 pUrl.GetHostId().c_str(), this,
1127 pRequest->GetObfuscatedDescription().c_str(),
1128 status->ToString().c_str() );
1129
1130 if( status->IsOK() )
1131 {
1132 Status st = ParseResponse( response );
1133 if( !st.IsOK() )
1134 {
1135 delete status;
1136 delete response;
1137 status = new XRootDStatus( st );
1138 response = 0;
1139 }
1140 }
1141
1142 //--------------------------------------------------------------------------
1143 // Close the redirect entry if necessary
1144 //--------------------------------------------------------------------------
1145 if( pRdirEntry )
1146 {
1147 pRdirEntry->status = *status;
1148 pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
1149 }
1150
1151 //--------------------------------------------------------------------------
1152 // Is it a final response?
1153 //--------------------------------------------------------------------------
1154 bool finalrsp = !( pStatus.IsOK() && pStatus.code == suContinue );
1155
1156 //--------------------------------------------------------------------------
1157 // Release the stream id
1158 //--------------------------------------------------------------------------
1159 if( pSidMgr && finalrsp )
1160 {
1161 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1162 if( status->IsOK() || !pMsgInFly ||
1163 !( status->code == errOperationExpired || status->code == errOperationInterrupted ) )
1164 pSidMgr->ReleaseSID( req->header.streamid );
1165 }
1166
1167 HostList *hosts = pHosts.release();
1168 if( !finalrsp )
1169 pHosts.reset( new HostList( *hosts ) );
1170
1171 pResponseHandler->HandleResponseWithHosts( status, response, hosts );
1172
1173 //--------------------------------------------------------------------------
1174 // if it is the final response there is nothing more to do ...
1175 //--------------------------------------------------------------------------
1176 if( finalrsp )
1177 delete this;
1178 //--------------------------------------------------------------------------
1179 // on the other hand if it is not the final response, we have to keep the
1180 // MsgHandler and delete the current response
1181 //--------------------------------------------------------------------------
1182 else
1183 {
1184 XrdSysCondVarHelper lck( pCV );
1185 pResponse.reset();
1186 pTimeoutFence.store( false, std::memory_order_relaxed );
1187 pCV.Broadcast();
1188 }
1189 }
1190
1191
1192 //----------------------------------------------------------------------------
1193 // Extract the status information from the stuff that we got
1194 //----------------------------------------------------------------------------
1195 XRootDStatus *XRootDMsgHandler::ProcessStatus()
1196 {
1197 XRootDStatus *st = new XRootDStatus( pStatus );
1198 ServerResponse *rsp = 0;
1199 if( pResponse )
1200 rsp = (ServerResponse *)pResponse->GetBuffer();
1201
1202 if( !pStatus.IsOK() && rsp )
1203 {
1204 if( pStatus.code == errErrorResponse )
1205 {
1206 st->errNo = rsp->body.error.errnum;
1207 // omit the last character as the string returned from the server
1208 // (acording to protocol specs) should be null-terminated
1209 std::string errmsg( rsp->body.error.errmsg, rsp->hdr.dlen-5 );
1210 if( st->errNo == kXR_noReplicas && !pLastError.IsOK() )
1211 errmsg += " Last seen error: " + pLastError.ToString();
1212 st->SetErrorMessage( errmsg );
1213 }
1214 else if( pStatus.code == errRedirect )
1215 st->SetErrorMessage( pRedirectUrl );
1216 }
1217 return st;
1218 }
1219
1220 //------------------------------------------------------------------------
1221 // Parse the response and put it in an object that could be passed to
1222 // the user
1223 //------------------------------------------------------------------------
1224 Status XRootDMsgHandler::ParseResponse( AnyObject *&response )
1225 {
1226 if( !pResponse )
1227 return Status();
1228
1229 ServerResponse *rsp = (ServerResponse *)pResponse->GetBuffer();
1230 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1231 Log *log = DefaultEnv::GetLog();
1232
1233 //--------------------------------------------------------------------------
1234 // Handle redirect as an answer
1235 //--------------------------------------------------------------------------
1236 if( rsp->hdr.status == kXR_redirect )
1237 {
1238 log->Error( XRootDMsg, "Internal Error: unable to process redirect" );
1239 return 0;
1240 }
1241
1242 Buffer buff;
1243 uint32_t length = 0;
1244 char *buffer = 0;
1245
1246 //--------------------------------------------------------------------------
1247 // We don't have any partial answers so pass what we have
1248 //--------------------------------------------------------------------------
1249 if( pPartialResps.empty() )
1250 {
1251 buffer = rsp->body.buffer.data;
1252 length = rsp->hdr.dlen;
1253 }
1254 //--------------------------------------------------------------------------
1255 // Partial answers, we need to glue them together before parsing
1256 //--------------------------------------------------------------------------
1257 else if( req->header.requestid != kXR_read &&
1258 req->header.requestid != kXR_readv )
1259 {
1260 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1261 {
1262 ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1263 length += part->hdr.dlen;
1264 }
1265 length += rsp->hdr.dlen;
1266
1267 buff.Allocate( length );
1268 uint32_t offset = 0;
1269 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1270 {
1271 ServerResponse *part = (ServerResponse*)pPartialResps[i]->GetBuffer();
1272 buff.Append( part->body.buffer.data, part->hdr.dlen, offset );
1273 offset += part->hdr.dlen;
1274 }
1275 buff.Append( rsp->body.buffer.data, rsp->hdr.dlen, offset );
1276 buffer = buff.GetBuffer();
1277 }
1278
1279 //--------------------------------------------------------------------------
1280 // Right, but what was the question?
1281 //--------------------------------------------------------------------------
1282 switch( req->header.requestid )
1283 {
1284 //------------------------------------------------------------------------
1285 // kXR_mv, kXR_truncate, kXR_rm, kXR_mkdir, kXR_rmdir, kXR_chmod,
1286 // kXR_ping, kXR_close, kXR_write, kXR_sync
1287 //------------------------------------------------------------------------
1288 case kXR_mv:
1289 case kXR_truncate:
1290 case kXR_rm:
1291 case kXR_mkdir:
1292 case kXR_rmdir:
1293 case kXR_chmod:
1294 case kXR_ping:
1295 case kXR_close:
1296 case kXR_write:
1297 case kXR_writev:
1298 case kXR_sync:
1299 case kXR_chkpoint:
1300 return Status();
1301
1302 //------------------------------------------------------------------------
1303 // kXR_locate
1304 //------------------------------------------------------------------------
1305 case kXR_locate:
1306 {
1307 AnyObject *obj = new AnyObject();
1308
1309 char *nullBuffer = new char[length+1];
1310 nullBuffer[length] = 0;
1311 memcpy( nullBuffer, buffer, length );
1312
1313 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1314 "LocateInfo: %s", pUrl.GetHostId().c_str(),
1315 pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1316 LocationInfo *data = new LocationInfo();
1317
1318 if( data->ParseServerResponse( nullBuffer ) == false )
1319 {
1320 delete obj;
1321 delete data;
1322 delete [] nullBuffer;
1323 return Status( stError, errInvalidResponse );
1324 }
1325 delete [] nullBuffer;
1326
1327 obj->Set( data );
1328 response = obj;
1329 return Status();
1330 }
1331
1332 //------------------------------------------------------------------------
1333 // kXR_stat
1334 //------------------------------------------------------------------------
1335 case kXR_stat:
1336 {
1337 AnyObject *obj = new AnyObject();
1338
1339 //----------------------------------------------------------------------
1340 // Virtual File System stat (kXR_vfs)
1341 //----------------------------------------------------------------------
1342 if( req->stat.options & kXR_vfs )
1343 {
1344 StatInfoVFS *data = new StatInfoVFS();
1345
1346 char *nullBuffer = new char[length+1];
1347 nullBuffer[length] = 0;
1348 memcpy( nullBuffer, buffer, length );
1349
1350 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1351 "StatInfoVFS: %s", pUrl.GetHostId().c_str(),
1352 pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1353
1354 if( data->ParseServerResponse( nullBuffer ) == false )
1355 {
1356 delete obj;
1357 delete data;
1358 delete [] nullBuffer;
1359 return Status( stError, errInvalidResponse );
1360 }
1361 delete [] nullBuffer;
1362
1363 obj->Set( data );
1364 }
1365 //----------------------------------------------------------------------
1366 // Normal stat
1367 //----------------------------------------------------------------------
1368 else
1369 {
1370 StatInfo *data = new StatInfo();
1371
1372 char *nullBuffer = new char[length+1];
1373 nullBuffer[length] = 0;
1374 memcpy( nullBuffer, buffer, length );
1375
1376 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as StatInfo: "
1377 "%s", pUrl.GetHostId().c_str(),
1378 pRequest->GetObfuscatedDescription().c_str(), nullBuffer );
1379
1380 if( data->ParseServerResponse( nullBuffer ) == false )
1381 {
1382 delete obj;
1383 delete data;
1384 delete [] nullBuffer;
1385 return Status( stError, errInvalidResponse );
1386 }
1387 delete [] nullBuffer;
1388 obj->Set( data );
1389 }
1390
1391 response = obj;
1392 return Status();
1393 }
1394
1395 //------------------------------------------------------------------------
1396 // kXR_protocol
1397 //------------------------------------------------------------------------
1398 case kXR_protocol:
1399 {
1400 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ProtocolInfo",
1401 pUrl.GetHostId().c_str(),
1402 pRequest->GetObfuscatedDescription().c_str() );
1403
1404 if( rsp->hdr.dlen < 8 )
1405 {
1406 log->Error( XRootDMsg, "[%s] Got invalid redirect response.",
1407 pUrl.GetHostId().c_str() );
1408 return Status( stError, errInvalidResponse );
1409 }
1410
1411 AnyObject *obj = new AnyObject();
1412 ProtocolInfo *data = new ProtocolInfo( rsp->body.protocol.pval,
1413 rsp->body.protocol.flags );
1414 obj->Set( data );
1415 response = obj;
1416 return Status();
1417 }
1418
1419 //------------------------------------------------------------------------
1420 // kXR_dirlist
1421 //------------------------------------------------------------------------
1422 case kXR_dirlist:
1423 {
1424 AnyObject *obj = new AnyObject();
1425 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as "
1426 "DirectoryList", pUrl.GetHostId().c_str(),
1427 pRequest->GetObfuscatedDescription().c_str() );
1428
1429 char *path = new char[req->dirlist.dlen+1];
1430 path[req->dirlist.dlen] = 0;
1431 memcpy( path, pRequest->GetBuffer(24), req->dirlist.dlen );
1432
1433 DirectoryList *data = new DirectoryList();
1434 data->SetParentName( path );
1435 delete [] path;
1436
1437 char *nullBuffer = new char[length+1];
1438 nullBuffer[length] = 0;
1439 memcpy( nullBuffer, buffer, length );
1440
1441 bool invalidrsp = false;
1442
1443 if( !pDirListStarted )
1444 {
1445 pDirListWithStat = DirectoryList::HasStatInfo( nullBuffer );
1446 pDirListStarted = true;
1447
1448 invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer );
1449 }
1450 else
1451 invalidrsp = !data->ParseServerResponse( pUrl.GetHostId(), nullBuffer, pDirListWithStat );
1452
1453 if( invalidrsp )
1454 {
1455 delete data;
1456 delete obj;
1457 delete [] nullBuffer;
1458 return Status( stError, errInvalidResponse );
1459 }
1460
1461 delete [] nullBuffer;
1462 obj->Set( data );
1463 response = obj;
1464 return Status();
1465 }
1466
1467 //------------------------------------------------------------------------
1468 // kXR_open - if we got the statistics, otherwise return 0
1469 //------------------------------------------------------------------------
1470 case kXR_open:
1471 {
1472 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as OpenInfo",
1473 pUrl.GetHostId().c_str(),
1474 pRequest->GetObfuscatedDescription().c_str() );
1475
1476 if( rsp->hdr.dlen < 4 )
1477 {
1478 log->Error( XRootDMsg, "[%s] Got invalid open response.",
1479 pUrl.GetHostId().c_str() );
1480 return Status( stError, errInvalidResponse );
1481 }
1482
1483 AnyObject *obj = new AnyObject();
1484 StatInfo *statInfo = 0;
1485
1486 //----------------------------------------------------------------------
1487 // Handle StatInfo if requested
1488 //----------------------------------------------------------------------
1489 if( req->open.options & kXR_retstat )
1490 {
1491 log->Dump( XRootDMsg, "[%s] Parsing StatInfo in response to %s",
1492 pUrl.GetHostId().c_str(),
1493 pRequest->GetObfuscatedDescription().c_str() );
1494
1495 if( rsp->hdr.dlen >= 12 )
1496 {
1497 char *nullBuffer = new char[rsp->hdr.dlen-11];
1498 nullBuffer[rsp->hdr.dlen-12] = 0;
1499 memcpy( nullBuffer, buffer+12, rsp->hdr.dlen-12 );
1500
1501 statInfo = new StatInfo();
1502 if( statInfo->ParseServerResponse( nullBuffer ) == false )
1503 {
1504 delete statInfo;
1505 statInfo = 0;
1506 }
1507 delete [] nullBuffer;
1508 }
1509
1510 if( rsp->hdr.dlen < 12 || !statInfo )
1511 {
1512 log->Error( XRootDMsg, "[%s] Unable to parse StatInfo in response "
1513 "to %s", pUrl.GetHostId().c_str(),
1514 pRequest->GetObfuscatedDescription().c_str() );
1515 delete obj;
1516 return Status( stError, errInvalidResponse );
1517 }
1518 }
1519
1520 OpenInfo *data = new OpenInfo( (uint8_t*)buffer,
1521 pResponse->GetSessionId(),
1522 statInfo );
1523 obj->Set( data );
1524 response = obj;
1525 return Status();
1526 }
1527
1528 //------------------------------------------------------------------------
1529 // kXR_read
1530 //------------------------------------------------------------------------
1531 case kXR_read:
1532 {
1533 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as ChunkInfo",
1534 pUrl.GetHostId().c_str(),
1535 pRequest->GetObfuscatedDescription().c_str() );
1536
1537 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1538 {
1539 //--------------------------------------------------------------------
1540 // we are expecting to have only the header in the message, the raw
1541 // data have been readout into the user buffer
1542 //--------------------------------------------------------------------
1543 if( pPartialResps[i]->GetSize() > 8 )
1544 return Status( stOK, errInternal );
1545 }
1546 //----------------------------------------------------------------------
1547 // we are expecting to have only the header in the message, the raw
1548 // data have been readout into the user buffer
1549 //----------------------------------------------------------------------
1550 if( pResponse->GetSize() > 8 )
1551 return Status( stOK, errInternal );
1552 //----------------------------------------------------------------------
1553 // Get the response for the end user
1554 //----------------------------------------------------------------------
1555 return pBodyReader->GetResponse( response );
1556 }
1557
1558 //------------------------------------------------------------------------
1559 // kXR_pgread
1560 //------------------------------------------------------------------------
1561 case kXR_pgread:
1562 {
1563 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as PageInfo",
1564 pUrl.GetHostId().c_str(),
1565 pRequest->GetObfuscatedDescription().c_str() );
1566
1567 //----------------------------------------------------------------------
1568 // Glue in the cached responses if necessary
1569 //----------------------------------------------------------------------
1570 ChunkInfo chunk = pChunkList->front();
1571 bool sizeMismatch = false;
1572 uint32_t currentOffset = 0;
1573 char *cursor = (char*)chunk.buffer;
1574 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1575 {
1576 ServerResponseV2 *part = (ServerResponseV2*)pPartialResps[i]->GetBuffer();
1577
1578 //--------------------------------------------------------------------
1579 // the actual size of the raw data without the crc32c checksums
1580 //--------------------------------------------------------------------
1581 size_t datalen = part->status.bdy.dlen - NbPgPerRsp( part->info.pgread.offset,
1582 part->status.bdy.dlen ) * CksumSize;
1583
1584 if( currentOffset + datalen > chunk.length )
1585 {
1586 sizeMismatch = true;
1587 break;
1588 }
1589
1590 currentOffset += datalen;
1591 cursor += datalen;
1592 }
1593
1594 ServerResponseV2 *rspst = (ServerResponseV2*)pResponse->GetBuffer();
1595 size_t datalen = rspst->status.bdy.dlen - NbPgPerRsp( rspst->info.pgread.offset,
1596 rspst->status.bdy.dlen ) * CksumSize;
1597 if( currentOffset + datalen <= chunk.length )
1598 currentOffset += datalen;
1599 else
1600 sizeMismatch = true;
1601
1602 //----------------------------------------------------------------------
1603 // Overflow
1604 //----------------------------------------------------------------------
1605 if( pChunkStatus.front().sizeError || sizeMismatch )
1606 {
1607 log->Error( XRootDMsg, "[%s] Handling response to %s: user supplied "
1608 "buffer is too small for the received data.",
1609 pUrl.GetHostId().c_str(),
1610 pRequest->GetObfuscatedDescription().c_str() );
1611 return Status( stError, errInvalidResponse );
1612 }
1613
1614 AnyObject *obj = new AnyObject();
1615 PageInfo *pgInfo = new PageInfo( chunk.offset, currentOffset, chunk.buffer,
1616 std::move( pCrc32cDigests) );
1617
1618 obj->Set( pgInfo );
1619 response = obj;
1620 return Status();
1621 }
1622
1623 //------------------------------------------------------------------------
1624 // kXR_pgwrite
1625 //------------------------------------------------------------------------
1626 case kXR_pgwrite:
1627 {
1628 std::vector<std::tuple<uint64_t, uint32_t>> retries;
1629
1630 ServerResponseV2 *rsp = (ServerResponseV2*)pResponse->GetBuffer();
1631 if( rsp->status.bdy.dlen > 0 )
1632 {
1633 ServerResponseBody_pgWrCSE *cse = (ServerResponseBody_pgWrCSE*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) );
1634 size_t pgcnt = ( rsp->status.bdy.dlen - 8 ) / sizeof( kXR_int64 );
1635 retries.reserve( pgcnt );
1636 kXR_int64 *pgoffs = (kXR_int64*)pResponse->GetBuffer( sizeof( ServerResponseV2 ) +
1637 sizeof( ServerResponseBody_pgWrCSE ) );
1638
1639 for( size_t i = 0; i < pgcnt; ++i )
1640 {
1641 uint32_t len = XrdSys::PageSize;
1642 if( i == 0 ) len = cse->dlFirst;
1643 else if( i == pgcnt - 1 ) len = cse->dlLast;
1644 retries.push_back( std::make_tuple( pgoffs[i], len ) );
1645 }
1646 }
1647
1648 RetryInfo *info = new RetryInfo( std::move( retries ) );
1649 AnyObject *obj = new AnyObject();
1650 obj->Set( info );
1651 response = obj;
1652
1653 return Status();
1654 }
1655
1656
1657 //------------------------------------------------------------------------
1658 // kXR_readv - we need to pass the length of the buffer to the user code
1659 //------------------------------------------------------------------------
1660 case kXR_readv:
1661 {
1662 log->Dump( XRootDMsg, "[%s] Parsing the response to %p as "
1663 "VectorReadInfo", pUrl.GetHostId().c_str(),
1664 pRequest->GetObfuscatedDescription().c_str() );
1665
1666 for( uint32_t i = 0; i < pPartialResps.size(); ++i )
1667 {
1668 //--------------------------------------------------------------------
1669 // we are expecting to have only the header in the message, the raw
1670 // data have been readout into the user buffer
1671 //--------------------------------------------------------------------
1672 if( pPartialResps[i]->GetSize() > 8 )
1673 return Status( stOK, errInternal );
1674 }
1675 //----------------------------------------------------------------------
1676 // we are expecting to have only the header in the message, the raw
1677 // data have been readout into the user buffer
1678 //----------------------------------------------------------------------
1679 if( pResponse->GetSize() > 8 )
1680 return Status( stOK, errInternal );
1681 //----------------------------------------------------------------------
1682 // Get the response for the end user
1683 //----------------------------------------------------------------------
1684 return pBodyReader->GetResponse( response );
1685 }
1686
1687 //------------------------------------------------------------------------
1688 // kXR_fattr
1689 //------------------------------------------------------------------------
1690 case kXR_fattr:
1691 {
1692 int len = rsp->hdr.dlen;
1693 char* data = rsp->body.buffer.data;
1694
1695 return ParseXAttrResponse( data, len, response );
1696 }
1697
1698 //------------------------------------------------------------------------
1699 // kXR_query
1700 //------------------------------------------------------------------------
1701 case kXR_query:
1702 case kXR_set:
1703 case kXR_prepare:
1704 default:
1705 {
1706 AnyObject *obj = new AnyObject();
1707 log->Dump( XRootDMsg, "[%s] Parsing the response to %s as BinaryData",
1708 pUrl.GetHostId().c_str(),
1709 pRequest->GetObfuscatedDescription().c_str() );
1710
1711 BinaryDataInfo *data = new BinaryDataInfo();
1712 data->Allocate( length );
1713 data->Append( buffer, length );
1714 obj->Set( data );
1715 response = obj;
1716 return Status();
1717 }
1718 };
1719 return Status( stError, errInvalidMessage );
1720 }
1721
1722 //------------------------------------------------------------------------
1723 // Parse the response to kXR_fattr request and put it in an object that
1724 // could be passed to the user
1725 //------------------------------------------------------------------------
1726 Status XRootDMsgHandler::ParseXAttrResponse( char *data, size_t len,
1727 AnyObject *&response )
1728 {
1729 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1730// Log *log = DefaultEnv::GetLog(); //TODO
1731
1732 switch( req->fattr.subcode )
1733 {
1734 case kXR_fattrDel:
1735 case kXR_fattrSet:
1736 {
1737 Status status;
1738
1739 kXR_char nerrs = 0;
1740 if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1741 return status;
1742
1743 kXR_char nattr = 0;
1744 if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1745 return status;
1746
1747 std::vector<XAttrStatus> resp;
1748 // read the namevec
1749 for( kXR_char i = 0; i < nattr; ++i )
1750 {
1751 kXR_unt16 rc = 0;
1752 if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1753 return status;
1754 rc = ntohs( rc );
1755
1756 // count errors
1757 if( rc ) --nerrs;
1758
1759 std::string name;
1760 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1761 return status;
1762
1763 XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1764 XRootDStatus();
1765 resp.push_back( XAttrStatus( name, st ) );
1766 }
1767
1768 // check if we read all the data and if the error count is OK
1769 if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1770
1771 // set up the response object
1772 response = new AnyObject();
1773 response->Set( new std::vector<XAttrStatus>( std::move( resp ) ) );
1774
1775 return Status();
1776 }
1777
1778 case kXR_fattrGet:
1779 {
1780 Status status;
1781
1782 kXR_char nerrs = 0;
1783 if( !( status = ReadFromBuffer( data, len, nerrs ) ).IsOK() )
1784 return status;
1785
1786 kXR_char nattr = 0;
1787 if( !( status = ReadFromBuffer( data, len, nattr ) ).IsOK() )
1788 return status;
1789
1790 std::vector<XAttr> resp;
1791 resp.reserve( nattr );
1792
1793 // read the name vec
1794 for( kXR_char i = 0; i < nattr; ++i )
1795 {
1796 kXR_unt16 rc = 0;
1797 if( !( status = ReadFromBuffer( data, len, rc ) ).IsOK() )
1798 return status;
1799 rc = ntohs( rc );
1800
1801 // count errors
1802 if( rc ) --nerrs;
1803
1804 std::string name;
1805 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1806 return status;
1807
1808 XRootDStatus st = rc ? XRootDStatus( stError, errErrorResponse, rc ) :
1809 XRootDStatus();
1810 resp.push_back( XAttr( name, st ) );
1811 }
1812
1813 // read the value vec
1814 for( kXR_char i = 0; i < nattr; ++i )
1815 {
1816 kXR_int32 vlen = 0;
1817 if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1818 return status;
1819 vlen = ntohl( vlen );
1820
1821 std::string value;
1822 if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1823 return status;
1824
1825 resp[i].value.swap( value );
1826 }
1827
1828 // check if we read all the data and if the error count is OK
1829 if( len != 0 || nerrs != 0 ) return Status( stError, errDataError );
1830
1831 // set up the response object
1832 response = new AnyObject();
1833 response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1834
1835 return Status();
1836 }
1837
1838 case kXR_fattrList:
1839 {
1840 Status status;
1841 std::vector<XAttr> resp;
1842
1843 while( len > 0 )
1844 {
1845 std::string name;
1846 if( !( status = ReadFromBuffer( data, len, name ) ).IsOK() )
1847 return status;
1848
1849 kXR_int32 vlen = 0;
1850 if( !( status = ReadFromBuffer( data, len, vlen ) ).IsOK() )
1851 return status;
1852 vlen = ntohl( vlen );
1853
1854 std::string value;
1855 if( !( status = ReadFromBuffer( data, len, vlen, value ) ).IsOK() )
1856 return status;
1857
1858 resp.push_back( XAttr( name, value ) );
1859 }
1860
1861 // set up the response object
1862 response = new AnyObject();
1863 response->Set( new std::vector<XAttr>( std::move( resp ) ) );
1864
1865 return Status();
1866 }
1867
1868 default:
1869 return Status( stError, errDataError );
1870 }
1871 }
1872
1873 //----------------------------------------------------------------------------
1874 // Perform the changes to the original request needed by the redirect
1875 // procedure - allocate new streamid, append redirection data and such
1876 //----------------------------------------------------------------------------
1877 Status XRootDMsgHandler::RewriteRequestRedirect( const URL &newUrl )
1878 {
1879 Log *log = DefaultEnv::GetLog();
1880
1881 Status st;
1882 // Append any "xrd.*" parameters present in newCgi so that any authentication
1883 // requirements are properly enforced
1884 const URL::ParamsMap &newCgi = newUrl.GetParams();
1885 std::string xrdCgi = "";
1886 std::ostringstream ossXrd;
1887 for(URL::ParamsMap::const_iterator it = newCgi.begin(); it != newCgi.end(); ++it )
1888 {
1889 if( it->first.compare( 0, 4, "xrd." ) )
1890 continue;
1891 ossXrd << it->first << '=' << it->second << '&';
1892 }
1893
1894 xrdCgi = ossXrd.str();
1895 // Redirection URL containing also any original xrd.* opaque parameters
1896 XrdCl::URL authUrl;
1897
1898 if (xrdCgi.empty())
1899 {
1900 authUrl = newUrl;
1901 }
1902 else
1903 {
1904 std::string surl = newUrl.GetURL();
1905 (surl.find('?') == std::string::npos) ? (surl += '?') :
1906 ((*surl.rbegin() != '&') ? (surl += '&') : (surl += ""));
1907 surl += xrdCgi;
1908 if (!authUrl.FromString(surl))
1909 {
1910 std::string surlLog = surl;
1911 if( unlikely( log->GetLevel() >= Log::ErrorMsg ) ) {
1912 surlLog = obfuscateAuth(surlLog);
1913 }
1914 log->Error( XRootDMsg, "[%s] Failed to build redirection URL from data: %s",
1915 newUrl.GetHostId().c_str(), surl.c_str());
1916 return Status(stError, errInvalidRedirectURL);
1917 }
1918 }
1919
1920 //--------------------------------------------------------------------------
1921 // Rewrite particular requests
1922 //--------------------------------------------------------------------------
1924 MessageUtils::RewriteCGIAndPath( pRequest, newCgi, true, newUrl.GetPath() );
1926 return Status();
1927 }
1928
1929 //----------------------------------------------------------------------------
1930 // Some requests need to be rewritten also after getting kXR_wait
1931 //----------------------------------------------------------------------------
1932 Status XRootDMsgHandler::RewriteRequestWait()
1933 {
1934 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1935
1936 XRootDTransport::UnMarshallRequest( pRequest );
1937
1938 //------------------------------------------------------------------------
1939 // For kXR_locate and kXR_open request the kXR_refresh bit needs to be
1940 // turned off after wait
1941 //------------------------------------------------------------------------
1942 switch( req->header.requestid )
1943 {
1944 case kXR_locate:
1945 {
1946 uint16_t refresh = kXR_refresh;
1947 req->locate.options &= (~refresh);
1948 break;
1949 }
1950
1951 case kXR_open:
1952 {
1953 uint16_t refresh = kXR_refresh;
1954 req->locate.options &= (~refresh);
1955 break;
1956 }
1957 }
1958
1959 XRootDTransport::SetDescription( pRequest );
1960 XRootDTransport::MarshallRequest( pRequest );
1961 return Status();
1962 }
1963
1964 //----------------------------------------------------------------------------
1965 // Recover error
1966 //----------------------------------------------------------------------------
1967 void XRootDMsgHandler::HandleError( XRootDStatus status )
1968 {
1969 //--------------------------------------------------------------------------
1970 // If there was no error then do nothing
1971 //--------------------------------------------------------------------------
1972 if( status.IsOK() )
1973 return;
1974
1975 if( pSidMgr && pMsgInFly && (
1976 status.code == errOperationExpired ||
1977 status.code == errOperationInterrupted ) )
1978 {
1979 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
1980 pSidMgr->TimeOutSID( req->header.streamid );
1981 }
1982
1983 bool noreplicas = ( status.code == errErrorResponse &&
1984 status.errNo == kXR_noReplicas );
1985
1986 if( !noreplicas ) pLastError = status;
1987
1988 Log *log = DefaultEnv::GetLog();
1989 log->Debug( XRootDMsg, "[%s] Handling error while processing %s: %s.",
1990 pUrl.GetHostId().c_str(), pRequest->GetObfuscatedDescription().c_str(),
1991 status.ToString().c_str() );
1992
1993 //--------------------------------------------------------------------------
1994 // Check if it is a fatal TLS error that has been marked as potentially
1995 // recoverable, if yes check if we can downgrade from fatal to error.
1996 //--------------------------------------------------------------------------
1997 if( status.IsFatal() && status.code == errTlsError && status.errNo == EAGAIN )
1998 {
1999 if( pSslErrCnt < MaxSslErrRetry )
2000 {
2001 status.status &= ~stFatal; // switch off fatal&error bits
2002 status.status |= stError; // switch on error bit
2003 }
2004 ++pSslErrCnt; // count number of consecutive SSL errors
2005 }
2006 else
2007 pSslErrCnt = 0;
2008
2009 //--------------------------------------------------------------------------
2010 // We have got an error message, we can recover it at the load balancer if:
2011 // 1) we haven't got it from the load balancer
2012 // 2) we have a load balancer assigned
2013 // 3) the error is either one of: kXR_FSError, kXR_IOError, kXR_ServerError,
2014 // kXR_NotFound
2015 // 4) in the case of kXR_NotFound a kXR_refresh flags needs to be set
2016 //--------------------------------------------------------------------------
2017 if( status.code == errErrorResponse )
2018 {
2019 if( RetriableErrorResponse( status ) )
2020 {
2021 UpdateTriedCGI(status.errNo);
2022 if( status.errNo == kXR_NotFound || status.errNo == kXR_Overloaded )
2023 SwitchOnRefreshFlag();
2024 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2025 return;
2026 }
2027 else
2028 {
2029 pStatus = status;
2030 HandleRspOrQueue();
2031 return;
2032 }
2033 }
2034
2035 //--------------------------------------------------------------------------
2036 // Nothing can be done if:
2037 // 1) a user timeout has occurred
2038 // 2) has a non-zero session id
2039 // 3) if another error occurred and the validity of the message expired
2040 //--------------------------------------------------------------------------
2041 if( status.code == errOperationExpired || pRequest->GetSessionId() ||
2042 status.code == errOperationInterrupted || time(0) >= pExpiration )
2043 {
2044 log->Error( XRootDMsg, "[%s] Unable to get the response to request %s",
2045 pUrl.GetHostId().c_str(),
2046 pRequest->GetObfuscatedDescription().c_str() );
2047 pStatus = status;
2048 HandleRspOrQueue();
2049 return;
2050 }
2051
2052 //--------------------------------------------------------------------------
2053 // At this point we're left with connection errors, we recover them
2054 // at a load balancer if we have one and if not on the current server
2055 // until we get a response, an unrecoverable error or a timeout
2056 //--------------------------------------------------------------------------
2057 if( pLoadBalancer.url.IsValid() &&
2058 pLoadBalancer.url.GetLocation() != pUrl.GetLocation() )
2059 {
2060 UpdateTriedCGI( kXR_ServerError );
2061 HandleError( RetryAtServer( pLoadBalancer.url, RedirectEntry::EntryRetry ) );
2062 return;
2063 }
2064 else
2065 {
2066 if( !status.IsFatal() && IsRetriable() )
2067 {
2068 log->Info( XRootDMsg, "[%s] Retrying request: %s.",
2069 pUrl.GetHostId().c_str(),
2070 pRequest->GetObfuscatedDescription().c_str() );
2071
2072 UpdateTriedCGI( kXR_ServerError );
2073 HandleError( RetryAtServer( pUrl, RedirectEntry::EntryRetry ) );
2074 return;
2075 }
2076 pStatus = status;
2077 HandleRspOrQueue();
2078 return;
2079 }
2080 }
2081
2082 //----------------------------------------------------------------------------
2083 // Retry the message at another server
2084 //----------------------------------------------------------------------------
2085 Status XRootDMsgHandler::RetryAtServer( const URL &url, RedirectEntry::Type entryType )
2086 {
2087 pResponse.reset();
2088 Log *log = DefaultEnv::GetLog();
2089
2090 //--------------------------------------------------------------------------
2091 // Set up a redirect entry
2092 //--------------------------------------------------------------------------
2093 if( pRdirEntry ) pRedirectTraceBack.push_back( std::move( pRdirEntry ) );
2094 pRdirEntry.reset( new RedirectEntry( pUrl.GetLocation(), url.GetLocation(), entryType ) );
2095
2096 if( pUrl.GetLocation() != url.GetLocation() )
2097 {
2098 pHosts->push_back( url );
2099
2100 //------------------------------------------------------------------------
2101 // Assign a new stream id to the message
2102 //------------------------------------------------------------------------
2103
2104 // first release the old stream id
2105 // (though it could be a redirect from a local
2106 // metalink file, in this case there's no SID)
2107 ClientRequestHdr *req = (ClientRequestHdr*)pRequest->GetBuffer();
2108 if( pSidMgr )
2109 {
2110 pSidMgr->ReleaseSID( req->streamid );
2111 pSidMgr.reset();
2112 }
2113
2114 // then get the new SIDManager
2115 // (again this could be a redirect to a local
2116 // file and in this case there is no SID)
2117 if( !url.IsLocalFile() )
2118 {
2119 pSidMgr = SIDMgrPool::Instance().GetSIDMgr( url );
2120 Status st = pSidMgr->AllocateSID( req->streamid );
2121 if( !st.IsOK() )
2122 {
2123 log->Error( XRootDMsg, "[%s] Impossible to send message %s.",
2124 pUrl.GetHostId().c_str(),
2125 pRequest->GetObfuscatedDescription().c_str() );
2126 return st;
2127 }
2128 }
2129
2130 pUrl = url;
2131 }
2132
2133 if( pUrl.IsMetalink() && pFollowMetalink )
2134 {
2135 log->Debug( ExDbgMsg, "[%s] Metaling redirection for MsgHandler: %p (message: %s ).",
2136 pUrl.GetHostId().c_str(), this,
2137 pRequest->GetObfuscatedDescription().c_str() );
2138
2139 return pPostMaster->Redirect( pUrl, pRequest, this );
2140 }
2141 else if( pUrl.IsLocalFile() )
2142 {
2143 HandleLocalRedirect( &pUrl );
2144 return Status();
2145 }
2146 else
2147 {
2148 log->Debug( ExDbgMsg, "[%s] Retry at server MsgHandler: %p (message: %s ).",
2149 pUrl.GetHostId().c_str(), this,
2150 pRequest->GetObfuscatedDescription().c_str() );
2151 return pPostMaster->Send( pUrl, pRequest, this, true, pExpiration );
2152 }
2153 }
2154
2155 //----------------------------------------------------------------------------
2156 // Update the "tried=" part of the CGI of the current message
2157 //----------------------------------------------------------------------------
2158 void XRootDMsgHandler::UpdateTriedCGI(uint32_t errNo)
2159 {
2160 URL::ParamsMap cgi;
2161 std::string tried;
2162
2163 //--------------------------------------------------------------------------
2164 // In case a data server responded with a kXR_redirect and we fail at the
2165 // node where we were redirected to, the original data server should be
2166 // included in the tried CGI opaque info (instead of the current one).
2167 //--------------------------------------------------------------------------
2168 if( pEffectiveDataServerUrl )
2169 {
2170 tried = pEffectiveDataServerUrl->GetHostName();
2171 delete pEffectiveDataServerUrl;
2172 pEffectiveDataServerUrl = 0;
2173 }
2174 //--------------------------------------------------------------------------
2175 // Otherwise use the current URL.
2176 //--------------------------------------------------------------------------
2177 else
2178 tried = pUrl.GetHostName();
2179
2180 // Report the reason for the failure to the next location
2181 //
2182 if (errNo)
2183 { if (errNo == kXR_NotFound) cgi["triedrc"] = "enoent";
2184 else if (errNo == kXR_IOError) cgi["triedrc"] = "ioerr";
2185 else if (errNo == kXR_FSError) cgi["triedrc"] = "fserr";
2186 else if (errNo == kXR_ServerError) cgi["triedrc"] = "srverr";
2187 }
2188
2189 //--------------------------------------------------------------------------
2190 // If our current load balancer is a metamanager and we failed either
2191 // at a diskserver or at an unidentified node we also exclude the last
2192 // known manager
2193 //--------------------------------------------------------------------------
2194 if( pLoadBalancer.url.IsValid() && (pLoadBalancer.flags & kXR_attrMeta) )
2195 {
2196 HostList::reverse_iterator it;
2197 for( it = pHosts->rbegin()+1; it != pHosts->rend(); ++it )
2198 {
2199 if( it->loadBalancer )
2200 break;
2201
2202 tried += "," + it->url.GetHostName();
2203
2204 if( it->flags & kXR_isManager )
2205 break;
2206 }
2207 }
2208
2209 cgi["tried"] = tried;
2210 XRootDTransport::UnMarshallRequest( pRequest );
2211 MessageUtils::RewriteCGIAndPath( pRequest, cgi, false, "" );
2212 XRootDTransport::MarshallRequest( pRequest );
2213 }
2214
2215 //----------------------------------------------------------------------------
2216 // Switch on the refresh flag for some requests
2217 //----------------------------------------------------------------------------
2218 void XRootDMsgHandler::SwitchOnRefreshFlag()
2219 {
2220 XRootDTransport::UnMarshallRequest( pRequest );
2221 ClientRequest *req = (ClientRequest *)pRequest->GetBuffer();
2222 switch( req->header.requestid )
2223 {
2224 case kXR_locate:
2225 {
2226 req->locate.options |= kXR_refresh;
2227 break;
2228 }
2229
2230 case kXR_open:
2231 {
2232 req->locate.options |= kXR_refresh;
2233 break;
2234 }
2235 }
2236 XRootDTransport::SetDescription( pRequest );
2237 XRootDTransport::MarshallRequest( pRequest );
2238 }
2239
2240 //------------------------------------------------------------------------
2241 // If the current thread is a worker thread from our thread-pool
2242 // handle the response, otherwise submit a new task to the thread-pool
2243 //------------------------------------------------------------------------
2244 void XRootDMsgHandler::HandleRspOrQueue()
2245 {
2246 JobManager *jobMgr = pPostMaster->GetJobManager();
2247 if( jobMgr->IsWorker() )
2248 HandleResponse();
2249 else
2250 {
2251 Log *log = DefaultEnv::GetLog();
2252 log->Debug( ExDbgMsg, "[%s] Passing to the thread-pool MsgHandler: %p (message: %s ).",
2253 pUrl.GetHostId().c_str(), this,
2254 pRequest->GetObfuscatedDescription().c_str() );
2255 jobMgr->QueueJob( new HandleRspJob( this ), 0 );
2256 }
2257 }
2258
2259 //------------------------------------------------------------------------
2260 // Notify the FileStateHandler to retry Open() with new URL
2261 //------------------------------------------------------------------------
2262 void XRootDMsgHandler::HandleLocalRedirect( URL *url )
2263 {
2264 Log *log = DefaultEnv::GetLog();
2265 log->Debug( ExDbgMsg, "[%s] Handling local redirect - MsgHandler: %p (message: %s ).",
2266 pUrl.GetHostId().c_str(), this,
2267 pRequest->GetObfuscatedDescription().c_str() );
2268
2269 if( !pLFileHandler )
2270 {
2271 HandleError( XRootDStatus( stFatal, errNotSupported ) );
2272 return;
2273 }
2274
2275 AnyObject *resp = 0;
2276 pLFileHandler->SetHostList( *pHosts );
2277 XRootDStatus st = pLFileHandler->Open( url, pRequest, resp );
2278 if( !st.IsOK() )
2279 {
2280 HandleError( st );
2281 return;
2282 }
2283
2284 pResponseHandler->HandleResponseWithHosts( new XRootDStatus(),
2285 resp,
2286 pHosts.release() );
2287 delete this;
2288
2289 return;
2290 }
2291
2292 //------------------------------------------------------------------------
2293 // Check if it is OK to retry this request
2294 //------------------------------------------------------------------------
2295 bool XRootDMsgHandler::IsRetriable()
2296 {
2297 std::string value;
2298 DefaultEnv::GetEnv()->GetString( "OpenRecovery", value );
2299 if( value == "true" ) return true;
2300
2301 // check if it is a mutable open (open + truncate or open + create)
2302 ClientRequest *req = reinterpret_cast<ClientRequest*>( pRequest->GetBuffer() );
2303 if( req->header.requestid == htons( kXR_open ) )
2304 {
2305 bool _mutable = ( req->open.options & htons( kXR_delete ) ) ||
2306 ( req->open.options & htons( kXR_new ) );
2307
2308 if( _mutable )
2309 {
2310 Log *log = DefaultEnv::GetLog();
2311 log->Debug( XRootDMsg,
2312 "[%s] Not allowed to retry open request (OpenRecovery disabled): %s.",
2313 pUrl.GetHostId().c_str(),
2314 pRequest->GetObfuscatedDescription().c_str() );
2315 // disallow retry if it is a mutable open
2316 return false;
2317 }
2318 }
2319
2320 return true;
2321 }
2322
2323 //------------------------------------------------------------------------
2324 // Check if for given request and Metalink redirector it is OK to omit
2325 // the kXR_wait and proceed straight to the next entry in the Metalink file
2326 //------------------------------------------------------------------------
2327 bool XRootDMsgHandler::OmitWait( Message &request, const URL &url )
2328 {
2329 // we can omit kXR_wait only if we have a Metalink redirector
2330 if( !url.IsMetalink() )
2331 return false;
2332
2333 // we can omit kXR_wait only for requests that can be redirected
2334 // (kXR_read is the only stateful request that can be redirected)
2335 ClientRequest *req = reinterpret_cast<ClientRequest*>( request.GetBuffer() );
2336 if( pStateful && req->header.requestid != kXR_read )
2337 return false;
2338
2339 // we can only omit kXR_wait if the Metalink redirect has more
2340 // replicas
2341 RedirectorRegistry &registry = RedirectorRegistry::Instance();
2342 VirtualRedirector *redirector = registry.Get( url );
2343
2344 // we need more than one server as the current one is not reflected
2345 // in tried CGI
2346 if( redirector->Count( request ) > 1 )
2347 return true;
2348
2349 return false;
2350 }
2351
2352 //------------------------------------------------------------------------
2353 // Checks if the given error returned by server is retriable.
2354 //------------------------------------------------------------------------
2355 bool XRootDMsgHandler::RetriableErrorResponse( const Status &status )
2356 {
2357 // we can only retry error response if we have a valid load-balancer and
2358 // it is not our current URL
2359 if( !( pLoadBalancer.url.IsValid() &&
2360 pUrl.GetLocation() != pLoadBalancer.url.GetLocation() ) )
2361 return false;
2362
2363 // following errors are retriable at any load-balancer
2364 if( status.errNo == kXR_FSError || status.errNo == kXR_IOError ||
2365 status.errNo == kXR_ServerError || status.errNo == kXR_NotFound ||
2366 status.errNo == kXR_Overloaded || status.errNo == kXR_NoMemory )
2367 return true;
2368
2369 // check if the load-balancer is a meta-manager, if yes there are
2370 // more errors that can be recovered
2371 if( !( pLoadBalancer.flags & kXR_attrMeta ) ) return false;
2372
2373 // those errors are retriable for meta-managers
2374 if( status.errNo == kXR_Unsupported || status.errNo == kXR_FileLocked )
2375 return true;
2376
2377 // in case of not-authorized error there is an imposed upper limit
2378 // on how many times we can retry this error
2379 if( status.errNo == kXR_NotAuthorized )
2380 {
2382 DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", limit );
2383 bool ret = pNotAuthorizedCounter < limit;
2384 ++pNotAuthorizedCounter;
2385 if( !ret )
2386 {
2387 Log *log = DefaultEnv::GetLog();
2388 log->Error( XRootDMsg,
2389 "[%s] Reached limit of NotAuthorized retries!",
2390 pUrl.GetHostId().c_str() );
2391 }
2392 return ret;
2393 }
2394
2395 // check if the load-balancer is a virtual (metalink) redirector,
2396 // if yes there are even more errors that can be recovered
2397 if( !( pLoadBalancer.flags & kXR_attrVirtRdr ) ) return false;
2398
2399 // those errors are retriable for virtual (metalink) redirectors
2400 if( status.errNo == kXR_noserver || status.errNo == kXR_ArgTooLong )
2401 return true;
2402
2403 // otherwise it is a non-retriable error
2404 return false;
2405 }
2406
2407 //------------------------------------------------------------------------
2408 // Dump the redirect-trace-back into the log file
2409 //------------------------------------------------------------------------
2410 void XRootDMsgHandler::DumpRedirectTraceBack()
2411 {
2412 if( pRedirectTraceBack.empty() ) return;
2413
2414 std::stringstream sstrm;
2415
2416 sstrm << "Redirect trace-back:\n";
2417
2418 int counter = 0;
2419
2420 auto itr = pRedirectTraceBack.begin();
2421 sstrm << '\t' << counter << ". " << (*itr)->ToString() << '\n';
2422
2423 auto prev = itr;
2424 ++itr;
2425 ++counter;
2426
2427 for( ; itr != pRedirectTraceBack.end(); ++itr, ++prev, ++counter )
2428 sstrm << '\t' << counter << ". "
2429 << (*itr)->ToString( (*prev)->status.IsOK() ) << '\n';
2430
2431 int authlimit = DefaultNotAuthorizedRetryLimit;
2432 DefaultEnv::GetEnv()->GetInt( "NotAuthorizedRetryLimit", authlimit );
2433
2434 bool warn = !pStatus.IsOK() &&
2435 ( pStatus.code == errNotFound ||
2436 pStatus.code == errRedirectLimit ||
2437 ( pStatus.code == errAuthFailed && pNotAuthorizedCounter >= authlimit ) );
2438
2439 Log *log = DefaultEnv::GetLog();
2440 if( warn )
2441 log->Warning( XRootDMsg, sstrm.str().c_str() );
2442 else
2443 log->Debug( XRootDMsg, sstrm.str().c_str() );
2444 }
2445
2446 // Read data from buffer
2447 //------------------------------------------------------------------------
2448 template<typename T>
2449 Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, T& result )
2450 {
2451 if( sizeof( T ) > buflen ) return Status( stError, errDataError );
2452
2453 memcpy(&result, buffer, sizeof(T));
2454
2455 buffer += sizeof( T );
2456 buflen -= sizeof( T );
2457
2458 return Status();
2459 }
2460
2461 //------------------------------------------------------------------------
2462 // Read a string from buffer
2463 //------------------------------------------------------------------------
2464 Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen, std::string &result )
2465 {
2466 Status status;
2467 char c = 0;
2468
2469 while( true )
2470 {
2471 if( !( status = ReadFromBuffer( buffer, buflen, c ) ).IsOK() )
2472 return status;
2473
2474 if( c == 0 ) break;
2475 result += c;
2476 }
2477
2478 return status;
2479 }
2480
2481 //------------------------------------------------------------------------
2482 // Read a string from buffer
2483 //------------------------------------------------------------------------
2484 Status XRootDMsgHandler::ReadFromBuffer( char *&buffer, size_t &buflen,
2485 size_t size, std::string &result )
2486 {
2487 Status status;
2488
2489 if( size > buflen ) return Status( stError, errDataError );
2490
2491 result.append( buffer, size );
2492 buffer += size;
2493 buflen -= size;
2494
2495 return status;
2496 }
2497
2498}
@ kXR_NotAuthorized
@ kXR_NotFound
@ kXR_FileLocked
Definition XProtocol.hh:993
@ kXR_noReplicas
@ kXR_Unsupported
@ kXR_ServerError
@ kXR_Overloaded
@ kXR_ArgTooLong
Definition XProtocol.hh:992
@ kXR_noserver
@ kXR_IOError
Definition XProtocol.hh:997
@ kXR_FSError
Definition XProtocol.hh:995
@ kXR_NoMemory
Definition XProtocol.hh:998
#define kXR_isManager
@ kXR_fattrDel
Definition XProtocol.hh:270
@ kXR_fattrSet
Definition XProtocol.hh:273
@ kXR_fattrList
Definition XProtocol.hh:272
@ kXR_fattrGet
Definition XProtocol.hh:271
struct ClientFattrRequest fattr
Definition XProtocol.hh:854
#define kXR_collapseRedir
ServerResponseStatus status
#define kXR_attrMeta
union ServerResponse::@040373375333017131300127053271011057331004327334 body
kXR_char streamid[2]
Definition XProtocol.hh:156
kXR_char streamid[2]
Definition XProtocol.hh:914
kXR_unt16 options
Definition XProtocol.hh:481
struct ClientDirlistRequest dirlist
Definition XProtocol.hh:852
static const int kXR_ckpXeq
Definition XProtocol.hh:216
@ kXR_delete
Definition XProtocol.hh:453
@ kXR_refresh
Definition XProtocol.hh:459
@ kXR_new
Definition XProtocol.hh:455
@ kXR_retstat
Definition XProtocol.hh:463
struct ClientOpenRequest open
Definition XProtocol.hh:860
@ kXR_waitresp
Definition XProtocol.hh:906
@ kXR_redirect
Definition XProtocol.hh:904
@ kXR_oksofar
Definition XProtocol.hh:900
@ kXR_status
Definition XProtocol.hh:907
@ kXR_ok
Definition XProtocol.hh:899
@ kXR_attn
Definition XProtocol.hh:901
@ kXR_wait
Definition XProtocol.hh:905
@ kXR_error
Definition XProtocol.hh:903
struct ServerResponseBody_Status bdy
struct ClientRequestHdr header
Definition XProtocol.hh:846
#define kXR_recoverWrts
union ServerResponseV2::@207342300141235315373173036347114307032363217365 info
kXR_unt16 requestid
Definition XProtocol.hh:157
@ kXR_read
Definition XProtocol.hh:125
@ kXR_open
Definition XProtocol.hh:122
@ kXR_writev
Definition XProtocol.hh:143
@ kXR_readv
Definition XProtocol.hh:137
@ kXR_mkdir
Definition XProtocol.hh:120
@ kXR_sync
Definition XProtocol.hh:128
@ kXR_chmod
Definition XProtocol.hh:114
@ kXR_dirlist
Definition XProtocol.hh:116
@ kXR_fattr
Definition XProtocol.hh:132
@ kXR_rm
Definition XProtocol.hh:126
@ kXR_query
Definition XProtocol.hh:113
@ kXR_write
Definition XProtocol.hh:131
@ kXR_set
Definition XProtocol.hh:130
@ kXR_rmdir
Definition XProtocol.hh:127
@ kXR_truncate
Definition XProtocol.hh:140
@ kXR_protocol
Definition XProtocol.hh:118
@ kXR_mv
Definition XProtocol.hh:121
@ kXR_ping
Definition XProtocol.hh:123
@ kXR_stat
Definition XProtocol.hh:129
@ kXR_pgread
Definition XProtocol.hh:142
@ kXR_chkpoint
Definition XProtocol.hh:124
@ kXR_locate
Definition XProtocol.hh:139
@ kXR_close
Definition XProtocol.hh:115
@ kXR_pgwrite
Definition XProtocol.hh:138
@ kXR_prepare
Definition XProtocol.hh:133
#define kXR_isServer
#define kXR_attrVirtRdr
struct ClientChkPointRequest chkpoint
Definition XProtocol.hh:849
struct ServerResponseHeader hdr
#define kXR_PROTOCOLVERSION
Definition XProtocol.hh:70
@ kXR_vfs
Definition XProtocol.hh:763
struct ClientStatRequest stat
Definition XProtocol.hh:873
#define kXR_ecRedir
struct ClientLocateRequest locate
Definition XProtocol.hh:856
ServerResponseHeader hdr
long long kXR_int64
Definition XPtypes.hh:98
int kXR_int32
Definition XPtypes.hh:89
unsigned short kXR_unt16
Definition XPtypes.hh:67
unsigned char kXR_char
Definition XPtypes.hh:65
void Get(Type &object)
Retrieve the object being held.
Object for reading out data from the PgRead response.
void AdvanceCursor(uint32_t delta)
Advance the cursor.
char * GetBufferAtCursor()
Get the buffer pointer at the append cursor.
const char * GetBuffer(uint32_t offset=0) const
Get the message buffer.
void SetCursor(uint32_t cursor)
Set the cursor.
uint32_t GetCursor() const
Get append cursor.
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
static bool HasStatInfo(const char *data)
Returns true if data contain stat info.
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
virtual void Run(void *arg)
The job logic.
HandleRspJob(XrdCl::XRootDMsgHandler *handler)
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
static void RewriteCGIAndPath(Message *msg, const URL::ParamsMap &newCgi, bool replace, const std::string &newPath)
Append cgi to the one already present in the message.
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
StreamEvent
Events that may have occurred to the stream.
@ Ready
The stream has become connected.
void CollapseRedirect(const URL &oldurl, const URL &newURL)
Collapse channel URL - replace the URL of the channel.
TaskManager * GetTaskManager()
Get the task manager object user by the post master.
Status QueryTransport(const URL &url, uint16_t query, AnyObject &result)
virtual void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
A network socket.
virtual XRootDStatus Send(const char *buffer, size_t size, int &bytesWritten)
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
URL representation.
Definition XrdClURL.hh:31
std::string GetHostId() const
Get the host part of the URL (user:password@host:port)
Definition XrdClURL.hh:99
bool IsMetalink() const
Is it a URL to a metalink.
Definition XrdClURL.cc:458
const std::string & GetPassword() const
Get the password.
Definition XrdClURL.hh:153
bool FromString(const std::string &url)
Parse a string and fill the URL fields.
Definition XrdClURL.cc:61
void SetPassword(const std::string &password)
Set the password.
Definition XrdClURL.hh:161
void SetParams(const std::string &params)
Set params.
Definition XrdClURL.cc:395
const std::string & GetUserName() const
Get the username.
Definition XrdClURL.hh:135
std::string GetURL() const
Get the URL.
Definition XrdClURL.hh:86
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
bool IsLocalFile() const
Definition XrdClURL.cc:467
void SetProtocol(const std::string &protocol)
Set protocol.
Definition XrdClURL.hh:126
const ParamsMap & GetParams() const
Get the URL params.
Definition XrdClURL.hh:244
const std::string & GetProtocol() const
Get the protocol.
Definition XrdClURL.hh:118
bool IsValid() const
Is the url valid.
Definition XrdClURL.cc:445
void SetUserName(const std::string &userName)
Set the username.
Definition XrdClURL.hh:143
static void splitString(Container &result, const std::string &input, const std::string &delimiter)
Split a string.
Definition XrdClUtils.hh:56
static bool CheckEC(const Message *req, const URL &url)
Check if this client can support given EC redirect.
Handle/Process/Forward XRootD messages.
virtual uint16_t Examine(std::shared_ptr< Message > &msg)
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten)
const Message * GetRequest() const
Get the request pointer.
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status)
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead)
virtual void OnStatusReady(const Message *message, XRootDStatus status)
The requested action has been performed and the status is available.
virtual bool IsRaw() const
Are we a raw writer or not?
virtual void Process()
Process the message if it was "taken" by the examine action.
virtual uint16_t InspectStatusRsp()
virtual uint16_t GetSid() const
const std::string & GetErrorMessage() const
Get error message.
static XRootDStatus UnMarshallBody(Message *msg, uint16_t reqType)
Unmarshall the body of the incoming message.
static XRootDStatus UnMarshallRequest(Message *msg)
static XRootDStatus UnMarshalStatusBody(Message &msg, uint16_t reqType)
Unmarshall the body of the status response.
static XRootDStatus MarshallRequest(Message *msg)
Marshal the outgoing message.
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
const uint16_t suRetry
const uint16_t errRedirectLimit
const int DefaultMaxMetalinkWait
const uint16_t errErrorResponse
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint16_t errNotFound
const uint64_t XRootDMsg
std::vector< HostInfo > HostList
const uint16_t errDataError
data is corrupted
const uint16_t errInternal
Internal error.
const uint16_t stOK
Everything went OK.
const uint64_t ExDbgMsg
const uint16_t errInvalidResponse
const uint16_t errInvalidRedirectURL
Buffer BinaryDataInfo
Binary buffer.
const uint16_t errOperationInterrupted
const uint16_t suContinue
const int DefaultNotAuthorizedRetryLimit
const uint16_t errRedirect
const uint16_t errAuthFailed
const uint16_t errInvalidMessage
none object for initializing empty Optional
XrdSysError Log
Definition XrdConfig.cc:112
@ kXR_PartialResult
static const int PageSize
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
Describe a data chunk for vector read.
void * buffer
length of the chunk
uint32_t length
offset in the file
URL url
URL of the host.
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.
static const uint16_t ServerFlags
returns server flags
static const uint16_t ProtocolVersion
returns the protocol version