XRootD
Loading...
Searching...
No Matches
XrdPfcFile.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel, Matevz Tadel
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19
20#include "XrdPfcFile.hh"
21#include "XrdPfcIO.hh"
22#include "XrdPfcTrace.hh"
23#include <cstdio>
24#include <sstream>
25#include <fcntl.h>
26#include <assert.h>
27#include "XrdCl/XrdClLog.hh"
29#include "XrdCl/XrdClFile.hh"
31#include "XrdSys/XrdSysTimer.hh"
32#include "XrdOss/XrdOss.hh"
33#include "XrdOuc/XrdOucEnv.hh"
35#include "XrdPfc.hh"
36
37
38using namespace XrdPfc;
39
40namespace
41{
42
43const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
44
45Cache* cache() { return &Cache::GetInstance(); }
46
47}
48
49const char *File::m_traceID = "File";
50
51//------------------------------------------------------------------------------
52
53File::File(const std::string& path, long long iOffset, long long iFileSize) :
54 m_ref_cnt(0),
55 m_data_file(0),
56 m_info_file(0),
57 m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
58 m_filename(path),
59 m_offset(iOffset),
60 m_file_size(iFileSize),
61 m_current_io(m_io_set.end()),
62 m_ios_in_detach(0),
63 m_non_flushed_cnt(0),
64 m_in_sync(false),
65 m_detach_time_logged(false),
66 m_in_shutdown(false),
67 m_state_cond(0),
68 m_block_size(0),
69 m_num_blocks(0),
70 m_prefetch_state(kOff),
71 m_prefetch_read_cnt(0),
72 m_prefetch_hit_cnt(0),
73 m_prefetch_score(0)
74{}
75
76File::~File()
77{
78 if (m_info_file)
79 {
80 TRACEF(Debug, "~File() close info ");
81 m_info_file->Close();
82 delete m_info_file;
83 m_info_file = NULL;
84 }
85
86 if (m_data_file)
87 {
88 TRACEF(Debug, "~File() close output ");
89 m_data_file->Close();
90 delete m_data_file;
91 m_data_file = NULL;
92 }
93
94 TRACEF(Debug, "~File() ended, prefetch score = " << m_prefetch_score);
95}
96
97//------------------------------------------------------------------------------
98
99File* File::FileOpen(const std::string &path, long long offset, long long fileSize)
100{
101 File *file = new File(path, offset, fileSize);
102 if ( ! file->Open())
103 {
104 delete file;
105 file = 0;
106 }
107 return file;
108}
109
110//------------------------------------------------------------------------------
111
113{
114 // Called from Cache::Unlink() when the file is currently open.
115 // Cache::Unlink is also called on FSync error and when wrong number of bytes
116 // is received from a remote read.
117 //
118 // From this point onward the file will not be written to, cinfo file will
119 // not be updated, and all new read requests will return -ENOENT.
120 //
121 // File's entry in the Cache's active map is set to nullptr and will be
122 // removed from there shortly, in any case, well before this File object
123 // shuts down. So we do not communicate to Cache about our destruction when
124 // it happens.
125
126 {
127 XrdSysCondVarHelper _lck(m_state_cond);
128
129 m_in_shutdown = true;
130
131 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
132 {
133 m_prefetch_state = kStopped;
134 cache()->DeRegisterPrefetchFile(this);
135 }
136 }
137
138}
139
140//------------------------------------------------------------------------------
141
143{
144 // Not locked, only used from Cache / Purge thread.
145
146 Stats delta = m_last_stats;
147
148 m_last_stats = m_stats.Clone();
149
150 delta.DeltaToReference(m_last_stats);
151
152 return delta;
153}
154
155//------------------------------------------------------------------------------
156
158{
159 TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
160
161 XrdSysCondVarHelper _lck(m_state_cond);
162 dec_ref_count(b);
163}
164
165void File::BlocksRemovedFromWriteQ(std::list<Block*>& blocks)
166{
167 TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
168
169 XrdSysCondVarHelper _lck(m_state_cond);
170
171 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
172 {
173 dec_ref_count(*i);
174 }
175}
176
177//------------------------------------------------------------------------------
178
180{
181 std::string loc(io->GetLocation());
182 XrdSysCondVarHelper _lck(m_state_cond);
183 insert_remote_location(loc);
184}
185
186//------------------------------------------------------------------------------
187
189{
190 // Returns true if delay is needed.
191
192 TRACEF(Debug, "ioActive start for io " << io);
193
194 std::string loc(io->GetLocation());
195
196 {
197 XrdSysCondVarHelper _lck(m_state_cond);
198
199 IoSet_i mi = m_io_set.find(io);
200
201 if (mi != m_io_set.end())
202 {
203 unsigned int n_active_reads = io->m_active_read_reqs;
204
205 TRACE(Info, "ioActive for io " << io <<
206 ", active_reads " << n_active_reads <<
207 ", active_prefetches " << io->m_active_prefetches <<
208 ", allow_prefetching " << io->m_allow_prefetching <<
209 ", ios_in_detach " << m_ios_in_detach);
210 TRACEF(Info,
211 "\tio_map.size() " << m_io_set.size() <<
212 ", block_map.size() " << m_block_map.size() << ", file");
213
214 insert_remote_location(loc);
215
216 io->m_allow_prefetching = false;
217 io->m_in_detach = true;
218
219 // Check if any IO is still available for prfetching. If not, stop it.
220 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
221 {
222 if ( ! select_current_io_or_disable_prefetching(false) )
223 {
224 TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
225 }
226 }
227
228 // On last IO, consider write queue blocks. Note, this also contains
229 // blocks being prefetched.
230
231 bool io_active_result;
232
233 if (n_active_reads > 0)
234 {
235 io_active_result = true;
236 }
237 else if (m_io_set.size() - m_ios_in_detach == 1)
238 {
239 io_active_result = ! m_block_map.empty();
240 }
241 else
242 {
243 io_active_result = io->m_active_prefetches > 0;
244 }
245
246 if ( ! io_active_result)
247 {
248 ++m_ios_in_detach;
249 }
250
251 TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
252
253 return io_active_result;
254 }
255 else
256 {
257 TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
258 return false;
259 }
260 }
261}
262
263//------------------------------------------------------------------------------
264
266{
267 XrdSysCondVarHelper _lck(m_state_cond);
268 m_detach_time_logged = false;
269}
270
272{
273 // Returns true if sync is required.
274 // This method is called after corresponding IO is detached from PosixCache.
275
276 XrdSysCondVarHelper _lck(m_state_cond);
277 if ( ! m_in_shutdown)
278 {
279 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
280 {
281 Stats loc_stats = m_stats.Clone();
282 m_cfi.WriteIOStatDetach(loc_stats);
283 m_detach_time_logged = true;
284 m_in_sync = true;
285 TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
286 return true;
287 }
288 }
289 TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
290 return false;
291}
292
293//------------------------------------------------------------------------------
294
296{
297 // Called from Cache::GetFile() when a new IO asks for the file.
298
299 TRACEF(Debug, "AddIO() io = " << (void*)io);
300
301 time_t now = time(0);
302 std::string loc(io->GetLocation());
303
304 m_state_cond.Lock();
305
306 IoSet_i mi = m_io_set.find(io);
307
308 if (mi == m_io_set.end())
309 {
310 m_io_set.insert(io);
311 io->m_attach_time = now;
312 m_stats.IoAttach();
313
314 insert_remote_location(loc);
315
316 if (m_prefetch_state == kStopped)
317 {
318 m_prefetch_state = kOn;
319 cache()->RegisterPrefetchFile(this);
320 }
321 }
322 else
323 {
324 TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
325 }
326
327 m_state_cond.UnLock();
328}
329
330//------------------------------------------------------------------------------
331
333{
334 // Called from Cache::ReleaseFile.
335
336 TRACEF(Debug, "RemoveIO() io = " << (void*)io);
337
338 time_t now = time(0);
339
340 m_state_cond.Lock();
341
342 IoSet_i mi = m_io_set.find(io);
343
344 if (mi != m_io_set.end())
345 {
346 if (mi == m_current_io)
347 {
348 ++m_current_io;
349 }
350
351 m_stats.IoDetach(now - io->m_attach_time);
352 m_io_set.erase(mi);
353 --m_ios_in_detach;
354
355 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
356 {
357 TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
358 m_prefetch_state = kStopped;
359 cache()->DeRegisterPrefetchFile(this);
360 }
361 }
362 else
363 {
364 TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
365 }
366
367 m_state_cond.UnLock();
368}
369
370//------------------------------------------------------------------------------
371
372bool File::Open()
373{
374 // Sets errno accordingly.
375
376 static const char *tpfx = "Open() ";
377
378 TRACEF(Dump, tpfx << "open file for disk cache");
379
381
382 XrdOss &myOss = * Cache::GetInstance().GetOss();
383 const char *myUser = conf.m_username.c_str();
384 XrdOucEnv myEnv;
385 struct stat data_stat, info_stat;
386
387 std::string ifn = m_filename + Info::s_infoExtension;
388
389 bool data_existed = (myOss.Stat(m_filename.c_str(), &data_stat) == XrdOssOK);
390 bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK);
391
392 // Create the data file itself.
393 char size_str[32]; sprintf(size_str, "%lld", m_file_size);
394 myEnv.Put("oss.asize", size_str);
395 myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
396
397 int res;
398
399 if ((res = myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
400 {
401 TRACEF(Error, tpfx << "Create failed " << ERRNO_AND_ERRSTR(-res));
402 errno = -res;
403 return false;
404 }
405
406 m_data_file = myOss.newFile(myUser);
407 if ((res = m_data_file->Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
408 {
409 TRACEF(Error, tpfx << "Open failed " << ERRNO_AND_ERRSTR(-res));
410 errno = -res;
411 delete m_data_file; m_data_file = 0;
412 return false;
413 }
414
415 myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
416 myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
417 if ((res = myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
418 {
419 TRACE(Error, tpfx << "Create failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
420 errno = -res;
421 m_data_file->Close(); delete m_data_file; m_data_file = 0;
422 return false;
423 }
424
425 m_info_file = myOss.newFile(myUser);
426 if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
427 {
428 TRACEF(Error, tpfx << "Failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
429 errno = -res;
430 delete m_info_file; m_info_file = 0;
431 m_data_file->Close(); delete m_data_file; m_data_file = 0;
432 return false;
433 }
434
435 bool initialize_info_file = true;
436
437 if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
438 {
439 TRACEF(Debug, tpfx << "Reading existing info file. (data_existed=" << data_existed <<
440 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
441 ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() << ")");
442
443 // Check if data file exists and is of reasonable size.
444 if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
445 {
446 initialize_info_file = false;
447 } else {
448 TRACEF(Warning, tpfx << "Basic sanity checks on data file failed, resetting info file, truncating data file.");
449 m_cfi.ResetAllAccessStats();
450 m_data_file->Ftruncate(0);
451 }
452 }
453
454 if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.get_cs_Chk())
455 {
457 conf.should_uvkeep_purge(time(0) - m_cfi.GetNoCkSumTimeForUVKeep()))
458 {
459 TRACEF(Info, tpfx << "Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
460 initialize_info_file = true;
461 m_cfi.ResetAllAccessStats();
462 m_data_file->Ftruncate(0);
463 } else {
464 // TODO: If the file is complete, we don't need to reset net cksums.
465 m_cfi.DowngradeCkSumState(conf.get_cs_Chk());
466 }
467 }
468
469 if (initialize_info_file)
470 {
471 m_cfi.SetBufferSizeFileSizeAndCreationTime(conf.m_bufferSize, m_file_size);
472 m_cfi.SetCkSumState(conf.get_cs_Chk());
473 m_cfi.ResetNoCkSumTime();
474 m_cfi.Write(m_info_file, ifn.c_str());
475 m_info_file->Fsync();
476 TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks());
477 }
478
479 m_cfi.WriteIOStatAttach();
480 m_state_cond.Lock();
481 m_block_size = m_cfi.GetBufferSize();
482 m_num_blocks = m_cfi.GetNBlocks();
483 m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO().
484 m_state_cond.UnLock();
485
486 return true;
487}
488
489
490//==============================================================================
491// Read and helpers
492//==============================================================================
493
494bool File::overlap(int blk, // block to query
495 long long blk_size, //
496 long long req_off, // offset of user request
497 int req_size, // size of user request
498 // output:
499 long long &off, // offset in user buffer
500 long long &blk_off, // offset in block
501 int &size) // size to copy
502{
503 const long long beg = blk * blk_size;
504 const long long end = beg + blk_size;
505 const long long req_end = req_off + req_size;
506
507 if (req_off < end && req_end > beg)
508 {
509 const long long ovlp_beg = std::max(beg, req_off);
510 const long long ovlp_end = std::min(end, req_end);
511
512 off = ovlp_beg - req_off;
513 blk_off = ovlp_beg - beg;
514 size = (int) (ovlp_end - ovlp_beg);
515
516 assert(size <= blk_size);
517 return true;
518 }
519 else
520 {
521 return false;
522 }
523}
524
525//------------------------------------------------------------------------------
526
527Block* File::PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch)
528{
529 // Must be called w/ state_cond locked.
530 // Checks on size etc should be done before.
531 //
532 // Reference count is 0 so increase it in calling function if you want to
533 // catch the block while still in memory.
534
535 const long long off = i * m_block_size;
536 const int last_block = m_num_blocks - 1;
537 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
538
539 int blk_size, req_size;
540 if (i == last_block) {
541 blk_size = req_size = m_file_size - off;
542 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
543 } else {
544 blk_size = req_size = m_block_size;
545 }
546
547 Block *b = 0;
548 char *buf = cache()->RequestRAM(req_size);
549
550 if (buf)
551 {
552 b = new (std::nothrow) Block(this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
553
554 if (b)
555 {
556 m_block_map[i] = b;
557
558 // Actual Read request is issued in ProcessBlockRequests().
559
560 if (m_prefetch_state == kOn && (int) m_block_map.size() >= Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
561 {
562 m_prefetch_state = kHold;
563 cache()->DeRegisterPrefetchFile(this);
564 }
565 }
566 else
567 {
568 TRACEF(Dump, "PrepareBlockRequest() " << i << " prefetch " << prefetch << ", allocation failed.");
569 }
570 }
571
572 return b;
573}
574
575void File::ProcessBlockRequest(Block *b)
576{
577 // This *must not* be called with block_map locked.
578
580
581 if (XRD_TRACE What >= TRACE_Dump) {
582 char buf[256];
583 snprintf(buf, 256, "idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
584 b->get_offset()/m_block_size, b, b->m_prefetch, b->get_offset(), b->get_req_size(), b->get_buff(), brh);
585 TRACEF(Dump, "ProcessBlockRequest() " << buf);
586 }
587
588 if (b->req_cksum_net())
589 {
590 b->get_io()->GetInput()->pgRead(*brh, b->get_buff(), b->get_offset(), b->get_req_size(),
591 b->ref_cksum_vec(), 0, b->ptr_n_cksum_errors());
592 } else {
593 b->get_io()->GetInput()-> Read(*brh, b->get_buff(), b->get_offset(), b->get_size());
594 }
595}
596
597void File::ProcessBlockRequests(BlockList_t& blks)
598{
599 // This *must not* be called with block_map locked.
600
601 for (BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
602 {
603 ProcessBlockRequest(*bi);
604 }
605}
606
607//------------------------------------------------------------------------------
608
609void File::RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size)
610{
611 int n_chunks = ioVec.size();
612 int n_vec_reads = (n_chunks - 1) / XrdProto::maxRvecsz + 1;
613
614 TRACEF(DumpXL, "RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
615 ", total_size = " << expected_size << ", n_vec_reads = " << n_vec_reads);
616
617 DirectResponseHandler *handler = new DirectResponseHandler(this, read_req, n_vec_reads);
618
619 int pos = 0;
620 while (n_chunks > XrdProto::maxRvecsz) {
621 io->GetInput()->ReadV( *handler, ioVec.data() + pos, XrdProto::maxRvecsz);
622 pos += XrdProto::maxRvecsz;
623 n_chunks -= XrdProto::maxRvecsz;
624 }
625 io->GetInput()->ReadV( *handler, ioVec.data() + pos, n_chunks);
626}
627
628//------------------------------------------------------------------------------
629
630int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size)
631{
632 TRACEF(DumpXL, "ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size);
633
634 long long rs = m_data_file->ReadV(ioVec.data(), (int) ioVec.size());
635
636 if (rs < 0)
637 {
638 TRACEF(Error, "ReadBlocksFromDisk neg retval = " << rs);
639 return rs;
640 }
641
642 if (rs != expected_size)
643 {
644 TRACEF(Error, "ReadBlocksFromDisk incomplete size = " << rs);
645 return -EIO;
646 }
647
648 return (int) rs;
649}
650
651//------------------------------------------------------------------------------
652
653int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadReqRH *rh)
654{
655 // rrc_func is ONLY called from async processing.
656 // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
657 // This streamlines implementation of synchronous IO::Read().
658
659 TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
660
661 m_state_cond.Lock();
662
663 if (m_in_shutdown || io->m_in_detach)
664 {
665 m_state_cond.UnLock();
666 return m_in_shutdown ? -ENOENT : -EBADF;
667 }
668
669 // Shortcut -- file is fully downloaded.
670
671 if (m_cfi.IsComplete())
672 {
673 m_state_cond.UnLock();
674 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
675 if (ret > 0) m_stats.AddBytesHit(ret);
676 return ret;
677 }
678
679 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
680
681 return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
682}
683
684//------------------------------------------------------------------------------
685
686int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
687{
688 TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
689
690 m_state_cond.Lock();
691
692 if (m_in_shutdown || io->m_in_detach)
693 {
694 m_state_cond.UnLock();
695 return m_in_shutdown ? -ENOENT : -EBADF;
696 }
697
698 // Shortcut -- file is fully downloaded.
699
700 if (m_cfi.IsComplete())
701 {
702 m_state_cond.UnLock();
703 int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
704 if (ret > 0) m_stats.AddBytesHit(ret);
705 return ret;
706 }
707
708 return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
709}
710
711//------------------------------------------------------------------------------
712
713int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
714 ReadReqRH *rh, const char *tpfx)
715{
716 // Non-trivial processing for Read and ReadV.
717 // Entered under lock.
718 //
719 // loop over reqired blocks:
720 // - if on disk, ok;
721 // - if in ram or incoming, inc ref-count
722 // - otherwise request and inc ref count (unless RAM full => request direct)
723 // unlock
724
725 int prefetch_cnt = 0;
726
727 ReadRequest *read_req = nullptr;
728 BlockList_t blks_to_request; // blocks we are issuing a new remote request for
729
730 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
731
732 std::vector<XrdOucIOVec> iovec_disk;
733 std::vector<XrdOucIOVec> iovec_direct;
734 int iovec_disk_total = 0;
735 int iovec_direct_total = 0;
736
737 for (int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
738 {
739 const XrdOucIOVec &iov = readV[iov_idx];
740 long long iUserOff = iov.offset;
741 int iUserSize = iov.size;
742 char *iUserBuff = iov.data;
743
744 const int idx_first = iUserOff / m_block_size;
745 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
746
747 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx_first: " << idx_first << " idx_last: " << idx_last);
748
749 enum LastBlock_e { LB_other, LB_disk, LB_direct };
750
751 LastBlock_e lbe = LB_other;
752
753 for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
754 {
755 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx: " << block_idx);
756 BlockMap_i bi = m_block_map.find(block_idx);
757
758 // overlap and read
759 long long off; // offset in user buffer
760 long long blk_off; // offset in block
761 int size; // size to copy
762
763 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
764
765 // In RAM or incoming?
766 if (bi != m_block_map.end())
767 {
768 inc_ref_count(bi->second);
769 TRACEF(Dump, tpfx << (void*) iUserBuff << " inc_ref_count for existing block " << bi->second << " idx = " << block_idx);
770
771 if (bi->second->is_finished())
772 {
773 // note, blocks with error should not be here !!!
774 // they should be either removed or reissued in ProcessBlockResponse()
775 assert(bi->second->is_ok());
776
777 blks_ready[bi->second].emplace_back( ChunkRequest(nullptr, iUserBuff + off, blk_off, size) );
778
779 if (bi->second->m_prefetch)
780 ++prefetch_cnt;
781 }
782 else
783 {
784 if ( ! read_req)
785 read_req = new ReadRequest(io, rh);
786
787 // We have a lock on state_cond --> as we register the request before releasing the lock,
788 // we are sure to get a call-in via the ChunkRequest handling when this block arrives.
789
790 bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
791 ++read_req->m_n_chunk_reqs;
792 }
793
794 lbe = LB_other;
795 }
796 // On disk?
797 else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
798 {
799 TRACEF(DumpXL, tpfx << "read from disk " << (void*)iUserBuff << " idx = " << block_idx);
800
801 if (lbe == LB_disk)
802 iovec_disk.back().size += size;
803 else
804 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
805 iovec_disk_total += size;
806
807 if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
808 ++prefetch_cnt;
809
810 lbe = LB_disk;
811 }
812 // Neither ... then we have to go get it ...
813 else
814 {
815 if ( ! read_req)
816 read_req = new ReadRequest(io, rh);
817
818 // Is there room for one more RAM Block?
819 Block *b = PrepareBlockRequest(block_idx, io, read_req, false);
820 if (b)
821 {
822 TRACEF(Dump, tpfx << "inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx);
823 inc_ref_count(b);
824 blks_to_request.push_back(b);
825
826 b->m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
827 ++read_req->m_n_chunk_reqs;
828
829 lbe = LB_other;
830 }
831 else // Nope ... read this directly without caching.
832 {
833 TRACEF(DumpXL, tpfx << "direct block " << block_idx << ", blk_off " << blk_off << ", size " << size);
834
835 iovec_direct_total += size;
836 read_req->m_direct_done = false;
837
838 // Make sure we do not issue a ReadV with chunk size above XrdProto::maxRVdsz.
839 // Number of actual ReadVs issued so as to not exceed the XrdProto::maxRvecsz limit
840 // is determined in the RequestBlocksDirect().
841 if (lbe == LB_direct && iovec_direct.back().size + size <= XrdProto::maxRVdsz) {
842 iovec_direct.back().size += size;
843 } else {
844 long long in_offset = block_idx * m_block_size + blk_off;
845 char *out_pos = iUserBuff + off;
846 while (size > XrdProto::maxRVdsz) {
847 iovec_direct.push_back( { in_offset, XrdProto::maxRVdsz, 0, out_pos } );
848 in_offset += XrdProto::maxRVdsz;
849 out_pos += XrdProto::maxRVdsz;
850 size -= XrdProto::maxRVdsz;
851 }
852 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
853 }
854
855 lbe = LB_direct;
856 }
857 }
858 } // end for over blocks in an IOVec
859 } // end for over readV IOVec
860
861 inc_prefetch_hit_cnt(prefetch_cnt);
862
863 m_state_cond.UnLock();
864
865 // First, send out remote requests for new blocks.
866 if ( ! blks_to_request.empty())
867 {
868 ProcessBlockRequests(blks_to_request);
869 blks_to_request.clear();
870 }
871
872 // Second, send out remote direct read requests.
873 if ( ! iovec_direct.empty())
874 {
875 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
876
877 TRACEF(Dump, tpfx << "direct read requests sent out, n_chunks = " << (int) iovec_direct.size() << ", total_size = " << iovec_direct_total);
878 }
879
880 // Begin synchronous part where we process data that is already in RAM or on disk.
881
882 long long bytes_read = 0;
883 int error_cond = 0; // to be set to -errno
884
885 // Third, process blocks that are available in RAM.
886 if ( ! blks_ready.empty())
887 {
888 for (auto &bvi : blks_ready)
889 {
890 for (auto &cr : bvi.second)
891 {
892 TRACEF(DumpXL, tpfx << "ub=" << (void*)cr.m_buf << " from pre-finished block " << bvi.first->m_offset/m_block_size << " size " << cr.m_size);
893 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
894 bytes_read += cr.m_size;
895 }
896 }
897 }
898
899 // Fourth, read blocks from disk.
900 if ( ! iovec_disk.empty())
901 {
902 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
903 TRACEF(DumpXL, tpfx << "from disk finished size = " << rc);
904 if (rc >= 0)
905 {
906 bytes_read += rc;
907 }
908 else
909 {
910 error_cond = rc;
911 TRACEF(Error, tpfx << "failed read from disk");
912 }
913 }
914
915 // End synchronous part -- update with sync stats and determine actual state of this read.
916 // Note: remote reads might have already finished during disk-read!
917
918 m_state_cond.Lock();
919
920 for (auto &bvi : blks_ready)
921 dec_ref_count(bvi.first, (int) bvi.second.size());
922
923 if (read_req)
924 {
925 read_req->m_bytes_read += bytes_read;
926 read_req->update_error_cond(error_cond);
927 read_req->m_stats.m_BytesHit += bytes_read;
928 read_req->m_sync_done = true;
929
930 if (read_req->is_complete())
931 {
932 // Almost like FinalizeReadRequest(read_req) -- but no callout!
933 m_state_cond.UnLock();
934
935 m_stats.AddReadStats(read_req->m_stats);
936
937 int ret = read_req->return_value();
938 delete read_req;
939 return ret;
940 }
941 else
942 {
943 m_state_cond.UnLock();
944 return -EWOULDBLOCK;
945 }
946 }
947 else
948 {
949 m_stats.m_BytesHit += bytes_read;
950 m_state_cond.UnLock();
951
952 // !!! No callout.
953
954 return error_cond ? error_cond : bytes_read;
955 }
956}
957
958
959//==============================================================================
960// WriteBlock and Sync
961//==============================================================================
962
964{
965 // write block buffer into disk file
966 long long offset = b->m_offset - m_offset;
967 long long size = b->get_size();
968 ssize_t retval;
969
970 if (m_cfi.IsCkSumCache())
971 if (b->has_cksums())
972 retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
973 else
974 retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
975 else
976 retval = m_data_file->Write(b->get_buff(), offset, size);
977
978 if (retval < size)
979 {
980 if (retval < 0)
981 {
982 GetLog()->Emsg("WriteToDisk()", -retval, "write block to disk", GetLocalPath().c_str());
983 }
984 else
985 {
986 TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
987 }
988
989 XrdSysCondVarHelper _lck(m_state_cond);
990
991 dec_ref_count(b);
992
993 return;
994 }
995
996 const int blk_idx = (b->m_offset - m_offset) / m_block_size;
997
998 // Set written bit.
999 TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1000
1001 bool schedule_sync = false;
1002 {
1003 XrdSysCondVarHelper _lck(m_state_cond);
1004
1005 m_cfi.SetBitWritten(blk_idx);
1006
1007 if (b->m_prefetch)
1008 {
1009 m_cfi.SetBitPrefetch(blk_idx);
1010 }
1011 if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1012 {
1013 m_cfi.ResetCkSumNet();
1014 }
1015
1016 dec_ref_count(b);
1017
1018 // Set synced bit or stash block index if in actual sync.
1019 // Synced state is only written out to cinfo file when data file is synced.
1020 if (m_in_sync)
1021 {
1022 m_writes_during_sync.push_back(blk_idx);
1023 }
1024 else
1025 {
1026 m_cfi.SetBitSynced(blk_idx);
1027 ++m_non_flushed_cnt;
1028 if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1029 ! m_in_shutdown)
1030 {
1031 schedule_sync = true;
1032 m_in_sync = true;
1033 m_non_flushed_cnt = 0;
1034 }
1035 }
1036 }
1037
1038 if (schedule_sync)
1039 {
1040 cache()->ScheduleFileSync(this);
1041 }
1042}
1043
1044//------------------------------------------------------------------------------
1045
1047{
1048 TRACEF(Dump, "Sync()");
1049
1050 int ret = m_data_file->Fsync();
1051 bool errorp = false;
1052 if (ret == XrdOssOK)
1053 {
1054 Stats loc_stats = m_stats.Clone();
1055 m_cfi.WriteIOStat(loc_stats);
1056 m_cfi.Write(m_info_file, m_filename.c_str());
1057 int cret = m_info_file->Fsync();
1058 if (cret != XrdOssOK)
1059 {
1060 TRACEF(Error, "Sync cinfo file sync error " << cret);
1061 errorp = true;
1062 }
1063 }
1064 else
1065 {
1066 TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1067 errorp = true;
1068 }
1069
1070 if (errorp)
1071 {
1072 TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1073
1074 // Unlink will also call this->initiate_emergency_shutdown()
1075 Cache::GetInstance().UnlinkFile(m_filename, false);
1076
1077 XrdSysCondVarHelper _lck(&m_state_cond);
1078
1079 m_writes_during_sync.clear();
1080 m_in_sync = false;
1081
1082 return;
1083 }
1084
1085 int written_while_in_sync;
1086 bool resync = false;
1087 {
1088 XrdSysCondVarHelper _lck(&m_state_cond);
1089 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1090 {
1091 m_cfi.SetBitSynced(*i);
1092 }
1093 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1094 m_writes_during_sync.clear();
1095
1096 // If there were writes during sync and the file is now complete,
1097 // let us call Sync again without resetting the m_in_sync flag.
1098 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1099 resync = true;
1100 else
1101 m_in_sync = false;
1102 }
1103 TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1104
1105 if (resync)
1106 Sync();
1107}
1108
1109
1110//==============================================================================
1111// Block processing
1112//==============================================================================
1113
1114void File::free_block(Block* b)
1115{
1116 // Method always called under lock.
1117 int i = b->m_offset / m_block_size;
1118 TRACEF(Dump, "free_block block " << b << " idx = " << i);
1119 size_t ret = m_block_map.erase(i);
1120 if (ret != 1)
1121 {
1122 // assert might be a better option than a warning
1123 TRACEF(Error, "free_block did not erase " << i << " from map");
1124 }
1125 else
1126 {
1127 cache()->ReleaseRAM(b->m_buff, b->m_req_size);
1128 delete b;
1129 }
1130
1131 if (m_prefetch_state == kHold && (int) m_block_map.size() < Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
1132 {
1133 m_prefetch_state = kOn;
1134 cache()->RegisterPrefetchFile(this);
1135 }
1136}
1137
1138//------------------------------------------------------------------------------
1139
1140bool File::select_current_io_or_disable_prefetching(bool skip_current)
1141{
1142 // Method always called under lock. It also expects prefetch to be active.
1143
1144 int io_size = (int) m_io_set.size();
1145 bool io_ok = false;
1146
1147 if (io_size == 1)
1148 {
1149 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1150 if (io_ok)
1151 {
1152 m_current_io = m_io_set.begin();
1153 }
1154 }
1155 else if (io_size > 1)
1156 {
1157 IoSet_i mi = m_current_io;
1158 if (skip_current && mi != m_io_set.end()) ++mi;
1159
1160 for (int i = 0; i < io_size; ++i)
1161 {
1162 if (mi == m_io_set.end()) mi = m_io_set.begin();
1163
1164 if ((*mi)->m_allow_prefetching)
1165 {
1166 m_current_io = mi;
1167 io_ok = true;
1168 break;
1169 }
1170 ++mi;
1171 }
1172 }
1173
1174 if ( ! io_ok)
1175 {
1176 m_current_io = m_io_set.end();
1177 m_prefetch_state = kStopped;
1178 cache()->DeRegisterPrefetchFile(this);
1179 }
1180
1181 return io_ok;
1182}
1183
1184//------------------------------------------------------------------------------
1185
1186void File::ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond)
1187{
1188 // Called from DirectResponseHandler.
1189 // NOT under lock.
1190
1191 if (error_cond)
1192 TRACEF(Error, "Read(), direct read finished with error " << -error_cond << " " << XrdSysE2T(-error_cond));
1193
1194 m_state_cond.Lock();
1195
1196 if (error_cond)
1197 rreq->update_error_cond(error_cond);
1198 else {
1199 rreq->m_stats.m_BytesBypassed += bytes_read;
1200 rreq->m_bytes_read += bytes_read;
1201 }
1202
1203 rreq->m_direct_done = true;
1204
1205 bool rreq_complete = rreq->is_complete();
1206
1207 m_state_cond.UnLock();
1208
1209 if (rreq_complete)
1210 FinalizeReadRequest(rreq);
1211}
1212
1213void File::ProcessBlockError(Block *b, ReadRequest *rreq)
1214{
1215 // Called from ProcessBlockResponse().
1216 // YES under lock -- we have to protect m_block_map for recovery through multiple IOs.
1217 // Does not manage m_read_req.
1218 // Will not complete the request.
1219
1220 TRACEF(Error, "ProcessBlockError() io " << b->m_io << ", block "<< b->m_offset/m_block_size <<
1221 " finished with error " << -b->get_error() << " " << XrdSysE2T(-b->get_error()));
1222
1223 rreq->update_error_cond(b->get_error());
1224 --rreq->m_n_chunk_reqs;
1225
1226 dec_ref_count(b);
1227}
1228
1229void File::ProcessBlockSuccess(Block *b, ChunkRequest &creq)
1230{
1231 // Called from ProcessBlockResponse().
1232 // NOT under lock as it does memcopy ofor exisf block data.
1233 // Acquires lock for block, m_read_req and rreq state update.
1234
1235 ReadRequest *rreq = creq.m_read_req;
1236
1237 TRACEF(Dump, "ProcessBlockSuccess() ub=" << (void*)creq.m_buf << " from finished block " << b->m_offset/m_block_size << " size " << creq.m_size);
1238 memcpy(creq.m_buf, b->m_buff + creq.m_off, creq.m_size);
1239
1240 m_state_cond.Lock();
1241
1242 rreq->m_bytes_read += creq.m_size;
1243
1244 if (b->get_req_id() == (void*) rreq)
1245 rreq->m_stats.m_BytesMissed += creq.m_size;
1246 else
1247 rreq->m_stats.m_BytesHit += creq.m_size;
1248
1249 --rreq->m_n_chunk_reqs;
1250
1251 if (b->m_prefetch)
1252 inc_prefetch_hit_cnt(1);
1253
1254 dec_ref_count(b);
1255
1256 bool rreq_complete = rreq->is_complete();
1257
1258 m_state_cond.UnLock();
1259
1260 if (rreq_complete)
1261 FinalizeReadRequest(rreq);
1262}
1263
1264void File::FinalizeReadRequest(ReadRequest *rreq)
1265{
1266 // called from ProcessBlockResponse()
1267 // NOT under lock -- does callout
1268
1269 m_stats.AddReadStats(rreq->m_stats);
1270
1271 rreq->m_rh->Done(rreq->return_value());
1272 delete rreq;
1273}
1274
1275void File::ProcessBlockResponse(Block *b, int res)
1276{
1277 static const char* tpfx = "ProcessBlockResponse ";
1278
1279 TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << ", res=" << res);
1280
1281 if (res >= 0 && res != b->get_size())
1282 {
1283 // Incorrect number of bytes received, apparently size of the file on the remote
1284 // is different than what the cache expects it to be.
1285 TRACEF(Error, tpfx << "Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1286 Cache::GetInstance().UnlinkFile(m_filename, false);
1287 }
1288
1289 m_state_cond.Lock();
1290
1291 // Deregister block from IO's prefetch count, if needed.
1292 if (b->m_prefetch)
1293 {
1294 IO *io = b->get_io();
1295 IoSet_i mi = m_io_set.find(io);
1296 if (mi != m_io_set.end())
1297 {
1298 --io->m_active_prefetches;
1299
1300 // If failed and IO is still prefetching -- disable prefetching on this IO.
1301 if (res < 0 && io->m_allow_prefetching)
1302 {
1303 TRACEF(Debug, tpfx << "after failed prefetch on io " << io << " disabling prefetching on this io.");
1304 io->m_allow_prefetching = false;
1305
1306 // Check if any IO is still available for prfetching. If not, stop it.
1307 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1308 {
1309 if ( ! select_current_io_or_disable_prefetching(false) )
1310 {
1311 TRACEF(Debug, tpfx << "stopping prefetching after io " << b->get_io() << " marked as bad.");
1312 }
1313 }
1314 }
1315
1316 // If failed with no subscribers -- delete the block and exit.
1317 if (b->m_refcnt == 0 && (res < 0 || m_in_shutdown))
1318 {
1319 free_block(b);
1320 m_state_cond.UnLock();
1321 return;
1322 }
1323 }
1324 else
1325 {
1326 TRACEF(Error, tpfx << "io " << b->get_io() << " not found in IoSet.");
1327 }
1328 }
1329
1330 if (res == b->get_size())
1331 {
1332 b->set_downloaded();
1333 TRACEF(Dump, tpfx << "inc_ref_count idx=" << b->m_offset/m_block_size);
1334 if ( ! m_in_shutdown)
1335 {
1336 // Increase ref-count for the writer.
1337 inc_ref_count(b);
1338 m_stats.AddWriteStats(b->get_size(), b->get_n_cksum_errors());
1339 cache()->AddWriteTask(b, true);
1340 }
1341
1342 // Swap chunk-reqs vector out of Block, it will be processed outside of lock.
1343 vChunkRequest_t creqs_to_notify;
1344 creqs_to_notify.swap( b->m_chunk_reqs );
1345
1346 m_state_cond.UnLock();
1347
1348 for (auto &creq : creqs_to_notify)
1349 {
1350 ProcessBlockSuccess(b, creq);
1351 }
1352 }
1353 else
1354 {
1355 if (res < 0) {
1356 TRACEF(Error, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << " error=" << res);
1357 } else {
1358 TRACEF(Error, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << " incomplete, got " << res << " expected " << b->get_size());
1359#if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1360 res = -EIO;
1361#else
1362 res = -EREMOTEIO;
1363#endif
1364 }
1365 b->set_error(res);
1366
1367 // Loop over Block's chunk-reqs vector, error out ones with the same IO.
1368 // Collect others with a different IO, the first of them will be used to reissue the request.
1369 // This is then done outside of lock.
1370 std::list<ReadRequest*> rreqs_to_complete;
1371 vChunkRequest_t creqs_to_keep;
1372
1373 for(ChunkRequest &creq : b->m_chunk_reqs)
1374 {
1375 ReadRequest *rreq = creq.m_read_req;
1376
1377 if (rreq->m_io == b->get_io())
1378 {
1379 ProcessBlockError(b, rreq);
1380 if (rreq->is_complete())
1381 {
1382 rreqs_to_complete.push_back(rreq);
1383 }
1384 }
1385 else
1386 {
1387 creqs_to_keep.push_back(creq);
1388 }
1389 }
1390
1391 bool reissue = false;
1392 if ( ! creqs_to_keep.empty())
1393 {
1394 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1395
1396 TRACEF(Info, "ProcessBlockResponse() requested block " << (void*)b << " failed with another io " <<
1397 b->get_io() << " - reissuing request with my io " << rreq->m_io);
1398
1399 b->reset_error_and_set_io(rreq->m_io, rreq);
1400 b->m_chunk_reqs.swap( creqs_to_keep );
1401 reissue = true;
1402 }
1403
1404 m_state_cond.UnLock();
1405
1406 for (auto rreq : rreqs_to_complete)
1407 FinalizeReadRequest(rreq);
1408
1409 if (reissue)
1410 ProcessBlockRequest(b);
1411 }
1412}
1413
1414//------------------------------------------------------------------------------
1415
1416const char* File::lPath() const
1417{
1418 return m_filename.c_str();
1419}
1420
1421//------------------------------------------------------------------------------
1422
1423int File::offsetIdx(int iIdx) const
1424{
1425 return iIdx - m_offset/m_block_size;
1426}
1427
1428
1429//------------------------------------------------------------------------------
1430
1432{
1433 // Check that block is not on disk and not in RAM.
1434 // TODO: Could prefetch several blocks at once!
1435 // blks_max could be an argument
1436
1437 BlockList_t blks;
1438
1439 TRACEF(DumpXL, "Prefetch() entering.");
1440 {
1441 XrdSysCondVarHelper _lck(m_state_cond);
1442
1443 if (m_prefetch_state != kOn)
1444 {
1445 return;
1446 }
1447
1448 if ( ! select_current_io_or_disable_prefetching(true) )
1449 {
1450 TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1451 return;
1452 }
1453
1454 // Select block(s) to fetch.
1455 for (int f = 0; f < m_num_blocks; ++f)
1456 {
1457 if ( ! m_cfi.TestBitWritten(f))
1458 {
1459 int f_act = f + m_offset / m_block_size;
1460
1461 BlockMap_i bi = m_block_map.find(f_act);
1462 if (bi == m_block_map.end())
1463 {
1464 Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1465 if (b)
1466 {
1467 TRACEF(Dump, "Prefetch take block " << f_act);
1468 blks.push_back(b);
1469 // Note: block ref_cnt not increased, it will be when placed into write queue.
1470
1471 inc_prefetch_read_cnt(1);
1472 }
1473 else
1474 {
1475 // This shouldn't happen as prefetching stops when RAM is 70% full.
1476 TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1477 }
1478 break;
1479 }
1480 }
1481 }
1482
1483 if (blks.empty())
1484 {
1485 TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1486 m_prefetch_state = kComplete;
1487 cache()->DeRegisterPrefetchFile(this);
1488 }
1489 else
1490 {
1491 (*m_current_io)->m_active_prefetches += (int) blks.size();
1492 }
1493 }
1494
1495 if ( ! blks.empty())
1496 {
1497 ProcessBlockRequests(blks);
1498 }
1499}
1500
1501
1502//------------------------------------------------------------------------------
1503
1505{
1506 return m_prefetch_score;
1507}
1508
1510{
1511 return Cache::GetInstance().GetLog();
1512}
1513
1518
1519void File::insert_remote_location(const std::string &loc)
1520{
1521 if ( ! loc.empty())
1522 {
1523 size_t p = loc.find_first_of('@');
1524 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1525 }
1526}
1527
1528std::string File::GetRemoteLocations() const
1529{
1530 std::string s;
1531 if ( ! m_remote_locations.empty())
1532 {
1533 size_t sl = 0;
1534 int nl = 0;
1535 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1536 {
1537 sl += i->size();
1538 }
1539 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1540 s = '[';
1541 int j = 1;
1542 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1543 {
1544 s += '"'; s += *i; s += '"';
1545 if (j < nl) s += ',';
1546 }
1547 s += ']';
1548 }
1549 else
1550 {
1551 s = "[]";
1552 }
1553 return s;
1554}
1555
1556//==============================================================================
1557//======================= RESPONSE HANDLERS ==============================
1558//==============================================================================
1559
1561{
1562 m_block->m_file->ProcessBlockResponse(m_block, res);
1563 delete this;
1564}
1565
1566//------------------------------------------------------------------------------
1567
1569{
1570 m_mutex.Lock();
1571
1572 int n_left = --m_to_wait;
1573
1574 if (res < 0) {
1575 if (m_errno == 0) m_errno = res; // store first reported error
1576 } else {
1577 m_bytes_read += res;
1578 }
1579
1580 m_mutex.UnLock();
1581
1582 if (n_left == 0)
1583 {
1584 m_file->ProcessDirectReadFinished(m_read_req, m_bytes_read, m_errno);
1585 delete this;
1586 }
1587}
#define XrdOssOK
Definition XrdOss.hh:50
#define XRDOSS_mkpath
Definition XrdOss.hh:466
#define TRACE_Dump
#define TRACEF(act, x)
#define ERRNO_AND_ERRSTR(err_code)
#define stat(a, b)
Definition XrdPosix.hh:96
#define XRD_TRACE
bool Debug
XrdOucString File
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:99
#define TRACE(act, x)
Definition XrdTrace.hh:63
virtual int Fsync()
Definition XrdOss.hh:144
virtual int Ftruncate(unsigned long long flen)
Definition XrdOss.hh:164
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual ssize_t Read(off_t offset, size_t size)
Definition XrdOss.hh:281
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
Definition XrdOss.cc:198
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
Definition XrdOss.cc:236
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition XrdOss.hh:345
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
void Done(int result) override
int get_size() const
int get_error() const
int get_n_cksum_errors()
int * ptr_n_cksum_errors()
IO * get_io() const
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
void set_error(int err)
void * get_req_id() const
void set_downloaded()
bool req_cksum_net() const
char * get_buff() const
bool has_cksums() const
long long m_offset
void reset_error_and_set_io(IO *io, void *rid)
int get_req_size() const
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:267
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:315
XrdSysTrace * GetTrace()
Definition XrdPfc.hh:398
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:160
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1133
XrdOss * GetOss() const
Definition XrdPfc.hh:385
XrdSysError * GetLog()
Definition XrdPfc.hh:397
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
XrdSysTrace * GetTrace()
void WriteBlockToDisk(Block *b)
std::string & GetLocalPath()
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition XrdPfcFile.cc:99
float GetPrefetchScore() const
friend class BlockResponseHandler
XrdSysError * GetLog()
std::string GetRemoteLocations() const
void AddIO(IO *io)
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void initiate_emergency_shutdown()
void Sync()
Sync file cache inf o and output data with disk.
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
void RemoveIO(IO *io)
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Stats DeltaStatsFromLastCall()
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:18
XrdOucCacheIO * GetInput()
Definition XrdPfcIO.cc:30
RAtomic_int m_active_read_reqs
number of active read requests
Definition XrdPfcIO.hh:72
const char * GetLocation()
Definition XrdPfcIO.hh:46
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:45
void SetBitPrefetch(int i)
Mark block as obtained through prefetch.
static const char * s_infoExtension
void SetBitSynced(int i)
Mark block as synced to disk.
time_t GetNoCkSumTimeForUVKeep() const
CkSumCheck_e GetCkSumState() const
void WriteIOStatAttach()
Write open time in the last entry of access statistics.
void ResetCkSumNet()
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
void DowngradeCkSumState(CkSumCheck_e css_ref)
bool IsCkSumNet() const
void ResetAllAccessStats()
Reset IO Stats.
bool TestBitPrefetch(int i) const
Test if block at the given index has been prefetched.
bool IsComplete() const
Get complete status.
bool IsCkSumCache() const
void SetBitWritten(int i)
Mark block as written to disk.
long long GetBufferSize() const
Get prefetch buffer size.
void WriteIOStat(Stats &s)
Write bytes missed, hits, and disk.
long long GetExpectedDataFileSize() const
Get expected data file size.
bool TestBitWritten(int i) const
Test if block at the given index is written to disk.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
void SetCkSumState(CkSumCheck_e css)
void ResetNoCkSumTime()
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Statistics of cache utilisation by a File object.
void AddReadStats(const Stats &s)
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddWriteStats(long long bytes_written, int n_cks_errs)
void AddBytesHit(long long bh)
long long m_BytesHit
number of bytes served from disk
void IoDetach(int duration)
void DeltaToReference(const Stats &ref)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
std::list< Block * > BlockList_t
XrdSysTrace * GetTrace()
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * >::iterator BlockList_i
static const int maxRVdsz
Definition XProtocol.hh:688
static const int maxRvecsz
Definition XProtocol.hh:686
long long offset
ReadRequest * m_read_req
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:56
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition XrdPfc.hh:109
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
Definition XrdPfc.hh:74
CkSumCheck_e get_cs_Chk() const
Definition XrdPfc.hh:67
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition XrdPfc.hh:106
bool should_uvkeep_purge(time_t delta) const
Definition XrdPfc.hh:76
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:82
long long m_bufferSize
prefetch buffer size, default 1MB
Definition XrdPfc.hh:101
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:83
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:81
unsigned short m_seq_id
Definition XrdPfcFile.hh:64
void update_error_cond(int ec)
Definition XrdPfcFile.hh:91
bool is_complete() const
Definition XrdPfcFile.hh:93
int return_value() const
Definition XrdPfcFile.hh:94
long long m_bytes_read
Definition XrdPfcFile.hh:79