25#ifndef __XRD_CL_XROOTD_MSG_HANDLER_HH__
26#define __XRD_CL_XROOTD_MSG_HANDLER_HH__
93 const std::string tostr =
to.GetLocation();
94 const std::string fromstr =
from.GetLocation();
100 case EntryRedirect:
return "Redirected from: " + fromstr +
" to: "
104 "Falling back to virtual redirector: " + tostr;
108 case EntryWait:
return "Waited at server request. Resending: "
112 return "Failed at: " + fromstr +
", retrying at: " + tostr;
137 std::shared_ptr<SIDManager> sidMgr,
140 pResponseHandler( respHandler ),
142 pEffectiveDataServerUrl( 0 ),
144 pLFileHandler( lFileHandler ),
146 pRedirectAsAnswer( false ),
147 pOksofarAsAnswer( false ),
148 pHasLoadBalancer( false ),
149 pHasSessionId( false ),
152 pRedirectCounter( 0 ),
153 pNotAuthorizedCounter( 0 ),
156 pAsyncChunkIndex( 0 ),
158 pPgWrtCksumBuff( 4 ),
159 pPgWrtCurrentPageOffset( 0 ),
160 pPgWrtCurrentPageNb( 0 ),
162 pOtherRawStarted( false ),
164 pFollowMetalink( false ),
168 pAggregatedWaitTime( 0 ),
172 pTimeoutFence( false ),
174 pDirListStarted( false ),
175 pDirListWithStat( false ),
183 pHasSessionId =
true;
186 log->
Debug(
ExDbgMsg,
"[%s] MsgHandler created: %p (message: %s ).",
187 pUrl.GetHostId().c_str(), (
void*)
this,
188 pRequest->GetObfuscatedDescription().c_str() );
195 ntohl( pgrdreq->
rlen ) ) );
217 DumpRedirectTraceBack();
221 delete pEffectiveDataServerUrl;
223 pRequest =
reinterpret_cast<Message*
>( 0xDEADBEEF );
225 pPostMaster =
reinterpret_cast<PostMaster*
>( 0xDEADBEEF );
227 pChunkList =
reinterpret_cast<ChunkList*
>( 0xDEADBEEF );
228 pEffectiveDataServerUrl =
reinterpret_cast<URL*
>( 0xDEADBEEF );
232 pUrl.GetHostId().c_str(), (
void*)
this );
242 virtual uint16_t
Examine( std::shared_ptr<Message> &msg )
override;
261 virtual uint16_t
GetSid()
const override;
268 virtual void Process()
override;
283 uint32_t &bytesRead )
override;
303 virtual bool IsRaw()
const override;
316 uint32_t &bytesWritten )
override;
330 pExpiration = expiration;
347 pRedirectAsAnswer = redirectAsAnswer;
356 pOksofarAsAnswer = oksofarAsAnswer;
374 pLoadBalancer = loadBalancer;
375 pHasLoadBalancer =
true;
383 pHosts.reset( hostList );
391 pChunkList = chunkList;
393 pBodyReader->SetChunkList( chunkList );
395 pChunkStatus.resize( chunkList->size() );
397 pChunkStatus.clear();
402 pCrc32cDigests = std::move( crc32cDigests );
418 pRedirectCounter = redirectCounter;
423 pFollowMetalink = followMetalink;
428 pStateful = stateful;
440 pSendingState |= kSawReadySend;
451 static constexpr int kSendDone = 0x0001;
452 static constexpr int kSawResp = 0x0002;
453 static constexpr int kFinalResp = 0x0004;
454 static constexpr int kSawReadySend = 0x0008;
455 static constexpr int kRetryAtSrv = 0x0010;
456 static constexpr int kInFlyDone = 0x0020;
471 void HandleResponse();
488 Status ParseXAttrResponse(
char *data,
size_t len,
AnyObject *&response );
494 Status RewriteRequestRedirect(
const URL &newUrl );
499 Status RewriteRequestWait();
504 void UpdateTriedCGI(uint32_t errNo=0);
509 void SwitchOnRefreshFlag();
515 void HandleRspOrQueue();
520 void HandleLocalRedirect(
URL *url );
538 bool OmitWait(
Message &request,
const URL &url );
547 bool RetriableErrorResponse(
const Status &status );
552 void DumpRedirectTraceBack();
562 Status ReadFromBuffer(
char *&buffer,
size_t &buflen, T& result );
572 Status ReadFromBuffer(
char *&buffer,
size_t &buflen, std::string &result );
583 Status ReadFromBuffer(
char *&buffer,
size_t &buflen,
size_t size,
584 std::string &result );
591 ChunkStatus(): sizeError( false ), done( false ) {}
596 typedef std::list<std::unique_ptr<RedirectEntry>> RedirectTraceBack;
598 static const size_t CksumSize =
sizeof( uint32_t );
600 static const size_t MaxSslErrRetry = 3;
602 inline static size_t NbPgPerRsp( uint64_t offset, uint32_t dlen )
612 if( _1stpg + CksumSize > dlen )
613 _1stpg = dlen - CksumSize;
614 dlen -= _1stpg + CksumSize;
616 pgcnt += dlen / PageWithCksum;
617 if( dlen % PageWithCksum )
625 inline bool IsInFly()
const
627 const int sst = pSendingState;
628 if ( ( sst & ( kSawResp|kSendDone ) ) && !( sst & kInFlyDone ) )
634 std::shared_ptr<Message> pResponse;
635 std::vector<std::shared_ptr<Message>> pPartialResps;
636 ResponseHandler *pResponseHandler;
638 URL *pEffectiveDataServerUrl;
639 PostMaster *pPostMaster;
640 std::shared_ptr<SIDManager> pSidMgr;
641 LocalFileHandler *pLFileHandler;
642 XRootDStatus pStatus;
645 bool pRedirectAsAnswer;
646 bool pOksofarAsAnswer;
647 std::unique_ptr<HostList> pHosts;
648 bool pHasLoadBalancer;
649 HostInfo pLoadBalancer;
651 std::string pRedirectUrl;
653 std::vector<uint32_t> pCrc32cDigests;
654 XrdSys::KernelBuffer *pKBuff;
655 std::vector<ChunkStatus> pChunkStatus;
656 uint16_t pRedirectCounter;
657 uint16_t pNotAuthorizedCounter;
659 uint32_t pAsyncOffset;
660 uint32_t pAsyncChunkIndex;
662 std::unique_ptr<AsyncPageReader> pPageReader;
663 std::unique_ptr<AsyncRawReaderIntfc> pBodyReader;
665 Buffer pPgWrtCksumBuff;
666 uint32_t pPgWrtCurrentPageOffset;
667 uint32_t pPgWrtCurrentPageNb;
669 bool pOtherRawStarted;
671 bool pFollowMetalink;
674 int pAggregatedWaitTime;
676 std::unique_ptr<RedirectEntry> pRdirEntry;
677 RedirectTraceBack pRedirectTraceBack;
679 std::atomic<int> pSendingState;
686 std::atomic<bool> pTimeoutFence;
693 bool pDirListStarted;
694 bool pDirListWithStat;
Object for discarding data.
Object for reading out data from the kXR_read response.
Object for reading out data from the VectorRead response.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
The message representation used throughout the system.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
StreamEvent
Events that may have occurred to the stream.
A hub for dispatching and receiving messages.
Handle an async response.
Handle XRootD stream IDs.
bool IsValid() const
Is the url valid.
void SetRedirectCounter(uint16_t redirectCounter)
Set the redirect counter.
void SetFollowMetalink(bool followMetalink)
void SetChunkList(ChunkList *chunkList)
Set the chunk list.
void SetHostList(HostList *hostList)
Set host list.
virtual uint16_t InspectStatusRsp() override
friend class HandleRspJob
virtual void OnStatusReady(const Message *message, XRootDStatus status) override
The requested action has been performed and the status is available.
void SetCrc32cDigests(std::vector< uint32_t > &&crc32cDigests)
const Message * GetRequest() const
Get the request pointer.
void SetLoadBalancer(const HostInfo &loadBalancer)
Set the load balancer.
virtual uint16_t Examine(std::shared_ptr< Message > &msg) override
void OnReadyToSend(Message *msg) override
void OnWaitingToSend(Message *msg) override
Called to indicate the message is waiting to be sent.
XRootDMsgHandler(Message *msg, ResponseHandler *respHandler, const URL *url, std::shared_ptr< SIDManager > sidMgr, LocalFileHandler *lFileHandler)
~XRootDMsgHandler()
Destructor.
void WaitDone(time_t now)
virtual void Process() override
Process the message if it was "taken" by the examine action.
virtual XRootDStatus ReadMessageBody(Message *msg, Socket *socket, uint32_t &bytesRead) override
void SetStateful(bool stateful)
void SetOksofarAsAnswer(bool oksofarAsAnswer)
time_t GetExpiration() override
Get a timestamp after which we give up.
XRootDStatus WriteMessageBody(Socket *socket, uint32_t &bytesWritten) override
void SetKernelBuffer(XrdSys::KernelBuffer *kbuff)
Set the kernel buffer.
virtual uint8_t OnStreamEvent(StreamEvent event, XRootDStatus status) override
virtual uint16_t GetSid() const override
void SetExpiration(time_t expiration)
Set a timestamp after which we give up.
virtual bool IsRaw() const override
Are we a raw writer or not?
void SetRedirectAsAnswer(bool redirectAsAnswer)
static int csNum(off_t offs, int count)
Compute the required size of a checksum vector based on offset & length.
std::vector< HostInfo > HostList
std::vector< ChunkInfo > ChunkList
List of chunks.
static const int PageSize
RedirectEntry(const URL &from, const URL &to, Type type)
std::string ToString(bool prevok=true)
Procedure execution status.