XRootD
Loading...
Searching...
No Matches
XrdClStream.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#include "XrdCl/XrdClStream.hh"
26#include "XrdCl/XrdClSocket.hh"
27#include "XrdCl/XrdClChannel.hh"
29#include "XrdCl/XrdClLog.hh"
30#include "XrdCl/XrdClMessage.hh"
32#include "XrdCl/XrdClUtils.hh"
34#include "XrdCl/XrdClMonitor.hh"
39
40#include <sys/types.h>
41#include <algorithm>
42#include <sys/socket.h>
43#include <sys/time.h>
44
45namespace XrdCl
46{
47 //----------------------------------------------------------------------------
48 // Statics
49 //----------------------------------------------------------------------------
50 RAtomic_uint64_t Stream::sSessCntGen{0};
51
52 //----------------------------------------------------------------------------
53 // Incoming message helper
54 //----------------------------------------------------------------------------
56 {
57 InMessageHelper( Message *message = 0,
58 MsgHandler *hndlr = 0,
59 time_t expir = 0,
60 uint16_t actio = 0 ):
61 msg( message ), handler( hndlr ), expires( expir ), action( actio ) {}
62 void Reset()
63 {
64 msg = 0; handler = 0; expires = 0; action = 0;
65 }
68 time_t expires;
69 uint16_t action;
70 };
71
72 //----------------------------------------------------------------------------
73 // Sub stream helper
74 //----------------------------------------------------------------------------
92
93
94 //----------------------------------------------------------------------------
95 // Notify the mutex a close (which may block waiting for the Poller) is
96 // about to happen for one of the subStreams.
97 //----------------------------------------------------------------------------
98 void StreamMutex::AddClosing( uint16_t subStream )
99 {
101 mclosing[subStream]++;
102 mcv.Broadcast();
103 }
104
105 //----------------------------------------------------------------------------
106 // Notify the mutex a close has completed.
107 //----------------------------------------------------------------------------
108 void StreamMutex::RemoveClosing( uint16_t subStream )
109 {
111 mclosing[subStream]--;
112 if( mclosing[subStream]==0 ) mclosing.erase( subStream );
113 mcv.Broadcast();
114 }
115
116 //----------------------------------------------------------------------------
117 // Lock
118 //----------------------------------------------------------------------------
120 {
122 if( mlist.empty() )
123 {
124 mlist.emplace_front();
125 ++mlist.front().cnt;
126 mthmap[XrdSysThread::ID()] = mlist.begin();
127 return;
128 }
129 while( 1 )
130 {
131 auto mit = mthmap.find( XrdSysThread::ID() );
132 if( mit == mthmap.end() )
133 {
134 mlist.emplace_back();
135 bool ins;
136 std::tie( mit, ins ) = mthmap.insert(
137 std::make_pair( XrdSysThread::ID(), std::prev( mlist.end() ) ) );
138 }
139 if( mit->second == mlist.begin() )
140 {
141 ++mlist.front().cnt;
142 return;
143 }
144 mcv.Wait();
145 }
146 }
147
148 //----------------------------------------------------------------------------
149 // Lock, notifying the mutex we're acquiring the mute from with a callback
150 // from the given subStream. We may fail to acquiring by setting isclosing.
151 //----------------------------------------------------------------------------
152 void StreamMutex::Lock( uint16_t subStream, bool &isclosing )
153 {
154 isclosing = false;
156 if( mlist.empty() ) {
157 mlist.emplace_front();
158 ++mlist.front().cnt;
159 mthmap[XrdSysThread::ID()] = mlist.begin();
160 return;
161 }
162 while( 1 )
163 {
164 auto mit = mthmap.find( XrdSysThread::ID() );
165 if( mit == mthmap.end() )
166 {
167 mlist.emplace_back();
168 bool ins;
169 std::tie( mit, ins ) = mthmap.insert(
170 std::make_pair( XrdSysThread::ID(), std::prev( mlist.end() ) ) );
171 }
172 if( mit->second == mlist.begin() )
173 {
174 ++mlist.front().cnt;
175 return;
176 }
177 if( hasfn || mclosing.count( subStream ) )
178 {
179 isclosing = true;
180 mlist.erase( mit->second );
181 mthmap.erase( mit );
182 return;
183 }
184 mcv.Wait();
185 }
186 }
187
188 //----------------------------------------------------------------------------
189 // Lock, notifying the mutex that any thread waiting for a lock from within a
190 // Poller callback should abort. We either acquire the mutex immediately or
191 // indicate that we could not, by setting isclosing. In that case the supplied
192 // func will be exectued by the last thread to release the lock.
193 // Only one func may be registered at a time. Others will not be called.
194 //----------------------------------------------------------------------------
195 void StreamMutex::Lock( const std::function<void()> &func, bool &isclosing )
196 {
197 isclosing = false;
199 if( mlist.empty() )
200 {
201 mlist.emplace_front( func );
202 ++mlist.front().cnt;
203 auto lit = mlist.begin();
204 mthmap[XrdSysThread::ID()] = lit;
205 fnlistit = lit;
206 hasfn = true;
207 return;
208 }
209 while( 1 )
210 {
211 auto mit = mthmap.find( XrdSysThread::ID() );
212 if( mit == mthmap.end() )
213 {
214 if( hasfn )
215 {
216 isclosing = true;
217 return;
218 }
219 mlist.emplace_back( func );
220 hasfn = true;
221 fnlistit = std::prev( mlist.end() );
222 mcv.Broadcast();
223 isclosing = true;
224 return;
225 }
226 if( mit->second == mlist.begin() )
227 {
228 ++mlist.front().cnt;
229 return;
230 }
231 mcv.Wait();
232 }
233 }
234
235 //----------------------------------------------------------------------------
236 // UnLock
237 //----------------------------------------------------------------------------
239 {
240 // keep any fn callback until return in case it holds a ref count
241 std::function<void()> keepfn;
242
244 auto mit = mthmap.find( XrdSysThread::ID() );
245 if( mit == mthmap.end() ) return;
246
247 // we must have held the lock
248 assert( mit->second == mlist.begin() );
249
250 const size_t cnt = --mlist.front().cnt;
251 if( cnt ) return;
252
253 if( hasfn && fnlistit == mit->second )
254 {
255 hasfn = false;
256 std::swap( keepfn, mlist.front().fn );
257 }
258
259 mlist.erase( mit->second );
260 mthmap.erase( mit );
261
262 // next up should have zero count
263 assert( mlist.empty() || mlist.front().cnt == 0 );
264
265 if( hasfn && fnlistit == mlist.begin() )
266 {
267 auto &lfn = mlist.front().fn;
268 ++mlist.front().cnt;
269 mthmap[XrdSysThread::ID()] = mlist.begin();
270 lck.UnLock();
271 lfn();
272 UnLock();
273 return;
274 }
275 mcv.Broadcast();
276 }
277
278 //------------------------------------------------------------------------
279 // Job to handle disposing of socket outside of a poller callback
280 //------------------------------------------------------------------------
281 class SocketDestroyJob: public Job
282 {
283 public:
285 pSock( socket ) { }
286 virtual ~SocketDestroyJob() {}
287 virtual void Run( void* )
288 {
289 delete pSock;
290 delete this;
291 }
292 private:
293 AsyncSocketHandler *pSock;
294 };
295
296 //----------------------------------------------------------------------------
297 // Constructor
298 //----------------------------------------------------------------------------
299 Stream::Stream( const URL *url, const URL &prefer ):
300 pUrl( url ),
301 pPrefer( prefer ),
302 pTransport( 0 ),
303 pPoller( 0 ),
304 pTaskManager( 0 ),
305 pJobManager( 0 ),
306 pIncomingQueue( 0 ),
307 pChannelData( 0 ),
308 pLastStreamError( 0 ),
309 pConnectionCount( 0 ),
310 pConnectionInitTime( 0 ),
311 pAddressType( Utils::IPAll ),
312 pSessionId( 0 ),
313 pBytesSent( 0 ),
314 pBytesReceived( 0 )
315 {
316 pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
317 pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
318
319 std::ostringstream o;
320 o << pUrl->GetHostId();
321 pStreamName = o.str();
322
323 pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
325 pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
327 pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
329
330 std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
332
333 pAddressType = Utils::String2AddressType( netStack );
334 if( pAddressType == Utils::AddressType::IPAuto )
335 {
337 if( !( stacks & XrdNetUtils::hasIP64 ) )
338 {
339 if( stacks & XrdNetUtils::hasIPv4 )
340 pAddressType = Utils::AddressType::IPv4;
341 else if( stacks & XrdNetUtils::hasIPv6 )
342 pAddressType = Utils::AddressType::IPv6;
343 }
344 }
345
346 Log *log = DefaultEnv::GetLog();
347 log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
348 "Connection Window: %d, ConnectionRetry: %d, Stream Error "
349 "Window: %d", pStreamName.c_str(), netStack.c_str(),
350 pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
351 }
352
353 //----------------------------------------------------------------------------
354 // Destructor
355 //----------------------------------------------------------------------------
357 {
358 // Used to disconnect substreams here, but since we are refernce counted
359 // and connected substream hold a count, if we're here they're closed.
360
361 Log *log = DefaultEnv::GetLog();
362 log->Debug( PostMasterMsg, "[%s] Destroying stream",
363 pStreamName.c_str() );
364
365 MonitorDisconnection( XRootDStatus() );
366
367 SubStreamList::iterator it;
368 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
369 delete *it;
370 }
371
372 //----------------------------------------------------------------------------
373 // Initializer
374 //----------------------------------------------------------------------------
376 {
377 if( !pTransport || !pPoller || !pChannelData )
379
380 AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
381 pChannelData, 0, this );
382 pSubStreams.push_back( new SubStreamData() );
383 pSubStreams[0]->socket = s;
384 return XRootDStatus();
385 }
386
387 //------------------------------------------------------------------------
388 // Make sure that the underlying socket handler gets write readiness
389 // events
390 //------------------------------------------------------------------------
392 {
393 StreamMutexHelper scopedLock( pMutex );
394
395 //--------------------------------------------------------------------------
396 // We are in the process of connecting the main stream, so we do nothing
397 // because when the main stream connection is established it will connect
398 // all the other streams
399 //--------------------------------------------------------------------------
400 if( pSubStreams[0]->status == Socket::Connecting )
401 return XRootDStatus();
402
403 //--------------------------------------------------------------------------
404 // The main stream is connected, so we can verify whether we have
405 // the up and the down stream connected and ready to handle data.
406 // If anything is not right we fall back to stream 0.
407 //--------------------------------------------------------------------------
408 if( pSubStreams[0]->status == Socket::Connected )
409 {
410 if( pSubStreams[path.down]->status != Socket::Connected )
411 path.down = 0;
412
413 if( pSubStreams[path.up]->status == Socket::Disconnected )
414 {
415 path.up = 0;
416 return pSubStreams[0]->socket->EnableUplink();
417 }
418
419 if( pSubStreams[path.up]->status == Socket::Connected )
420 return pSubStreams[path.up]->socket->EnableUplink();
421
422 return XRootDStatus();
423 }
424
425 //--------------------------------------------------------------------------
426 // The main stream is not connected, we need to check whether enough time
427 // has passed since we last encountered an error (if any) so that we could
428 // re-attempt the connection
429 //--------------------------------------------------------------------------
430 Log *log = DefaultEnv::GetLog();
431 time_t now = ::time(0);
432
433 if( now-pLastStreamError < pStreamErrorWindow )
434 return pLastFatalError;
435
436 gettimeofday( &pConnectionStarted, 0 );
437 ++pConnectionCount;
438
439 //--------------------------------------------------------------------------
440 // Resolve all the addresses of the host we're supposed to connect to
441 //--------------------------------------------------------------------------
442 XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
443 if( !st.IsOK() )
444 {
445 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
446 "the host", pStreamName.c_str() );
447 pLastStreamError = now;
448 st.status = stFatal;
449 pLastFatalError = st;
450 return st;
451 }
452
453 if( pPrefer.IsValid() )
454 {
455 std::vector<XrdNetAddr> addrresses;
456 XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
457 if( !st.IsOK() )
458 {
459 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
460 pStreamName.c_str(), pPrefer.GetHostName().c_str() );
461 }
462 else
463 {
464 std::vector<XrdNetAddr> tmp;
465 tmp.reserve( pAddresses.size() );
466 // first add all remaining addresses
467 auto itr = pAddresses.begin();
468 for( ; itr != pAddresses.end() ; ++itr )
469 {
470 if( !HasNetAddr( *itr, addrresses ) )
471 tmp.push_back( *itr );
472 }
473 // then copy all 'preferred' addresses
474 std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
475 // and keep the result
476 pAddresses.swap( tmp );
477 }
478 }
479
480 Utils::LogHostAddresses( log, PostMasterMsg, pUrl->GetHostId(),
481 pAddresses );
482
483 while( !pAddresses.empty() )
484 {
485 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
486 pAddresses.pop_back();
487 pConnectionInitTime = ::time( 0 );
488 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
489 if( st.IsOK() )
490 {
491 pSubStreams[0]->status = Socket::Connecting;
492 break;
493 }
494 }
495 return st;
496 }
497
498 //----------------------------------------------------------------------------
499 // Queue the message for sending
500 //----------------------------------------------------------------------------
502 MsgHandler *handler,
503 bool stateful,
504 time_t expires )
505 {
506 StreamMutexHelper scopedLock( pMutex );
507 Log *log = DefaultEnv::GetLog();
508
509 //--------------------------------------------------------------------------
510 // Check the session ID and bounce if needed
511 //--------------------------------------------------------------------------
512 if( msg->GetSessionId() &&
513 (pSubStreams[0]->status != Socket::Connected ||
514 pSessionId != msg->GetSessionId()) )
516
517 //--------------------------------------------------------------------------
518 // Decide on the path to send the message
519 //--------------------------------------------------------------------------
520 PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
521 if( pSubStreams.size() <= path.up )
522 {
523 log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
524 "substream %d, using 0 instead", pStreamName.c_str(),
525 msg->GetObfuscatedDescription().c_str(), path.up );
526 path.up = 0;
527 }
528
529 log->Dump( PostMasterMsg, "[%s] Sending message %s (%p) through "
530 "substream %d expecting answer at %d", pStreamName.c_str(),
531 msg->GetObfuscatedDescription().c_str(), (void*)msg, path.up, path.down );
532
533 //--------------------------------------------------------------------------
534 // Enable *a* path and insert the message to the right queue
535 //--------------------------------------------------------------------------
536 XRootDStatus st = EnableLink( path );
537 if( st.IsOK() )
538 {
539 pTransport->MultiplexSubStream( msg, *pChannelData, &path );
540 handler->OnWaitingToSend( msg );
541 pSubStreams[path.up]->outQueue->PushBack( msg, handler,
542 expires, stateful );
543 }
544 else
545 st.status = stFatal;
546 return st;
547 }
548
549 //----------------------------------------------------------------------------
550 // Force connection
551 //----------------------------------------------------------------------------
553 {
554 StreamMutexHelper scopedLock( pMutex );
555 if( pSubStreams[0]->status == Socket::Connecting )
556 {
557 pSubStreams[0]->status = Socket::Disconnected;
558 XrdCl::PathID path( 0, 0 );
559 XrdCl::XRootDStatus st = EnableLink( path );
560 if( !st.IsOK() )
561 OnConnectError( 0, st );
562 }
563 }
564
565 //----------------------------------------------------------------------------
566 // Disconnect the stream
567 //----------------------------------------------------------------------------
569 {
570 auto channel = GetChannel();
571 StreamMutexHelper scopedLock( pMutex );
572 SubStreamList::iterator it;
573 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
574 {
575 (*it)->socket->Close();
576 (*it)->status = Socket::Disconnected;
577 }
578 pSessionId = 0;
579 }
580
581 //----------------------------------------------------------------------------
582 // Handle a clock event
583 //----------------------------------------------------------------------------
584 void Stream::Tick( time_t now )
585 {
586 //--------------------------------------------------------------------------
587 // Check for timed-out requests and incoming handlers
588 //--------------------------------------------------------------------------
589 StreamMutexHelper scopedLock( pMutex );
590 OutQueue q;
591 SubStreamList::iterator it;
592 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
593 q.GrabExpired( *(*it)->outQueue, now );
594 scopedLock.UnLock();
595
597 pIncomingQueue->ReportTimeout( now );
598 }
599}
600
601//------------------------------------------------------------------------------
602// Handle message timeouts and reconnection in the future
603//------------------------------------------------------------------------------
604namespace
605{
606 class StreamConnectorTask: public XrdCl::Task
607 {
608 public:
609 //------------------------------------------------------------------------
610 // Constructor
611 //------------------------------------------------------------------------
612 StreamConnectorTask( const XrdCl::URL &url, const std::string &n ):
613 url( url )
614 {
615 std::string name = "StreamConnectorTask for ";
616 name += n;
617 SetName( name );
618 }
619
620 //------------------------------------------------------------------------
621 // Run the task
622 //------------------------------------------------------------------------
623 time_t Run( time_t )
624 {
626 return 0;
627 }
628
629 private:
630 XrdCl::URL url;
631 };
632}
633
634namespace XrdCl
635{
636 XRootDStatus Stream::RequestClose( Message &response )
637 {
638 ServerResponse *rsp = reinterpret_cast<ServerResponse*>( response.GetBuffer() );
639 if( rsp->hdr.dlen < 4 ) return XRootDStatus( stError );
640 Message *msg;
641 ClientCloseRequest *req;
642 MessageUtils::CreateRequest( msg, req );
643 req->requestid = kXR_close;
644 memcpy( req->fhandle, reinterpret_cast<uint8_t*>( rsp->body.buffer.data ), 4 );
646 msg->SetSessionId( pSessionId );
647 NullResponseHandler *handler = new NullResponseHandler();
648 MessageSendParams params;
649 params.timeout = 0;
650 params.followRedirects = false;
651 params.stateful = true;
653 return MessageUtils::SendMessage( *pUrl, msg, handler, params, 0 );
654 }
655
656 //------------------------------------------------------------------------
657 // Check if message is a partial response
658 //------------------------------------------------------------------------
659 bool Stream::IsPartial( Message &msg )
660 {
661 ServerResponseHeader *rsphdr = (ServerResponseHeader*)msg.GetBuffer();
662 if( rsphdr->status == kXR_oksofar )
663 return true;
664
665 if( rsphdr->status == kXR_status )
666 {
667 ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
669 return true;
670 }
671
672 return false;
673 }
674
675 //----------------------------------------------------------------------------
676 // Call back when a message has been reconstructed
677 //----------------------------------------------------------------------------
678 void Stream::OnIncoming( uint16_t subStream,
679 std::shared_ptr<Message> msg,
680 uint32_t bytesReceived )
681 {
682 msg->SetSessionId( pSessionId );
683 pBytesReceived += bytesReceived;
684
685 MsgHandler *handler = nullptr;
686 uint16_t action = 0;
687 {
688 InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
689 handler = mh.handler;
690 action = mh.action;
691 mh.Reset();
692 }
693
694 if( !IsPartial( *msg ) )
695 {
696 uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
697 *pChannelData );
698 if( streamAction & TransportHandler::DigestMsg )
699 return;
700
701 if( streamAction & TransportHandler::RequestClose )
702 {
703 RequestClose( *msg );
704 return;
705 }
706 }
707
708 Log *log = DefaultEnv::GetLog();
709
710 //--------------------------------------------------------------------------
711 // No handler, we discard the message ...
712 //--------------------------------------------------------------------------
713 if( !handler )
714 {
715 ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
716 log->Warning( PostMasterMsg, "[%s] Discarding received message: %p "
717 "(status=%d, SID=[%d,%d]), no MsgHandler found.",
718 pStreamName.c_str(), (void*)msg.get(), rsp->hdr.status,
719 rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
720 return;
721 }
722
723 //--------------------------------------------------------------------------
724 // We have a handler, so we call the callback
725 //--------------------------------------------------------------------------
726 log->Dump( PostMasterMsg, "[%s] Handling received message: %p.",
727 pStreamName.c_str(), (void*)msg.get() );
728
730 {
731 log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.",
732 pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
733
734 // if we are handling partial response we have to take down the timeout fence
735 if( IsPartial( *msg ) )
736 {
737 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
738 if( xrdHandler ) xrdHandler->PartialReceived();
739 }
740
741 return;
742 }
743
744 Job *job = new HandleIncMsgJob( handler );
745 pJobManager->QueueJob( job );
746 }
747
748 //----------------------------------------------------------------------------
749 // Call when one of the sockets is ready to accept a new message
750 //----------------------------------------------------------------------------
751 std::pair<Message *, MsgHandler *>
752 Stream::OnReadyToWrite( uint16_t subStream )
753 {
754 bool closing;
755 StreamMutexHelper scopedLock( pMutex, subStream, closing );
756 if( closing ) return std::make_pair( (Message *)0, (MsgHandler *)0 );
757 Log *log = DefaultEnv::GetLog();
758 if( pSubStreams[subStream]->outQueue->IsEmpty() )
759 {
760 log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
761 pSubStreams[subStream]->socket->GetStreamName().c_str() );
762
763 pSubStreams[subStream]->socket->DisableUplink();
764 return std::make_pair( (Message *)0, (MsgHandler *)0 );
765 }
766
767 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
768 h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
769 h.expires,
770 h.stateful );
771
772 log->Debug( PostMasterMsg, "[%s] Duplicating MsgHandler: %p (message: %s) "
773 "from out-queue to in-queue, starting to send outgoing.",
774 pUrl->GetHostId().c_str(), (void*)h.handler,
775 h.msg->GetObfuscatedDescription().c_str() );
776
777 scopedLock.UnLock();
778
779 if( h.handler )
780 {
781 bool rmMsg = false;
782 pIncomingQueue->AddMessageHandler( h.handler, rmMsg );
783 if( rmMsg )
784 {
785 Log *log = DefaultEnv::GetLog();
786 log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
787 pStreamName.c_str() );
788 }
789 h.handler->OnReadyToSend( h.msg );
790 }
791 return std::make_pair( h.msg, h.handler );
792 }
793
794 void Stream::DisableIfEmpty( uint16_t subStream )
795 {
796 bool closing;
797 StreamMutexHelper scopedLock( pMutex, subStream, closing );
798 if( closing ) return;
799 Log *log = DefaultEnv::GetLog();
800
801 if( pSubStreams[subStream]->outQueue->IsEmpty() )
802 {
803 log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
804 pSubStreams[subStream]->socket->GetStreamName().c_str() );
805 pSubStreams[subStream]->socket->DisableUplink();
806 }
807 }
808
809 //----------------------------------------------------------------------------
810 // Call when a message is written to the socket
811 //----------------------------------------------------------------------------
812 void Stream::OnMessageSent( uint16_t subStream,
813 Message *msg,
814 uint32_t bytesSent )
815 {
816 pTransport->MessageSent( msg, subStream, bytesSent,
817 *pChannelData );
818 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
819 pBytesSent += bytesSent;
820 if( h.handler )
821 {
822 // ensure expiration time is assigned if still in queue
823 pIncomingQueue->AssignTimeout( h.handler );
824 // OnStatusReady may cause the handler to delete itself, in
825 // which case the handler or the user callback may also delete msg
827 }
828 pSubStreams[subStream]->outMsgHelper.Reset();
829 }
830
831 //----------------------------------------------------------------------------
832 // Call back when a message has been reconstructed
833 //----------------------------------------------------------------------------
834 void Stream::OnConnect( uint16_t subStream )
835 {
836 auto channel = GetChannel();
837 bool closing;
838 StreamMutexHelper scopedLock( pMutex, subStream, closing );
839 if( closing ) return;
840 pSubStreams[subStream]->status = Socket::Connected;
841
842 std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
843 Log *log = DefaultEnv::GetLog();
844 log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
845 subStream, ipstack.c_str() );
846
847 if( subStream == 0 )
848 {
849 pLastStreamError = 0;
850 pLastFatalError = XRootDStatus();
851 pConnectionCount = 0;
852 uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
853 pSessionId = ++sSessCntGen;
854
855 //------------------------------------------------------------------------
856 // Create the streams if they don't exist yet
857 //------------------------------------------------------------------------
858 if( pSubStreams.size() == 1 && numSub > 1 )
859 {
860 for( uint16_t i = 1; i < numSub; ++i )
861 {
862 URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
863 AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
864 pChannelData, i, this );
865 pSubStreams.push_back( new SubStreamData() );
866 pSubStreams[i]->socket = s;
867 }
868 }
869
870 //------------------------------------------------------------------------
871 // Connect the extra streams, if we fail we move all the outgoing items
872 // to stream 0, we don't need to enable the uplink here, because it
873 // should be already enabled after the handshaking process is completed.
874 //------------------------------------------------------------------------
875 if( pSubStreams.size() > 1 )
876 {
877 log->Debug( PostMasterMsg, "[%s] Attempting to connect %zu additional streams.",
878 pStreamName.c_str(), pSubStreams.size() - 1 );
879 for( size_t i = 1; i < pSubStreams.size(); ++i )
880 {
881 if( pSubStreams[i]->status != Socket::Disconnected )
882 {
883 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
884 SockHandlerClose( i );
885 }
886 pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
887 XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
888 if( !st.IsOK() )
889 {
890 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
891 SockHandlerClose( i );
892 }
893 else
894 {
895 pSubStreams[i]->status = Socket::Connecting;
896 }
897 }
898 }
899
900 //------------------------------------------------------------------------
901 // Inform monitoring
902 //------------------------------------------------------------------------
903 pBytesSent = 0;
904 pBytesReceived = 0;
905 gettimeofday( &pConnectionDone, 0 );
907 if( mon )
908 {
910 i.server = pUrl->GetHostId();
911 i.sTOD = pConnectionStarted;
912 i.eTOD = pConnectionDone;
913 i.streams = pSubStreams.size();
914
915 AnyObject qryResult;
916 std::string *qryResponse = nullptr;
917 pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
918 qryResult.Get( qryResponse );
919
920 if (qryResponse) {
921 i.auth = *qryResponse;
922 delete qryResponse;
923 } else {
924 i.auth = "";
925 }
926
927 mon->Event( Monitor::EvConnect, &i );
928 }
929
930 //------------------------------------------------------------------------
931 // For every connected control-stream call the global on-connect handler
932 //------------------------------------------------------------------------
934 }
935 else if( pOnDataConnJob )
936 {
937 //------------------------------------------------------------------------
938 // For every connected data-stream call the on-connect handler
939 //------------------------------------------------------------------------
940 pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
941 }
942 }
943
944 //----------------------------------------------------------------------------
945 // On connect error
946 //----------------------------------------------------------------------------
947 void Stream::OnConnectError( uint16_t subStream, XRootDStatus status )
948 {
949 auto channel = GetChannel();
950 bool closing;
951 StreamMutexHelper scopedLock( pMutex, subStream, closing );
952 if( closing ) return;
953 Log *log = DefaultEnv::GetLog();
954 SockHandlerClose( subStream );
955 time_t now = ::time(0);
956
957 //--------------------------------------------------------------------------
958 // For every connection error call the global connection error handler
959 //--------------------------------------------------------------------------
961
962 //--------------------------------------------------------------------------
963 // If we connected subStream == 0 and cannot connect >0 then we just give
964 // up and move the outgoing messages to another queue
965 //--------------------------------------------------------------------------
966 if( subStream > 0 )
967 {
968 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
969 if( pSubStreams[0]->status == Socket::Connected )
970 {
971 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
972 if( !st.IsOK() )
973 OnFatalError( 0, st, scopedLock );
974 return;
975 }
976
977 if( pSubStreams[0]->status == Socket::Connecting )
978 return;
979
980 OnFatalError( subStream, status, scopedLock );
981 return;
982 }
983
984 //--------------------------------------------------------------------------
985 // Check if we still have time to try and do something in the current window
986 //--------------------------------------------------------------------------
987 time_t elapsed = now-pConnectionInitTime;
988 log->Error( PostMasterMsg, "[%s] elapsed = %lld, pConnectionWindow = %d seconds.",
989 pStreamName.c_str(), (long long) elapsed, pConnectionWindow );
990
991 //------------------------------------------------------------------------
992 // If we have some IP addresses left we try them
993 //------------------------------------------------------------------------
994 if( !pAddresses.empty() )
995 {
996 XRootDStatus st;
997 do
998 {
999 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
1000 pAddresses.pop_back();
1001 pConnectionInitTime = ::time( 0 );
1002 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
1003 }
1004 while( !pAddresses.empty() && !st.IsOK() );
1005
1006 if( !st.IsOK() )
1007 OnFatalError( subStream, st, scopedLock );
1008 else
1009 pSubStreams[0]->status = Socket::Connecting;
1010
1011 return;
1012 }
1013 //------------------------------------------------------------------------
1014 // If we still can retry with the same host name, we sleep until the end
1015 // of the connection window and try
1016 //------------------------------------------------------------------------
1017 else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
1018 && !status.IsFatal() )
1019 {
1020 log->Info( PostMasterMsg, "[%s] Attempting reconnection in %lld seconds.",
1021 pStreamName.c_str(), (long long) (pConnectionWindow - elapsed) );
1022
1023 pSubStreams[0]->status = Socket::Connecting;
1024 Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
1025 pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
1026 return;
1027 }
1028 //--------------------------------------------------------------------------
1029 // We are out of the connection window, the only thing we can do here
1030 // is re-resolving the host name and retrying if we still can
1031 //--------------------------------------------------------------------------
1032 else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
1033 {
1034 pAddresses.clear();
1035 pSubStreams[0]->status = Socket::Disconnected;
1036 PathID path( 0, 0 );
1037 XRootDStatus st = EnableLink( path );
1038 if( !st.IsOK() )
1039 OnFatalError( subStream, st, scopedLock );
1040 return;
1041 }
1042
1043 //--------------------------------------------------------------------------
1044 // Else, we fail
1045 //--------------------------------------------------------------------------
1046 OnFatalError( subStream, status, scopedLock );
1047 }
1048
1049 //----------------------------------------------------------------------------
1050 // Call back when an error has occurred
1051 //----------------------------------------------------------------------------
1052 void Stream::OnError( uint16_t subStream, XRootDStatus status )
1053 {
1054 auto channel = GetChannel();
1055 bool closing;
1056 StreamMutexHelper scopedLock( pMutex, subStream, closing );
1057 if( closing ) return;
1058 Log *log = DefaultEnv::GetLog();
1059 SockHandlerClose( subStream );
1060
1061 log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
1062 pStreamName.c_str(), subStream, status.ToString().c_str() );
1063
1064 //--------------------------------------------------------------------------
1065 // Reinsert the stuff that we have failed to sent
1066 //--------------------------------------------------------------------------
1067 Reinsert( subStream );
1068
1069 //--------------------------------------------------------------------------
1070 // We are dealing with an error of a peripheral stream. If we don't have
1071 // anything to send don't bother recovering. Otherwise move the requests
1072 // to stream 0 if possible.
1073 //--------------------------------------------------------------------------
1074 if( subStream > 0 )
1075 {
1076 if( pSubStreams[subStream]->outQueue->IsEmpty() )
1077 return;
1078
1079 if( pSubStreams[0]->status != Socket::Disconnected )
1080 {
1081 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
1082 if( pSubStreams[0]->status == Socket::Connected )
1083 {
1084 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
1085 if( !st.IsOK() )
1086 OnFatalError( 0, st, scopedLock );
1087 return;
1088 }
1089 }
1090 OnFatalError( subStream, status, scopedLock );
1091 return;
1092 }
1093
1094 //--------------------------------------------------------------------------
1095 // If we lost the stream 0 we have lost the session, we re-enable the
1096 // stream if we still have things in one of the outgoing queues, otherwise
1097 // there is not point to recover at this point.
1098 //--------------------------------------------------------------------------
1099 if( subStream == 0 )
1100 {
1101 MonitorDisconnection( status );
1102 pSessionId = 0;
1103
1104 SubStreamList::iterator it;
1105 size_t outstanding = 0;
1106 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1107 outstanding += (*it)->outQueue->GetSizeStateless();
1108
1109 if( outstanding )
1110 {
1111 PathID path( 0, 0 );
1112 XRootDStatus st = EnableLink( path );
1113 if( !st.IsOK() )
1114 {
1115 OnFatalError( 0, st, scopedLock );
1116 return;
1117 }
1118 }
1119
1120 //------------------------------------------------------------------------
1121 // We're done here, unlock the stream mutex to avoid deadlocks and
1122 // report the disconnection event to the handlers
1123 //------------------------------------------------------------------------
1124 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
1125 "message handlers.", pStreamName.c_str() );
1126 OutQueue q;
1127 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1128 q.GrabStateful( *(*it)->outQueue );
1129 scopedLock.UnLock();
1130
1131 q.Report( status );
1132 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
1133 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
1134 return;
1135 }
1136 }
1137
1138 //------------------------------------------------------------------------
1139 // Force error
1140 //------------------------------------------------------------------------
1141 void Stream::ForceError( XRootDStatus status, const bool hush, const uint64_t sess )
1142 {
1143 auto channel = GetChannel();
1144 bool closing;
1145 StreamMutexHelper scopedLock( pMutex,
1146 [this, channel, status, hush, sess]()
1147 {
1148 this->ForceError(status, hush, sess);
1149 }, closing );
1150 if( closing ) return;
1151 if( sess && sess != pSessionId ) return;
1152
1153 Log *log = DefaultEnv::GetLog();
1154 for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
1155 {
1156 if( pSubStreams[substream]->status != Socket::Connected ) continue;
1157 SockHandlerClose( substream );
1158
1159 if( !hush )
1160 log->Debug( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
1161 pStreamName.c_str(), status.ToString().c_str() );
1162
1163 //--------------------------------------------------------------------
1164 // Reinsert the stuff that we have failed to sent
1165 //--------------------------------------------------------------------
1166 Reinsert( substream );
1167 }
1168
1169 pConnectionCount = 0;
1170 pSessionId = 0;
1171
1172 //------------------------------------------------------------------------
1173 // We're done here, unlock the stream mutex to avoid deadlocks and
1174 // report the disconnection event to the handlers
1175 //------------------------------------------------------------------------
1176 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
1177 "message handlers.", pStreamName.c_str() );
1178
1179 SubStreamList::iterator it;
1180 OutQueue q;
1181 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1182 q.GrabItems( *(*it)->outQueue );
1183 scopedLock.UnLock();
1184
1185 q.Report( status );
1186
1187 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
1188 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
1189 }
1190
1191 //----------------------------------------------------------------------------
1192 // On fatal error
1193 //----------------------------------------------------------------------------
1194 void Stream::OnFatalError( uint16_t subStream,
1195 XRootDStatus status,
1196 StreamMutexHelper &lock )
1197 {
1198 Log *log = DefaultEnv::GetLog();
1199 SockHandlerClose( subStream );
1200 log->Error( PostMasterMsg, "[%s] Unable to recover: %s.",
1201 pStreamName.c_str(), status.ToString().c_str() );
1202
1203 //--------------------------------------------------------------------------
1204 // Don't set the stream error windows for authentication errors as the user
1205 // may refresh his credential at any time
1206 //--------------------------------------------------------------------------
1207 if( status.code != errAuthFailed )
1208 {
1209 pConnectionCount = 0;
1210 pLastStreamError = ::time(0);
1211 pLastFatalError = status;
1212 }
1213
1214 SubStreamList::iterator it;
1215 OutQueue q;
1216 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1217 q.GrabItems( *(*it)->outQueue );
1218 lock.UnLock();
1219
1220 status.status = stFatal;
1221 q.Report( status );
1222 pIncomingQueue->ReportStreamEvent( MsgHandler::FatalError, status );
1223 pChannelEvHandlers.ReportEvent( ChannelEventHandler::FatalError, status );
1224
1225 }
1226
1227 //----------------------------------------------------------------------------
1228 // Inform monitoring about disconnection
1229 //----------------------------------------------------------------------------
1230 void Stream::MonitorDisconnection( XRootDStatus status )
1231 {
1232 Monitor *mon = DefaultEnv::GetMonitor();
1233 if( mon )
1234 {
1235 Monitor::DisconnectInfo i;
1236 i.server = pUrl->GetHostId();
1237 i.rBytes = pBytesReceived;
1238 i.sBytes = pBytesSent;
1239 i.cTime = ::time(0) - pConnectionDone.tv_sec;
1240 i.status = status;
1241 mon->Event( Monitor::EvDisconnect, &i );
1242 }
1243 }
1244
1245 //----------------------------------------------------------------------------
1246 // Call back when a message has been reconstructed
1247 //----------------------------------------------------------------------------
1248 bool Stream::OnReadTimeout( uint16_t substream )
1249 {
1250 //--------------------------------------------------------------------------
1251 // We only take the main stream into account
1252 //--------------------------------------------------------------------------
1253 if( substream != 0 )
1254 return true;
1255
1256 //--------------------------------------------------------------------------
1257 // Check if there is no outgoing messages and if the stream TTL is elapesed.
1258 // It is assumed that the underlying transport makes sure that there is no
1259 // pending requests that are not answered, ie. all possible virtual streams
1260 // are de-allocated
1261 //--------------------------------------------------------------------------
1262 Log *log = DefaultEnv::GetLog();
1263 SubStreamList::iterator it;
1264 time_t now = time(0);
1265
1266 bool closing;
1267 StreamMutexHelper scopedLock( pMutex, substream, closing );
1268 if( closing ) return false;
1269 uint32_t outgoingMessages = 0;
1270 time_t lastActivity = 0;
1271 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1272 {
1273 outgoingMessages += (*it)->outQueue->GetSize();
1274 time_t sockLastActivity = (*it)->socket->GetLastActivity();
1275 if( lastActivity < sockLastActivity )
1276 lastActivity = sockLastActivity;
1277 }
1278
1279 if( !outgoingMessages )
1280 {
1281 bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1282 *pChannelData );
1283 if( disconnect )
1284 {
1285 log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1286 pStreamName.c_str() );
1287 const uint64_t sess = pSessionId;
1288 scopedLock.UnLock();
1289 //----------------------------------------------------------------------
1290 // Important note!
1291 //
1292 // This destroys the Stream object itself, the underlined
1293 // AsyncSocketHandler object (that called this method) and the Channel
1294 // object that aggregates this Stream.
1295 //----------------------------------------------------------------------
1297 return false;
1298 }
1299 }
1300
1301 //--------------------------------------------------------------------------
1302 // Check if the stream is broken
1303 //--------------------------------------------------------------------------
1304 XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1305 *pChannelData );
1306 if( !st.IsOK() )
1307 {
1308 scopedLock.UnLock();
1309 OnError( substream, st );
1310 return false;
1311 }
1312 return true;
1313 }
1314
1315 //----------------------------------------------------------------------------
1316 // Call back when a message has been reconstru
1317 //----------------------------------------------------------------------------
1318 bool Stream::OnWriteTimeout( uint16_t /*substream*/ )
1319 {
1320 return true;
1321 }
1322
1323 //----------------------------------------------------------------------------
1324 // Register channel event handler
1325 //----------------------------------------------------------------------------
1327 {
1328 pChannelEvHandlers.AddHandler( handler );
1329 }
1330
1331 //----------------------------------------------------------------------------
1332 // Remove a channel event handler
1333 //----------------------------------------------------------------------------
1335 {
1336 pChannelEvHandlers.RemoveHandler( handler );
1337 }
1338
1339 //----------------------------------------------------------------------------
1340 // Install a incoming message handler
1341 //----------------------------------------------------------------------------
1342 MsgHandler*
1343 Stream::InstallIncHandler( std::shared_ptr<Message> &msg, uint16_t stream )
1344 {
1345 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1346 if( !mh.handler )
1347 mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1348 mh.expires,
1349 mh.action );
1350
1351 if( !mh.handler )
1352 return nullptr;
1353
1354 if( mh.action & MsgHandler::Raw )
1355 return mh.handler;
1356 return nullptr;
1357 }
1358
1359 //----------------------------------------------------------------------------
1363 //----------------------------------------------------------------------------
1364 uint16_t Stream::InspectStatusRsp( uint16_t stream,
1365 MsgHandler *&incHandler )
1366 {
1367 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1368 if( !mh.handler )
1370
1371 uint16_t action = mh.handler->InspectStatusRsp();
1372 mh.action |= action;
1373
1374 if( action & MsgHandler::RemoveHandler )
1375 pIncomingQueue->RemoveMessageHandler( mh.handler );
1376
1377 if( action & MsgHandler::Raw )
1378 {
1379 incHandler = mh.handler;
1380 return MsgHandler::Raw;
1381 }
1382
1383 if( action & MsgHandler::Corrupted )
1384 return MsgHandler::Corrupted;
1385
1386 if( action & MsgHandler::More )
1387 return MsgHandler::More;
1388
1389 return MsgHandler::None;
1390 }
1391
1392 //----------------------------------------------------------------------------
1393 // Check if channel can be collapsed using given URL
1394 //----------------------------------------------------------------------------
1395 bool Stream::CanCollapse( const URL &url )
1396 {
1397 Log *log = DefaultEnv::GetLog();
1398
1399 //--------------------------------------------------------------------------
1400 // Resolve all the addresses of the host we're supposed to connect to
1401 //--------------------------------------------------------------------------
1402 std::vector<XrdNetAddr> prefaddrs;
1403 XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1404 if( !st.IsOK() )
1405 {
1406 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1407 , pStreamName.c_str(), url.GetHostName().c_str() );
1408 return false;
1409 }
1410
1411 //--------------------------------------------------------------------------
1412 // Resolve all the addresses of the alias
1413 //--------------------------------------------------------------------------
1414 std::vector<XrdNetAddr> aliasaddrs;
1415 st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1416 if( !st.IsOK() )
1417 {
1418 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1419 , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1420 return false;
1421 }
1422
1423 //--------------------------------------------------------------------------
1424 // Now check if the preferred host is part of the alias
1425 //--------------------------------------------------------------------------
1426 auto itr = prefaddrs.begin();
1427 for( ; itr != prefaddrs.end() ; ++itr )
1428 {
1429 auto itr2 = aliasaddrs.begin();
1430 for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1431 if( itr->Same( &*itr2 ) ) return true;
1432 }
1433
1434 return false;
1435 }
1436
1437 //------------------------------------------------------------------------
1438 // Query the stream
1439 //------------------------------------------------------------------------
1440 Status Stream::Query( uint16_t query, AnyObject &result )
1441 {
1442 switch( query )
1443 {
1445 {
1446 result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1447 return Status();
1448 }
1449
1451 {
1452 result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1453 return Status();
1454 }
1455
1457 {
1458 result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1459 return Status();
1460 }
1461
1462 default:
1464 }
1465 }
1466
1467 //----------------------------------------------------------------------------
1468 // Used under error conditions to move handlers from the out & in queue
1469 // helpers back to main out queue for the subStream or the in queue.
1470 //----------------------------------------------------------------------------
1471 void Stream::Reinsert( uint16_t subStream )
1472 {
1473 //--------------------------------------------------------------------------
1474 // Out MsgHelper
1475 //--------------------------------------------------------------------------
1476 if( pSubStreams[subStream]->outMsgHelper.msg )
1477 {
1478 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
1479 if( pIncomingQueue->HasUnsetTimeout( h.handler ) )
1480 {
1481 pSubStreams[subStream]->outQueue->PushFront( h.msg, h.handler, h.expires,
1482 h.stateful );
1483 pIncomingQueue->RemoveMessageHandler(h.handler);
1484 h.handler->OnWaitingToSend( h.msg );
1485 }
1486 else
1487 {
1488 // Since the handler has been removed from the in-queue or had its
1489 // timeout assigned it must have been sent.
1490 h.handler->OnStatusReady( h.msg, XRootDStatus() );
1491 }
1492 pSubStreams[subStream]->outMsgHelper.Reset();
1493 }
1494
1495 //--------------------------------------------------------------------------
1496 // In MsgHelper. Reset any partially read partial.
1497 //--------------------------------------------------------------------------
1498 if( pSubStreams[subStream]->inMsgHelper.handler )
1499 {
1500 InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
1501 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
1502 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
1503 if( xrdHandler ) xrdHandler->PartialReceived();
1504 h.Reset();
1505 }
1506 }
1507
1508 //----------------------------------------------------------------------------
1509 // Marks subStream as disconnected and closes the sockethandler.
1510 // pMutex should be locked throughout.
1511 //----------------------------------------------------------------------------
1512 void Stream::SockHandlerClose( uint16_t subStream )
1513 {
1514 SubStreamData *sd = pSubStreams[subStream];
1516 pMutex.AddClosing(subStream);
1517 sd->socket->PreClose();
1518 AsyncSocketHandler *s = new AsyncSocketHandler( *sd->socket );
1519 Job *job = new SocketDestroyJob( sd->socket );
1520 pJobManager->QueueJob( job );
1521 sd->socket = s;
1522 pMutex.RemoveClosing(subStream);
1523 }
1524}
union ServerResponse::@040373375333017131300127053271011057331004327334 body
kXR_char streamid[2]
Definition XProtocol.hh:956
kXR_unt16 requestid
Definition XProtocol.hh:257
@ kXR_oksofar
Definition XProtocol.hh:942
@ kXR_status
Definition XProtocol.hh:949
struct ServerResponseBody_Status bdy
kXR_char fhandle[4]
Definition XProtocol.hh:258
@ kXR_close
Definition XProtocol.hh:116
ServerResponseHeader hdr
XrdSys::RAtomic< uint64_t > RAtomic_uint64_t
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
@ FatalError
Stream has been broken and won't be recovered.
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
bool HasUnsetTimeout(MsgHandler *handler)
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
An abstract class to describe the client-side monitoring plugin interface.
@ EvDisconnect
DisconnectInfo: Logout from a server.
@ EvConnect
ConnectInfo: Login into a server.
virtual void Event(EventCode evCode, void *evData)=0
virtual void OnWaitingToSend(Message *msg)
Called to indicate the message is waiting to be sent.
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
virtual void OnReadyToSend(Message *msg)
@ FatalError
Stream has been broken and won't be recovered.
@ Broken
The stream is broken.
virtual uint16_t InspectStatusRsp()=0
virtual void OnStatusReady(const Message *message, XRootDStatus status)=0
The requested action has been performed and the status is available.
A synchronized queue for the outgoing data.
void GrabStateful(OutQueue &queue)
void GrabExpired(OutQueue &queue, time_t exp=0)
void GrabItems(OutQueue &queue)
void Report(XRootDStatus status)
Report status to all the handlers.
Status ForceReconnect(const URL &url)
Reconnect the channel.
Status ForceDisconnect(const URL &url)
Shut down a channel.
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
virtual void Run(void *)
The job logic.
SocketDestroyJob(AsyncSocketHandler *socket)
A network socket.
SocketStatus
Status of the socket.
@ Disconnected
The socket is disconnected.
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
void UnLock()
UnLock.
void AddClosing(uint16_t subStream)
AddClosing. Notified that subStream will be closed.
std::map< pthread_t, std::list< MtxInfo >::iterator > mthmap
std::list< MtxInfo >::iterator fnlistit
void Lock()
Lock. Regular, non-subStream aware recursive lock.
XrdSysCondVar mcv
void RemoveClosing(uint16_t subStream)
RemoveClosing. Notified that subStream close has completed.
std::map< uint16_t, size_t > mclosing
std::list< MtxInfo > mlist
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
bool CanCollapse(const URL &url)
void ForceConnect()
Force connection.
Status Query(uint16_t query, AnyObject &result)
Query the stream.
XRootDStatus EnableLink(PathID &path)
Stream(const URL *url, const URL &prefer=URL())
Constructor.
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
void Tick(time_t now)
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
~Stream()
Destructor.
std::shared_ptr< Channel > GetChannel()
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
void ForceError(XRootDStatus status, const bool hush, const uint64_t sess)
Force error.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
XRootDStatus Initialize()
Initializer.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
Interface for a task to be run by the TaskManager.
@ RequestClose
Send a close request.
URL representation.
Definition XrdClURL.hh:31
const std::string & GetHostName() const
Get the name of the target host.
Definition XrdClURL.hh:170
Random utilities.
Definition XrdClUtils.hh:50
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Handle/Process/Forward XRootD messages.
static void SetDescription(Message *msg)
Get the description of a message.
@ qryINIF
Only consider internet protocols via ifconfig.
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
static pthread_t ID(void)
const uint16_t errQueryNotSupported
const uint16_t errUninitialized
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
const uint16_t errInvalidSession
const uint16_t errAuthFailed
@ kXR_PartialResult
InMessageHelper(Message *message=0, MsgHandler *hndlr=0, time_t expir=0, uint16_t actio=0)
Describe a server login event.
std::string server
"user@host:port"
uint16_t streams
Number of streams.
timeval sTOD
gettimeofday() when login started
timeval eTOD
gettimeofday() when login ended
std::string auth
authentication protocol used or empty if none
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
uint16_t status
Status of the execution.
bool IsOK() const
We're fine.
bool IsFatal() const
Fatal error.
std::string ToString() const
Create a string representation.
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack
InMessageHelper inMsgHelper
AsyncSocketHandler * socket
OutQueue::MsgHelper outMsgHelper
Socket::SocketStatus status
static const uint16_t Auth
Transport name, returns std::string *.