42 channel(0), callBack(0), readEnabled(false), writeEnabled(false),
43 readTimeout(0), writeTimeout(0)
45 XrdSys::IOEvents::Channel *channel;
46 XrdSys::IOEvents::CallBack *callBack;
69 DisableControl() : pFlags( 0 ), pCnd( 0 ) { }
76 void DisableCallBack()
78 const int flags = pFlags.fetch_or( kWantDisable );
79 if( !(flags & kIdSet) )
return;
80 if( !(flags & kRunningCallBack) )
return;
81 XrdSysCondVarHelper lck( pCnd );
83 while( !(pFlags.load() & kDisabled) ) pCnd.Wait();
86 std::atomic<int> pFlags;
91 SocketCallBack( XrdCl::Socket *sock, XrdCl::SocketHandler *sh ):
92 pSocket( sock ), pHandler( sh )
94 pControl = std::make_shared<DisableControl>();
97 virtual ~SocketCallBack() {};
99 virtual bool Event( XrdSys::IOEvents::Channel *chP,
103 using namespace XrdCl;
106 if( evFlags & ReadyToRead ) ev |= SocketHandler::ReadyToRead;
107 if( evFlags & ReadTimeOut ) ev |= SocketHandler::ReadTimeOut;
108 if( evFlags & ReadyToWrite ) ev |= SocketHandler::ReadyToWrite;
109 if( evFlags & WriteTimeOut ) ev |= SocketHandler::WriteTimeOut;
111 Log *log = DefaultEnv::GetLog();
114 log->
Dump( PollerMsg,
"%s Got an event: %s",
115 pSocket->GetName().c_str(),
116 SocketHandler::EventTypeToString( ev ).c_str() );
119 int flags = pControl->pFlags.fetch_or( kRunningCallBack );
120 if( !( flags & kIdSet ) )
122 XrdSysCondVarHelper lck( pControl->pCnd );
124 flags = pControl->pFlags.fetch_or( kIdSet );
126 if( flags & kWantDisable )
128 XrdSysCondVarHelper lck( pControl->pCnd );
129 pControl->pFlags &= ~kRunningCallBack;
130 pControl->pFlags |= kDisabled;
131 pControl->pCnd.Broadcast();
139 auto control = pControl;
140 pHandler->Event( ev, pSocket );
142 flags = control->pFlags.fetch_and( ~kRunningCallBack );
143 if( flags & kWantDisable )
145 XrdSysCondVarHelper lck( control->pCnd );
146 control->pFlags |= kDisabled;
147 control->pCnd.Broadcast();
153 std::shared_ptr<DisableControl> GetControl()
159 XrdCl::Socket *pSocket;
160 XrdCl::SocketHandler *pHandler;
161 std::shared_ptr<DisableControl> pControl;
184 SocketMap::iterator it;
185 for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
187 PollerHelper *helper = (PollerHelper*)it->second;
188 if( helper->channel ) helper->channel->
Delete();
189 delete helper->callBack;
208 log->
Debug(
PollerMsg,
"Creating and starting the built-in poller..." );
211 const char *errMsg = 0;
213 for(
int i = 0; i < pNbPoller; ++i )
218 log->
Error(
PollerMsg,
"Unable to create the internal poller object: "
222 pPollerPool.push_back( poller );
225 pNext = pPollerPool.begin();
233 SocketMap::iterator it;
234 for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
236 PollerHelper *helper = (PollerHelper*)it->second;
237 Socket *socket = it->first;
240 if( helper->readEnabled )
243 helper->readTimeout, &errMsg );
247 "while re-starting %s (%s)",
XrdSysE2T( errno ), errMsg );
253 if( helper->writeEnabled )
256 helper->writeTimeout, &errMsg );
260 "while re-starting %s (%s)",
XrdSysE2T( errno ), errMsg );
281 if( pPollerPool.empty() )
283 log->
Debug(
PollerMsg,
"Stopping a poller that has not been started" );
287 while( !pPollerPool.empty() )
290 if( *pNext == poller )
291 pNext = pPollerPool.begin();
292 pPollerPool.pop_back();
294 if( !poller )
continue;
299 scopedLock.
Lock( &pMutex );
301 pNext = pPollerPool.end();
304 SocketMap::iterator it;
305 const char *errMsg = 0;
307 for( it = pSocketMap.begin(); it != pSocketMap.end(); ++it )
309 PollerHelper *helper = (PollerHelper*)it->second;
310 if( !helper->channel )
continue;
314 Socket *socket = it->first;
315 log->
Error(
PollerMsg,
"%s Unable to disable write notifications: %s",
316 socket->
GetName().c_str(), errMsg );
318 helper->channel->
Delete();
343 log->
Error(
PollerMsg,
"Socket is not in a state valid for polling" );
347 log->
Debug(
PollerMsg,
"Adding socket %p to the poller", (
void*)socket );
352 SocketMap::const_iterator it = pSocketMap.find( socket );
353 if( it != pSocketMap.end() )
367 log->
Error(
PollerMsg,
"No poller available, can not add socket" );
371 PollerHelper *helper =
new PollerHelper();
372 helper->callBack = new ::SocketCallBack( socket, handler );
382 pSocketMap[socket] = helper;
395 SocketMap::iterator it = pSocketMap.find( socket );
396 if( it == pSocketMap.end() )
399 PollerHelper *helper = (PollerHelper*)it->second;
400 if( !helper )
return;
403 SocketCallBack *scb =
dynamic_cast<SocketCallBack*
>( cb );
405 auto dc = scb->GetControl();
407 dc->DisableCallBack();
422 SocketMap::iterator it = pSocketMap.find( socket );
423 if( it == pSocketMap.end() )
430 UnregisterFromPoller( socket );
435 PollerHelper *helper = (PollerHelper*)it->second;
436 pSocketMap.erase( it );
439 if( helper->channel )
445 log->
Error(
PollerMsg,
"%s Unable to disable write notifications: %s",
446 socket->
GetName().c_str(), errMsg );
449 helper->channel->
Delete();
451 delete helper->callBack;
468 log->
Error(
PollerMsg,
"Invalid socket, read events unavailable" );
476 SocketMap::const_iterator it = pSocketMap.find( socket );
477 if( it == pSocketMap.end() )
484 PollerHelper *helper = (PollerHelper*)it->second;
492 if( helper->readEnabled )
494 helper->readTimeout = timeout;
496 log->
Dump(
PollerMsg,
"%s Enable read notifications, timeout: %lld",
497 socket->
GetName().c_str(), (
long long)timeout );
506 log->
Error(
PollerMsg,
"%s Unable to enable read notifications: %s",
507 socket->
GetName().c_str(), errMsg );
511 helper->readEnabled =
true;
519 if( !helper->readEnabled )
531 log->
Error(
PollerMsg,
"%s Unable to disable read notifications: %s",
532 socket->
GetName().c_str(), errMsg );
536 helper->readEnabled =
false;
553 log->
Error(
PollerMsg,
"Invalid socket, write events unavailable" );
561 SocketMap::const_iterator it = pSocketMap.find( socket );
562 if( it == pSocketMap.end() )
569 PollerHelper *helper = (PollerHelper*)it->second;
577 if( helper->writeEnabled )
580 helper->writeTimeout = timeout;
582 log->
Dump(
PollerMsg,
"%s Enable write notifications, timeout: %lld",
583 socket->
GetName().c_str(), (
long long)timeout );
592 log->
Error(
PollerMsg,
"%s Unable to enable write notifications: %s",
593 socket->
GetName().c_str(), errMsg );
597 helper->writeEnabled =
true;
605 if( !helper->writeEnabled )
616 log->
Error(
PollerMsg,
"%s Unable to disable write notifications: %s",
617 socket->
GetName().c_str(), errMsg );
621 helper->writeEnabled =
false;
632 SocketMap::iterator it = pSocketMap.find( socket );
633 return it != pSocketMap.end();
641 if( pPollerPool.empty() )
return 0;
643 PollerPool::iterator ret = pNext;
645 if( pNext == pPollerPool.end() )
646 pNext = pPollerPool.begin();
655 PollerMap::iterator itr = pPollerMap.find( socket->
GetFD() );
657 if( itr == pPollerMap.end() )
661 pPollerMap[socket->
GetFD()] = poller;
668 void PollerBuiltIn::UnregisterFromPoller(
const Socket *socket )
670 PollerMap::iterator itr = pPollerMap.find( socket->
GetFD() );
671 if( itr == pPollerMap.end() )
return;
672 pPollerMap.erase( itr );
675 XrdSys::IOEvents::Poller* PollerBuiltIn::GetPoller(
const Socket * socket)
677 PollerMap::iterator itr = pPollerMap.find( socket->
GetFD() );
678 if( itr == pPollerMap.end() )
return 0;
685 int PollerBuiltIn::GetNbPollerInit()
689 env->
GetInt(
"ParallelEvtLoop", ret);
const char * XrdSysE2T(int errcode)
static Log * GetLog()
Get default log.
static Env * GetEnv()
Get default client environment.
bool GetInt(const std::string &key, int &value)
void Error(uint64_t topic, const char *format,...)
Report an error.
LogLevel GetLevel() const
Get the log level.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
virtual bool AddSocket(Socket *socket, SocketHandler *handler)
virtual bool EnableReadNotification(Socket *socket, bool notify, time_t timeout=60)
virtual bool RemoveSocket(Socket *socket)
Remove the socket.
virtual bool Stop()
Stop polling.
virtual void ShutdownEvents(Socket *socket)
virtual bool EnableWriteNotification(Socket *socket, bool notify, time_t timeout=60)
virtual bool IsRegistered(Socket *socket)
Check whether the socket is registered with the poller.
virtual bool Finalize()
Finalize the poller.
virtual bool Initialize()
Initialize the poller.
virtual bool Start()
Start polling.
virtual void Initialize(Poller *)
Initializer.
std::string GetName() const
Get the string representation of the socket.
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
int GetFD() const
Get the file descriptor.
SocketStatus GetStatus() const
Get the socket status.
void Lock(XrdSysMutex *Mutex)
static int Same(pthread_t t1, pthread_t t2)
static pthread_t ID(void)
@ allEvents
All of the above.
@ writeEvents
Write and Write Timeouts.
@ readEvents
Read and Read Timeouts.
bool Enable(int events, int timeout=0, const char **eText=0)
bool Disable(int events, const char **eText=0)
static Poller * Create(int &eNum, const char **eTxt=0, int crOpts=0)
const int DefaultParallelEvtLoop