XRootD
Loading...
Searching...
No Matches
XrdClClassicCopyJob.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
27#include "XrdCl/XrdClLog.hh"
29#include "XrdCl/XrdClFile.hh"
30#include "XrdCl/XrdClMonitor.hh"
31#include "XrdCl/XrdClUtils.hh"
33#include "XrdCks/XrdCksCalc.hh"
40#include "XrdClXCpCtx.hh"
42#include "XrdSys/XrdSysE2T.hh"
44
45#include <memory>
46#include <mutex>
47#include <queue>
48#include <algorithm>
49#include <chrono>
50#include <thread>
51#include <vector>
52
53#include <sys/types.h>
54#include <sys/stat.h>
55#include <fcntl.h>
56#include <cerrno>
57#include <unistd.h>
58
59#if __cplusplus < 201103L
60#include <ctime>
61#endif
62
63namespace
64{
65 //----------------------------------------------------------------------------
67 //----------------------------------------------------------------------------
68 template<typename U = std::ratio<1, 1>>
69 class mytimer_t
70 {
71 public:
72 mytimer_t() : start( clock_t::now() ){ }
73 void reset(){ start = clock_t::now(); }
74 time_t elapsed() const
75 {
76 return std::chrono::duration_cast<unit_t>( clock_t::now() - start ).count();
77 }
78 private:
79 typedef std::chrono::high_resolution_clock clock_t;
80 typedef std::chrono::duration<int, U> unit_t;
81 std::chrono::time_point<clock_t> start;
82 };
83
84 using timer_sec_t = mytimer_t<>;
85 using timer_nsec_t = mytimer_t<std::nano>;
86
87
88 inline XrdCl::XRootDStatus Translate( std::vector<XrdCl::XAttr> &in,
89 std::vector<XrdCl::xattr_t> &out )
90 {
91 std::vector<XrdCl::xattr_t> ret;
92 ret.reserve( in.size() );
93 std::vector<XrdCl::XAttr>::iterator itr = in.begin();
94 for( ; itr != in.end() ; ++itr )
95 {
96 if( !itr->status.IsOK() ) return itr->status;
97 XrdCl::xattr_t xa( itr->name, itr->value );
98 ret.push_back( std::move( xa ) );
99 }
100 out.swap( ret );
101 return XrdCl::XRootDStatus();
102 }
103
104 //----------------------------------------------------------------------------
106 //----------------------------------------------------------------------------
108 std::vector<XrdCl::xattr_t> &xattrs )
109 {
110 std::vector<XrdCl::XAttr> rsp;
111 XrdCl::XRootDStatus st = file.ListXAttr( rsp );
112 if( !st.IsOK() ) return st;
113 return Translate( rsp, xattrs );
114 }
115
116 //----------------------------------------------------------------------------
118 //----------------------------------------------------------------------------
119 inline XrdCl::XRootDStatus GetXAttr( const std::string &url,
120 std::vector<XrdCl::xattr_t> &xattrs )
121 {
122 XrdCl::URL u( url );
123 XrdCl::FileSystem fs( u );
124 std::vector<XrdCl::XAttr> rsp;
125 XrdCl::XRootDStatus st = fs.ListXAttr( u.GetPath(), rsp );
126 if( !st.IsOK() ) return st;
127 return Translate( rsp, xattrs );
128 }
129
131 const std::vector<XrdCl::xattr_t> &xattrs )
132 {
133 std::vector<XrdCl::XAttrStatus> rsp;
134 file.SetXAttr( xattrs, rsp );
135 std::vector<XrdCl::XAttrStatus>::iterator itr = rsp.begin();
136 for( ; itr != rsp.end() ; ++itr )
137 if( !itr->status.IsOK() ) return itr->status;
138 return XrdCl::XRootDStatus();
139 }
140
141 //----------------------------------------------------------------------------
143 //----------------------------------------------------------------------------
144 class Source
145 {
146 public:
147 //------------------------------------------------------------------------
148 // Destructor
149 //------------------------------------------------------------------------
150 Source( const std::string &checkSumType = "",
151 const std::vector<std::string> &addcks = std::vector<std::string>() ) :
152 pCkSumHelper( 0 ),
153 pContinue( false )
154 {
155 if( !checkSumType.empty() )
156 pCkSumHelper = new XrdCl::CheckSumHelper( "source", checkSumType );
157
158 for( auto &type : addcks )
159 pAddCksHelpers.push_back( new XrdCl::CheckSumHelper( "source", type ) );
160 };
161
162 virtual ~Source()
163 {
164 delete pCkSumHelper;
165 for( auto ptr : pAddCksHelpers )
166 delete ptr;
167 }
168
169 //------------------------------------------------------------------------
171 //------------------------------------------------------------------------
172 virtual XrdCl::XRootDStatus Initialize() = 0;
173
174 //------------------------------------------------------------------------
176 //------------------------------------------------------------------------
177 virtual int64_t GetSize() = 0;
178
179 //------------------------------------------------------------------------
181 //------------------------------------------------------------------------
182 virtual XrdCl::XRootDStatus StartAt( uint64_t offset ) = 0;
183
184 //------------------------------------------------------------------------
191 //------------------------------------------------------------------------
192 virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci ) = 0;
193
194 //------------------------------------------------------------------------
196 //------------------------------------------------------------------------
197 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
198 std::string &checkSumType ) = 0;
199
200 //------------------------------------------------------------------------
202 //------------------------------------------------------------------------
203 virtual std::vector<std::string> GetAddCks() = 0;
204
205 //------------------------------------------------------------------------
207 //------------------------------------------------------------------------
208 virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs ) = 0;
209
210 //------------------------------------------------------------------------
212 //------------------------------------------------------------------------
213 virtual XrdCl::XRootDStatus TryOtherServer()
214 {
215 return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errNotImplemented );
216 }
217
218 protected:
219
220 XrdCl::CheckSumHelper *pCkSumHelper;
221 std::vector<XrdCl::CheckSumHelper*> pAddCksHelpers;
222 bool pContinue;
223 };
224
225 //----------------------------------------------------------------------------
227 //----------------------------------------------------------------------------
228 class Destination
229 {
230 public:
231 //------------------------------------------------------------------------
233 //------------------------------------------------------------------------
234 Destination( const std::string &checkSumType = "" ):
235 pPosc( false ), pForce( false ), pCoerce( false ), pMakeDir( false ),
236 pContinue( false ), pCkSumHelper( 0 )
237 {
238 if( !checkSumType.empty() )
239 pCkSumHelper = new XrdCl::CheckSumHelper( "destination", checkSumType );
240 }
241
242 //------------------------------------------------------------------------
244 //------------------------------------------------------------------------
245 virtual ~Destination()
246 {
247 delete pCkSumHelper;
248 }
249
250 //------------------------------------------------------------------------
252 //------------------------------------------------------------------------
253 virtual XrdCl::XRootDStatus Initialize() = 0;
254
255 //------------------------------------------------------------------------
257 //------------------------------------------------------------------------
258 virtual XrdCl::XRootDStatus Finalize() = 0;
259
260 //------------------------------------------------------------------------
265 //------------------------------------------------------------------------
266 virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci ) = 0;
267
268 //------------------------------------------------------------------------
270 //------------------------------------------------------------------------
271 virtual XrdCl::XRootDStatus Flush() = 0;
272
273 //------------------------------------------------------------------------
275 //------------------------------------------------------------------------
276 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
277 std::string &checkSumType ) = 0;
278
279 //------------------------------------------------------------------------
281 //------------------------------------------------------------------------
282 virtual XrdCl::XRootDStatus SetXAttr( const std::vector<XrdCl::xattr_t> &xattrs ) = 0;
283
284 //------------------------------------------------------------------------
286 //------------------------------------------------------------------------
287 virtual int64_t GetSize() = 0;
288
289 //------------------------------------------------------------------------
291 //------------------------------------------------------------------------
292 void SetPOSC( bool posc )
293 {
294 pPosc = posc;
295 }
296
297 //------------------------------------------------------------------------
299 //------------------------------------------------------------------------
300 void SetForce( bool force )
301 {
302 pForce = force;
303 }
304
305 //------------------------------------------------------------------------
307 //------------------------------------------------------------------------
308 void SetContinue( bool continue_ )
309 {
310 pContinue = continue_;
311 }
312
313 //------------------------------------------------------------------------
315 //------------------------------------------------------------------------
316 void SetCoerce( bool coerce )
317 {
318 pCoerce = coerce;
319 }
320
321 //------------------------------------------------------------------------
323 //------------------------------------------------------------------------
324 void SetMakeDir( bool makedir )
325 {
326 pMakeDir = makedir;
327 }
328
329 //------------------------------------------------------------------------
331 //------------------------------------------------------------------------
332 virtual const std::string& GetLastURL() const
333 {
334 static const std::string empty;
335 return empty;
336 }
337
338 //------------------------------------------------------------------------
340 //------------------------------------------------------------------------
341 virtual const std::string& GetWrtRecoveryRedir() const
342 {
343 static const std::string empty;
344 return empty;
345 }
346
347 protected:
348 bool pPosc;
349 bool pForce;
350 bool pCoerce;
351 bool pMakeDir;
352 bool pContinue;
353
354 XrdCl::CheckSumHelper *pCkSumHelper;
355 };
356
357 //----------------------------------------------------------------------------
359 //----------------------------------------------------------------------------
360 class StdInSource: public Source
361 {
362 public:
363 //------------------------------------------------------------------------
365 //------------------------------------------------------------------------
366 StdInSource( const std::string &ckSumType, uint32_t chunkSize, const std::vector<std::string> &addcks ):
367 Source( ckSumType, addcks ),
368 pCurrentOffset(0),
369 pChunkSize( chunkSize )
370 {
371
372 }
373
374 //------------------------------------------------------------------------
376 //------------------------------------------------------------------------
377 virtual ~StdInSource()
378 {
379
380 }
381
382 //------------------------------------------------------------------------
384 //------------------------------------------------------------------------
385 virtual XrdCl::XRootDStatus Initialize()
386 {
387 if( pCkSumHelper )
388 {
389 auto st = pCkSumHelper->Initialize();
390 if( !st.IsOK() ) return st;
391 for( auto cksHelper : pAddCksHelpers )
392 {
393 st = cksHelper->Initialize();
394 if( !st.IsOK() ) return st;
395 }
396 }
397 return XrdCl::XRootDStatus();
398 }
399
400 //------------------------------------------------------------------------
402 //------------------------------------------------------------------------
403 virtual int64_t GetSize()
404 {
405 return -1;
406 }
407
408 //------------------------------------------------------------------------
410 //------------------------------------------------------------------------
411 virtual XrdCl::XRootDStatus StartAt( uint64_t )
412 {
413 return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errNotSupported, ENOTSUP,
414 "Cannot continue from stdin!" );
415 }
416
417 //------------------------------------------------------------------------
419 //------------------------------------------------------------------------
420 virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
421 {
422 using namespace XrdCl;
423 Log *log = DefaultEnv::GetLog();
424
425 uint32_t toRead = pChunkSize;
426 char *buffer = new char[toRead];
427
428 int64_t bytesRead = 0;
429 uint32_t offset = 0;
430 while( toRead )
431 {
432 int64_t bRead = read( 0, buffer+offset, toRead );
433 if( bRead == -1 )
434 {
435 log->Debug( UtilityMsg, "Unable to read from stdin: %s",
436 XrdSysE2T( errno ) );
437 delete [] buffer;
438 return XRootDStatus( stError, errOSError, errno );
439 }
440
441 if( bRead == 0 )
442 break;
443
444 bytesRead += bRead;
445 offset += bRead;
446 toRead -= bRead;
447 }
448
449 if( bytesRead == 0 )
450 {
451 delete [] buffer;
452 return XRootDStatus( stOK, suDone );
453 }
454
455 if( pCkSumHelper )
456 pCkSumHelper->Update( buffer, bytesRead );
457
458 for( auto cksHelper : pAddCksHelpers )
459 cksHelper->Update( buffer, bytesRead );
460
461 ci = XrdCl::PageInfo( pCurrentOffset, bytesRead, buffer );
462 pCurrentOffset += bytesRead;
463 return XRootDStatus( stOK, suContinue );
464 }
465
466 //------------------------------------------------------------------------
468 //------------------------------------------------------------------------
469 virtual XrdCl::XRootDStatus GetCheckSumImpl( XrdCl::CheckSumHelper *cksHelper,
470 std::string &checkSum,
471 std::string &checkSumType )
472 {
473 using namespace XrdCl;
474 if( cksHelper )
475 return cksHelper->GetCheckSum( checkSum, checkSumType );
476 return XRootDStatus( stError, errCheckSumError );
477 }
478
479 //------------------------------------------------------------------------
481 //------------------------------------------------------------------------
482 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
483 std::string &checkSumType )
484 {
485 return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
486 }
487
488 //------------------------------------------------------------------------
490 //------------------------------------------------------------------------
491 std::vector<std::string> GetAddCks()
492 {
493 std::vector<std::string> ret;
494 for( auto cksHelper : pAddCksHelpers )
495 {
496 std::string type = cksHelper->GetType();
497 std::string cks;
498 GetCheckSumImpl( cksHelper, cks, type );
499 ret.push_back( type + ":" + cks );
500 }
501 return ret;
502 }
503
504 //------------------------------------------------------------------------
506 //------------------------------------------------------------------------
507 virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
508 {
509 return XrdCl::XRootDStatus();
510 }
511
512 private:
513 StdInSource(const StdInSource &other);
514 StdInSource &operator = (const StdInSource &other);
515
516 uint64_t pCurrentOffset;
517 uint32_t pChunkSize;
518 };
519
520 //----------------------------------------------------------------------------
522 //----------------------------------------------------------------------------
523 class XRootDSource: public Source
524 {
525 struct CancellableJob : public XrdCl::Job
526 {
527 virtual void Cancel() = 0;
528
529 std::mutex mtx;
530 };
531
532 //----------------------------------------------------------------------------
533 // On-connect callback job, a lambda would be more elegant, but we still have
534 // to support SLC6
535 //----------------------------------------------------------------------------
536 template<typename READER>
537 struct OnConnJob : public CancellableJob
538 {
539 OnConnJob( XRootDSource *self, READER *reader ) : self( self ), reader( reader )
540 {
541 }
542
543 void Run( void* )
544 {
545 std::unique_lock<std::mutex> lck( mtx );
546 if( !self || !reader ) return;
547 // add new chunks to the queue
548 if( self->pNbConn < self->pMaxNbConn )
549 self->FillQueue( reader );
550 }
551
552 void Cancel()
553 {
554 std::unique_lock<std::mutex> lck( mtx );
555 self = 0;
556 reader = 0;
557 }
558
559 private:
560 XRootDSource *self;
561 READER *reader;
562
563 };
564
565 public:
566
567 //------------------------------------------------------------------------
569 //------------------------------------------------------------------------
570 XrdCl::XRootDStatus TryOtherServer()
571 {
572 return pFile->TryOtherServer();
573 }
574
575 //------------------------------------------------------------------------
577 //------------------------------------------------------------------------
578 XRootDSource( const XrdCl::URL *url,
579 uint32_t chunkSize,
580 uint8_t parallelChunks,
581 const std::string &ckSumType,
582 const std::vector<std::string> &addcks,
583 bool doserver ):
584 Source( ckSumType, addcks ),
585 pUrl( url ), pFile( new XrdCl::File() ), pSize( -1 ),
586 pCurrentOffset( 0 ), pChunkSize( chunkSize ),
587 pParallel( parallelChunks ),
588 pNbConn( 0 ), pUsePgRead( false ),
589 pDoServer( doserver )
590 {
592 XrdCl::DefaultEnv::GetEnv()->GetInt( "SubStreamsPerChannel", val );
593 pMaxNbConn = val - 1; // account for the control stream
594 }
595
596 //------------------------------------------------------------------------
598 //------------------------------------------------------------------------
599 virtual ~XRootDSource()
600 {
601 if( pDataConnCB )
602 pDataConnCB->Cancel();
603
604 CleanUpChunks();
605 if( pFile->IsOpen() )
606 XrdCl::XRootDStatus status = pFile->Close();
607 delete pFile;
608 }
609
610 //------------------------------------------------------------------------
612 //------------------------------------------------------------------------
613 virtual XrdCl::XRootDStatus Initialize()
614 {
615 using namespace XrdCl;
616 Log *log = DefaultEnv::GetLog();
617 log->Debug( UtilityMsg, "Opening %s for reading",
618 pUrl->GetObfuscatedURL().c_str() );
619
620 std::string value;
621 DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
622 pFile->SetProperty( "ReadRecovery", value );
623
624 XRootDStatus st = pFile->Open( pUrl->GetURL(), OpenFlags::Read );
625 if( !st.IsOK() )
626 return st;
627
628 StatInfo *statInfo;
629 st = pFile->Stat( false, statInfo );
630 if( !st.IsOK() )
631 return st;
632
633 pSize = statInfo->GetSize();
634 delete statInfo;
635
636 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper && !pContinue )
637 {
638 st = pCkSumHelper->Initialize();
639 if( !st.IsOK() ) return st;
640
641 for( auto cksHelper : pAddCksHelpers )
642 {
643 st = cksHelper->Initialize();
644 if( !st.IsOK() ) return st;
645 }
646 }
647
648 //----------------------------------------------------------------------
649 // Figere out the actual data server we are talking to
650 //----------------------------------------------------------------------
651 if( !pUrl->IsLocalFile() ||
652 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
653 {
654 pFile->GetProperty( "LastURL", pDataServer );
655 }
656
657
658 if( ( !pUrl->IsLocalFile() && !pFile->IsSecure() ) ||
659 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
660 {
661 //--------------------------------------------------------------------
662 // Decide whether we can use PgRead
663 //--------------------------------------------------------------------
665 XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
666 pUsePgRead = XrdCl::Utils::HasPgRW( pDataServer ) && ( val == 1 );
667 }
668
669 //----------------------------------------------------------------------
670 // Print the IPv4/IPv6 stack to the stderr if we are running in server
671 // mode
672 //----------------------------------------------------------------------
673 if( pDoServer && !pUrl->IsLocalFile() )
674 {
675 AnyObject obj;
676 DefaultEnv::GetPostMaster()->QueryTransport( pDataServer, StreamQuery::IpStack, obj );
677 std::string *ipstack = nullptr;
678 obj.Get( ipstack );
679 std::cerr << "!-!" << *ipstack << std::endl;
680 delete ipstack;
681 }
682
683 SetOnDataConnectHandler( pFile );
684
685 return XRootDStatus();
686 }
687
688 //------------------------------------------------------------------------
690 //------------------------------------------------------------------------
691 virtual int64_t GetSize()
692 {
693 return pSize;
694 }
695
696 //------------------------------------------------------------------------
698 //------------------------------------------------------------------------
699 virtual XrdCl::XRootDStatus StartAt( uint64_t offset )
700 {
701 pCurrentOffset = offset;
702 pContinue = true;
703 return XrdCl::XRootDStatus();
704 }
705
706 //------------------------------------------------------------------------
713 //------------------------------------------------------------------------
714 virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
715 {
716 return GetChunkImpl( pFile, ci );
717 }
718
719 //------------------------------------------------------------------------
721 //------------------------------------------------------------------------
722 virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
723 {
724 return ::GetXAttr( *pFile, xattrs );
725 }
726
727 //------------------------------------------------------------------------
728 // Clean up the chunks that are flying
729 //------------------------------------------------------------------------
730 void CleanUpChunks()
731 {
732 while( !pChunks.empty() )
733 {
734 ChunkHandler *ch = pChunks.front();
735 pChunks.pop();
736 ch->sem->Wait();
737 delete [] (char *)ch->chunk.GetBuffer();
738 delete ch;
739 }
740 }
741
742 //------------------------------------------------------------------------
743 // Get check sum
744 //------------------------------------------------------------------------
745 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
746 std::string &checkSumType )
747 {
748 return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
749 }
750
751 XrdCl::XRootDStatus GetCheckSumImpl( XrdCl::CheckSumHelper *cksHelper,
752 std::string &checkSum,
753 std::string &checkSumType )
754 {
755 if( pUrl->IsMetalink() )
756 {
757 XrdCl::RedirectorRegistry &registry = XrdCl::RedirectorRegistry::Instance();
758 XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
759 checkSum = redirector->GetCheckSum( checkSumType );
760 if( !checkSum.empty() ) return XrdCl::XRootDStatus();
761 }
762
763 if( pUrl->IsLocalFile() )
764 {
765 if( pContinue )
766 // in case of --continue option we have to calculate the checksum from scratch
767 return XrdCl::Utils::GetLocalCheckSum( checkSum, checkSumType, pUrl->GetPath() );
768
769 if( cksHelper )
770 return cksHelper->GetCheckSum( checkSum, checkSumType );
771
772 return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errCheckSumError );
773 }
774
775 std::string dataServer; pFile->GetProperty( "DataServer", dataServer );
776 std::string lastUrl; pFile->GetProperty( "LastURL", lastUrl );
777 return XrdCl::Utils::GetRemoteCheckSum( checkSum, checkSumType, XrdCl::URL( lastUrl ) );
778 }
779
780 //------------------------------------------------------------------------
782 //------------------------------------------------------------------------
783 std::vector<std::string> GetAddCks()
784 {
785 std::vector<std::string> ret;
786 for( auto cksHelper : pAddCksHelpers )
787 {
788 std::string type = cksHelper->GetType();
789 std::string cks;
790 GetCheckSumImpl( cksHelper, cks, type );
791 ret.push_back( cks );
792 }
793 return ret;
794 }
795
796 private:
797 XRootDSource(const XRootDSource &other);
798 XRootDSource &operator = (const XRootDSource &other);
799
800 protected:
801
802 //------------------------------------------------------------------------
803 // Fill the queue with in-the-fly read requests
804 //------------------------------------------------------------------------
805 template<typename READER>
806 inline void FillQueue( READER *reader )
807 {
808 //----------------------------------------------------------------------
809 // Get the number of connected streams
810 //----------------------------------------------------------------------
811 uint16_t parallel = pParallel;
812 if( pNbConn < pMaxNbConn )
813 {
815 NbConnectedStrm( pDataServer );
816 }
817 if( pNbConn ) parallel *= pNbConn;
818
819 while( pChunks.size() < parallel && pCurrentOffset < pSize )
820 {
821 uint64_t chunkSize = pChunkSize;
822 if( pCurrentOffset + chunkSize > (uint64_t)pSize )
823 chunkSize = pSize - pCurrentOffset;
824
825 char *buffer = new char[chunkSize];
826 ChunkHandler *ch = new ChunkHandler();
827 auto st = pUsePgRead
828 ? reader->PgRead( pCurrentOffset, chunkSize, buffer, ch )
829 : reader->Read( pCurrentOffset, chunkSize, buffer, ch );
830 pChunks.push( ch );
831 pCurrentOffset += chunkSize;
832 if( !st.IsOK() )
833 {
834 ch->status = st;
835 ch->sem->Post();
836 break;
837 }
838 }
839 }
840
841 //------------------------------------------------------------------------
842 // Set the on-connect handler for data streams
843 //------------------------------------------------------------------------
844 template<typename READER>
845 void SetOnDataConnectHandler( READER *reader )
846 {
847 // we need to create the object anyway as it contains our mutex now
848 pDataConnCB.reset( new OnConnJob<READER>( this, reader ) );
849
850 // check if it is a local file
851 if( pDataServer.empty() ) return;
852
853 XrdCl::DefaultEnv::GetPostMaster()->SetOnDataConnectHandler( pDataServer, pDataConnCB );
854 }
855
856 //------------------------------------------------------------------------
864 //------------------------------------------------------------------------
865 template<typename READER>
866 XrdCl::XRootDStatus GetChunkImpl( READER *reader, XrdCl::PageInfo &ci )
867 {
868 //----------------------------------------------------------------------
869 // Sanity check
870 //----------------------------------------------------------------------
871 using namespace XrdCl;
872 Log *log = DefaultEnv::GetLog();
873
874 //----------------------------------------------------------------------
875 // Fill the queue
876 //----------------------------------------------------------------------
877 std::unique_lock<std::mutex> lck( pDataConnCB->mtx );
878 FillQueue( reader );
879
880 //----------------------------------------------------------------------
881 // Pick up a chunk from the front and wait for status
882 //----------------------------------------------------------------------
883 if( pChunks.empty() )
884 return XRootDStatus( stOK, suDone );
885
886 std::unique_ptr<ChunkHandler> ch( pChunks.front() );
887 pChunks.pop();
888 lck.unlock();
889
890 ch->sem->Wait();
891
892 if( !ch->status.IsOK() )
893 {
894 log->Debug( UtilityMsg, "Unable read %d bytes at %llu from %s: %s",
895 ch->chunk.GetLength(), (unsigned long long) ch->chunk.GetOffset(),
896 pUrl->GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
897 delete [] (char *)ch->chunk.GetBuffer();
898 CleanUpChunks();
899 return ch->status;
900 }
901
902 ci = std::move( ch->chunk );
903 // if it is a local file update the checksum
904 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && !pContinue )
905 {
906 if( pCkSumHelper )
907 pCkSumHelper->Update( ci.GetBuffer(), ci.GetLength() );
908
909 for( auto cksHelper : pAddCksHelpers )
910 cksHelper->Update( ci.GetBuffer(), ci.GetLength() );
911 }
912
913 return XRootDStatus( stOK, suContinue );
914 }
915
916 //------------------------------------------------------------------------
917 // Asynchronous chunk handler
918 //------------------------------------------------------------------------
919 class ChunkHandler: public XrdCl::ResponseHandler
920 {
921 public:
922 ChunkHandler(): sem( new XrdSysSemaphore(0) ) {}
923 virtual ~ChunkHandler() { delete sem; }
924 virtual void HandleResponse( XrdCl::XRootDStatus *statusval,
925 XrdCl::AnyObject *response )
926 {
927 this->status = *statusval;
928 delete statusval;
929 if( response )
930 {
931 chunk = ToChunk( response );
932 delete response;
933 }
934 sem->Post();
935 }
936
937 XrdCl::PageInfo ToChunk( XrdCl::AnyObject *response )
938 {
939 if( response->Has<XrdCl::PageInfo>() )
940 {
941 XrdCl::PageInfo *resp = nullptr;
942 response->Get( resp );
943 return std::move( *resp );
944 }
945 else
946 {
947 XrdCl::ChunkInfo *resp = nullptr;
948 response->Get( resp );
949 return XrdCl::PageInfo( resp->GetOffset(), resp->GetLength(),
950 resp->GetBuffer() );
951 }
952 }
953
954 XrdSysSemaphore *sem;
955 XrdCl::PageInfo chunk;
956 XrdCl::XRootDStatus status;
957 };
958
959 const XrdCl::URL *pUrl;
960 XrdCl::File *pFile;
961 int64_t pSize;
962 int64_t pCurrentOffset;
963 uint32_t pChunkSize;
964 uint16_t pParallel;
965 std::queue<ChunkHandler*> pChunks;
966 std::string pDataServer;
967 uint16_t pNbConn;
968 uint16_t pMaxNbConn;
969 bool pUsePgRead;
970 bool pDoServer;
971
972 std::shared_ptr<CancellableJob> pDataConnCB;
973 };
974
975 //----------------------------------------------------------------------------
977 //----------------------------------------------------------------------------
978 class XRootDSourceZip: public XRootDSource
979 {
980 public:
981 //------------------------------------------------------------------------
983 //------------------------------------------------------------------------
984 XRootDSourceZip( const std::string &filename,
985 const XrdCl::URL *archive,
986 uint32_t chunkSize,
987 uint8_t parallelChunks,
988 const std::string &ckSumType,
989 const std::vector<std::string> &addcks,
990 bool doserver ):
991 XRootDSource( archive, chunkSize, parallelChunks, ckSumType,
992 addcks, doserver ),
993 pFilename( filename ),
994 pZipArchive( new XrdCl::ZipArchive() )
995 {
996 }
997
998 //------------------------------------------------------------------------
1000 //------------------------------------------------------------------------
1001 virtual ~XRootDSourceZip()
1002 {
1003 CleanUpChunks();
1004
1005 XrdCl::WaitFor( XrdCl::CloseArchive( pZipArchive ) );
1006 delete pZipArchive;
1007 }
1008
1009 //------------------------------------------------------------------------
1011 //------------------------------------------------------------------------
1012 virtual XrdCl::XRootDStatus Initialize()
1013 {
1014 using namespace XrdCl;
1015 Log *log = DefaultEnv::GetLog();
1016 log->Debug( UtilityMsg, "Opening %s for reading",
1017 pUrl->GetObfuscatedURL().c_str() );
1018
1019 std::string value;
1020 DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
1021 pZipArchive->SetProperty( "ReadRecovery", value );
1022
1023 XRootDStatus st = XrdCl::WaitFor( XrdCl::OpenArchive( pZipArchive, pUrl->GetURL(), XrdCl::OpenFlags::Read ) );
1024 if( !st.IsOK() )
1025 return st;
1026
1027 st = pZipArchive->OpenFile( pFilename );
1028 if( !st.IsOK() )
1029 return st;
1030
1031 XrdCl::StatInfo *info = 0;
1032 st = pZipArchive->Stat( info );
1033 if( st.IsOK() )
1034 {
1035 pSize = info->GetSize();
1036 delete info;
1037 }
1038 else
1039 return st;
1040
1041 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper )
1042 {
1043 auto st = pCkSumHelper->Initialize();
1044 if( !st.IsOK() ) return st;
1045 for( auto cksHelper : pAddCksHelpers )
1046 {
1047 st = cksHelper->Initialize();
1048 if( !st.IsOK() ) return st;
1049 }
1050 }
1051
1052 if( ( !pUrl->IsLocalFile() && !pZipArchive->IsSecure() ) ||
1053 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
1054 {
1055 pZipArchive->GetProperty( "DataServer", pDataServer );
1056 //--------------------------------------------------------------------
1057 // Decide whether we can use PgRead
1058 //--------------------------------------------------------------------
1060 XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
1061 pUsePgRead = XrdCl::Utils::HasPgRW( pDataServer ) && ( val == 1 );
1062 }
1063
1064 SetOnDataConnectHandler( pZipArchive );
1065
1066 return XrdCl::XRootDStatus();
1067 }
1068
1069 //------------------------------------------------------------------------
1077 //------------------------------------------------------------------------
1078 virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
1079 {
1080 return GetChunkImpl( pZipArchive, ci );
1081 }
1082
1083 //------------------------------------------------------------------------
1084 // Get check sum
1085 //------------------------------------------------------------------------
1086 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1087 std::string &checkSumType )
1088 {
1089 return GetCheckSumImpl( checkSum, checkSumType, pCkSumHelper );
1090 }
1091
1092 //------------------------------------------------------------------------
1093 // Get check sum implementation
1094 //------------------------------------------------------------------------
1095 virtual XrdCl::XRootDStatus GetCheckSumImpl( std::string &checkSum,
1096 std::string &checkSumType,
1097 XrdCl::CheckSumHelper *cksHelper )
1098 {
1099 // The ZIP archive by default contains a ZCRC32 checksum
1100 if( checkSumType == "zcrc32" )
1101 {
1102 uint32_t cksum = 0;
1103 auto st = pZipArchive->GetCRC32( pFilename, cksum );
1104 if( !st.IsOK() ) return st;
1105
1106 XrdCksData ckSum;
1107 ckSum.Set( "zcrc32" );
1108 ckSum.Set( reinterpret_cast<void*>( &cksum ), sizeof( uint32_t ) );
1109 char cksBuffer[265];
1110 ckSum.Get( cksBuffer, 256 );
1111 checkSum = "zcrc32:";
1112 checkSum += XrdCl::Utils::NormalizeChecksum( "zcrc32", cksBuffer );
1113 return st;
1114 }
1115
1116 int useMtlnCksum = XrdCl::DefaultZipMtlnCksum;
1117 XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
1118 env->GetInt( "ZipMtlnCksum", useMtlnCksum );
1119 if( useMtlnCksum && pUrl->IsMetalink() )
1120 {
1121 XrdCl::RedirectorRegistry &registry = XrdCl::RedirectorRegistry::Instance();
1122 XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
1123 checkSum = redirector->GetCheckSum( checkSumType );
1124 if( !checkSum.empty() ) return XrdCl::XRootDStatus();
1125 }
1126
1127 // if it is a local file we can calculate the checksum ourself
1128 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && cksHelper && !pContinue )
1129 return cksHelper->GetCheckSum( checkSum, checkSumType );
1130
1131 // if it is a remote file other types of checksum are not supported
1132 return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errNotSupported );
1133 }
1134
1135 //------------------------------------------------------------------------
1137 //------------------------------------------------------------------------
1138 std::vector<std::string> GetAddCks()
1139 {
1140 std::vector<std::string> ret;
1141 for( auto cksHelper : pAddCksHelpers )
1142 {
1143 std::string type = cksHelper->GetType();
1144 std::string cks;
1145 GetCheckSumImpl( cks, type, cksHelper );
1146 ret.push_back( cks );
1147 }
1148 return ret;
1149 }
1150
1151 //------------------------------------------------------------------------
1153 //------------------------------------------------------------------------
1154 virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
1155 {
1156 return XrdCl::XRootDStatus();
1157 }
1158
1159 private:
1160
1161 XRootDSourceZip(const XRootDSourceZip &other);
1162 XRootDSourceZip &operator = (const XRootDSourceZip &other);
1163
1164 const std::string pFilename;
1165 XrdCl::ZipArchive *pZipArchive;
1166 };
1167
1168 //----------------------------------------------------------------------------
1170 //----------------------------------------------------------------------------
1171 class XRootDSourceDynamic: public Source
1172 {
1173 public:
1174
1175 //------------------------------------------------------------------------
1177 //------------------------------------------------------------------------
1178 XrdCl::XRootDStatus TryOtherServer()
1179 {
1180 return pFile->TryOtherServer();
1181 }
1182
1183 //------------------------------------------------------------------------
1185 //------------------------------------------------------------------------
1186 XRootDSourceDynamic( const XrdCl::URL *url,
1187 uint32_t chunkSize,
1188 const std::string &ckSumType,
1189 const std::vector<std::string> &addcks ):
1190 Source( ckSumType, addcks ),
1191 pUrl( url ), pFile( new XrdCl::File() ), pCurrentOffset( 0 ),
1192 pChunkSize( chunkSize ), pDone( false ), pUsePgRead( false )
1193 {
1194 }
1195
1196 //------------------------------------------------------------------------
1198 //------------------------------------------------------------------------
1199 virtual ~XRootDSourceDynamic()
1200 {
1201 XrdCl::XRootDStatus status = pFile->Close();
1202 delete pFile;
1203 }
1204
1205 //------------------------------------------------------------------------
1207 //------------------------------------------------------------------------
1208 virtual XrdCl::XRootDStatus Initialize()
1209 {
1210 using namespace XrdCl;
1211 Log *log = DefaultEnv::GetLog();
1212 log->Debug( UtilityMsg, "Opening %s for reading",
1213 pUrl->GetObfuscatedURL().c_str() );
1214
1215 std::string value;
1216 DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
1217 pFile->SetProperty( "ReadRecovery", value );
1218
1219 XRootDStatus st = pFile->Open( pUrl->GetURL(), OpenFlags::Read );
1220 if( !st.IsOK() )
1221 return st;
1222
1223 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper && !pContinue )
1224 {
1225 auto st = pCkSumHelper->Initialize();
1226 if( !st.IsOK() ) return st;
1227 for( auto cksHelper : pAddCksHelpers )
1228 {
1229 st = cksHelper->Initialize();
1230 if( !st.IsOK() ) return st;
1231 }
1232 }
1233
1234 if( ( !pUrl->IsLocalFile() && !pFile->IsSecure() ) ||
1235 ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
1236 {
1237 std::string datasrv;
1238 pFile->GetProperty( "DataServer", datasrv );
1239 //--------------------------------------------------------------------
1240 // Decide whether we can use PgRead
1241 //--------------------------------------------------------------------
1243 XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
1244 pUsePgRead = XrdCl::Utils::HasPgRW( datasrv ) && ( val == 1 );
1245 }
1246
1247 return XRootDStatus();
1248 }
1249
1250 //------------------------------------------------------------------------
1252 //------------------------------------------------------------------------
1253 virtual int64_t GetSize()
1254 {
1255 return -1;
1256 }
1257
1258 //------------------------------------------------------------------------
1260 //------------------------------------------------------------------------
1261 virtual XrdCl::XRootDStatus StartAt( uint64_t offset )
1262 {
1263 pCurrentOffset = offset;
1264 pContinue = true;
1265 return XrdCl::XRootDStatus();
1266 }
1267
1268 //------------------------------------------------------------------------
1276 //------------------------------------------------------------------------
1277 virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
1278 {
1279 //----------------------------------------------------------------------
1280 // Sanity check
1281 //----------------------------------------------------------------------
1282 using namespace XrdCl;
1283
1284 if( pDone )
1285 return XRootDStatus( stOK, suDone );
1286
1287 //----------------------------------------------------------------------
1288 // Fill the queue
1289 //----------------------------------------------------------------------
1290 char *buffer = new char[pChunkSize];
1291 uint32_t bytesRead = 0;
1292
1293 std::vector<uint32_t> cksums;
1294 XRootDStatus st = pUsePgRead
1295 ? pFile->PgRead( pCurrentOffset, pChunkSize, buffer, cksums, bytesRead )
1296 : pFile->Read( pCurrentOffset, pChunkSize, buffer, bytesRead );
1297
1298 if( !st.IsOK() )
1299 {
1300 delete [] buffer;
1301 return st;
1302 }
1303
1304 if( !bytesRead )
1305 {
1306 delete [] buffer;
1307 return XRootDStatus( stOK, suDone );
1308 }
1309
1310 if( bytesRead < pChunkSize )
1311 pDone = true;
1312
1313 // if it is a local file update the checksum
1314 if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && !pContinue )
1315 {
1316 if( pCkSumHelper )
1317 pCkSumHelper->Update( buffer, bytesRead );
1318
1319 for( auto cksHelper : pAddCksHelpers )
1320 cksHelper->Update( buffer, bytesRead );
1321 }
1322
1323 ci = XrdCl::PageInfo( pCurrentOffset, bytesRead, buffer );
1324 pCurrentOffset += bytesRead;
1325
1326 return XRootDStatus( stOK, suContinue );
1327 }
1328
1329 //------------------------------------------------------------------------
1330 // Get check sum
1331 //------------------------------------------------------------------------
1332 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1333 std::string &checkSumType )
1334 {
1335 return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
1336 }
1337
1338 XrdCl::XRootDStatus GetCheckSumImpl( XrdCl::CheckSumHelper *cksHelper,
1339 std::string &checkSum,
1340 std::string &checkSumType )
1341 {
1342 if( pUrl->IsMetalink() )
1343 {
1344 XrdCl::RedirectorRegistry &registry = XrdCl::RedirectorRegistry::Instance();
1345 XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
1346 checkSum = redirector->GetCheckSum( checkSumType );
1347 if( !checkSum.empty() ) return XrdCl::XRootDStatus();
1348 }
1349
1350 if( pUrl->IsLocalFile() )
1351 {
1352 if( pContinue)
1353 // in case of --continue option we have to calculate the checksum from scratch
1354 return XrdCl::Utils::GetLocalCheckSum( checkSum, checkSumType, pUrl->GetPath() );
1355
1356 if( cksHelper )
1357 return cksHelper->GetCheckSum( checkSum, checkSumType );
1358
1359 return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errCheckSumError );
1360 }
1361
1362 std::string dataServer; pFile->GetProperty( "DataServer", dataServer );
1363 std::string lastUrl; pFile->GetProperty( "LastURL", lastUrl );
1364 return XrdCl::Utils::GetRemoteCheckSum( checkSum, checkSumType, XrdCl::URL( lastUrl ) );
1365 }
1366
1367 //------------------------------------------------------------------------
1369 //------------------------------------------------------------------------
1370 std::vector<std::string> GetAddCks()
1371 {
1372 std::vector<std::string> ret;
1373 for( auto cksHelper : pAddCksHelpers )
1374 {
1375 std::string type = cksHelper->GetType();
1376 std::string cks;
1377 GetCheckSumImpl( cksHelper, cks, type );
1378 ret.push_back( cks );
1379 }
1380 return ret;
1381 }
1382
1383 //------------------------------------------------------------------------
1385 //------------------------------------------------------------------------
1386 virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
1387 {
1388 return ::GetXAttr( *pFile, xattrs );
1389 }
1390
1391 private:
1392 XRootDSourceDynamic(const XRootDSourceDynamic &other);
1393 XRootDSourceDynamic &operator = (const XRootDSourceDynamic &other);
1394 const XrdCl::URL *pUrl;
1395 XrdCl::File *pFile;
1396 int64_t pCurrentOffset;
1397 uint32_t pChunkSize;
1398 bool pDone;
1399 bool pUsePgRead;
1400 };
1401
1402 //----------------------------------------------------------------------------
1404 //----------------------------------------------------------------------------
1405 class XRootDSourceXCp: public Source
1406 {
1407 public:
1408 //------------------------------------------------------------------------
1410 //------------------------------------------------------------------------
1411 XRootDSourceXCp( const XrdCl::URL* url, uint32_t chunkSize, uint16_t parallelChunks, int32_t nbSrc, uint64_t blockSize ):
1412 pXCpCtx( 0 ), pUrl( url ), pChunkSize( chunkSize ), pParallelChunks( parallelChunks ), pNbSrc( nbSrc ), pBlockSize( blockSize )
1413 {
1414 }
1415
1416 ~XRootDSourceXCp()
1417 {
1418 if( pXCpCtx )
1419 pXCpCtx->Delete();
1420 }
1421
1422 //------------------------------------------------------------------------
1424 //------------------------------------------------------------------------
1425 virtual XrdCl::XRootDStatus Initialize()
1426 {
1427 XrdCl::Log *log = XrdCl::DefaultEnv::GetLog();
1428 int64_t fileSize = -1;
1429
1430 if( pUrl->IsMetalink() )
1431 {
1432 XrdCl::RedirectorRegistry &registry = XrdCl::RedirectorRegistry::Instance();
1433 XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
1434 fileSize = redirector->GetSize();
1435 pReplicas = redirector->GetReplicas();
1436 }
1437 else
1438 {
1439 XrdCl::LocationInfo *li = 0;
1440 XrdCl::FileSystem fs( *pUrl );
1441 XrdCl::XRootDStatus st = fs.DeepLocate( pUrl->GetPath(), XrdCl::OpenFlags::Compress | XrdCl::OpenFlags::PrefName, li );
1442 if( !st.IsOK() ) return st;
1443
1445 for( itr = li->Begin(); itr != li->End(); ++itr)
1446 {
1447 std::string url = "root://" + itr->GetAddress() + "/" + pUrl->GetPath();
1448 pReplicas.push_back( url );
1449 }
1450
1451 delete li;
1452 }
1453
1454 std::stringstream ss;
1455 ss << "XCp sources: ";
1456
1457 std::vector<std::string>::iterator itr;
1458 for( itr = pReplicas.begin() ; itr != pReplicas.end() ; ++itr )
1459 {
1460 ss << *itr << ", ";
1461 }
1462 log->Debug( XrdCl::UtilityMsg, "%s", ss.str().c_str() );
1463
1464 pXCpCtx = new XrdCl::XCpCtx( pReplicas, pBlockSize, pNbSrc, pChunkSize, pParallelChunks, fileSize );
1465
1466 return pXCpCtx->Initialize();
1467 }
1468
1469 //------------------------------------------------------------------------
1471 //------------------------------------------------------------------------
1472 virtual int64_t GetSize()
1473 {
1474 return pXCpCtx->GetSize();
1475 }
1476
1477 //------------------------------------------------------------------------
1479 //------------------------------------------------------------------------
1480 virtual XrdCl::XRootDStatus StartAt( uint64_t offset )
1481 {
1482 return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errNotImplemented );
1483 }
1484
1485 //------------------------------------------------------------------------
1493 //------------------------------------------------------------------------
1494 virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
1495 {
1496 XrdCl::XRootDStatus st;
1497 do
1498 {
1499 st = pXCpCtx->GetChunk( ci );
1500 }
1501 while( st.IsOK() && st.code == XrdCl::suRetry );
1502 return st;
1503 }
1504
1505 //------------------------------------------------------------------------
1506 // Get check sum
1507 //------------------------------------------------------------------------
1508 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1509 std::string &checkSumType )
1510 {
1511 if( pUrl->IsMetalink() )
1512 {
1513 XrdCl::RedirectorRegistry &registry = XrdCl::RedirectorRegistry::Instance();
1514 XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
1515 checkSum = redirector->GetCheckSum( checkSumType );
1516 if( !checkSum.empty() ) return XrdCl::XRootDStatus();
1517 }
1518
1519 std::vector<std::string>::iterator itr;
1520 for( itr = pReplicas.begin() ; itr != pReplicas.end() ; ++itr )
1521 {
1522 XrdCl::URL url( *itr );
1523 XrdCl::XRootDStatus st = XrdCl::Utils::GetRemoteCheckSum( checkSum,
1524 checkSumType, url );
1525 if( st.IsOK() ) return st;
1526 }
1527
1528 return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errNoMoreReplicas );
1529 }
1530
1531 //------------------------------------------------------------------------
1533 //------------------------------------------------------------------------
1534 std::vector<std::string> GetAddCks()
1535 {
1536 return std::vector<std::string>();
1537 }
1538
1539 //------------------------------------------------------------------------
1541 //------------------------------------------------------------------------
1542 virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
1543 {
1544 XrdCl::XRootDStatus st;
1545 std::vector<std::string>::iterator itr = pReplicas.begin();
1546 for( ; itr < pReplicas.end() ; ++itr )
1547 {
1548 st = ::GetXAttr( *itr, xattrs );
1549 if( st.IsOK() ) return st;
1550 }
1551 return st;
1552 }
1553
1554 private:
1555
1556
1557 XrdCl::XCpCtx *pXCpCtx;
1558 const XrdCl::URL *pUrl;
1559 std::vector<std::string> pReplicas;
1560 uint32_t pChunkSize;
1561 uint16_t pParallelChunks;
1562 int32_t pNbSrc;
1563 uint64_t pBlockSize;
1564 };
1565
1566 //----------------------------------------------------------------------------
1568 //----------------------------------------------------------------------------
1569 class StdOutDestination: public Destination
1570 {
1571 public:
1572 //------------------------------------------------------------------------
1574 //------------------------------------------------------------------------
1575 StdOutDestination( const std::string &ckSumType ):
1576 Destination( ckSumType ), pCurrentOffset(0)
1577 {
1578 }
1579
1580 //------------------------------------------------------------------------
1582 //------------------------------------------------------------------------
1583 virtual ~StdOutDestination()
1584 {
1585 }
1586
1587 //------------------------------------------------------------------------
1589 //------------------------------------------------------------------------
1590 virtual XrdCl::XRootDStatus Initialize()
1591 {
1592 if( pContinue )
1593 return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errNotSupported,
1594 ENOTSUP, "Cannot continue to stdout." );
1595
1596 if( pCkSumHelper )
1597 return pCkSumHelper->Initialize();
1598 return XrdCl::XRootDStatus();
1599 }
1600
1601 //------------------------------------------------------------------------
1603 //------------------------------------------------------------------------
1604 virtual XrdCl::XRootDStatus Finalize()
1605 {
1606 return XrdCl::XRootDStatus();
1607 }
1608
1609 //------------------------------------------------------------------------
1614 //------------------------------------------------------------------------
1615 virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci )
1616 {
1617 using namespace XrdCl;
1618 Log *log = DefaultEnv::GetLog();
1619
1620 if( pCurrentOffset != ci.GetOffset() )
1621 {
1622 log->Error( UtilityMsg, "Got out-of-bounds chunk, expected offset:"
1623 " %llu, got %llu", (unsigned long long) pCurrentOffset, (unsigned long long) ci.GetOffset() );
1624 return XRootDStatus( stError, errInternal );
1625 }
1626
1627 int64_t wr = 0;
1628 uint32_t length = ci.GetLength();
1629 char *cursor = (char*)ci.GetBuffer();
1630 do
1631 {
1632 wr = write( 1, cursor, length );
1633 if( wr == -1 )
1634 {
1635 log->Debug( UtilityMsg, "Unable to write to stdout: %s",
1636 XrdSysE2T( errno ) );
1637 delete [] (char*)ci.GetBuffer();
1638 return XRootDStatus( stError, errOSError, errno );
1639 }
1640 pCurrentOffset += wr;
1641 cursor += wr;
1642 length -= wr;
1643 }
1644 while( length );
1645
1646 if( pCkSumHelper )
1647 pCkSumHelper->Update( ci.GetBuffer(), ci.GetLength() );
1648 delete [] (char*)ci.GetBuffer();
1649 return XRootDStatus();
1650 }
1651
1652 //------------------------------------------------------------------------
1654 //------------------------------------------------------------------------
1655 virtual XrdCl::XRootDStatus Flush()
1656 {
1657 return XrdCl::XRootDStatus();
1658 }
1659
1660 //------------------------------------------------------------------------
1662 //------------------------------------------------------------------------
1663 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1664 std::string &checkSumType )
1665 {
1666 if( pCkSumHelper )
1667 return pCkSumHelper->GetCheckSum( checkSum, checkSumType );
1668 return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errCheckSumError );
1669 }
1670
1671 //------------------------------------------------------------------------
1673 //------------------------------------------------------------------------
1674 virtual XrdCl::XRootDStatus SetXAttr( const std::vector<XrdCl::xattr_t> &xattrs )
1675 {
1676 return XrdCl::XRootDStatus();
1677 }
1678
1679 //------------------------------------------------------------------------
1681 //------------------------------------------------------------------------
1682 virtual int64_t GetSize()
1683 {
1684 return -1;
1685 }
1686
1687 private:
1688 StdOutDestination(const StdOutDestination &other);
1689 StdOutDestination &operator = (const StdOutDestination &other);
1690 uint64_t pCurrentOffset;
1691 };
1692
1693 //----------------------------------------------------------------------------
1695 //----------------------------------------------------------------------------
1696 class XRootDDestination: public Destination
1697 {
1698 public:
1699 //------------------------------------------------------------------------
1701 //------------------------------------------------------------------------
1702 XRootDDestination( const XrdCl::URL &url, uint8_t parallelChunks,
1703 const std::string &ckSumType, const XrdCl::ClassicCopyJob &cpjob ):
1704 Destination( ckSumType ),
1705 pUrl( url ), pFile( new XrdCl::File( XrdCl::File::DisableVirtRedirect ) ),
1706 pParallel( parallelChunks ), pSize( -1 ), pUsePgWrt( false ), cpjob( cpjob )
1707 {
1708 }
1709
1710 //------------------------------------------------------------------------
1712 //------------------------------------------------------------------------
1713 virtual ~XRootDDestination()
1714 {
1715 CleanUpChunks();
1716 delete pFile;
1717
1718 XrdCl::Log *log = XrdCl::DefaultEnv::GetLog();
1719
1720 //----------------------------------------------------------------------
1721 // Make sure we clean up the cp-target symlink
1722 //----------------------------------------------------------------------
1723 std::string cptarget = XrdCl::DefaultCpTarget;
1724 XrdCl::DefaultEnv::GetEnv()->GetString( "CpTarget", cptarget );
1725 if( !cptarget.empty() )
1726 {
1727 XrdCl::FileSystem fs( "file://localhost" );
1728 XrdCl::XRootDStatus st = fs.Rm( cptarget );
1729 if( !st.IsOK() )
1730 log->Warning( XrdCl::UtilityMsg, "Could not delete cp-target symlink: %s",
1731 st.ToString().c_str() );
1732 }
1733
1734 //----------------------------------------------------------------------
1735 // If the copy failed and user requested posc and we are dealing with
1736 // a local destination, remove the file
1737 //----------------------------------------------------------------------
1738 if( pUrl.IsLocalFile() && pPosc && !cpjob.GetResult().IsOK() )
1739 {
1740 XrdCl::FileSystem fs( pUrl );
1741 XrdCl::XRootDStatus st = fs.Rm( pUrl.GetPath() );
1742 if( !st.IsOK() )
1743 log->Error( XrdCl::UtilityMsg, "Failed to remove local destination"
1744 " on failure: %s", st.ToString().c_str() );
1745 }
1746 }
1747
1748 //------------------------------------------------------------------------
1750 //------------------------------------------------------------------------
1751 virtual XrdCl::XRootDStatus Initialize()
1752 {
1753 using namespace XrdCl;
1754 Log *log = DefaultEnv::GetLog();
1755 log->Debug( UtilityMsg, "Opening %s for writing",
1756 pUrl.GetObfuscatedURL().c_str() );
1757
1758 std::string value;
1759 DefaultEnv::GetEnv()->GetString( "WriteRecovery", value );
1760 pFile->SetProperty( "WriteRecovery", value );
1761
1762 OpenFlags::Flags flags = OpenFlags::Update;
1763 if( pForce )
1764 flags |= OpenFlags::Delete;
1765 else if( !pContinue )
1766 flags |= OpenFlags::New;
1767
1768 if( pPosc )
1769 flags |= OpenFlags::POSC;
1770
1771 if( pCoerce )
1772 flags |= OpenFlags::Force;
1773
1774 if( pMakeDir)
1775 flags |= OpenFlags::MakePath;
1776
1777 Access::Mode mode = Access::UR|Access::UW|Access::GR|Access::OR;
1778
1779 XrdCl::XRootDStatus st = pFile->Open( pUrl.GetURL(), flags, mode );
1780 if( !st.IsOK() )
1781 return st;
1782
1783 if( ( !pUrl.IsLocalFile() && !pFile->IsSecure() ) ||
1784 ( pUrl.IsLocalFile() && pUrl.IsMetalink() ) )
1785 {
1786 std::string datasrv;
1787 pFile->GetProperty( "DataServer", datasrv );
1788 //--------------------------------------------------------------------
1789 // Decide whether we can use PgRead
1790 //--------------------------------------------------------------------
1792 XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
1793 pUsePgWrt = XrdCl::Utils::HasPgRW( datasrv ) && ( val == 1 );
1794 }
1795
1796 std::string cptarget = XrdCl::DefaultCpTarget;
1797 XrdCl::DefaultEnv::GetEnv()->GetString( "CpTarget", cptarget );
1798 if( !cptarget.empty() )
1799 {
1800 std::string targeturl;
1801 pFile->GetProperty( "LastURL", targeturl );
1802 targeturl = URL( targeturl ).GetLocation();
1803 if( symlink( targeturl.c_str(), cptarget.c_str() ) == -1 )
1804 log->Warning( UtilityMsg, "Could not create cp-target symlink: %s",
1805 XrdSysE2T( errno ) );
1806 else
1807 log->Info( UtilityMsg, "Created cp-target symlink: %s -> %s",
1808 cptarget.c_str(), targeturl.c_str() );
1809 }
1810
1811 StatInfo *info = 0;
1812 st = pFile->Stat( false, info );
1813 if( !st.IsOK() )
1814 return st;
1815 pSize = info->GetSize();
1816 delete info;
1817
1818 if( pUrl.IsLocalFile() && pCkSumHelper && !pContinue )
1819 return pCkSumHelper->Initialize();
1820
1821 return XRootDStatus();
1822 }
1823
1824 //------------------------------------------------------------------------
1826 //------------------------------------------------------------------------
1827 virtual XrdCl::XRootDStatus Finalize()
1828 {
1829 return pFile->Close();
1830 }
1831
1832 //------------------------------------------------------------------------
1837 //------------------------------------------------------------------------
1838 virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci )
1839 {
1840 using namespace XrdCl;
1841 if( !pFile->IsOpen() )
1842 {
1843 delete[] (char*)ci.GetBuffer(); // we took the ownership of the buffer
1844 return XRootDStatus( stError, errUninitialized );
1845 }
1846
1847 //----------------------------------------------------------------------
1848 // If there is still place for this chunk to be sent send it
1849 //----------------------------------------------------------------------
1850 if( pChunks.size() < pParallel )
1851 return QueueChunk( std::move( ci ) );
1852
1853 //----------------------------------------------------------------------
1854 // We wait for a chunk to be sent so that we have space for the current
1855 // one
1856 //----------------------------------------------------------------------
1857 std::unique_ptr<ChunkHandler> ch( pChunks.front() );
1858 pChunks.pop();
1859 ch->sem->Wait();
1860 delete [] (char*)ch->chunk.GetBuffer();
1861 if( !ch->status.IsOK() )
1862 {
1863 Log *log = DefaultEnv::GetLog();
1864 log->Debug( UtilityMsg, "Unable write %d bytes at %llu from %s: %s",
1865 ch->chunk.GetLength(), (unsigned long long) ch->chunk.GetOffset(),
1866 pUrl.GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
1867 delete[] (char*)ci.GetBuffer(); // we took the ownership of the buffer
1868 CleanUpChunks();
1869
1870 //--------------------------------------------------------------------
1871 // Check if we should re-try the transfer from scratch at a different
1872 // data server
1873 //--------------------------------------------------------------------
1874 return CheckIfRetriable( ch->status );
1875 }
1876
1877 return QueueChunk( std::move( ci ) );
1878 }
1879
1880 //------------------------------------------------------------------------
1882 //------------------------------------------------------------------------
1883 virtual int64_t GetSize()
1884 {
1885 return pSize;
1886 }
1887
1888 //------------------------------------------------------------------------
1890 //------------------------------------------------------------------------
1891 void CleanUpChunks()
1892 {
1893 while( !pChunks.empty() )
1894 {
1895 ChunkHandler *ch = pChunks.front();
1896 pChunks.pop();
1897 ch->sem->Wait();
1898 delete [] (char *)ch->chunk.GetBuffer();
1899 delete ch;
1900 }
1901 }
1902
1903 //------------------------------------------------------------------------
1905 //------------------------------------------------------------------------
1906 XrdCl::XRootDStatus QueueChunk( XrdCl::PageInfo &&ci )
1907 {
1908 // we are writing chunks in order so we can calc the checksum
1909 // in case of local files
1910 if( pUrl.IsLocalFile() && pCkSumHelper && !pContinue )
1911 pCkSumHelper->Update( ci.GetBuffer(), ci.GetLength() );
1912
1913 ChunkHandler *ch = new ChunkHandler( std::move( ci ) );
1914 XrdCl::XRootDStatus st;
1915 st = pUsePgWrt
1916 ? pFile->PgWrite(ch->chunk.GetOffset(), ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch->chunk.GetCksums(), ch)
1917 : pFile->Write( ch->chunk.GetOffset(), ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch );
1918 if( !st.IsOK() )
1919 {
1920 CleanUpChunks();
1921 delete [] (char*)ch->chunk.GetBuffer();
1922 delete ch;
1923 return st;
1924 }
1925 pChunks.push( ch );
1926 return XrdCl::XRootDStatus();
1927 }
1928
1929 //------------------------------------------------------------------------
1931 //------------------------------------------------------------------------
1932 virtual XrdCl::XRootDStatus Flush()
1933 {
1934 XrdCl::XRootDStatus st;
1935 while( !pChunks.empty() )
1936 {
1937 ChunkHandler *ch = pChunks.front();
1938 pChunks.pop();
1939 ch->sem->Wait();
1940 if( !ch->status.IsOK() )
1941 {
1942 //--------------------------------------------------------------------
1943 // Check if we should re-try the transfer from scratch at a different
1944 // data server
1945 //--------------------------------------------------------------------
1946 st = CheckIfRetriable( ch->status );
1947 }
1948 delete [] (char *)ch->chunk.GetBuffer();
1949 delete ch;
1950 }
1951 return st;
1952 }
1953
1954 //------------------------------------------------------------------------
1956 //------------------------------------------------------------------------
1957 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1958 std::string &checkSumType )
1959 {
1960 if( pUrl.IsLocalFile() )
1961 {
1962 if( pContinue )
1963 // in case of --continue option we have to calculate the checksum from scratch
1964 return XrdCl::Utils::GetLocalCheckSum( checkSum, checkSumType, pUrl.GetPath() );
1965
1966 if( pCkSumHelper )
1967 return pCkSumHelper->GetCheckSum( checkSum, checkSumType );
1968
1969 return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errCheckSumError );
1970 }
1971
1972 std::string lastUrl; pFile->GetProperty( "LastURL", lastUrl );
1973 return XrdCl::Utils::GetRemoteCheckSum( checkSum, checkSumType,
1974 XrdCl::URL( lastUrl ) );
1975 }
1976
1977 //------------------------------------------------------------------------
1979 //------------------------------------------------------------------------
1980 virtual XrdCl::XRootDStatus SetXAttr( const std::vector<XrdCl::xattr_t> &xattrs )
1981 {
1982 return ::SetXAttr( *pFile, xattrs );
1983 }
1984
1985 //------------------------------------------------------------------------
1987 //------------------------------------------------------------------------
1988 const std::string& GetLastURL() const
1989 {
1990 return pLastURL;
1991 }
1992
1993 //------------------------------------------------------------------------
1995 //------------------------------------------------------------------------
1996 const std::string& GetWrtRecoveryRedir() const
1997 {
1998 return pWrtRecoveryRedir;
1999 }
2000
2001 private:
2002 XRootDDestination(const XRootDDestination &other);
2003 XRootDDestination &operator = (const XRootDDestination &other);
2004
2005 //------------------------------------------------------------------------
2006 // Asynchronous chunk handler
2007 //------------------------------------------------------------------------
2008 class ChunkHandler: public XrdCl::ResponseHandler
2009 {
2010 public:
2011 ChunkHandler( XrdCl::PageInfo &&ci ):
2012 sem( new XrdSysSemaphore(0) ),
2013 chunk(std::move( ci ) ) {}
2014 virtual ~ChunkHandler() { delete sem; }
2015 virtual void HandleResponse( XrdCl::XRootDStatus *statusval,
2016 XrdCl::AnyObject */*response*/ )
2017 {
2018 this->status = *statusval;
2019 delete statusval;
2020 sem->Post();
2021 }
2022
2023 XrdSysSemaphore *sem;
2024 XrdCl::PageInfo chunk;
2025 XrdCl::XRootDStatus status;
2026 };
2027
2028 inline XrdCl::XRootDStatus CheckIfRetriable( XrdCl::XRootDStatus &status )
2029 {
2030 if( status.IsOK() ) return status;
2031
2032 //--------------------------------------------------------------------
2033 // Check if we should re-try the transfer from scratch at a different
2034 // data server
2035 //--------------------------------------------------------------------
2036 std::string value;
2037 if( pFile->GetProperty( "WrtRecoveryRedir", value ) )
2038 {
2039 pWrtRecoveryRedir = value;
2040 if( pFile->GetProperty( "LastURL", value ) ) pLastURL = value;
2041 return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errRetry );
2042 }
2043
2044 return status;
2045 }
2046
2047 const XrdCl::URL pUrl;
2048 XrdCl::File *pFile;
2049 uint8_t pParallel;
2050 std::queue<ChunkHandler *> pChunks;
2051 int64_t pSize;
2052
2053 std::string pWrtRecoveryRedir;
2054 std::string pLastURL;
2055 bool pUsePgWrt;
2056 const XrdCl::ClassicCopyJob &cpjob;
2057 };
2058
2059 //----------------------------------------------------------------------------
2061 //----------------------------------------------------------------------------
2062 class XRootDZipDestination: public Destination
2063 {
2064 public:
2065 //------------------------------------------------------------------------
2067 //------------------------------------------------------------------------
2068 XRootDZipDestination( const XrdCl::URL &url, const std::string &fn,
2069 int64_t size, uint8_t parallelChunks, XrdCl::ClassicCopyJob &cpjob ):
2070 Destination( "zcrc32" ),
2071 pUrl( url ), pFilename( fn ), pZip( new XrdCl::ZipArchive() ),
2072 pParallel( parallelChunks ), pSize( size ), cpjob( cpjob )
2073 {
2074 }
2075
2076 //------------------------------------------------------------------------
2078 //------------------------------------------------------------------------
2079 virtual ~XRootDZipDestination()
2080 {
2081 CleanUpChunks();
2082 delete pZip;
2083
2084 //----------------------------------------------------------------------
2085 // If the copy failed and user requested posc and we are dealing with
2086 // a local destination, remove the file
2087 //----------------------------------------------------------------------
2088 if( pUrl.IsLocalFile() && pPosc && !cpjob.GetResult().IsOK() )
2089 {
2090 XrdCl::FileSystem fs( pUrl );
2091 XrdCl::XRootDStatus st = fs.Rm( pUrl.GetPath() );
2092 if( !st.IsOK() )
2093 {
2094 XrdCl::Log *log = XrdCl::DefaultEnv::GetLog();
2095 log->Error( XrdCl::UtilityMsg, "Failed to remove local destination"
2096 " on failure: %s", st.ToString().c_str() );
2097 }
2098 }
2099 }
2100
2101 //------------------------------------------------------------------------
2103 //------------------------------------------------------------------------
2104 virtual XrdCl::XRootDStatus Initialize()
2105 {
2106 using namespace XrdCl;
2107 Log *log = DefaultEnv::GetLog();
2108 log->Debug( UtilityMsg, "Opening %s for writing",
2109 pUrl.GetObfuscatedURL().c_str() );
2110
2111 std::string value;
2112 DefaultEnv::GetEnv()->GetString( "WriteRecovery", value );
2113 pZip->SetProperty( "WriteRecovery", value );
2114
2115 OpenFlags::Flags flags = OpenFlags::Update;
2116
2117 FileSystem fs( pUrl );
2118 StatInfo *info = nullptr;
2119 auto st = fs.Stat( pUrl.GetPath(), info );
2120 if( !st.IsOK() && st.code == errErrorResponse && st.errNo == kXR_NotFound )
2121 flags |= OpenFlags::New;
2122
2123 if( pPosc )
2124 flags |= OpenFlags::POSC;
2125
2126 if( pCoerce )
2127 flags |= OpenFlags::Force;
2128
2129 if( pMakeDir)
2130 flags |= OpenFlags::MakePath;
2131
2132 st = XrdCl::WaitFor( XrdCl::OpenArchive( pZip, pUrl.GetURL(), flags ) );
2133 if( !st.IsOK() )
2134 return st;
2135
2136 std::string cptarget = XrdCl::DefaultCpTarget;
2137 XrdCl::DefaultEnv::GetEnv()->GetString( "CpTarget", cptarget );
2138 if( !cptarget.empty() )
2139 {
2140 std::string targeturl;
2141 pZip->GetProperty( "LastURL", targeturl );
2142 if( symlink( targeturl.c_str(), cptarget.c_str() ) == -1 )
2143 log->Warning( UtilityMsg, "Could not create cp-target symlink: %s",
2144 XrdSysE2T( errno ) );
2145 }
2146
2147 st = pZip->OpenFile( pFilename, XrdCl::OpenFlags::New | XrdCl::OpenFlags::Write, pSize );
2148 if( !st.IsOK() )
2149 return st;
2150
2151 return pCkSumHelper->Initialize();
2152 }
2153
2154 //------------------------------------------------------------------------
2156 //------------------------------------------------------------------------
2157 virtual XrdCl::XRootDStatus Finalize()
2158 {
2159 uint32_t crc32 = 0;
2160 auto st = pCkSumHelper->GetRawCheckSum( "zcrc32", crc32 );
2161 if( !st.IsOK() ) return st;
2162 pZip->UpdateMetadata( crc32 );
2163 pZip->CloseFile();
2164 return XrdCl::WaitFor( XrdCl::CloseArchive( pZip ) );
2165 }
2166
2167 //------------------------------------------------------------------------
2172 //------------------------------------------------------------------------
2173 virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci )
2174 {
2175 using namespace XrdCl;
2176
2177 //----------------------------------------------------------------------
2178 // If there is still place for this chunk to be sent send it
2179 //----------------------------------------------------------------------
2180 if( pChunks.size() < pParallel )
2181 return QueueChunk( std::move( ci ) );
2182
2183 //----------------------------------------------------------------------
2184 // We wait for a chunk to be sent so that we have space for the current
2185 // one
2186 //----------------------------------------------------------------------
2187 std::unique_ptr<ChunkHandler> ch( pChunks.front() );
2188 pChunks.pop();
2189 ch->sem->Wait();
2190 delete [] (char*)ch->chunk.GetBuffer();
2191 if( !ch->status.IsOK() )
2192 {
2193 Log *log = DefaultEnv::GetLog();
2194 log->Debug( UtilityMsg, "Unable write %d bytes at %llu from %s: %s",
2195 ch->chunk.GetLength(), (unsigned long long) ch->chunk.GetOffset(),
2196 pUrl.GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
2197 CleanUpChunks();
2198
2199 //--------------------------------------------------------------------
2200 // Check if we should re-try the transfer from scratch at a different
2201 // data server
2202 //--------------------------------------------------------------------
2203 return CheckIfRetriable( ch->status );
2204 }
2205
2206 return QueueChunk( std::move( ci ) );
2207 }
2208
2209 //------------------------------------------------------------------------
2211 //------------------------------------------------------------------------
2212 virtual int64_t GetSize()
2213 {
2214 return -1;
2215 }
2216
2217 //------------------------------------------------------------------------
2219 //------------------------------------------------------------------------
2220 void CleanUpChunks()
2221 {
2222 while( !pChunks.empty() )
2223 {
2224 ChunkHandler *ch = pChunks.front();
2225 pChunks.pop();
2226 ch->sem->Wait();
2227 delete [] (char *)ch->chunk.GetBuffer();
2228 delete ch;
2229 }
2230 }
2231
2232 //------------------------------------------------------------------------
2233 // Queue a chunk
2234 //------------------------------------------------------------------------
2235 XrdCl::XRootDStatus QueueChunk( XrdCl::PageInfo &&ci )
2236 {
2237 // we are writing chunks in order so we can calc the checksum
2238 // in case of local files
2239 if( pCkSumHelper ) pCkSumHelper->Update( ci.GetBuffer(), ci.GetLength() );
2240
2241 ChunkHandler *ch = new ChunkHandler( std::move( ci ) );
2242 XrdCl::XRootDStatus st;
2243
2244 //----------------------------------------------------------------------
2245 // TODO
2246 // In order to use PgWrite with ZIP append we need first to implement
2247 // PgWriteV!!!
2248 //----------------------------------------------------------------------
2249 st = pZip->Write( ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch );
2250 if( !st.IsOK() )
2251 {
2252 CleanUpChunks();
2253 delete [] (char*)ch->chunk.GetBuffer();
2254 delete ch;
2255 return st;
2256 }
2257 pChunks.push( ch );
2258 return XrdCl::XRootDStatus();
2259 }
2260
2261 //------------------------------------------------------------------------
2263 //------------------------------------------------------------------------
2264 virtual XrdCl::XRootDStatus Flush()
2265 {
2266 XrdCl::XRootDStatus st;
2267 while( !pChunks.empty() )
2268 {
2269 ChunkHandler *ch = pChunks.front();
2270 pChunks.pop();
2271 ch->sem->Wait();
2272 if( !ch->status.IsOK() )
2273 {
2274 //--------------------------------------------------------------------
2275 // Check if we should re-try the transfer from scratch at a different
2276 // data server
2277 //--------------------------------------------------------------------
2278 st = CheckIfRetriable( ch->status );
2279 }
2280 delete [] (char *)ch->chunk.GetBuffer();
2281 delete ch;
2282 }
2283 return st;
2284 }
2285
2286 //------------------------------------------------------------------------
2288 //------------------------------------------------------------------------
2289 virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
2290 std::string &checkSumType )
2291 {
2292 return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errNotSupported );
2293 }
2294
2295 //------------------------------------------------------------------------
2297 //------------------------------------------------------------------------
2298 virtual XrdCl::XRootDStatus SetXAttr( const std::vector<XrdCl::xattr_t> &xattrs )
2299 {
2300 return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errNotSupported );
2301 }
2302
2303 //------------------------------------------------------------------------
2305 //------------------------------------------------------------------------
2306 const std::string& GetLastURL() const
2307 {
2308 return pLastURL;
2309 }
2310
2311 //------------------------------------------------------------------------
2313 //------------------------------------------------------------------------
2314 const std::string& GetWrtRecoveryRedir() const
2315 {
2316 return pWrtRecoveryRedir;
2317 }
2318
2319 private:
2320 XRootDZipDestination(const XRootDDestination &other);
2321 XRootDZipDestination &operator = (const XRootDDestination &other);
2322
2323 //------------------------------------------------------------------------
2324 // Asynchronous chunk handler
2325 //------------------------------------------------------------------------
2326 class ChunkHandler: public XrdCl::ResponseHandler
2327 {
2328 public:
2329 ChunkHandler( XrdCl::PageInfo &&ci ):
2330 sem( new XrdSysSemaphore(0) ),
2331 chunk( std::move( ci ) ) {}
2332 virtual ~ChunkHandler() { delete sem; }
2333 virtual void HandleResponse( XrdCl::XRootDStatus *statusval,
2334 XrdCl::AnyObject */*response*/ )
2335 {
2336 this->status = *statusval;
2337 delete statusval;
2338 sem->Post();
2339 }
2340
2341 XrdSysSemaphore *sem;
2342 XrdCl::PageInfo chunk;
2343 XrdCl::XRootDStatus status;
2344 };
2345
2346 inline XrdCl::XRootDStatus CheckIfRetriable( XrdCl::XRootDStatus &status )
2347 {
2348 if( status.IsOK() ) return status;
2349
2350 //--------------------------------------------------------------------
2351 // Check if we should re-try the transfer from scratch at a different
2352 // data server
2353 //--------------------------------------------------------------------
2354 std::string value;
2355 if( pZip->GetProperty( "WrtRecoveryRedir", value ) )
2356 {
2357 pWrtRecoveryRedir = value;
2358 if( pZip->GetProperty( "LastURL", value ) ) pLastURL = value;
2359 return XrdCl::XRootDStatus( XrdCl::stError, XrdCl::errRetry );
2360 }
2361
2362 return status;
2363 }
2364
2365 const XrdCl::URL pUrl;
2366 std::string pFilename;
2367 XrdCl::ZipArchive *pZip;
2368 uint8_t pParallel;
2369 std::queue<ChunkHandler *> pChunks;
2370 int64_t pSize;
2371
2372 std::string pWrtRecoveryRedir;
2373 std::string pLastURL;
2374 XrdCl::ClassicCopyJob &cpjob;
2375 };
2376}
2377
2378//------------------------------------------------------------------------------
2379// Get current time in nanoseconds
2380//------------------------------------------------------------------------------
2381inline std::chrono::nanoseconds time_nsec()
2382{
2383 using namespace std::chrono;
2384 auto since_epoch = high_resolution_clock::now().time_since_epoch();
2385 return duration_cast<nanoseconds>( since_epoch );
2386}
2387
2388//------------------------------------------------------------------------------
2389// Convert seconds to nanoseconds
2390//------------------------------------------------------------------------------
2391inline long long to_nsec( long long sec )
2392{
2393 return sec * 1000000000;
2394}
2395
2396//------------------------------------------------------------------------------
2397// Sleep for # nanoseconds
2398//------------------------------------------------------------------------------
2399inline void sleep_nsec( long long nsec )
2400{
2401#if __cplusplus >= 201103L
2402 using namespace std::chrono;
2403 std::this_thread::sleep_for( nanoseconds( nsec ) );
2404#else
2405 timespec req;
2406 req.tv_sec = nsec / to_nsec( 1 );
2407 req.tv_nsec = nsec % to_nsec( 1 );
2408 nanosleep( &req, 0 );
2409#endif
2410}
2411
2412namespace XrdCl
2413{
2414 //----------------------------------------------------------------------------
2415 // Constructor
2416 //----------------------------------------------------------------------------
2418 PropertyList *jobProperties,
2419 PropertyList *jobResults ):
2420 CopyJob( jobId, jobProperties, jobResults )
2421 {
2422 Log *log = DefaultEnv::GetLog();
2423 log->Debug( UtilityMsg, "Creating a classic copy job, from %s to %s",
2424 GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str() );
2425 }
2426
2427 //----------------------------------------------------------------------------
2428 // Run the copy job
2429 //----------------------------------------------------------------------------
2431 {
2432 Log *log = DefaultEnv::GetLog();
2433
2434 std::string checkSumMode;
2435 std::string checkSumType;
2436 std::string checkSumPreset;
2437 std::string zipSource;
2438 uint16_t parallelChunks;
2439 uint32_t chunkSize;
2440 uint64_t blockSize;
2441 bool posc, force, coerce, makeDir, dynamicSource, zip, xcp, preserveXAttr,
2442 rmOnBadCksum, continue_, zipappend, doserver;
2443 int32_t nbXcpSources;
2444 long long xRate;
2445 long long xRateThreshold;
2446 time_t cpTimeout;
2447 std::vector<std::string> addcksums;
2448
2449 pProperties->Get( "checkSumMode", checkSumMode );
2450 pProperties->Get( "checkSumType", checkSumType );
2451 pProperties->Get( "checkSumPreset", checkSumPreset );
2452 pProperties->Get( "parallelChunks", parallelChunks );
2453 pProperties->Get( "chunkSize", chunkSize );
2454 pProperties->Get( "posc", posc );
2455 pProperties->Get( "force", force );
2456 pProperties->Get( "coerce", coerce );
2457 pProperties->Get( "makeDir", makeDir );
2458 pProperties->Get( "dynamicSource", dynamicSource );
2459 pProperties->Get( "zipArchive", zip );
2460 pProperties->Get( "xcp", xcp );
2461 pProperties->Get( "xcpBlockSize", blockSize );
2462 pProperties->Get( "preserveXAttr", preserveXAttr );
2463 pProperties->Get( "xrate", xRate );
2464 pProperties->Get( "xrateThreshold", xRateThreshold );
2465 pProperties->Get( "rmOnBadCksum", rmOnBadCksum );
2466 pProperties->Get( "continue", continue_ );
2467 pProperties->Get( "cpTimeout", cpTimeout );
2468 pProperties->Get( "zipAppend", zipappend );
2469 pProperties->Get( "addcksums", addcksums );
2470 pProperties->Get( "doServer", doserver );
2471
2472 if( zip )
2473 pProperties->Get( "zipSource", zipSource );
2474
2475 if( xcp )
2476 pProperties->Get( "nbXcpSources", nbXcpSources );
2477
2478 if( force && continue_ )
2479 return SetResult( stError, errInvalidArgs, EINVAL,
2480 "Invalid argument combination: continue + force." );
2481
2482 if( zipappend && ( continue_ || force ) )
2483 return SetResult( stError, errInvalidArgs, EINVAL,
2484 "Invalid argument combination: ( continue | force ) + zip-append." );
2485
2486 //--------------------------------------------------------------------------
2487 // Start the cp t/o timer if necessary
2488 //--------------------------------------------------------------------------
2489 std::unique_ptr<timer_sec_t> cptimer;
2490 if( cpTimeout ) cptimer.reset( new timer_sec_t() );
2491
2492 //--------------------------------------------------------------------------
2493 // Remove on bad checksum implies that POSC semantics has to be enabled
2494 //--------------------------------------------------------------------------
2495 if( rmOnBadCksum ) posc = true;
2496
2497 //--------------------------------------------------------------------------
2498 // Resolve the 'auto' checksum type.
2499 //--------------------------------------------------------------------------
2500 if( checkSumType == "auto" )
2501 {
2502 checkSumType = Utils::InferChecksumType( GetSource(), GetTarget(), zip );
2503 if( checkSumType.empty() )
2504 return SetResult( stError, errCheckSumError, ENOTSUP, "Could not infer checksum type." );
2505 else
2506 log->Info( UtilityMsg, "Using inferred checksum type: %s.", checkSumType.c_str() );
2507 }
2508
2509 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2510 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2511
2512 //--------------------------------------------------------------------------
2513 // Initialize the source and the destination
2514 //--------------------------------------------------------------------------
2515 std::unique_ptr<Source> src;
2516 if( xcp )
2517 src.reset( new XRootDSourceXCp( &GetSource(), chunkSize, parallelChunks, nbXcpSources, blockSize ) );
2518 else if( zip ) // TODO make zip work for xcp
2519 src.reset( new XRootDSourceZip( zipSource, &GetSource(), chunkSize, parallelChunks,
2520 checkSumType, addcksums , doserver) );
2521 else if( GetSource().GetProtocol() == "stdio" )
2522 src.reset( new StdInSource( checkSumType, chunkSize, addcksums ) );
2523 else
2524 {
2525 if( dynamicSource )
2526 src.reset( new XRootDSourceDynamic( &GetSource(), chunkSize, checkSumType, addcksums ) );
2527 else
2528 src.reset( new XRootDSource( &GetSource(), chunkSize, parallelChunks, checkSumType, addcksums, doserver ) );
2529 }
2530
2531 XRootDStatus st = src->Initialize();
2532 if( !st.IsOK() ) return SourceError( st );
2533 uint64_t size = src->GetSize() >= 0 ? src->GetSize() : 0;
2534
2535 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2536 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2537
2538 std::unique_ptr<Destination> dest;
2539 URL newDestUrl( GetTarget() );
2540
2541 if( GetTarget().GetProtocol() == "stdio" )
2542 dest.reset( new StdOutDestination( checkSumType ) );
2543 else if( zipappend )
2544 {
2545 std::string fn = GetSource().GetPath();
2546 size_t pos = fn.rfind( '/' );
2547 if( pos != std::string::npos )
2548 fn = fn.substr( pos + 1 );
2549 int64_t size = src->GetSize();
2550 dest.reset( new XRootDZipDestination( newDestUrl, fn, size, parallelChunks, *this ) );
2551 }
2552 //--------------------------------------------------------------------------
2553 // For xrootd destination build the oss.asize hint
2554 //--------------------------------------------------------------------------
2555 else
2556 {
2557 if( src->GetSize() >= 0 )
2558 {
2559 URL::ParamsMap params = newDestUrl.GetParams();
2560 std::ostringstream o; o << src->GetSize();
2561 params["oss.asize"] = o.str();
2562 newDestUrl.SetParams( params );
2563 // makeDir = true; // Backward compatibility for xroot destinations!!!
2564 }
2565 dest.reset( new XRootDDestination( newDestUrl, parallelChunks, checkSumType, *this ) );
2566 }
2567
2568 dest->SetForce( force );
2569 dest->SetPOSC( posc );
2570 dest->SetCoerce( coerce );
2571 dest->SetMakeDir( makeDir );
2572 dest->SetContinue( continue_ );
2573 st = dest->Initialize();
2574 if( !st.IsOK() ) return DestinationError( st );
2575
2576 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2577 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2578
2579 //--------------------------------------------------------------------------
2580 // Copy the chunks
2581 //--------------------------------------------------------------------------
2582 if( continue_ )
2583 {
2584 size -= dest->GetSize();
2585 XrdCl::XRootDStatus st = src->StartAt( dest->GetSize() );
2586 if( !st.IsOK() ) return SetResult( st );
2587 }
2588
2589 PageInfo pageInfo;
2590 uint64_t total_processed = 0;
2591 uint64_t processed = 0;
2592 auto start = time_nsec();
2593 uint16_t threshold_interval = parallelChunks;
2594 bool threshold_draining = false;
2595 timer_nsec_t threshold_timer;
2596 while( 1 )
2597 {
2598 st = src->GetChunk( pageInfo );
2599 if( !st.IsOK() )
2600 return SourceError( st);
2601
2602 if( st.IsOK() && st.code == suDone )
2603 break;
2604
2605 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2606 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2607
2608 if( xRate )
2609 {
2610 auto elapsed = ( time_nsec() - start ).count();
2611 double transferred = total_processed + pageInfo.GetLength();
2612 double expected = double( xRate ) / to_nsec( 1 ) * elapsed;
2613 //----------------------------------------------------------------------
2614 // check if our transfer rate didn't exceeded the limit
2615 // (we are too fast)
2616 //----------------------------------------------------------------------
2617 if( elapsed && // make sure elapsed time is greater than 0
2618 transferred > expected )
2619 {
2620 auto nsec = ( transferred / xRate * to_nsec( 1 ) ) - elapsed;
2621 sleep_nsec( nsec );
2622 }
2623 }
2624
2625 if( xRateThreshold )
2626 {
2627 auto elapsed = threshold_timer.elapsed();
2628 double transferred = processed + pageInfo.GetLength();
2629 double expected = double( xRateThreshold ) / to_nsec( 1 ) * elapsed;
2630 //----------------------------------------------------------------------
2631 // check if our transfer rate dropped below the threshold
2632 // (we are too slow)
2633 //----------------------------------------------------------------------
2634 if( elapsed && // make sure elapsed time is greater than 0
2635 transferred < expected &&
2636 threshold_interval == 0 ) // we check every # parallelChunks
2637 {
2638 if( !threshold_draining )
2639 {
2640 log->Warning( UtilityMsg, "Transfer rate dropped below requested ehreshold,"
2641 " trying different source!" );
2642 XRootDStatus st = src->TryOtherServer();
2643 if( !st.IsOK() ) return SetResult( stError, errThresholdExceeded, 0,
2644 "The transfer rate dropped below "
2645 "requested threshold!" );
2646 threshold_draining = true; // before the next measurement we need to drain
2647 // all the chunks that will come from the old server
2648 }
2649 else // now that all the chunks from the old server have
2650 { // been received we can start another measurement
2651 processed = 0;
2652 threshold_timer.reset();
2653 threshold_interval = parallelChunks;
2654 threshold_draining = false;
2655 }
2656 }
2657
2658 threshold_interval = threshold_interval > 0 ? threshold_interval - 1 : parallelChunks;
2659 }
2660
2661 total_processed += pageInfo.GetLength();
2662 processed += pageInfo.GetLength();
2663
2664 st = dest->PutChunk( std::move( pageInfo ) );
2665 if( !st.IsOK() )
2666 {
2667 if( st.code == errRetry )
2668 {
2669 pResults->Set( "LastURL", dest->GetLastURL() );
2670 pResults->Set( "WrtRecoveryRedir", dest->GetWrtRecoveryRedir() );
2671 return SetResult( st );
2672 }
2673 return DestinationError( st );
2674 }
2675
2676 if( progress )
2677 {
2678 progress->JobProgress( pJobId, total_processed, size );
2679 if( progress->ShouldCancel( pJobId ) )
2680 return SetResult( stError, errOperationInterrupted, kXR_Cancelled, "The copy-job has been cancelled!" );
2681 }
2682 }
2683
2684 st = dest->Flush();
2685 if( !st.IsOK() )
2686 return DestinationError( st );
2687
2688 //--------------------------------------------------------------------------
2689 // Copy extended attributes
2690 //--------------------------------------------------------------------------
2691 if( preserveXAttr && Utils::HasXAttr( GetSource() ) && Utils::HasXAttr( GetTarget() ) )
2692 {
2693 std::vector<xattr_t> xattrs;
2694 st = src->GetXAttr( xattrs );
2695 if( !st.IsOK() ) return SourceError( st );
2696 st = dest->SetXAttr( xattrs );
2697 if( !st.IsOK() ) return DestinationError( st );
2698 }
2699
2700 //--------------------------------------------------------------------------
2701 // The size of the source is known and not enough data has been transferred
2702 // to the destination
2703 //--------------------------------------------------------------------------
2704 if( src->GetSize() >= 0 && size != total_processed )
2705 {
2706 log->Error( UtilityMsg, "The declared source size is %llu bytes, but "
2707 "received %llu bytes.", (unsigned long long) size, (unsigned long long) total_processed );
2708 return SetResult( stError, errDataError );
2709 }
2710 pResults->Set( "size", total_processed );
2711
2712 //--------------------------------------------------------------------------
2713 // Finalize the destination
2714 //--------------------------------------------------------------------------
2715 st = dest->Finalize();
2716 if( !st.IsOK() )
2717 return DestinationError( st );
2718
2719 //--------------------------------------------------------------------------
2720 // Verify the checksums if needed
2721 //--------------------------------------------------------------------------
2722 if( checkSumMode != "none" )
2723 {
2724 log->Debug( UtilityMsg, "Attempting checksum calculation, mode: %s.",
2725 checkSumMode.c_str() );
2726 std::string sourceCheckSum;
2727 std::string targetCheckSum;
2728
2729 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2730 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2731
2732 //------------------------------------------------------------------------
2733 // Get the check sum at source
2734 //------------------------------------------------------------------------
2735 timeval oStart, oEnd;
2736 XRootDStatus st;
2737
2738 if( checkSumMode == "end2end" || checkSumMode == "source" ||
2739 !checkSumPreset.empty() )
2740 {
2741 gettimeofday( &oStart, 0 );
2742 if( !checkSumPreset.empty() )
2743 {
2744 sourceCheckSum = checkSumType + ":";
2745 sourceCheckSum += Utils::NormalizeChecksum( checkSumType,
2746 checkSumPreset );
2747 }
2748 else
2749 {
2750 st = src->GetCheckSum( sourceCheckSum, checkSumType );
2751 }
2752 gettimeofday( &oEnd, 0 );
2753
2754 if( !st.IsOK() )
2755 return SourceError( st );
2756
2757 pResults->Set( "sourceCheckSum", sourceCheckSum );
2758 }
2759
2760 if( !addcksums.empty() )
2761 pResults->Set( "additionalCkeckSum", src->GetAddCks() );
2762
2763 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2764 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2765
2766 //------------------------------------------------------------------------
2767 // Get the check sum at destination
2768 //------------------------------------------------------------------------
2769 timeval tStart, tEnd;
2770
2771 if( checkSumMode == "end2end" || checkSumMode == "target" )
2772 {
2773 gettimeofday( &tStart, 0 );
2774 st = dest->GetCheckSum( targetCheckSum, checkSumType );
2775 if( !st.IsOK() )
2776 return DestinationError( st );
2777 gettimeofday( &tEnd, 0 );
2778 pResults->Set( "targetCheckSum", targetCheckSum );
2779 }
2780
2781 if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2782 return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2783
2784 //------------------------------------------------------------------------
2785 // Make sure the checksums are both lower case
2786 //------------------------------------------------------------------------
2787 auto sanitize_cksum = []( char c )
2788 {
2789 std::locale loc;
2790 if( std::isalpha( c ) ) return std::tolower( c, loc );
2791 return c;
2792 };
2793
2794 std::transform( sourceCheckSum.begin(), sourceCheckSum.end(),
2795 sourceCheckSum.begin(), sanitize_cksum );
2796
2797 std::transform( targetCheckSum.begin(), targetCheckSum.end(),
2798 targetCheckSum.begin(), sanitize_cksum );
2799
2800 //------------------------------------------------------------------------
2801 // Compare and inform monitoring
2802 //------------------------------------------------------------------------
2803 if( !sourceCheckSum.empty() && !targetCheckSum.empty() )
2804 {
2805 bool match = false;
2806 if( sourceCheckSum == targetCheckSum )
2807 match = true;
2808
2810 if( mon )
2811 {
2813 i.transfer.origin = &GetSource();
2814 i.transfer.target = &GetTarget();
2815 i.cksum = sourceCheckSum;
2816 i.oTime = Utils::GetElapsedMicroSecs( oStart, oEnd );
2817 i.tTime = Utils::GetElapsedMicroSecs( tStart, tEnd );
2818 i.isOK = match;
2819 mon->Event( Monitor::EvCheckSum, &i );
2820 }
2821
2822 if( !match )
2823 {
2824 if( rmOnBadCksum )
2825 {
2826 FileSystem fs( newDestUrl );
2827 st = fs.Rm( newDestUrl.GetPath() );
2828 if( !st.IsOK() )
2829 log->Error( UtilityMsg, "Invalid checksum: failed to remove the target file: %s", st.ToString().c_str() );
2830 else
2831 log->Info( UtilityMsg, "Target file removed due to bad checksum!" );
2832 }
2833
2834 st = dest->Finalize();
2835 if( !st.IsOK() )
2836 log->Error( UtilityMsg, "Failed to finalize the destination: %s", st.ToString().c_str() );
2837
2838 return SetResult( stError, errCheckSumError, 0 );
2839 }
2840
2841 log->Info( UtilityMsg, "Checksum verification: succeeded." );
2842 }
2843 }
2844
2845 return SetResult();
2846 }
2847}
@ kXR_NotFound
@ kXR_Cancelled
std::chrono::nanoseconds time_nsec()
long long to_nsec(long long sec)
void sleep_nsec(long long nsec)
#define write(a, b, c)
Definition XrdPosix.hh:115
#define read(a, b, c)
Definition XrdPosix.hh:82
XrdOucString File
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
int Set(const char *csName)
Definition XrdCksData.hh:81
int Get(char *Buff, int Blen)
Definition XrdCksData.hh:69
void Get(Type &object)
Retrieve the object being held.
XRootDStatus Initialize()
Initialize.
const std::string & GetType()
XRootDStatus GetCheckSum(std::string &checkSum, std::string &checkSumType)
void Update(const void *buffer, uint32_t size)
ClassicCopyJob(uint32_t jobId, PropertyList *jobProperties, PropertyList *jobResults)
virtual XRootDStatus Run(CopyProgressHandler *progress=0)
PropertyList * pResults
CopyJob(uint32_t jobId, PropertyList *jobProperties, PropertyList *jobResults)
Constructor.
const URL & GetSource() const
Get source.
const URL & GetTarget() const
Get target.
PropertyList * pProperties
Interface for copy progress notification.
virtual bool ShouldCancel(uint32_t jobNum)
Determine whether the job should be canceled.
virtual void JobProgress(uint32_t jobNum, uint64_t bytesProcessed, uint64_t bytesTotal)
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
Definition XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition XrdClEnv.cc:89
Send file/filesystem queries to an XRootD cluster.
XRootDStatus Rm(const std::string &path, ResponseHandler *handler, time_t timeout=0) XRD_WARN_UNUSED_RESULT
A file.
Definition XrdClFile.hh:52
XRootDStatus SetXAttr(const std::vector< xattr_t > &attrs, ResponseHandler *handler, time_t timeout=0)
Definition XrdClFile.cc:781
XRootDStatus ListXAttr(ResponseHandler *handler, time_t timeout=0)
Definition XrdClFile.cc:880
Iterator Begin()
Get the location begin iterator.
LocationList::iterator Iterator
Iterator over locations.
Iterator End()
Get the location end iterator.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
An abstract class to describe the client-side monitoring plugin interface.
@ EvCheckSum
CheckSumInfo: File checksummed.
virtual void Event(EventCode evCode, void *evData)=0
void SetOnDataConnectHandler(const URL &url, std::shared_ptr< Job > onConnJob)
Set the on-connect handler for data streams.
A key-value pair map storing both keys and values as strings.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
uint64_t GetSize() const
Get size (in bytes).
URL representation.
Definition XrdClURL.hh:31
const std::string & GetPath() const
Get the path.
Definition XrdClURL.hh:217
std::map< std::string, std::string > ParamsMap
Definition XrdClURL.hh:33
void SetParams(const std::string &params)
Set params.
Definition XrdClURL.cc:402
const ParamsMap & GetParams() const
Get the URL params.
Definition XrdClURL.hh:244
static std::string NormalizeChecksum(const std::string &name, const std::string &checksum)
Normalize checksum.
static std::string InferChecksumType(const XrdCl::URL &source, const XrdCl::URL &destination, bool zip=false)
Automatically infer the right checksum type.
static uint64_t GetElapsedMicroSecs(timeval start, timeval end)
Get the elapsed microseconds between two timevals.
static XRootDStatus GetLocalCheckSum(std::string &checkSum, const std::string &checkSumType, const std::string &path)
Get a checksum from local file.
static bool HasXAttr(const XrdCl::URL &url)
static XRootDStatus GetRemoteCheckSum(std::string &checkSum, const std::string &checkSumType, const URL &url)
Get a checksum from a remote xrootd server.
static bool HasPgRW(const XrdCl::URL &url)
virtual long long GetSize() const =0
virtual std::string GetCheckSum(const std::string &type) const =0
virtual const std::vector< std::string > & GetReplicas()=0
Returns a vector with replicas as given in the meatlink file.
const uint16_t suRetry
GetXAttrImpl< false > GetXAttr(Ctx< File > file, Arg< std::string > name)
const char *const DefaultCpTarget
const uint16_t errOperationExpired
const uint16_t errNotImplemented
Operation is not implemented.
const uint16_t stError
An error occurred that could potentially be retried.
OpenArchiveImpl< false > OpenArchive(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< OpenFlags::Flags > flags, time_t timeout=0)
Factory for creating OpenArchiveImpl objects.
const uint16_t errDataError
data is corrupted
const int DefaultSubStreamsPerChannel
const int DefaultCpUsePgWrtRd
SetXAttrImpl< false > SetXAttr(Ctx< File > file, Arg< std::string > name, Arg< std::string > value)
const uint64_t UtilityMsg
const uint16_t errInvalidArgs
std::tuple< std::string, std::string > xattr_t
Extended attribute key - value pair.
XRootDStatus WaitFor(Pipeline pipeline, time_t timeout=0)
const uint16_t errNotSupported
const uint16_t errRetry
Try again for whatever reason.
const uint16_t errCheckSumError
const uint16_t suDone
const uint16_t errThresholdExceeded
const uint16_t errOperationInterrupted
const uint16_t errNoMoreReplicas
No more replicas to try.
CloseArchiveImpl< false > CloseArchive(Ctx< ZipArchive > zip, time_t timeout=0)
Factory for creating CloseFileImpl objects.
const int DefaultZipMtlnCksum
XrdSysError Log
Definition XrdConfig.cc:113
uint64_t GetOffset() const
Get the offset.
uint32_t GetLength() const
Get the data length.
void * GetBuffer()
Get the buffer.
Describe a checksum event.
TransferInfo transfer
The transfer in question.
uint64_t tTime
Microseconds to obtain cksum from target.
bool isOK
True if checksum matched, false otherwise.
std::string cksum
Checksum as "type:value".
uint64_t oTime
Microseconds to obtain cksum from origin.
const URL * target
URL of the target.
const URL * origin
URL of the origin.
@ Read
Open only for reading.
@ Write
Open only for writing.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
uint16_t code
Error type, or additional hints on what to do.
bool IsOK() const
We're fine.
std::string ToString() const
Create a string representation.
uint32_t errNo
Errno, if any.