XRootD
Loading...
Searching...
No Matches
XrdPfc.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel, Matevz Tadel, Brian Bockelman
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19#include <fcntl.h>
20#include <sstream>
21#include <algorithm>
22#include <sys/statvfs.h>
23
24#include "XrdCl/XrdClURL.hh"
27
28#include "XrdOuc/XrdOucEnv.hh"
29#include "XrdOuc/XrdOucUtils.hh"
31#include "XrdOuc/XrdOucJson.hh"
32
33#include "XrdSys/XrdSysTimer.hh"
34#include "XrdSys/XrdSysTrace.hh"
35#include "XrdSys/XrdSysXAttr.hh"
36
38
39#include "XrdOss/XrdOss.hh"
41
42#include "XrdPfc.hh"
43#include "XrdPfcTrace.hh"
44#include "XrdPfcFSctl.hh"
45#include "XrdPfcInfo.hh"
46#include "XrdPfcIOFile.hh"
47#include "XrdPfcIOFileBlock.hh"
49
51
52using namespace XrdPfc;
53
54Cache *Cache::m_instance = nullptr;
56
57
59{
61 return 0;
62}
63
65{
67 return 0;
68}
69
70void *PrefetchThread(void*)
71{
73 return 0;
74}
75
76//==============================================================================
77
78extern "C"
79{
81 const char *config_filename,
82 const char *parameters,
83 XrdOucEnv *env)
84{
85 XrdSysError err(logger, "");
86 err.Say("++++++ Proxy file cache initialization started.");
87
88 if ( ! env ||
89 ! (XrdPfc::Cache::schedP = (XrdScheduler*) env->GetPtr("XrdScheduler*")))
90 {
92 XrdPfc::Cache::schedP->Start();
93 }
94
95 Cache &instance = Cache::CreateInstance(logger, env);
96
97 if (! instance.Config(config_filename, parameters, env))
98 {
99 err.Say("Config Proxy file cache initialization failed.");
100 return 0;
101 }
102 err.Say("++++++ Proxy file cache initialization completed.");
103
104 {
105 pthread_t tid;
106
107 XrdSysThread::Run(&tid, ResourceMonitorThread, 0, 0, "XrdPfc ResourceMonitor");
108
109 for (int wti = 0; wti < instance.RefConfiguration().m_wqueue_threads; ++wti)
110 {
111 XrdSysThread::Run(&tid, ProcessWriteTaskThread, 0, 0, "XrdPfc WriteTasks ");
112 }
113
114 if (instance.is_prefetch_enabled())
115 {
116 XrdSysThread::Run(&tid, PrefetchThread, 0, 0, "XrdPfc Prefetch ");
117 }
118 }
119
120 XrdPfcFSctl* pfcFSctl = new XrdPfcFSctl(instance, logger);
121 env->PutPtr("XrdFSCtl_PC*", pfcFSctl);
122
123 return &instance;
124}
125}
126
127//==============================================================================
128
130{
131 assert (m_instance == 0);
132 m_instance = new Cache(logger, env);
133 return *m_instance;
134}
135
136 Cache& Cache::GetInstance() { return *m_instance; }
137const Cache& Cache::TheOne() { return *m_instance; }
138const Configuration& Cache::Conf() { return m_instance->RefConfiguration(); }
139 ResourceMonitor& Cache::ResMon() { return m_instance->RefResMon(); }
140
142{
143 if (! m_decisionpoints.empty())
144 {
145 XrdCl::URL url(io->Path());
146 std::string filename = url.GetPath();
147 std::vector<Decision*>::const_iterator it;
148 for (it = m_decisionpoints.begin(); it != m_decisionpoints.end(); ++it)
149 {
150 XrdPfc::Decision *d = *it;
151 if (! d) continue;
152 if (! d->Decide(filename, *m_oss))
153 {
154 return false;
155 }
156 }
157 }
158
159 return true;
160}
161
163 XrdOucCache("pfc"),
164 m_env(env),
165 m_log(logger, "XrdPfc_"),
166 m_trace(new XrdSysTrace("XrdPfc", logger)),
167 m_traceID("Cache"),
168 m_oss(0),
169 m_gstream(0),
170 m_purge_pin(0),
171 m_prefetch_condVar(0),
172 m_prefetch_enabled(false),
173 m_RAM_used(0),
174 m_RAM_write_queue(0),
175 m_RAM_std_size(0),
176 m_isClient(false),
177 m_active_cond(0)
178{
179 // Default log level is Warning.
180 m_trace->What = 2;
181}
182
184{
185 const char* tpfx = "Attach() ";
186
187 if (Options & XrdOucCache::optRW)
188 {
189 TRACE(Info, tpfx << "passing through write operation" << obfuscateAuth(io->Path()));
190 }
191 else if (Cache::GetInstance().Decide(io))
192 {
193 TRACE(Info, tpfx << obfuscateAuth(io->Path()));
194
195 IO *cio;
196
197 if (Cache::GetInstance().RefConfiguration().m_hdfsmode)
198 {
199 cio = new IOFileBlock(io, *this);
200 }
201 else
202 {
203 IOFile *iof = new IOFile(io, *this);
204
205 if ( ! iof->HasFile())
206 {
207 delete iof;
208 // TODO - redirect instead. But this is kind of an awkward place for it.
209 // errno is set during IOFile construction.
210 TRACE(Error, tpfx << "Failed opening local file, falling back to remote access " << io->Path());
211 return io;
212 }
213
214 cio = iof;
215 }
216
217 TRACE_PC(Debug, const char* loc = io->Location(), tpfx << io->Path() << " location: " <<
218 ((loc && loc[0] != 0) ? loc : "<deferred open>"));
219
220 return cio;
221 }
222 else
223 {
224 TRACE(Info, tpfx << "decision decline " << io->Path());
225 }
226 return io;
227}
228
229void Cache::AddWriteTask(Block* b, bool fromRead)
230{
231 TRACE(Dump, "AddWriteTask() offset=" << b->m_offset << ". file " << b->get_file()->GetLocalPath());
232
233 {
234 XrdSysMutexHelper lock(&m_RAM_mutex);
235 m_RAM_write_queue += b->get_size();
236 }
237
238 m_writeQ.condVar.Lock();
239 if (fromRead)
240 m_writeQ.queue.push_back(b);
241 else
242 m_writeQ.queue.push_front(b);
243 m_writeQ.size++;
244 m_writeQ.condVar.Signal();
245 m_writeQ.condVar.UnLock();
246}
247
249{
250 std::list<Block*> removed_blocks;
251 long long sum_size = 0;
252
253 m_writeQ.condVar.Lock();
254 std::list<Block*>::iterator i = m_writeQ.queue.begin();
255 while (i != m_writeQ.queue.end())
256 {
257 if ((*i)->m_file == file)
258 {
259 TRACE(Dump, "Remove entries for " << (void*)(*i) << " path " << file->lPath());
260 std::list<Block*>::iterator j = i++;
261 removed_blocks.push_back(*j);
262 sum_size += (*j)->get_size();
263 m_writeQ.queue.erase(j);
264 --m_writeQ.size;
265 }
266 else
267 {
268 ++i;
269 }
270 }
271 m_writeQ.condVar.UnLock();
272
273 {
274 XrdSysMutexHelper lock(&m_RAM_mutex);
275 m_RAM_write_queue -= sum_size;
276 }
277
278 file->BlocksRemovedFromWriteQ(removed_blocks);
279}
280
282{
283 std::vector<Block*> blks_to_write(m_configuration.m_wqueue_blocks);
284
285 while (true)
286 {
287 m_writeQ.condVar.Lock();
288 while (m_writeQ.size == 0)
289 {
290 m_writeQ.condVar.Wait();
291 }
292
293 // MT -- optimize to pop several blocks if they are available (or swap the list).
294 // This makes sense especially for smallish block sizes.
295
296 int n_pushed = std::min(m_writeQ.size, m_configuration.m_wqueue_blocks);
297 long long sum_size = 0;
298
299 for (int bi = 0; bi < n_pushed; ++bi)
300 {
301 Block* block = m_writeQ.queue.front();
302 m_writeQ.queue.pop_front();
303 m_writeQ.writes_between_purges += block->get_size();
304 sum_size += block->get_size();
305
306 blks_to_write[bi] = block;
307
308 TRACE(Dump, "ProcessWriteTasks for block " << (void*)(block) << " path " << block->m_file->lPath());
309 }
310 m_writeQ.size -= n_pushed;
311
312 m_writeQ.condVar.UnLock();
313
314 {
315 XrdSysMutexHelper lock(&m_RAM_mutex);
316 m_RAM_write_queue -= sum_size;
317 }
318
319 for (int bi = 0; bi < n_pushed; ++bi)
320 {
321 Block* block = blks_to_write[bi];
322
323 block->m_file->WriteBlockToDisk(block);
324 }
325 }
326}
327
329{
330 // Called from ResourceMonitor for an alternative estimation of disk writes.
331 XrdSysCondVarHelper lock(&m_writeQ.condVar);
332 long long ret = m_writeQ.writes_between_purges;
333 m_writeQ.writes_between_purges = 0;
334 return ret;
335}
336
337//==============================================================================
338
339char* Cache::RequestRAM(long long size)
340{
341 static const size_t s_block_align = sysconf(_SC_PAGESIZE);
342
343 bool std_size = (size == m_configuration.m_bufferSize);
344
345 m_RAM_mutex.Lock();
346
347 long long total = m_RAM_used + size;
348
349 if (total <= m_configuration.m_RamAbsAvailable)
350 {
351 m_RAM_used = total;
352 if (std_size && m_RAM_std_size > 0)
353 {
354 char *buf = m_RAM_std_blocks.back();
355 m_RAM_std_blocks.pop_back();
356 --m_RAM_std_size;
357
358 m_RAM_mutex.UnLock();
359
360 return buf;
361 }
362 else
363 {
364 m_RAM_mutex.UnLock();
365 char *buf;
366 if (posix_memalign((void**) &buf, s_block_align, (size_t) size))
367 {
368 // Report out of mem? Probably should report it at least the first time,
369 // then periodically.
370 return 0;
371 }
372 return buf;
373 }
374 }
375 m_RAM_mutex.UnLock();
376 return 0;
377}
378
379void Cache::ReleaseRAM(char* buf, long long size)
380{
381 bool std_size = (size == m_configuration.m_bufferSize);
382 {
383 XrdSysMutexHelper lock(&m_RAM_mutex);
384
385 m_RAM_used -= size;
386
387 if (std_size && m_RAM_std_size < m_configuration.m_RamKeepStdBlocks)
388 {
389 m_RAM_std_blocks.push_back(buf);
390 ++m_RAM_std_size;
391 return;
392 }
393 }
394 free(buf);
395}
396
397File* Cache::GetFile(const std::string& path, IO* io, long long off, long long filesize)
398{
399 // Called from virtual IOFile constructor.
400
401 TRACE(Debug, "GetFile " << path << ", io " << io);
402
403 ActiveMap_i it;
404
405 {
406 XrdSysCondVarHelper lock(&m_active_cond);
407
408 while (true)
409 {
410 it = m_active.find(path);
411
412 // File is not open or being opened. Mark it as being opened and
413 // proceed to opening it outside of while loop.
414 if (it == m_active.end())
415 {
416 it = m_active.insert(std::make_pair(path, (File*) 0)).first;
417 break;
418 }
419
420 if (it->second != 0)
421 {
422 it->second->AddIO(io);
423 inc_ref_cnt(it->second, false, true);
424
425 return it->second;
426 }
427 else
428 {
429 // Wait for some change in m_active, then recheck.
430 m_active_cond.Wait();
431 }
432 }
433 }
434
435 // This is always true, now that IOFileBlock is unsupported.
436
437 if (filesize == 0)
438 {
439 struct stat st;
440 int res = io->Fstat(st);
441 if (res < 0) {
442 errno = res;
443 TRACE(Error, "GetFile, could not get valid stat");
444 } else if (res > 0) {
445 errno = ENOTSUP;
446 TRACE(Error, "GetFile, stat returned positive value, this should NOT happen here");
447 } else {
448 filesize = st.st_size;
449 }
450 }
451
452 File *file = 0;
453
454 if (filesize >= 0)
455 {
456 file = File::FileOpen(path, off, filesize, io->GetInput());
457 }
458
459 {
460 XrdSysCondVarHelper lock(&m_active_cond);
461
462 if (file)
463 {
464 inc_ref_cnt(file, false, true);
465 it->second = file;
466
467 file->AddIO(io);
468 }
469 else
470 {
471 m_active.erase(it);
472 }
473
474 m_active_cond.Broadcast();
475 }
476
477 return file;
478}
479
481{
482 // Called from virtual IO::DetachFinalize.
483
484 TRACE(Debug, "ReleaseFile " << f->GetLocalPath() << ", io " << io);
485
486 {
487 XrdSysCondVarHelper lock(&m_active_cond);
488
489 f->RemoveIO(io);
490 }
491 dec_ref_cnt(f, true);
492}
493
494
495namespace
496{
497
498class DiskSyncer : public XrdJob
499{
500private:
501 File *m_file;
502 bool m_high_debug;
503
504public:
505 DiskSyncer(File *f, bool high_debug, const char *desc = "") :
506 XrdJob(desc),
507 m_file(f),
508 m_high_debug(high_debug)
509 {}
510
511 void DoIt()
512 {
513 m_file->Sync();
514 Cache::GetInstance().FileSyncDone(m_file, m_high_debug);
515 delete this;
516 }
517};
518
519
520class CommandExecutor : public XrdJob
521{
522private:
523 std::string m_command_url;
524
525public:
526 CommandExecutor(const std::string& command, const char *desc = "") :
527 XrdJob(desc),
528 m_command_url(command)
529 {}
530
531 void DoIt()
532 {
533 Cache::GetInstance().ExecuteCommandUrl(m_command_url);
534 delete this;
535 }
536};
537
538}
539
540//==============================================================================
541
542void Cache::schedule_file_sync(File* f, bool ref_cnt_already_set, bool high_debug)
543{
544 DiskSyncer* ds = new DiskSyncer(f, high_debug);
545
546 if ( ! ref_cnt_already_set) inc_ref_cnt(f, true, high_debug);
547
548 schedP->Schedule(ds);
549}
550
551void Cache::FileSyncDone(File* f, bool high_debug)
552{
553 dec_ref_cnt(f, high_debug);
554}
555
556void Cache::inc_ref_cnt(File* f, bool lock, bool high_debug)
557{
558 // called from GetFile() or SheduleFileSync();
559
560 int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;
561
562 if (lock) m_active_cond.Lock();
563 int rc = f->inc_ref_cnt();
564 if (lock) m_active_cond.UnLock();
565
566 TRACE_INT(tlvl, "inc_ref_cnt " << f->GetLocalPath() << ", cnt at exit = " << rc);
567}
568
569void Cache::dec_ref_cnt(File* f, bool high_debug)
570{
571 // NOT under active lock.
572 // Called from ReleaseFile(), DiskSync callback and stat-like functions.
573
574 int tlvl = high_debug ? TRACE_Debug : TRACE_Dump;
575 int cnt;
576
577 bool emergency_close = false;
578 {
579 XrdSysCondVarHelper lock(&m_active_cond);
580
581 cnt = f->get_ref_cnt();
582 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << ", cnt at entry = " << cnt);
583
585 {
586 // In this case file has been already removed from m_active map and
587 // does not need to be synced.
588
589 if (cnt == 1)
590 {
591 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
592 << " -- closing and deleting File object without further ado");
593 emergency_close = true;
594 }
595 else
596 {
597 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << " is in shutdown, ref_cnt = " << cnt
598 << " -- waiting");
599 f->dec_ref_cnt();
600 return;
601 }
602 }
603 if (cnt > 1)
604 {
605 f->dec_ref_cnt();
606 return;
607 }
608 }
609 if (emergency_close)
610 {
611 f->Close();
612 delete f;
613 return;
614 }
615
616 if (cnt == 1)
617 {
618 if (f->FinalizeSyncBeforeExit())
619 {
620 // Note, here we "reuse" the existing reference count for the
621 // final sync.
622
623 TRACE(Debug, "dec_ref_cnt " << f->GetLocalPath() << ", scheduling final sync");
624 schedule_file_sync(f, true, true);
625 return;
626 }
627 }
628
629 bool finished_p = false;
630 ActiveMap_i act_it;
631 {
632 XrdSysCondVarHelper lock(&m_active_cond);
633
634 cnt = f->dec_ref_cnt();
635 TRACE_INT(tlvl, "dec_ref_cnt " << f->GetLocalPath() << ", cnt after sync_check and dec_ref_cnt = " << cnt);
636 if (cnt == 0)
637 {
638 act_it = m_active.find(f->GetLocalPath());
639 act_it->second = 0;
640
641 finished_p = true;
642 }
643 }
644 if (finished_p)
645 {
646 f->Close();
647 {
648 XrdSysCondVarHelper lock(&m_active_cond);
649 m_active.erase(act_it);
650 m_active_cond.Broadcast();
651 }
652
653 if (m_gstream)
654 {
655 const Stats &st = f->RefStats();
656 const Info::AStat *as = f->GetLastAccessStats();
657
658 char buf[4096];
659 int len = snprintf(buf, 4096, "{\"event\":\"file_close\","
660 "\"lfn\":\"%s\",\"size\":%lld,\"blk_size\":%d,\"n_blks\":%d,\"n_blks_done\":%d,"
661 "\"access_cnt\":%lu,\"attach_t\":%lld,\"detach_t\":%lld,\"remotes\":%s,"
662 "\"b_hit\":%lld,\"b_miss\":%lld,\"b_bypass\":%lld,"
663 "\"b_todisk\":%lld,\"b_prefetch\":%lld,\"n_cks_errs\":%d}",
664 f->GetLocalPath().c_str(), f->GetFileSize(), f->GetBlockSize(),
666 (unsigned long) f->GetAccessCnt(), (long long) as->AttachTime, (long long) as->DetachTime,
667 f->GetRemoteLocations().c_str(),
668 as->BytesHit, as->BytesMissed, as->BytesBypassed,
670 );
671 bool suc = false;
672 if (len < 4096)
673 {
674 suc = m_gstream->Insert(buf, len + 1);
675 }
676 if ( ! suc)
677 {
678 TRACE(Error, "Failed g-stream insertion of file_close record, len=" << len);
679 }
680 }
681
682 delete f;
683 }
684}
685
686bool Cache::IsFileActiveOrPurgeProtected(const std::string& path) const
687{
688 XrdSysCondVarHelper lock(&m_active_cond);
689
690 return m_active.find(path) != m_active.end() ||
691 m_purge_delay_set.find(path) != m_purge_delay_set.end();
692}
693
695{
696 XrdSysCondVarHelper lock(&m_active_cond);
697 m_purge_delay_set.clear();
698}
699
700//==============================================================================
701//=== PREFETCH
702//==============================================================================
703
705{
706 // Can be called with other locks held.
707
708 if ( ! m_prefetch_enabled)
709 {
710 return;
711 }
712
713 m_prefetch_condVar.Lock();
714 m_prefetchList.push_back(file);
715 m_prefetch_condVar.Signal();
716 m_prefetch_condVar.UnLock();
717}
718
719
721{
722 // Can be called with other locks held.
723
724 if ( ! m_prefetch_enabled)
725 {
726 return;
727 }
728
729 m_prefetch_condVar.Lock();
730 for (PrefetchList::iterator it = m_prefetchList.begin(); it != m_prefetchList.end(); ++it)
731 {
732 if (*it == file)
733 {
734 m_prefetchList.erase(it);
735 break;
736 }
737 }
738 m_prefetch_condVar.UnLock();
739}
740
741
743{
744 m_prefetch_condVar.Lock();
745 while (m_prefetchList.empty())
746 {
747 m_prefetch_condVar.Wait();
748 }
749
750 // std::sort(m_prefetchList.begin(), m_prefetchList.end(), myobject);
751
752 size_t l = m_prefetchList.size();
753 int idx = rand() % l;
754 File* f = m_prefetchList[idx];
755
756 m_prefetch_condVar.UnLock();
757 return f;
758}
759
760
762{
763 const long long limit_RAM = m_configuration.m_RamAbsAvailable * 7 / 10;
764
765 while (true)
766 {
767 m_RAM_mutex.Lock();
768 bool doPrefetch = (m_RAM_used < limit_RAM);
769 m_RAM_mutex.UnLock();
770
771 if (doPrefetch)
772 {
774 f->Prefetch();
775 }
776 else
777 {
779 }
780 }
781}
782
783
784//==============================================================================
785//=== Virtuals from XrdOucCache
786//==============================================================================
787
788//------------------------------------------------------------------------------
802
803int Cache::LocalFilePath(const char *curl, char *buff, int blen,
804 LFP_Reason why, bool forall)
805{
806 static const mode_t groupReadable = S_IRUSR | S_IWUSR | S_IRGRP;
807 static const mode_t worldReadable = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;
808 static const char *lfpReason[] = { "ForAccess", "ForInfo", "ForPath" };
809
810 TRACE(Debug, "LocalFilePath '" << curl << "', why=" << lfpReason[why]);
811
812 if (buff && blen > 0) buff[0] = 0;
813
814 XrdCl::URL url(curl);
815 std::string f_name = url.GetPath();
816 std::string i_name = f_name + Info::s_infoExtension;
817
818 if (why == ForPath)
819 {
820 int ret = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
821 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> " << ret);
822 return ret;
823 }
824
825 {
826 XrdSysCondVarHelper lock(&m_active_cond);
827 m_purge_delay_set.insert(f_name);
828 }
829
830 struct stat sbuff, sbuff2;
831 if (m_oss->Stat(f_name.c_str(), &sbuff) == XrdOssOK &&
832 m_oss->Stat(i_name.c_str(), &sbuff2) == XrdOssOK)
833 {
834 if (S_ISDIR(sbuff.st_mode))
835 {
836 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> EISDIR");
837 return -EISDIR;
838 }
839 else
840 {
841 bool read_ok = false;
842 bool is_complete = false;
843
844 // Lock and check if the file is active. If NOT, keep the lock
845 // and add dummy access after successful reading of info file.
846 // If it IS active, just release the lock, this ongoing access will
847 // assure the file continues to exist.
848
849 // XXXX How can I just loop over the cinfo file when active?
850 // Can I not get is_complete from the existing file?
851 // Do I still want to inject access record?
852 // Oh, it writes only if not active .... still let's try to use existing File.
853
854 m_active_cond.Lock();
855
856 bool is_active = m_active.find(f_name) != m_active.end();
857
858 if (is_active) m_active_cond.UnLock();
859
860 XrdOssDF* infoFile = m_oss->newFile(m_configuration.m_username.c_str());
861 XrdOucEnv myEnv;
862 int res = infoFile->Open(i_name.c_str(), O_RDWR, 0600, myEnv);
863 if (res >= 0)
864 {
865 Info info(m_trace, 0);
866 if (info.Read(infoFile, i_name.c_str()))
867 {
868 read_ok = true;
869
870 is_complete = info.IsComplete();
871
872 // Add full-size access if reason is for access.
873 if ( ! is_active && is_complete && why == ForAccess)
874 {
875 info.WriteIOStatSingle(info.GetFileSize());
876 info.Write(infoFile, i_name.c_str());
877 }
878 }
879 infoFile->Close();
880 }
881 delete infoFile;
882
883 if ( ! is_active) m_active_cond.UnLock();
884
885 if (read_ok)
886 {
887 if ((is_complete || why == ForInfo) && buff != 0)
888 {
889 int res2 = m_oss->Lfn2Pfn(f_name.c_str(), buff, blen);
890 if (res2 < 0)
891 return res2;
892
893 // Normally, files are owned by us but when direct cache access
894 // is wanted and possible, make sure the file is world readable.
895 if (why == ForAccess)
896 {mode_t mode = (forall ? worldReadable : groupReadable);
897 if (((sbuff.st_mode & worldReadable) != mode)
898 && (m_oss->Chmod(f_name.c_str(), mode) != XrdOssOK))
899 {is_complete = false;
900 *buff = 0;
901 }
902 }
903 }
904
905 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] <<
906 (is_complete ? " -> FILE_COMPLETE_IN_CACHE" : " -> EREMOTE"));
907
908 return is_complete ? 0 : -EREMOTE;
909 }
910 }
911 }
912
913 TRACE(Info, "LocalFilePath '" << curl << "', why=" << lfpReason[why] << " -> ENOENT");
914 return -ENOENT;
915}
916
917//______________________________________________________________________________
918// If supported, write Cache-Control as xattr to cinfo file.
919// One can use file descriptor or full path interchangeably
920//------------------------------------------------------------------------------
921void Cache::WriteCacheControlXAttr(int cinfo_fd, const char* path, const std::string& cc)
922{
923 if (m_metaXattr) {
924 int res = XrdSysXAttrActive->Set("pfc.cache-control", cc.c_str(), cc.size(), path, cinfo_fd, 0);
925 if (res != 0) {
926 TRACE(Error, "WritecacheControlXAttr error setting xattr " << res);
927 }
928 }
929}
930
931//______________________________________________________________________________
932// If supported, write file_size as xattr to cinfo file.
933//------------------------------------------------------------------------------
934void Cache::WriteFileSizeXAttr(int cinfo_fd, long long file_size)
935{
936 if (m_metaXattr) {
937 int res = XrdSysXAttrActive->Set("pfc.fsize", &file_size, sizeof(long long), 0, cinfo_fd, 0);
938 if (res != 0) {
939 TRACE(Debug, "WriteFileSizeXAttr error setting xattr " << res);
940 }
941 }
942}
943
944//______________________________________________________________________________
945// Determine full size of the data file from the corresponding cinfo-file name.
946// Attempts to read xattr first and falls back to reading of the cinfo file.
947// Returns -error on failure.
948//------------------------------------------------------------------------------
949long long Cache::DetermineFullFileSize(const std::string &cinfo_fname)
950{
951 if (m_metaXattr) {
952 char pfn[4096];
953 m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
954 long long fsize = -1ll;
955 int res = XrdSysXAttrActive->Get("pfc.fsize", &fsize, sizeof(long long), pfn);
956 if (res == sizeof(long long))
957 {
958 return fsize;
959 }
960 else
961 {
962 TRACE(Debug, "DetermineFullFileSize error getting xattr " << res);
963 }
964 }
965
966 XrdOssDF *infoFile = m_oss->newFile(m_configuration.m_username.c_str());
967 XrdOucEnv env;
968 long long ret;
969 int res = infoFile->Open(cinfo_fname.c_str(), O_RDONLY, 0600, env);
970 if (res < 0) {
971 ret = res;
972 } else {
973 Info info(m_trace, 0);
974 if ( ! info.Read(infoFile, cinfo_fname.c_str())) {
975 ret = -EBADF;
976 } else {
977 ret = info.GetFileSize();
978 }
979 infoFile->Close();
980 }
981 delete infoFile;
982 return ret;
983}
984
985//______________________________________________________________________________
986// Get cache control attributes from the corresponding cinfo-file name.
987// Returns -error on failure.
988//------------------------------------------------------------------------------
989int Cache::GetCacheControlXAttr(const std::string &cinfo_fname, std::string& ival) const
990{
991 if (m_metaXattr) {
992
993 char pfn[4096];
994 m_oss->Lfn2Pfn(cinfo_fname.c_str(), pfn, 4096);
995
996 char cc[512];
997 int res = XrdSysXAttrActive->Get("pfc.cache-control", &cc, 512, pfn, -1);
998 if (res > 0)
999 {
1000 std::string tmp(cc, res);
1001 ival = tmp;
1002 }
1003 return res;
1004 }
1005 return 0;
1006}
1007
1008//______________________________________________________________________________
1009// Get cache control attributes from the corresponding cinfo-file name.
1010// Returns -error on failure.
1011//------------------------------------------------------------------------------
1012int Cache::GetCacheControlXAttr(int fd, std::string& ival) const
1013{
1014 if (m_metaXattr) {
1015 char cc[512];
1016 int res = XrdSysXAttrActive->Get("pfc.cache-control", &cc, 512, nullptr, fd);
1017 if (res > 0)
1018 {
1019 ival = std::string(cc, res);
1020 return res;
1021 }
1022 }
1023 return 0;
1024}
1025
1026//______________________________________________________________________________
1027// Calculate if the file is to be considered cached for the purposes of
1028// only-if-cached and setting of atime of the Stat() calls.
1029// Returns true if the file is to be conidered cached.
1030//------------------------------------------------------------------------------
1031bool Cache::DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
1032{
1033 if (file_size == 0 || bytes_on_disk >= file_size)
1034 return true;
1035
1036 double frac_on_disk = (double) bytes_on_disk / file_size;
1037
1038 if (file_size <= m_configuration.m_onlyIfCachedMinSize)
1039 {
1040 if (frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
1041 return true;
1042 }
1043 else
1044 {
1045 if (bytes_on_disk >= m_configuration.m_onlyIfCachedMinSize &&
1046 frac_on_disk >= m_configuration.m_onlyIfCachedMinFrac)
1047 return true;
1048 }
1049 return false;
1050}
1051
1052//______________________________________________________________________________
1053// Check if the file is cached including m_onlyIfCachedMinSize and m_onlyIfCachedMinFrac
1054// pfc configuration parameters. The logic of accessing the Info file is the same
1055// as in Cache::LocalFilePath.
1063//------------------------------------------------------------------------------
1064int Cache::ConsiderCached(const char *curl)
1065{
1066 static const char* tpfx = "ConsiderCached ";
1067
1068 TRACE(Debug, tpfx << curl);
1069
1070 XrdCl::URL url(curl);
1071 std::string f_name = url.GetPath();
1072
1073 File *file = nullptr;
1074 {
1075 XrdSysCondVarHelper lock(&m_active_cond);
1076 auto it = m_active.find(f_name);
1077 if (it != m_active.end()) {
1078 file = it->second;
1079 // If the file-open is in progress, `file` is a nullptr
1080 // so we cannot increase the reference count. For now,
1081 // simply treat it as if the file open doesn't exist instead
1082 // of trying to wait and see if it succeeds.
1083 if (file) {
1084 inc_ref_cnt(file, false, false);
1085 }
1086 }
1087 }
1088 if (file) {
1089 struct stat sbuff;
1090 int res = file->Fstat(sbuff);
1091 dec_ref_cnt(file, false);
1092 if (res)
1093 return res;
1094 // DecideIfConsideredCached() already called in File::Fstat().
1095 return sbuff.st_atime > 0 ? 0 : -EREMOTE;
1096 }
1097
1098 struct stat sbuff;
1099 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1100 if (res != XrdOssOK) {
1101 TRACE(Debug, tpfx << curl << " -> " << res);
1102 return res;
1103 }
1104 if (S_ISDIR(sbuff.st_mode))
1105 {
1106 TRACE(Debug, tpfx << curl << " -> EISDIR");
1107 return -EISDIR;
1108 }
1109
1110 long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1111 if (file_size < 0) {
1112 TRACE(Debug, tpfx << curl << " -> " << file_size);
1113 return (int) file_size;
1114 }
1115 bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1116
1117 return is_cached ? 0 : -EREMOTE;
1118}
1119
1120//______________________________________________________________________________
1128//------------------------------------------------------------------------------
1129
1130int Cache::Prepare(const char *curl, int oflags, mode_t mode)
1131{
1132 XrdCl::URL url(curl);
1133 std::string f_name = url.GetPath();
1134 std::string i_name = f_name + Info::s_infoExtension;
1135
1136 // Do not allow write access.
1137 if ((oflags & O_ACCMODE) != O_RDONLY)
1138 {
1139 if (Cache::GetInstance().RefConfiguration().m_write_through)
1140 {
1141 return 0;
1142 }
1143 TRACE(Warning, "Prepare write access requested on file " << f_name << ". Denying access.");
1144 return -EROFS;
1145 }
1146
1147 // Intercept xrdpfc_command requests.
1148 if (m_configuration.m_allow_xrdpfc_command && strncmp("/xrdpfc_command/", f_name.c_str(), 16) == 0)
1149 {
1150 // Schedule a job to process command request.
1151 {
1152 CommandExecutor *ce = new CommandExecutor(f_name, "CommandExecutor");
1153
1154 schedP->Schedule(ce);
1155 }
1156
1157 return -EAGAIN;
1158 }
1159
1160 {
1161 XrdSysCondVarHelper lock(&m_active_cond);
1162 m_purge_delay_set.insert(f_name);
1163 }
1164
1165 struct stat sbuff;
1166 if (m_oss->Stat(i_name.c_str(), &sbuff) == XrdOssOK)
1167 {
1168
1169 if (m_configuration.m_httpcc && !is_http_cache_valid(f_name, i_name, url))
1170 {
1171 TRACE(Info, "Http cache not valid " << f_name);
1172 UnlinkFile(f_name, false);
1173 return 0;
1174 }
1175
1176 TRACE(Dump, "Prepare defer open " << f_name);
1177 return 1;
1178 }
1179 else
1180 {
1181 return 0;
1182 }
1183}
1184
1185//______________________________________________________________________________
1186// virtual method of XrdOucCache.
1191//------------------------------------------------------------------------------
1192
1193int Cache::Stat(const char *curl, struct stat &sbuff)
1194{
1195 const char *tpfx = "Stat ";
1196
1197 XrdCl::URL url(curl);
1198 std::string f_name = url.GetPath();
1199
1200 File *file = nullptr;
1201 {
1202 XrdSysCondVarHelper lock(&m_active_cond);
1203 auto it = m_active.find(f_name);
1204 if (it != m_active.end()) {
1205 file = it->second;
1206 // If `file` is nullptr, the file-open is in progress; instead
1207 // of waiting for the file-open to finish, simply treat it as if
1208 // the file-open doesn't exist.
1209 if (file) {
1210 inc_ref_cnt(file, false, false);
1211 }
1212 }
1213 }
1214 if (file) {
1215 int res = file->Fstat(sbuff);
1216 dec_ref_cnt(file, false);
1217 TRACE(Debug, tpfx << "from active file " << curl << " -> " << res);
1218 return res;
1219 }
1220
1221 int res = m_oss->Stat(f_name.c_str(), &sbuff);
1222 if (res != XrdOssOK) {
1223 TRACE(Debug, tpfx << curl << " -> " << res);
1224 return 1; // res; -- for only-if-cached
1225 }
1226 if (S_ISDIR(sbuff.st_mode))
1227 {
1228 TRACE(Debug, tpfx << curl << " -> EISDIR");
1229 return -EISDIR;
1230 }
1231
1232 long long file_size = DetermineFullFileSize(f_name + Info::s_infoExtension);
1233 if (file_size < 0) {
1234 TRACE(Debug, tpfx << curl << " -> " << file_size);
1235 return 1; // (int) file_size; -- for only-if-cached
1236 }
1237 sbuff.st_size = file_size;
1238 bool is_cached = DecideIfConsideredCached(file_size, sbuff.st_blocks * 512ll);
1239 if ( ! is_cached)
1240 sbuff.st_atime = 0;
1241
1242 TRACE(Debug, tpfx << "from disk " << curl << " -> " << res);
1243
1244 return 0;
1245}
1246
1247//______________________________________________________________________________
1248// virtual method of XrdOucCache.
1252//------------------------------------------------------------------------------
1253
1254int Cache::Unlink(const char *curl)
1255{
1256 XrdCl::URL url(curl);
1257 std::string f_name = url.GetPath();
1258
1259 // printf("Unlink url=%s\n\t fname=%s\n", curl, f_name.c_str());
1260
1261 return UnlinkFile(f_name, false);
1262}
1263
1264int Cache::UnlinkFile(const std::string& f_name, bool fail_if_open)
1265{
1266 static const char* trc_pfx = "UnlinkFile ";
1267 ActiveMap_i it;
1268 File *file = 0;
1269 long long st_blocks_to_purge = 0;
1270 {
1271 XrdSysCondVarHelper lock(&m_active_cond);
1272
1273 it = m_active.find(f_name);
1274
1275 if (it != m_active.end())
1276 {
1277 if (fail_if_open)
1278 {
1279 TRACE(Info, trc_pfx << f_name << ", file currently open and force not requested - denying request");
1280 return -EBUSY;
1281 }
1282
1283 // Null File* in m_active map means an operation is ongoing, probably
1284 // Attach() with possible File::Open(). Ask for retry.
1285 if (it->second == 0)
1286 {
1287 TRACE(Info, trc_pfx << f_name << ", an operation on this file is ongoing - denying request");
1288 return -EAGAIN;
1289 }
1290
1291 file = it->second;
1292 st_blocks_to_purge = file->initiate_emergency_shutdown();
1293 it->second = 0;
1294 }
1295 else
1296 {
1297 it = m_active.insert(std::make_pair(f_name, (File*) 0)).first;
1298 }
1299 }
1300
1301 if (file) {
1303 } else {
1304 struct stat f_stat;
1305 if (m_oss->Stat(f_name.c_str(), &f_stat) == XrdOssOK)
1306 st_blocks_to_purge = f_stat.st_blocks;
1307 }
1308
1309 std::string i_name = f_name + Info::s_infoExtension;
1310
1311 // Unlink file & cinfo
1312 int f_ret = m_oss->Unlink(f_name.c_str());
1313 int i_ret = m_oss->Unlink(i_name.c_str());
1314
1315 if (st_blocks_to_purge)
1316 m_res_mon->register_file_purge(f_name, st_blocks_to_purge);
1317
1318 TRACE(Debug, trc_pfx << f_name << ", f_ret=" << f_ret << ", i_ret=" << i_ret);
1319
1320 {
1321 XrdSysCondVarHelper lock(&m_active_cond);
1322 m_active.erase(it);
1323 m_active_cond.Broadcast();
1324 }
1325
1326 return std::min(f_ret, i_ret);
1327}
1328
1329//---------------------------------------------------------------------
1333//---------------------------------------------------------------------
1334bool Cache::is_http_cache_valid(const std::string& f_name,const std::string& i_name, XrdCl::URL& url)
1335{
1336 std::string icc;
1337
1338 // return true if nothing is written in cinfo extended file attributes
1339 if (GetCacheControlXAttr(i_name, icc) <= 0)
1340 return true;
1341
1342 bool ccIsValid = true;
1343 using namespace nlohmann;
1344 json cc_json = json::parse(icc);
1345
1346 bool mustRevalidate = cc_json.contains("revalidate") && (cc_json["revalidate"] == true);
1347 bool hasExpired = false;
1348 if (cc_json.contains("expire"))
1349 {
1350 time_t current_time;
1351 current_time = time(NULL);
1352 if (current_time > cc_json["expire"])
1353 hasExpired = true;
1354 }
1355
1356 if (cc_json.contains("ETag") && (mustRevalidate || hasExpired))
1357 {
1358 // Compare cinfo etag and the etag from file system query response
1359 // Add XrdCl:URL parameter as additional XrdOucCacheOp::Code::QFSinfo sub-command
1360 url.SetParam("code", "head");
1361 std::string response;
1362 std::string fsctlarg = url.GetURL();
1363 int st = XrdPosixExtra::FSctl(XrdOucCacheOp::Code::QFSinfo, fsctlarg, response, false, m_configuration.m_qfsredir);
1364
1365 if (st >= 0)
1366 {
1367 // client response keeps the \0 at the end of the string
1368 std::string etag = response.substr(0, response.find('\0'));
1369 std::string jval = cc_json["ETag"].get<std::string>();
1370 ccIsValid = (etag == jval);
1371
1372 TRACE(Info, "Prepare " << i_name << ", ETag valid res: " << ccIsValid);
1373
1374 // update expiration time if Etag is valid
1375 if (cc_json.contains("max-age"))
1376 {
1377 time_t ma = cc_json["max-age"];
1378 cc_json["expire"] = ma + time(NULL);
1379 char pfn[4096];
1380 m_oss->Lfn2Pfn(i_name.c_str(), pfn, 4096);
1381 WriteCacheControlXAttr(-1, pfn, cc_json.dump());
1382 }
1383 }
1384 else
1385 {
1386 // Message has a status because we are in the block condition for cache-control xattr
1387 TRACE(Error, "Prepare() XrdCl::FileSystem::Query failed " << f_name.c_str());
1388 ccIsValid = false;
1389 }
1390 }
1391
1392 return ccIsValid;
1393}
int DoIt(int argpnt, int argc, char **argv, bool singleshot)
#define TRACE_Debug
nlohmann::json json
#define XrdOssOK
Definition XrdOss.hh:54
std::string obfuscateAuth(const std::string &input)
#define TRACE_Dump
#define TRACE_PC(act, pre_code, x)
#define TRACE_INT(act, x)
void * ProcessWriteTaskThread(void *)
Definition XrdPfc.cc:64
XrdSysXAttr * XrdSysXAttrActive
void * ResourceMonitorThread(void *)
Definition XrdPfc.cc:58
void * PrefetchThread(void *)
Definition XrdPfc.cc:70
XrdOucCache * XrdOucGetCache(XrdSysLogger *logger, const char *config_filename, const char *parameters, XrdOucEnv *env)
Definition XrdPfc.cc:80
#define stat(a, b)
Definition XrdPosix.hh:101
bool Debug
#define TRACE(act, x)
Definition XrdTrace.hh:63
URL representation.
Definition XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:217
std::string GetURL() const
Get the URL.
Definition XrdClURL.hh:86
void SetParam(const std::string &name, const std::string &value)
Set a single param.
Definition XrdClURL.hh:283
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:228
virtual int Lfn2Pfn(const char *Path, char *buff, int blen)
Definition XrdOss.hh:954
virtual int Fstat(struct stat &sbuff)
virtual const char * Path()=0
virtual const char * Location(bool refresh=false)
static const int optRW
File is read/write (o/w read/only).
XrdOucCache(const char *ctype)
void * GetPtr(const char *varname)
Definition XrdOucEnv.cc:263
void PutPtr(const char *varname, void *value)
Definition XrdOucEnv.cc:298
int get_size() const
long long m_offset
File * get_file() const
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:169
long long DetermineFullFileSize(const std::string &cinfo_fname)
Definition XrdPfc.cc:949
void FileSyncDone(File *, bool high_debug)
Definition XrdPfc.cc:551
File * GetFile(const std::string &, IO *, long long off=0, long long filesize=0)
Definition XrdPfc.cc:397
static const Configuration & Conf()
Definition XrdPfc.cc:138
virtual int LocalFilePath(const char *url, char *buff=0, int blen=0, LFP_Reason why=ForAccess, bool forall=false)
Definition XrdPfc.cc:803
virtual int Stat(const char *url, struct stat &sbuff)
Definition XrdPfc.cc:1193
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:225
static ResourceMonitor & ResMon()
Definition XrdPfc.cc:139
bool IsFileActiveOrPurgeProtected(const std::string &) const
Definition XrdPfc.cc:686
void ClearPurgeProtectedSet()
Definition XrdPfc.cc:694
void ReleaseRAM(char *buf, long long size)
Definition XrdPfc.cc:379
virtual int ConsiderCached(const char *url)
Definition XrdPfc.cc:1064
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:136
bool Config(const char *config_filename, const char *parameters, XrdOucEnv *env)
Parse configuration file.
int GetCacheControlXAttr(const std::string &cinfo_fname, std::string &res) const
Definition XrdPfc.cc:989
void DeRegisterPrefetchFile(File *)
Definition XrdPfc.cc:720
void ExecuteCommandUrl(const std::string &command_url)
void RegisterPrefetchFile(File *)
Definition XrdPfc.cc:704
void WriteFileSizeXAttr(int cinfo_fd, long long file_size)
Definition XrdPfc.cc:934
void Prefetch()
Definition XrdPfc.cc:761
void ReleaseFile(File *, IO *)
Definition XrdPfc.cc:480
void AddWriteTask(Block *b, bool from_read)
Add downloaded block in write queue.
Definition XrdPfc.cc:229
Cache(XrdSysLogger *logger, XrdOucEnv *env)
Constructor.
Definition XrdPfc.cc:162
bool Decide(XrdOucCacheIO *)
Makes decision if the original XrdOucCacheIO should be cached.
Definition XrdPfc.cc:141
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1264
static XrdScheduler * schedP
Definition XrdPfc.hh:312
File * GetNextFileToPrefetch()
Definition XrdPfc.cc:742
long long WritesSinceLastCall()
Definition XrdPfc.cc:328
void ProcessWriteTasks()
Separate task which writes blocks from ram to disk.
Definition XrdPfc.cc:281
virtual int Unlink(const char *url)
Definition XrdPfc.cc:1254
void WriteCacheControlXAttr(int cinfo_fd, const char *path, const std::string &cc)
Definition XrdPfc.cc:921
void RemoveWriteQEntriesFor(File *f)
Remove blocks from write queue which belong to given prefetch. This method is used at the time of Fil...
Definition XrdPfc.cc:248
virtual XrdOucCacheIO * Attach(XrdOucCacheIO *, int Options=0)
Definition XrdPfc.cc:183
static const Cache & TheOne()
Definition XrdPfc.cc:137
char * RequestRAM(long long size)
Definition XrdPfc.cc:339
virtual int Prepare(const char *url, int oflags, mode_t mode)
Definition XrdPfc.cc:1130
bool DecideIfConsideredCached(long long file_size, long long bytes_on_disk)
Definition XrdPfc.cc:1031
static Cache & CreateInstance(XrdSysLogger *logger, XrdOucEnv *env)
Singleton creation.
Definition XrdPfc.cc:129
bool is_prefetch_enabled() const
Definition XrdPfc.hh:317
Base class for selecting which files should be cached.
virtual bool Decide(const std::string &, XrdOss &) const =0
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
void WriteBlockToDisk(Block *b)
int GetNBlocks() const
std::string GetRemoteLocations() const
size_t GetAccessCnt() const
int Fstat(struct stat &sbuff)
void AddIO(IO *io)
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
long long GetPrefetchedBytes() const
int GetBlockSize() const
int GetNDownloadedBlocks() const
const Info::AStat * GetLastAccessStats() const
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
int inc_ref_cnt()
const Stats & RefStats() const
void Sync()
Sync file cache inf o and output data with disk.
int dec_ref_cnt()
int get_ref_cnt()
long long initiate_emergency_shutdown()
long long GetFileSize() const
const std::string & GetLocalPath() const
void RemoveIO(IO *io)
bool is_in_emergency_shutdown()
Downloads original file into multiple files, chunked into blocks. Only blocks that are asked for are ...
Downloads original file into a single file on local disk. Handles read requests as they come along.
bool HasFile() const
Check if File was opened successfully.
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:16
XrdOucCacheIO * GetInput()
Definition XrdPfcIO.cc:31
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:41
static const char * s_infoExtension
void WriteIOStatSingle(long long bytes_disk)
Write single open/close time for given bytes read from disk.
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
bool IsComplete() const
Get complete status.
long long GetFileSize() const
Get file size.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
int m_NCksumErrors
number of checksum errors while getting data from remote
long long m_BytesWritten
number of bytes written to disk
static int FSctl(XrdOucCacheOp::Code opc, const std::string &args, std::string &resp, bool viaCache=false, bool viaRedir=false)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static void Wait(int milliseconds)
XrdPosixStats Stats
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:66
bool m_qfsredir
redirect file system query to the origin
Definition XrdPfc.hh:140
long long BytesHit
read from cache
Definition XrdPfcInfo.hh:64
long long BytesBypassed
read from remote and dropped
Definition XrdPfcInfo.hh:66
time_t DetachTime
close time
Definition XrdPfcInfo.hh:59
long long BytesMissed
read from remote and cached
Definition XrdPfcInfo.hh:65
time_t AttachTime
open time
Definition XrdPfcInfo.hh:58