9#include "XrdVersion.hh"
37uint64_t TPCHandler::m_monid{0};
38int TPCHandler::m_marker_period = 5;
39size_t TPCHandler::m_block_size = 16*1024*1024;
40size_t TPCHandler::m_small_block_size = 1*1024*1024;
49TPCHandler::TPCLogRecord::~TPCLogRecord()
56 monInfo.
clID = clID.c_str();
58 gettimeofday(&monInfo.
endT, 0);
60 if (log_prefix ==
"PullRequest")
61 {monInfo.
dstURL = local.c_str();
62 monInfo.
srcURL = remote.c_str();
64 monInfo.
dstURL = remote.c_str();
65 monInfo.
srcURL = local.c_str();
69 if (!status) monInfo.
endRC = 0;
70 else if (tpc_status > 0) monInfo.
endRC = tpc_status;
71 else monInfo.
endRC = 1;
72 monInfo.
strm =
static_cast<unsigned char>(streams);
73 monInfo.
fSize = (bytes_transferred < 0 ? 0 : bytes_transferred);
76 tpcMonitor->
Report(monInfo);
86 if (curl) curl_easy_cleanup(curl);
101int TPCHandler::sockopt_callback(
void *clientp, curl_socket_t curlfd, curlsocktype purpose) {
102 TPCLogRecord * rec = (TPCLogRecord *)clientp;
103 if (purpose == CURLSOCKTYPE_IPCXN && rec && rec->pmarkManager.isEnabled()) {
106 return CURL_SOCKOPT_ALREADY_CONNECTED;
108 return CURL_SOCKOPT_OK;
120int TPCHandler::opensocket_callback(
void *clientp,
121 curlsocktype purpose,
122 struct curl_sockaddr *aInfo)
125 int fd = XrdSysFD_Socket(aInfo->family, aInfo->socktype, aInfo->protocol);
129 return CURL_SOCKET_BAD;
131 TPCLogRecord * rec = (TPCLogRecord *)clientp;
132 if (purpose == CURLSOCKTYPE_IPCXN && clientp)
135 && !thePeer.isMapped());
136 std::stringstream connectErrMsg;
138 if(!rec->pmarkManager.connect(fd, &(aInfo->addr), aInfo->addrlen, CONNECT_TIMEOUT, connectErrMsg)) {
139 rec->m_log->Emsg(rec->log_prefix.c_str(),
"Unable to connect socket:", connectErrMsg.str().c_str());
140 return CURL_SOCKET_BAD;
147int TPCHandler::closesocket_callback(
void *clientp, curl_socket_t fd) {
148 TPCLogRecord * rec = (TPCLogRecord *)clientp;
153 rec->pmarkManager.endPmark(fd);
173std::string TPCHandler::prepareURL(
XrdHttpExtReq &req,
bool & hasSetOpaque) {
179 return prepareURL(req,foundHeader);
193 std::stringstream parser(opaque);
194 std::string sequence;
195 std::stringstream output;
197 while (
getline(parser, sequence,
'&')) {
198 if (sequence.empty()) {
continue;}
199 size_t equal_pos = sequence.find(
'=');
201 if (equal_pos != std::string::npos)
202 val = curl_easy_escape(curl, sequence.c_str() + equal_pos + 1, sequence.size() - equal_pos - 1);
204 if (!val && equal_pos != std::string::npos) {
continue;}
206 if (!first) output <<
"&";
208 output << sequence.substr(0, equal_pos);
210 output <<
"=" << val;
222TPCHandler::ConfigureCurlCA(
CURL *curl)
224 auto ca_filename = m_ca_file ? m_ca_file->
CAFilename() :
"";
225 auto crl_filename = m_ca_file ? m_ca_file->
CRLFilename() :
"";
226 if (!ca_filename.empty() && !crl_filename.empty()) {
227 curl_easy_setopt(curl, CURLOPT_CAINFO, ca_filename.c_str());
231 std::ifstream in(crl_filename, std::ifstream::ate | std::ifstream::binary);
233 curl_easy_setopt(curl, CURLOPT_CRLFILE, crl_filename.c_str());
235 std::ostringstream oss;
236 oss <<
"No valid CRL file has been found in the file " << crl_filename <<
". Disabling CRL checking.";
237 m_log.
Log(
Warning,
"TpcHandler",oss.str().c_str());
240 else if (!m_cadir.empty()) {
241 curl_easy_setopt(curl, CURLOPT_CAPATH, m_cadir.c_str());
243 if (!m_cafile.empty()) {
244 curl_easy_setopt(curl, CURLOPT_CAINFO, m_cafile.c_str());
250 return !strcmp(verb,
"COPY") || !strcmp(verb,
"OPTIONS");
258 if (!strncmp(input.c_str(),
"davs://", 7)) {
259 return "https://" + input.substr(7);
269 if (req.
verb ==
"OPTIONS") {
270 return ProcessOptionsReq(req);
273 if (header != req.
headers.end()) {
274 if (header->second !=
"none") {
275 m_log.
Emsg(
"ProcessReq",
"COPY requested an unsupported credential type: ", header->second.c_str());
276 return req.
SendSimpleResp(400, NULL, NULL,
"COPY requestd an unsupported Credential type", 0);
280 if (header != req.
headers.end()) {
282 return ProcessPullReq(src, req);
285 if (header != req.
headers.end()) {
286 return ProcessPushReq(header->second, req);
288 m_log.
Emsg(
"ProcessReq",
"COPY verb requested but no source or destination specified.");
289 return req.
SendSimpleResp(400, NULL, NULL,
"No Source or Destination specified", 0);
306 m_fixed_route(false),
308 m_first_timeout(120),
309 m_log(log->logger(),
"TPC_"),
312 if (!Configure(config, myEnv)) {
313 throw std::runtime_error(
"Failed to configure the HTTP third-party-copy handler.");
331 return req.
SendSimpleResp(200, NULL, (
char *)
"DAV: 1\r\nDAV: <http://apache.org/dav/propset/fs/1>\r\nAllow: HEAD,GET,PUT,PROPFIND,DELETE,OPTIONS,COPY", NULL, 0);
341 if (authz_header != req.
headers.end()) {
342 char * quoted_url =
quote(authz_header->second.c_str());
343 std::stringstream ss;
344 ss <<
"authz=" << quoted_url;
355int TPCHandler::RedirectTransfer(
CURL *curl,
const std::string &redirect_resource,
360 if ((ptr == NULL) || (*ptr ==
'\0') || (port == 0)) {
362 std::stringstream ss;
363 ss <<
"Internal error: redirect without hostname";
364 logTransferEvent(
LogMask::Error, rec,
"REDIRECT_INTERNAL_ERROR", ss.str());
365 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
369 std::string rdr_info = ptr;
370 std::string host, opaque;
371 size_t pos = rdr_info.find(
'?');
372 host = rdr_info.substr(0, pos);
374 if (pos != std::string::npos) {
375 opaque = rdr_info.substr(pos + 1);
378 std::stringstream ss;
379 ss <<
"Location: http" << (m_desthttps ?
"s" :
"") <<
"://" << host <<
":" << port <<
"/" << redirect_resource;
381 if (!opaque.empty()) {
387 return req.
SendSimpleResp(rec.status, NULL,
const_cast<char *
>(ss.str().c_str()),
395int TPCHandler::OpenWaitStall(
XrdSfsFile &fh,
const std::string &resource,
397 const std::string &authz)
404 size_t pos = resource.find(
'?');
406 std::string path = resource.substr(0, pos);
408 if (pos != std::string::npos) {
409 opaque = resource.substr(pos + 1);
414 opaque += (opaque.empty() ?
"" :
"&");
417 open_result = fh.
open(path.c_str(), mode, openMode, &sec, opaque.c_str());
421 if (open_result ==
SFS_STARTED) {secs_to_stall = secs_to_stall/2 + 5;}
422 std::this_thread::sleep_for (std::chrono::seconds(secs_to_stall));
442 bool &success, TPCLogRecord &rec,
bool shouldReturnErrorToClient) {
444 curl_easy_setopt(curl, CURLOPT_NOBODY, 1);
446 res = curl_easy_perform(curl);
449 curl_easy_setopt(curl, CURLOPT_NOBODY, 0);
450 if (res == CURLE_HTTP_RETURNED_ERROR) {
451 std::stringstream ss;
452 ss <<
"Remote server failed request";
453 std::stringstream ss2;
454 ss2 << ss.str() <<
": " << curl_easy_strerror(res);
457 return shouldReturnErrorToClient ? req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec, res).c_str(), 0) : -1;
459 std::stringstream ss;
463 return shouldReturnErrorToClient ? req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0) : -1;
465 std::stringstream ss;
466 ss <<
"Internal transfer failure";
467 std::stringstream ss2;
468 ss2 << ss.str() <<
" - HTTP library failed: " << curl_easy_strerror(res);
471 return shouldReturnErrorToClient ? req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec, res).c_str(), 0) : -1;
473 std::stringstream ss;
474 ss <<
"Successfully determined remote size for pull request: "
481int TPCHandler::GetContentLengthTPCPull(
CURL *curl,
XrdHttpExtReq &req, uint64_t &contentLength,
bool & success, TPCLogRecord &rec) {
488 if ((result = DetermineXferSize(curl, req, state, success, rec,
false)) || !success) {
501 std::stringstream ss;
502 const std::string crlf =
"\n";
503 ss <<
"Perf Marker" << crlf;
504 ss <<
"Timestamp: " << time(NULL) << crlf;
505 ss <<
"Stripe Index: 0" << crlf;
507 ss <<
"Total Stripe Count: 1" << crlf;
512 ss <<
"RemoteConnections: " << desc << crlf;
517 return req.
ChunkResp(ss.str().c_str(), 0);
525int TPCHandler::SendPerfMarker(
XrdHttpExtReq &req, TPCLogRecord &rec, std::vector<State*> &state,
526 off_t bytes_transferred)
540 std::stringstream ss;
541 const std::string crlf =
"\n";
542 ss <<
"Perf Marker" << crlf;
543 ss <<
"Timestamp: " << time(NULL) << crlf;
544 ss <<
"Stripe Index: 0" << crlf;
545 ss <<
"Stripe Bytes Transferred: " << bytes_transferred << crlf;
546 ss <<
"Total Stripe Count: 1" << crlf;
550 std::stringstream ss2;
551 for (std::vector<State*>::const_iterator iter = state.begin();
552 iter != state.end(); iter++)
554 std::string desc = (*iter)->GetConnectionDescription();
556 ss2 << (first ?
"" :
",") << desc;
561 ss <<
"RemoteConnections: " << ss2.str() << crlf;
563 rec.bytes_transferred = bytes_transferred;
566 return req.
ChunkResp(ss.str().c_str(), 0);
578 CURLM *multi_handle = curl_multi_init();
582 "Failed to initialize a libcurl multi-handle");
583 std::stringstream ss;
584 ss <<
"Failed to initialize internal server memory";
585 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
591 mres = curl_multi_add_handle(multi_handle, curl);
594 std::stringstream ss;
595 ss <<
"Failed to add transfer to libcurl multi-handle: HTTP library failure=" << curl_multi_strerror(mres);
596 logTransferEvent(
LogMask::Error, rec,
"CURL_INIT_FAIL", ss.str());
597 curl_multi_cleanup(multi_handle);
598 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
602 int retval = req.
StartChunkedResp(201,
"Created",
"Content-Type: text/plain");
604 curl_multi_cleanup(multi_handle);
606 "Failed to send the initial response to the TPC client");
610 "Initial transfer response sent to the TPC client");
615 int running_handles = 1;
616 time_t last_marker = 0;
618 off_t last_advance_bytes = 0;
619 time_t last_advance_time = time(NULL);
620 time_t transfer_start = last_advance_time;
621 CURLcode res =
static_cast<CURLcode
>(-1);
623 time_t now = time(NULL);
624 time_t next_marker = last_marker + m_marker_period;
625 if (now >= next_marker) {
627 if (bytes_xfer > last_advance_bytes) {
628 last_advance_bytes = bytes_xfer;
629 last_advance_time = now;
631 if (SendPerfMarker(req, rec, state)) {
632 curl_multi_remove_handle(multi_handle, curl);
633 curl_multi_cleanup(multi_handle);
635 "Failed to send a perf marker to the TPC client");
638 int timeout = (transfer_start == last_advance_time) ? m_first_timeout : m_timeout;
639 if (now > last_advance_time + timeout) {
640 const char *log_prefix = rec.log_prefix.c_str();
641 bool tpc_pull = strncmp(
"Pull", log_prefix, 4) == 0;
644 std::stringstream ss;
645 ss <<
"Transfer failed because no bytes have been "
646 << (tpc_pull ?
"received from the source (pull mode) in "
647 :
"transmitted to the destination (push mode) in ") << timeout <<
" seconds.";
649 curl_multi_remove_handle(multi_handle, curl);
650 curl_multi_cleanup(multi_handle);
656 rec.pmarkManager.startTransfer();
657 mres = curl_multi_perform(multi_handle, &running_handles);
658 if (mres == CURLM_CALL_MULTI_PERFORM) {
662 }
else if (mres != CURLM_OK) {
664 }
else if (running_handles == 0) {
668 rec.pmarkManager.beginPMarks();
675 msg = curl_multi_info_read(multi_handle, &msgq);
676 if (msg && (msg->msg == CURLMSG_DONE)) {
677 CURL *easy_handle = msg->easy_handle;
678 res = msg->data.result;
679 curl_multi_remove_handle(multi_handle, easy_handle);
683 int64_t max_sleep_time = next_marker - time(NULL);
684 if (max_sleep_time <= 0) {
688#ifdef HAVE_CURL_MULTI_WAIT
689 mres = curl_multi_wait(multi_handle, NULL, 0, max_sleep_time*1000, &fd_count);
693 if (mres != CURLM_OK) {
696 }
while (running_handles);
698 if (mres != CURLM_OK) {
699 std::stringstream ss;
700 ss <<
"Internal libcurl multi-handle error: HTTP library failure=" << curl_multi_strerror(mres);
701 logTransferEvent(
LogMask::Error, rec,
"TRANSFER_CURL_ERROR", ss.str());
703 curl_multi_remove_handle(multi_handle, curl);
704 curl_multi_cleanup(multi_handle);
706 if ((retval = req.
ChunkResp(generateClientErr(ss, rec).c_str(), 0))) {
708 "Failed to send error message to the TPC client");
718 msg = curl_multi_info_read(multi_handle, &msgq);
719 if (msg && (msg->msg == CURLMSG_DONE)) {
720 CURL *easy_handle = msg->easy_handle;
721 res = msg->data.result;
722 curl_multi_remove_handle(multi_handle, easy_handle);
726 if (!state.
GetErrorCode() && res ==
static_cast<CURLcode
>(-1)) {
727 curl_multi_remove_handle(multi_handle, curl);
728 curl_multi_cleanup(multi_handle);
729 std::stringstream ss;
730 ss <<
"Internal state error in libcurl";
731 logTransferEvent(
LogMask::Error, rec,
"TRANSFER_CURL_ERROR", ss.str());
733 if ((retval = req.
ChunkResp(generateClientErr(ss, rec).c_str(), 0))) {
735 "Failed to send error message to the TPC client");
740 curl_multi_cleanup(multi_handle);
754 std::stringstream ss;
755 bool success =
false;
758 std::stringstream ss2;
759 ss2 <<
"Remote side failed with status code " << state.
GetStatusCode();
761 std::replace(err.begin(), err.end(),
'\n',
' ');
762 ss2 <<
"; error message: \"" << err <<
"\"";
764 logTransferEvent(
LogMask::Error, rec,
"TRANSFER_FAIL", ss2.str());
765 ss << generateClientErr(ss2, rec);
768 if (err.empty()) {err =
"(no error message provided)";}
769 else {std::replace(err.begin(), err.end(),
'\n',
' ');}
770 std::stringstream ss2;
771 ss2 <<
"Error when interacting with local filesystem: " << err;
772 logTransferEvent(
LogMask::Error, rec,
"TRANSFER_FAIL", ss2.str());
773 ss << generateClientErr(ss2, rec);
774 }
else if (res != CURLE_OK) {
775 std::stringstream ss2;
776 ss2 <<
"Internal transfer failure";
777 std::stringstream ss3;
778 ss3 << ss2.str() <<
": " << curl_easy_strerror(res);
779 logTransferEvent(
LogMask::Error, rec,
"TRANSFER_FAIL", ss3.str());
780 ss << generateClientErr(ss2, rec, res);
782 ss <<
"success: Created";
786 if ((retval = req.
ChunkResp(ss.str().c_str(), 0))) {
788 "Failed to send last update to remote client");
790 }
else if (success) {
805 const char *log_prefix = rec.log_prefix.c_str();
807 res = curl_easy_perform(curl);
812 if (err.empty()) {err =
"(no error message provided)";}
813 else {std::replace(err.begin(), err.end(),
'\n',
' ');}
814 std::stringstream ss2;
815 ss2 <<
"Error when interacting with local filesystem: " << err;
816 logTransferEvent(
LogMask::Error, rec,
"TRANSFER_FAIL", ss2.str());
817 ss <<
"failure: " << ss2.str();
818 }
else if (res == CURLE_HTTP_RETURNED_ERROR) {
819 m_log.
Emsg(log_prefix,
"Remote server failed request", curl_easy_strerror(res));
821 const_cast<char *
>(curl_easy_strerror(res)), 0);
823 std::stringstream ss;
824 ss <<
"Remote side failed with status code " << state.
GetStatusCode();
825 m_log.
Emsg(log_prefix,
"Remote server failed request", ss.str().c_str());
827 const_cast<char *
>(ss.str().c_str()), 0);
829 m_log.
Emsg(log_prefix,
"Curl failed", curl_easy_strerror(res));
830 char msg[] =
"Unknown internal transfer failure";
833 char msg[] =
"Created";
844int TPCHandler::ProcessPushReq(
const std::string & resource,
XrdHttpExtReq &req) {
845 TPCLogRecord rec(req);
846 rec.log_prefix =
"PushRequest";
848 rec.remote = resource;
852 if (name) rec.name = name;
853 logTransferEvent(
LogMask::Info, rec,
"PUSH_START",
"Starting a push request");
856 auto curl = curlPtr.get();
858 std::stringstream ss;
859 ss <<
"Failed to initialize internal transfer resources";
862 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
864 curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
865 curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (
long) CURL_HTTP_VERSION_1_1);
868 curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, opensocket_callback);
869 curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, &rec);
870 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, closesocket_callback);
871 curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
872 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, &rec);
873 curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
875 std::string redirect_resource = req.
resource;
876 if (query_header != req.
headers.end()) {
877 redirect_resource = query_header->second;
881 uint64_t file_monid =
AtomicInc(m_monid);
883 std::unique_ptr<XrdSfsFile> fh(m_sfs->
newFile(name, file_monid));
886 std::stringstream ss;
887 ss <<
"Failed to initialize internal transfer file handle";
890 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
892 std::string full_url = prepareURL(req);
894 std::string authz = GetAuthz(req);
896 int open_results = OpenWaitStall(*fh, full_url,
SFS_O_RDONLY, 0644,
899 int result = RedirectTransfer(curl, redirect_resource, req, fh->
error, rec);
901 }
else if (
SFS_OK != open_results) {
903 std::stringstream ss;
905 if (msg == NULL) ss <<
"Failed to open local resource";
908 if (code == EACCES) rec.status = 401;
909 else if (code == EEXIST) rec.status = 412;
911 int resp_result = req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
915 ConfigureCurlCA(curl);
916 curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
918 Stream stream(std::move(fh), 0, 0, m_log);
923 return RunCurlWithUpdates(curl, req, state, rec);
925 return RunCurlBasic(curl, req, state, rec);
933int TPCHandler::ProcessPullReq(
const std::string &resource,
XrdHttpExtReq &req) {
934 TPCLogRecord rec(req);
935 rec.log_prefix =
"PullRequest";
937 rec.remote = resource;
941 if (name) rec.name = name;
942 logTransferEvent(
LogMask::Info, rec,
"PULL_START",
"Starting a pull request");
945 auto curl = curlPtr.get();
947 std::stringstream ss;
948 ss <<
"Failed to initialize internal transfer resources";
951 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
968 std::string host_used;
969 if (host_header != req.
headers.end()) {
970 host_used = host_header->second;
976 ip = (
char *)malloc(ip_size-1);
979 memcpy(ip, buff+1, ip_size-2);
983 curl_easy_setopt(curl, CURLOPT_INTERFACE, ip);
985 curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1);
986 curl_easy_setopt(curl, CURLOPT_HTTP_VERSION, (
long) CURL_HTTP_VERSION_1_1);
988 curl_easy_setopt(curl, CURLOPT_OPENSOCKETFUNCTION, opensocket_callback);
989 curl_easy_setopt(curl, CURLOPT_OPENSOCKETDATA, &rec);
990 curl_easy_setopt(curl, CURLOPT_SOCKOPTFUNCTION, sockopt_callback);
991 curl_easy_setopt(curl, CURLOPT_SOCKOPTDATA , &rec);
992 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETFUNCTION, closesocket_callback);
993 curl_easy_setopt(curl, CURLOPT_CLOSESOCKETDATA, &rec);
994 curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, CONNECT_TIMEOUT);
995 std::unique_ptr<XrdSfsFile> fh(m_sfs->
newFile(name, m_monid++));
997 std::stringstream ss;
998 ss <<
"Failed to initialize internal transfer file handle";
1001 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1004 std::string redirect_resource = req.
resource;
1005 if (query_header != req.
headers.end()) {
1006 redirect_resource = query_header->second;
1010 if ((overwrite_header == req.
headers.end()) || (overwrite_header->second ==
"T")) {
1016 if (streams_header != req.
headers.end()) {
1017 int stream_req = -1;
1019 stream_req = std::stol(streams_header->second);
1022 if (stream_req < 0 || stream_req > 100) {
1023 std::stringstream ss;
1024 ss <<
"Invalid request for number of streams";
1026 logTransferEvent(
LogMask::Info, rec,
"INVALID_REQUEST", ss.str());
1027 return req.
SendSimpleResp(rec.status, NULL, NULL, generateClientErr(ss, rec).c_str(), 0);
1029 streams = stream_req == 0 ? 1 : stream_req;
1032 rec.streams = streams;
1033 bool hasSetOpaque =
false;
1034 std::string full_url = prepareURL(req, hasSetOpaque);
1035 std::string authz = GetAuthz(req);
1036 curl_easy_setopt(curl, CURLOPT_URL, resource.c_str());
1037 ConfigureCurlCA(curl);
1038#ifdef XRD_CHUNK_RESP
1042 uint64_t sourceFileContentLength = 0;
1044 GetContentLengthTPCPull(curl, req, sourceFileContentLength, success, rec);
1048 full_url += hasSetOpaque ?
"&" :
"?";
1049 full_url +=
"oss.asize=" + std::to_string(sourceFileContentLength);
1053 int open_result = OpenWaitStall(*fh, full_url, mode|
SFS_O_WRONLY,
1057 int result = RedirectTransfer(curl, redirect_resource, req, fh->
error, rec);
1059 }
else if (
SFS_OK != open_result) {
1061 std::stringstream ss;
1063 if ((msg == NULL) || (*msg ==
'\0')) ss <<
"Failed to open local resource";
1066 if (code == EACCES) rec.status = 401;
1067 else if (code == EEXIST) rec.status = 412;
1070 generateClientErr(ss, rec).c_str(), 0);
1074 Stream stream(std::move(fh), streams * m_pipelining_multiplier, streams > 1 ? m_block_size : m_small_block_size, m_log);
1078#ifdef XRD_CHUNK_RESP
1080 return RunCurlWithStreams(req, state, streams, rec);
1082 return RunCurlWithUpdates(curl, req, state, rec);
1085 return RunCurlBasic(curl, req, state, rec);
1093void TPCHandler::logTransferEvent(
LogMask mask,
const TPCLogRecord &rec,
1094 const std::string &event,
const std::string &message)
1098 std::stringstream ss;
1099 ss <<
"event=" <<
event <<
", local=" << rec.local <<
", remote=" << rec.remote;
1100 if (rec.name.empty())
1101 ss <<
", user=(anonymous)";
1103 ss <<
", user=" << rec.name;
1104 if (rec.streams != 1)
1105 ss <<
", streams=" << rec.streams;
1106 if (rec.bytes_transferred >= 0)
1107 ss <<
", bytes_transferred=" << rec.bytes_transferred;
1108 if (rec.status >= 0)
1109 ss <<
", status=" << rec.status;
1110 if (rec.tpc_status >= 0)
1111 ss <<
", tpc_status=" << rec.tpc_status;
1112 if (!message.empty())
1113 ss <<
"; " << message;
1114 m_log.
Log(mask, rec.log_prefix.c_str(), ss.str().c_str());
1117std::string TPCHandler::generateClientErr(std::stringstream &err_ss,
const TPCLogRecord &rec, CURLcode cCode) {
1118 std::stringstream ssret;
1119 ssret <<
"failure: " << err_ss.str() <<
", local=" << rec.local <<
", remote=" << rec.remote;
1120 if(cCode != CURLcode::CURLE_OK) {
1121 ssret <<
", HTTP library failure=" << curl_easy_strerror(cCode);
1132 if (curl_global_init(CURL_GLOBAL_DEFAULT)) {
1133 log->
Emsg(
"TPCInitialize",
"libcurl failed to initialize");
1139 log->
Emsg(
"TPCInitialize",
"TPC handler requires a config filename in order to load");
1143 log->
Emsg(
"TPCInitialize",
"Will load configuration for the TPC handler from", config);
1145 }
catch (std::runtime_error &re) {
1146 log->
Emsg(
"TPCInitialize",
"Encountered a runtime failure when loading ", re.what());
XrdHttpExtHandler * XrdHttpGetExtHandler(XrdHttpExtHandlerArgs)
char * quote(const char *str)
void getline(uchar *buff, int blen)
CURLMcode curl_multi_wait_impl(CURLM *multi_handle, int timeout_ms, int *numfds)
static std::string PrepareURL(const std::string &input)
XrdVERSIONINFO(XrdHttpGetExtHandler, HttpTPC)
std::string encode_xrootd_opaque_to_uri(CURL *curl, const std::string &opaque)
XrdHttpExtHandler * XrdHttpGetExtHandler(XrdSysError *log, const char *config, const char *, XrdOucEnv *myEnv)
int GetStatusCode() const
off_t BytesTransferred() const
void CopyHeaders(XrdHttpExtReq &req)
void SetErrorMessage(const std::string &error_msg)
std::string GetErrorMessage() const
std::string GetConnectionDescription()
off_t GetContentLength() const
void SetErrorCode(int error_code)
TPCHandler(XrdSysError *log, const char *config, XrdOucEnv *myEnv)
virtual int ProcessReq(XrdHttpExtReq &req)
virtual bool MatchesPath(const char *verb, const char *path)
Tells if the incoming path is recognized as one of the paths that have to be processed.
int ChunkResp(const char *body, long long bodylen)
Send a (potentially partial) body in a chunked response; invoking with NULL body.
void GetClientID(std::string &clid)
std::map< std::string, std::string > & headers
int StartChunkedResp(int code, const char *desc, const char *header_to_add)
Starts a chunked response; body of request is sent over multiple parts using the SendChunkResp.
const XrdSecEntity & GetSecEntity() const
int SendSimpleResp(int code, const char *desc, const char *header_to_add, const char *body, long long bodylen)
Sends a basic response. If the length is < 0 then it is calculated internally.
static const int noPort
Do not add port number.
int Format(char *bAddr, int bLen, fmtUse fmtType=fmtAuto, int fmtOpts=0)
@ fmtAddr
Address using suitable ipv4 or ipv6 format.
static const char * GetAddrs(const char *hSpec, XrdNetAddr *aListP[], int &aListN, AddrOpts opts=allIPMap, int pNum=PortInSpec)
void * GetPtr(const char *varname)
const char * getErrText()
void setUCap(int ucval)
Set user capabilties.
static std::map< std::string, T >::const_iterator caseInsensitiveFind(const std::map< std::string, T > &m, const std::string &lowerCaseSearchKey)
char * name
Entity's name.
virtual XrdSfsFile * newFile(char *user=0, int MonID=0)=0
virtual int open(const char *fileName, XrdSfsFileOpenMode openMode, mode_t createMode, const XrdSecEntity *client=0, const char *opaque=0)=0
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
XrdSysLogger * logger(XrdSysLogger *lp=0)
void Log(int mask, const char *esfx, const char *text1, const char *text2=0, const char *text3=0)
bool atLeastOneValidCRLFound() const
std::string CAFilename() const
std::string CRLFilename() const
static std::string prepareOpenURL(const std::string &reqResource, std::map< std::string, std::string > &reqHeaders, const std::map< std::string, std::string > &hdr2cgimap, bool &hasSetOpaque)
void Report(TpcInfo &info)
std::unique_ptr< CURL, CurlDeleter > ManagedCurlHandle
void operator()(CURL *curl)
static const int uIPv64
ucap: Supports only IPv4 info