XRootD
Loading...
Searching...
No Matches
XrdSysIOEvents.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* X r d S y s I O E v e n t s . c c */
4/* */
5/* (c) 2012 by the Board of Trustees of the Leland Stanford, Jr., University */
6/* All Rights Reserved */
7/* Produced by Andrew Hanushevsky for Stanford University under contract */
8/* DE-AC02-76-SFO0515 with the Department of Energy */
9/* */
10/* This file is part of the XRootD software suite. */
11/* */
12/* XRootD is free software: you can redistribute it and/or modify it under */
13/* the terms of the GNU Lesser General Public License as published by the */
14/* Free Software Foundation, either version 3 of the License, or (at your */
15/* option) any later version. */
16/* */
17/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
18/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
19/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
20/* License for more details. */
21/* */
22/* You should have received a copy of the GNU Lesser General Public License */
23/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
24/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
25/* */
26/* The copyright holder's institutional names and contributor's names may not */
27/* be used to endorse or promote products derived from this software without */
28/* specific prior written permission of the institution or contributor. */
29/******************************************************************************/
30
31#include <cstdio>
32#include <cstdlib>
33
34#include "XrdSys/XrdSysE2T.hh"
35#include "XrdSys/XrdSysFD.hh"
40
41/******************************************************************************/
42/* L o c a l D a t a */
43/******************************************************************************/
44
45namespace
46{
47// Status code to name array corresponding to:
48// enum Status {isClear = 0, isCBMode, isDead};
49//
50 const char *statName[] = {"isClear", "isCBMode", "isDead"};
51}
52
53/******************************************************************************/
54/* L o c a l D e f i n e s */
55/******************************************************************************/
56
57#define STATUS statName[(int)chStat]
58
59#define STATUSOF(x) statName[(int)(x->chStat)]
60
61#define SINGLETON(dlvar, theitem)\
62 theitem ->dlvar .next == theitem
63
64#define INSERT(dlvar, curitem, newitem) \
65 newitem ->dlvar .next = curitem; \
66 newitem ->dlvar .prev = curitem ->dlvar .prev; \
67 curitem ->dlvar .prev-> dlvar .next = newitem; \
68 curitem ->dlvar .prev = newitem
69
70#define REMOVE(dlbase, dlvar, curitem) \
71 if (dlbase == curitem) dlbase = (SINGLETON(dlvar,curitem) \
72 ? 0 : curitem ->dlvar .next);\
73 curitem ->dlvar .prev-> dlvar .next = curitem ->dlvar .next;\
74 curitem ->dlvar .next-> dlvar .prev = curitem ->dlvar .prev;\
75 curitem ->dlvar .next = curitem;\
76 curitem ->dlvar .prev = curitem
77
78#define REVENTS(x) x & Channel:: readEvents
79
80#define WEVENTS(x) x & Channel::writeEvents
81
82#define ISPOLLER XrdSysThread::Same(XrdSysThread::ID(),pollTid)
83
84#define BOOLNAME(x) (x ? "true" : "false")
85
86#define DO_TRACE(x,fd,y) \
87 {PollerInit::traceMTX.Lock(); \
88 std::cerr <<"IOE fd "<<fd<<' '<<#x <<": "<<y<<'\n'<< std::flush; \
89 PollerInit::traceMTX.UnLock();}
90
91#define TRACING PollerInit::doTrace
92
93#define IF_TRACE(x,fd,y) if (TRACING) DO_TRACE(x,fd,y)
94
95#define TRACE_LOK " channel now " <<(isLocked ? "locked" : "unlocked")
96
97#define TRACE_MOD(x,fd,y) \
98 IF_TRACE(x,fd,"Modify(" <<y <<") == " \
99 <<BOOLNAME(retval) <<TRACE_LOK)
100
101#define TRACE_NOD(x,fd,y) \
102 IF_TRACE(x,fd,"Modify(" <<y <<") skipped; no events changed")
103
104/******************************************************************************/
105/* G l o b a l D a t a */
106/******************************************************************************/
107
109 = (sizeof(time_t) == 8 ? 0x7fffffffffffffffLL : 0x7fffffff);
110
112
113/******************************************************************************/
114/* L o c a l C l a s s e s */
115/******************************************************************************/
116/******************************************************************************/
117/* T h r e a d S t a r t u p I n t e r f a c e */
118/******************************************************************************/
119
120namespace XrdSys
121{
122namespace IOEvents
123{
126 const char *retMsg;
129
131 {pollSync = new XrdSysSemaphore(0, "poll sync");}
133 };
134
136{
137public:
138
139static void *Start(void *parg);
140};
141
142void *BootStrap::Start(void *parg)
143{
144 struct pollArg *pollArg = (struct pollArg *)parg;
145 Poller *thePoller = pollArg->pollP;
147 thePoller->pollTid = XrdSysThread::ID();
148
149 thePoller->Begin(theSem, pollArg->retCode, &(pollArg->retMsg));
150 delete theSem;
151
152 return (void *)0;
153}
154
155/******************************************************************************/
156/* P o l l e r E r r 1 */
157/******************************************************************************/
158
159// This class is set in the channel when an error occurs but cannot be reflected
160// immediately because either there is no callback function or all events are
161// disabled. We need to do this because error events can be physically presented
162// by the kernel even when logical events are disabled. Note that the error
163// number and text will have been set and remain set as the channel was actually
164// disabled preventing any new operation on the channel.
165//
166class PollerErr1 : public Poller
167{
168public:
169
170 PollerErr1() : Poller(-1, -1) {}
172
173protected:
174void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)
175 {(void)syncp; (void)rc; (void)eTxt;}
176
177void Exclude(Channel *cP, bool &isLocked, bool dover=1)
178 {(void)cP; (void)isLocked; (void)dover;}
179
180bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
181 {(void)isLocked;
182 if (!(eNum = GetFault(cP))) eNum = EPROTO;
183 if (eTxt) *eTxt = "initializing channel";
184 return false;
185 }
186
187bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
188 {(void)isLocked;
189 if (!(eNum = GetFault(cP))) eNum = EPROTO;
190 if (eTxt) *eTxt = "modifying channel";
191 return false;
192 }
193
194void Shutdown() {}
195};
196
197/******************************************************************************/
198/* P o l l e r I n i t */
199/******************************************************************************/
200
201// This class is used as the initial poller on a channel. It is responsible
202// for adding the file descriptor to the poll set upon the initial enable. This
203// avoids enabling a channel prior to it receiving a call back function.
204//
205class PollerInit : public Poller
206{
207public:
208
209 PollerInit() : Poller(-1, -1) {}
211
213static bool doTrace;
214
215protected:
216
217void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt) {}
218
219void Exclude(Channel *cP, bool &isLocked, bool dover=1) {}
220
221bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
222 {eNum = EPROTO;
223 if (eTxt) *eTxt = "initializing channel";
224 return false;
225 }
226
227bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
228 {bool rc = Init(cP, eNum, eTxt, isLocked);
229 IF_TRACE(Modify,cP->GetFD(), "Init() returned " <<BOOLNAME(rc));
230 return rc;
231 }
232
233void Shutdown() {}
234};
235
236bool PollerInit::doTrace = (getenv("XrdSysIOE_TRACE") != 0);
238
239/******************************************************************************/
240/* P o l l e r W a i t */
241/******************************************************************************/
242
243// This class is set in the channel when we need to serialize aces to the
244// channel. Channel methods (as some others) check for this to see if they need
245// to defer the current operation. We need to do his because some poller
246// implementations must release the channel lock to avoid a deadlock.
247//
248class PollerWait : public Poller
249{
250public:
251
252 PollerWait() : Poller(-1, -1) {}
254
255protected:
256void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt) {}
257
258void Exclude(Channel *cP, bool &isLocked, bool dover=1) {}
259
260bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
261 {eNum = EIDRM;
262 if (eTxt) *eTxt = "initializing channel";
263 return false;
264 }
265
266bool Modify (Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
267 {return Init(cP, eNum, eTxt, isLocked);}
268
269void Shutdown() {}
270};
271
275};
276};
277
278/******************************************************************************/
279/* C l a s s C h a n n e l M e t h o d s */
280/******************************************************************************/
281/******************************************************************************/
282/* C o n s t r u c t o r */
283/******************************************************************************/
284
286 CallBack *cbP, void *cbArg)
287 : chPollXQ(pollP), chCB(cbP), chCBA(cbArg)
288{
289 attList.next = attList.prev = this;
290 tmoList.next = tmoList.prev = this;
291 inTOQ = 0;
292 pollEnt = 0;
293 chStat = isClear;
294 Reset(&pollInit, fd);
295
296 pollP->Attach(this);
297}
298
299/******************************************************************************/
300/* D e l e t e */
301/******************************************************************************/
302
304{
305 Poller *myPoller;
306 bool isLocked = true;
307
308// Do some tracing
309//
310 IF_TRACE(Delete,chFD,"status="<<STATUS);
311
312// Lock ourselves during the delete process. If the channel is disassociated
313// or the real poller is set to the error poller then this channel is clean
314// and can be deleted (i.e. the channel ran through Detach()).
315//
316 chMutex.Lock();
317 if (!chPollXQ || chPollXQ == &pollErr1)
318 {chMutex.UnLock();
319 delete this;
320 return;
321 }
322
323// Disable and remove ourselves from all queues
324//
325 myPoller = chPollXQ;
326 chPollXQ->Detach(this,isLocked,false);
327 if (!isLocked) chMutex.Lock();
328
329// If we are in callback mode then we will need to delay the destruction until
330// after the callback completes unless this is the poller thread. In that case,
331// we need to tell the poller that we have been destroyed in a shelf-stable way.
332//
333 if (chStat)
335 {myPoller->chDead = true;
336 chMutex.UnLock();
337 } else {
338 XrdSysSemaphore cbDone(0);
339 IF_TRACE(Delete,chFD,"waiting for callback");
340 chStat = isDead;
341 chCBA = (void *)&cbDone;
342 chMutex.UnLock();
343 cbDone.Wait();
344 }
345 } else chMutex.UnLock();
346
347// It is now safe to release the storage
348//
349 IF_TRACE(Delete,chFD,"chan="<< std::hex<<(void *)this<< std::dec);
350 delete this;
351}
352
353/******************************************************************************/
354/* D i s a b l e */
355/******************************************************************************/
356
357bool XrdSys::IOEvents::Channel::Disable(int events, const char **eText)
358{
359 int eNum = 0, newev, curev;
360 bool retval = true, isLocked = true;
361
362// Lock this channel
363//
364 chMutex.Lock();
365
366// Get correct current events; depending on the state of the channel
367//
368 if (chPoller == &pollWait) curev = static_cast<int>(reMod);
369 else curev = static_cast<int>(chEvents);
370
371// Trace this entry
372//
373 IF_TRACE(Disable,chFD,"->Disable(" <<events <<") chev=" <<curev);
374
375// Calculate new event mask
376//
377 events &= allEvents;
378 newev = curev & ~events;
379
380// If something has changed, then modify the event mask in the poller. The
381// poller may or may not unlock this channel during the process.
382//
383 if (newev != curev)
384 {chEvents = newev;
385 retval = chPoller->Modify(this, eNum, eText, isLocked);
386 TRACE_MOD(Disable,chFD,newev);
387 } else {
388 TRACE_NOD(Disable,chFD,newev);
389 }
390 if (isLocked) chMutex.UnLock();
391
392// All done
393//
394 if (!retval) errno = eNum;
395 return retval;
396}
397
398/******************************************************************************/
399/* E n a b l e */
400/******************************************************************************/
401
402bool XrdSys::IOEvents::Channel::Enable(int events, int timeout,
403 const char **eText)
404{
405 int eNum = 0, newev, curev, tmoSet = 0;
406 bool retval, setTO, isLocked = true;
407
408// Lock ourselves against any changes (this is a recursive mutex)
409//
410 chMutex.Lock();
411
412// Get correct current events; depending on the state of the channel
413//
414 if (chPoller == &pollWait) curev = static_cast<int>(reMod);
415 else curev = static_cast<int>(chEvents);
416
417// Trace this entry
418//
419 IF_TRACE(Enable,chFD,"->Enable("<<events<<','<<timeout<<") chev="<<curev);
420
421// Establish events that should be enabled
422//
423 events &= allEvents;
424 newev = (curev ^ events) & events;
425 chEvents = curev | events;
426
427// Handle timeout changes now
428//
429 if (REVENTS(events))
430 { if (timeout > 0) chRTO = timeout;
431 else if (timeout < 0) chRTO = 0;
432 if (rdDL != Poller::maxTime || chRTO) tmoSet |= CallBack::ReadyToRead;
433 }
434
435 if (WEVENTS(events))
436 { if (timeout > 0) chWTO = timeout;
437 else if (timeout < 0) chWTO = 0;
438 if (wrDL != Poller::maxTime || chWTO) tmoSet |= CallBack::ReadyToWrite;
439 }
440
441// Check if we have to reset the timeout. We need to hold the channel lock here.
442//
443 if (tmoSet && chPoller != &pollErr1)
444 setTO = chPollXQ->TmoAdd(this, tmoSet);
445 else setTO = false;
446
447// Check if any modifcations needed here. If so, invoke the modifier. Note that
448// the modify will unlock the channel if the operation causes a wait. So,
449// we cannot depend on the channel being locked upon return. The reason we do
450// not unlock here is because we must ensure the channel doesn't change while
451// we call modify. We let modify determine what to do.
452//
453 if (newev)
454 {retval = chPoller->Modify(this, eNum, eText, isLocked);
455 TRACE_MOD(Enable,chFD,(curev | events));
456 } else {
457 retval = true;
458 TRACE_NOD(Enable,chFD,(curev | events));
459 }
460
461// We need to notify the poller thread if the added deadline is the first in the
462// queue and the poller is waiting. We also optimize for the case where the
463// poller thread is always woken up to perform an action in which case it
464// doesn't need a separate wakeup. We only do this if the enable succeeed. Note
465// that we cannot hold the channel mutex for this call because it may wait.
466//
467 if (isLocked) chMutex.UnLock();
468 bool isWakePend = CPP_ATOMIC_LOAD(chPollXQ->wakePend, std::memory_order_consume);
469 if (retval && !isWakePend && setTO && isLocked) chPollXQ->WakeUp();
470
471// All done
472//
473 if (!retval) errno = eNum;
474 return retval;
475}
476
477/******************************************************************************/
478/* G e t C a l l B a c k */
479/******************************************************************************/
480
482{
483 chMutex.Lock();
484 *cbP = chCB;
485 *cbArg = chCBA;
486 chMutex.UnLock();
487}
488
489/******************************************************************************/
490/* Private: R e s e t */
491/******************************************************************************/
492
493void XrdSys::IOEvents::Channel::Reset(XrdSys::IOEvents::Poller *thePoller,
494 int fd, int eNum)
495{
496 chPoller = thePoller;
497 chFD = fd;
498 chFault = eNum;
499 chRTO = 0;
500 chWTO = 0;
501 chEvents = 0;
502 dlType = 0;
503 inPSet = 0;
504 reMod = 0;
505 rdDL = Poller::maxTime;
506 wrDL = Poller::maxTime;
507 deadLine = Poller::maxTime;
508}
509
510/******************************************************************************/
511/* S e t C a l l B a c k */
512/******************************************************************************/
513
515{
516
517// We only need to have the channel lock to set the callback. If the object
518// is in the process of being destroyed, we do nothing.
519//
520 chMutex.Lock();
521 if (chStat != isDead)
522 {chCB = cbP;
523 chCBA = cbArg;
524 }
525 chMutex.UnLock();
526}
527
528/******************************************************************************/
529/* S e t F D */
530/******************************************************************************/
531
533{
534 bool isLocked = true;
535
536// Obtain the channel lock. If the object is in callback mode we have some
537// extra work to do. If normal callback then indicate the channel transitioned
538// to prevent it being automatically re-enabled. If it's being destroyed, then
539// do nothing. Otherwise, this is a stupid double setFD call.
540//
541 chMutex.Lock();
542 if (chStat == isDead)
543 {chMutex.UnLock();
544 return;
545 }
546
547// This is a tricky deal here because we need to protect ourselves from other
548// threads as well as the poller trying to do a callback. We first, set the
549// poller target. This means the channel is no longer ready and callbacks will
550// be skipped. We then remove the current file descriptor. This may unlock the
551// channel but at this point that's ok.
552//
553 if (inPSet)
554 {chPoller = &pollWait;
555 chPollXQ->Detach(this, isLocked, true);
556 if (!isLocked) chMutex.Lock();
557 }
558
559// Indicate channel needs to be re-enabled then unlock the channel
560//
561 Reset(&pollInit, fd);
562 chMutex.UnLock();
563}
564
565/******************************************************************************/
566/* C l a s s P o l l e r */
567/******************************************************************************/
568/******************************************************************************/
569/* C o n s t r u c t o r */
570/******************************************************************************/
571
573{
574
575// Now initialize local class members
576//
577 attBase = 0;
578 tmoBase = 0;
579 cmdFD = cFD;
580 reqFD = rFD;
581 wakePend = false;
582 pipeBuff = 0;
583 pipeBlen = 0;
584 pipePoll.fd = rFD;
585 pipePoll.events = POLLIN | POLLRDNORM;
586 tmoMask = 255;
587}
588
589/******************************************************************************/
590/* A t t a c h */
591/******************************************************************************/
592
593void XrdSys::IOEvents::Poller::Attach(XrdSys::IOEvents::Channel *cP)
594{
595 Channel *pcP;
596
597// We allow only one attach at a time to simplify the processing
598//
599 adMutex.Lock();
600
601// Chain this channel into the list of attached channels
602//
603 if ((pcP = attBase)) {INSERT(attList, pcP, cP);}
604 else attBase = cP;
605
606// All done
607//
608 adMutex.UnLock();
609}
610
611/******************************************************************************/
612/* C b k T M O */
613/******************************************************************************/
614
616{
617 Channel *cP;
618
619// Process each element in the timeout queue, calling the callback function
620// if the timeout has passed. As this method can be called with a lock on the
621// channel mutex, we need to drop it prior to calling the callback.
622//
623 toMutex.Lock();
624 while((cP = tmoBase) && cP->deadLine <= time(0))
625 {int dlType = cP->dlType;
626 toMutex.UnLock();
627 CbkXeq(cP, dlType, 0, 0);
628 toMutex.Lock();
629 }
630 toMutex.UnLock();
631}
632
633/******************************************************************************/
634/* C b k X e q */
635/******************************************************************************/
636
638 int eNum, const char *eTxt)
639{
640 XrdSysMutexHelper cbkMHelp(cP->chMutex);
641 char oldEvents;
642 bool cbok, retval, isRead, isWrite, isLocked = true;
643
644// Perform any required tracing
645//
646 if (TRACING)
647 {const char *cbtype = (cP->chPoller == cP->chPollXQ ? "norm" :
648 (cP->chPoller == &pollInit ? "init" :
649 (cP->chPoller == &pollWait ? "wait" : "err")));
650 DO_TRACE(CbkXeq,cP->chFD,"callback events=" <<events
651 <<" chev=" <<static_cast<int>(cP->chEvents)
652 <<" toq=" <<(cP->inTOQ != 0) <<" erc=" <<eNum
653 <<" callback " <<(cP->chCB ? "present" : "missing")
654 <<" poller=" <<cbtype);
655 }
656
657// Remove this from the timeout queue if there and reset the deadlines based
658// on the event we are reflecting. This separates read and write deadlines
659//
660 if (cP->inTOQ)
661 {TmoDel(cP);
662 cP->dlType |= (events & CallBack::ValidEvents) << 4;
663 isRead = events & (CallBack::ReadyToRead | CallBack:: ReadTimeOut);
664 if (isRead) cP->rdDL = maxTime;
666 if (isWrite) cP->wrDL = maxTime;
667 } else {
668 cP->dlType &= CallBack::ValidEvents;
669 isRead = isWrite = false;
670 }
671
672// Verify that there is a callback here and the channel is ready. If not,
673// disable this channel for the events being refelcted unless the event is a
674// fatal error. In this case we need to abandon the channel since error events
675// may continue to be generated as we can't always disable them.
676//
677 if (!(cP->chCB) || cP->chPoller != cP->chPollXQ)
678 {if (eNum)
679 {cP->chPoller = &pollErr1; cP->chFault = eNum;
680 cP->inPSet = 0;
681 return false;
682 }
683 oldEvents = cP->chEvents;
684 cP->chEvents = 0;
685 retval = cP->chPoller->Modify(cP, eNum, 0, isLocked);
686 TRACE_MOD(CbkXeq,cP->chFD,0);
687 if (!isLocked) cP->chMutex.Lock();
688 cP->chEvents = oldEvents;
689 return true;
690 }
691
692// Resolve the problem where we get an error event but the channel wants them
693// presented as a read or write event. If neither is possible then defer the
694// error until the channel is enabled again.
695//
696 if (eNum)
697 {if (cP->chEvents & Channel::errorEvents)
698 {cP->chPoller = &pollErr1; cP->chFault = eNum;
699 cP->chStat = Channel::isCBMode;
700 chDead = false;
701 cbkMHelp.UnLock();
702 cP->chCB->Fatal(cP,cP->chCBA, eNum, eTxt);
703 if (chDead) return true;
704 cbkMHelp.Lock(&(cP->chMutex));
705 cP->inPSet = 0;
706 return false;
707 }
708 if (REVENTS(cP->chEvents)) events = CallBack::ReadyToRead;
709 else if (WEVENTS(cP->chEvents)) events = CallBack::ReadyToWrite;
710 else {cP->chPoller = &pollErr1; cP->chFault = eNum; cP->inPSet = 0;
711 return false;
712 }
713 }
714
715// Indicate that we are in callback mode then drop the channel lock and effect
716// the callback. This allows the callback to freely manage locks.
717//
718 cP->chStat = Channel::isCBMode;
719 chDead = false;
720 // Detach() may be called after unlocking the channel and would zero the
721 // callback pointer and argument. So keep a copy.
722 CallBack *cb = cP->chCB;
723 void *cba = cP->chCBA;
724 cbkMHelp.UnLock();
725 IF_TRACE(CbkXeq,cP->chFD,"invoking callback; events=" <<events);
726 cbok = cb->Event(cP,cba, events);
727 IF_TRACE(CbkXeq,cP->chFD,"callback returned " <<BOOLNAME(cbok));
728
729// If channel destroyed by the callback, bail really fast. Otherwise, regain
730// the channel lock.
731//
732 if (chDead) return true;
733 cbkMHelp.Lock(&(cP->chMutex));
734
735// If the channel is being destroyed; then another thread must have done so.
736// Tell it the callback has finished and just return.
737//
738 if (cP->chStat != Channel::isCBMode)
739 {if (cP->chStat == Channel::isDead)
740 {XrdSysSemaphore *theSem = (XrdSysSemaphore *)cP->chCBA;
741 // channel will be destroyed shortly after post, unlock mutex before
742 cbkMHelp.UnLock();
743 theSem->Post();
744 }
745 return true;
746 }
747 cP->chStat = Channel::isClear;
748
749// Handle enable or disable here. If we keep the channel enabled then reset
750// the timeout if it hasn't been handled via a call from the callback.
751//
752 if (!cbok) Detach(cP,isLocked,false);
753 else if ((isRead || isWrite) && !(cP->inTOQ) && (cP->chRTO || cP->chWTO))
754 TmoAdd(cP, 0);
755
756// All done. While the mutex should not have been unlocked, we relock it if
757// it has to keep the mutex helper from croaking.
758//
759 if (!isLocked) cP->chMutex.Lock();
760 return true;
761}
762
763/******************************************************************************/
764/* Static: C r e a t e */
765/******************************************************************************/
766
768 const char **eTxt,
769 int crOpts)
770{
771 int fildes[2];
772 struct pollArg pArg;
773 pthread_t tid;
774
775// Create a pipe used to break the poll wait loop
776//
777 if (XrdSysFD_Pipe(fildes))
778 {eNum = errno;
779 if (eTxt) *eTxt = "creating poll pipe";
780 return 0;
781 }
782
783// Create an actual implementation of a poller
784//
785 if (!(pArg.pollP = newPoller(fildes, eNum, eTxt)))
786 {close(fildes[0]);
787 close(fildes[1]);
788 return 0;
789 }
790
791// Now start a thread to handle this poller object
792//
794 (void *)&pArg, XRDSYSTHREAD_BIND, "Poller")))
795 {if (eTxt) *eTxt = "creating poller thread"; return 0;}
796
797// Now wait for the thread to finish initializing before we allow use
798// Note that the bootstrap takes ownership of the semaphore and will delete it
799// once the thread positing the semaphore actually ends. This is to avoid
800// semaphore bugs present in certain (e.g. Linux) kernels.
801//
802 pArg.pollSync->Wait();
803
804// Check if all went well
805//
806 if (pArg.retCode)
807 {if (eTxt) *eTxt = (pArg.retMsg ? pArg.retMsg : "starting poller");
808 eNum = pArg.retCode;
809 delete pArg.pollP;
810 return 0;
811 }
812
813// Set creation options in the new poller
814//
815 if (crOpts & optTOM)
817
818// All done
819//
820 eNum = 0;
821 if (eTxt) *eTxt = "";
822 return pArg.pollP;
823}
824
825/******************************************************************************/
826/* D e t a c h */
827/******************************************************************************/
828
829void XrdSys::IOEvents::Poller::Detach(XrdSys::IOEvents::Channel *cP,
830 bool &isLocked, bool keep)
831{
832// The caller must hold the channel lock!
833//
834 bool detFD = (cP->inPSet != 0);
835
836// First remove the channel from the timeout queue
837//
838 if (cP->inTOQ)
839 {toMutex.Lock();
840 REMOVE(tmoBase, tmoList, cP);
841 toMutex.UnLock();
842 }
843
844// Allow only one detach at a time
845//
846 adMutex.Lock();
847
848// Preset channel to prohibit callback if we are not keeping this channel
849//
850 if (!keep)
851 {cP->Reset(&pollErr1, cP->chFD);
852 cP->chPollXQ = &pollErr1;
853 cP->chCB = 0;
854 cP->chCBA = 0;
855 if (cP->attList.next != cP) {REMOVE(attBase, attList, cP);}
856 else if (attBase == cP) attBase = 0;
857 }
858
859// Exclude this channel from the associated poll set, don't hold the ad lock
860//
861 adMutex.UnLock();
862 if (detFD)
863 {cP->inPSet = 0;
864 if (cmdFD >= 0) Exclude(cP, isLocked, !ISPOLLER);
865 }
866}
867
868/******************************************************************************/
869/* Protected: G e t R e q u e s t */
870/******************************************************************************/
871
872// Warning: This method runs unlocked. The caller must have exclusive use of
873// the reqBuff otherwise unpredictable results will occur.
874
876{
877 ssize_t rlen;
878 int rc;
879
880// See if we are to resume a read or start a fresh one
881//
882 if (!pipeBlen)
883 {pipeBuff = (char *)&reqBuff; pipeBlen = sizeof(reqBuff);}
884
885// Wait for the next request. Some OS's (like Linux) don't support non-blocking
886// pipes. So, we must front the read with a poll.
887//
888 do {rc = poll(&pipePoll, 1, 0);}
889 while(rc < 0 && (errno == EAGAIN || errno == EINTR));
890 if (rc < 1) return 0;
891
892// Now we can put up a read without a delay. Normally a full command will be
893// present. Under some heavy conditions, this may not be the case.
894//
895 do {rlen = read(reqFD, pipeBuff, pipeBlen);}
896 while(rlen < 0 && errno == EINTR);
897 if (rlen <= 0)
898 {std::cerr <<"Poll: "<<XrdSysE2T(errno)<<" reading from request pipe\n"<< std::flush;
899 return 0;
900 }
901
902// Check if all the data has arrived. If not all the data is present, defer
903// this request until more data arrives.
904//
905 if (!(pipeBlen -= rlen)) return 1;
906 pipeBuff += rlen;
907 return 0;
908}
909
910/******************************************************************************/
911/* Protected: I n i t */
912/******************************************************************************/
913
915 const char **eTxt, bool &isLocked)
916{
917// The channel must be locked upon entry!
918//
919 bool retval;
920
921
922// If we are already in progress then simply update the shadow events and
923// resuppress all current events.
924//
925 if (cP->chPoller == &pollWait)
926 {cP->reMod = cP->chEvents;
927 cP->chEvents = 0;
928 IF_TRACE(Init,cP->chFD,"defer events=" <<cP->reMod);
929 return true;
930 }
931
932// Trace this entry
933//
934 IF_TRACE(Init,cP->chFD,"begin events=" <<int(cP->chEvents));
935
936// If no events are enabled at this point, just return
937//
938 if (!(cP->chEvents)) return true;
939
940// Refuse to enable a channel without a callback function
941//
942 if (!(cP->chCB))
943 {eNum = EDESTADDRREQ;
944 if (eTxt) *eTxt = "enabling without a callback";
945 return false;
946 }
947
948// So, now we can include the channel in the poll set. We will include it
949// with no events enabled to prevent callbacks prior to completion here.
950//
951 cP->chPoller = &pollWait; cP->reMod = cP->chEvents; cP->chEvents = 0;
952 retval = cP->chPollXQ->Include(cP, eNum, eTxt, isLocked);
953 IF_TRACE(Init,cP->chFD,"Include() returned " <<BOOLNAME(retval) <<TRACE_LOK);
954 if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
955
956// Determine what future poller to use. If we can use the regular poller then
957// set the correct event mask for the channel. Note that we could have lost
958// control but the correct events will be reflected in the "reMod" member.
959//
960 if (!retval) {cP->chPoller = &pollErr1; cP->chFault = eNum;}
961 else {cP->chPoller = cP->chPollXQ;
962 cP->inPSet = 1;
963 if (cP->reMod)
964 {cP->chEvents = cP->reMod;
965 retval = cP->chPoller->Modify(cP, eNum, eTxt, isLocked);
966 TRACE_MOD(Init,cP->chFD,int(cP->reMod));
967 if (!isLocked) {cP->chMutex.Lock(); isLocked = true;}
968 } else {
969 TRACE_NOD(Init,cP->chFD,0);
970 }
971 }
972
973// All done
974//
975 cP->reMod = 0;
976 return retval;
977}
978
979/******************************************************************************/
980/* P o l l 2 E n u m */
981/******************************************************************************/
982
984{
985 if (events & POLLERR) return EPIPE;
986
987 if (events & POLLHUP) return ECONNRESET;
988
989 if (events & POLLNVAL) return EBADF;
990
991 return EOPNOTSUPP;
992}
993
994/******************************************************************************/
995/* S e n d C m d */
996/******************************************************************************/
997
999{
1000 int wlen;
1001
1002// Pipe writes are atomic so we don't need locks. Some commands require
1003// confirmation. We handle that here based on the command. Note that pipes
1004// gaurantee that all of the data will be written or we will block.
1005//
1006 if (cmd.req >= PipeData::Post)
1007 {XrdSysSemaphore mySem(0);
1008 cmd.theSem = &mySem;
1009 do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1010 while (wlen < 0 && errno == EINTR);
1011 if (wlen > 0) mySem.Wait();
1012 } else {
1013 do {wlen = write(cmdFD, (char *)&cmd, sizeof(PipeData));}
1014 while (wlen < 0 && errno == EINTR);
1015 }
1016
1017// All done
1018//
1019 return (wlen >= 0 ? 0 : errno);
1020}
1021
1022/******************************************************************************/
1023/* Protected: S e t P o l l E n t */
1024/******************************************************************************/
1025
1027{
1028 cP->pollEnt = pe;
1029}
1030
1031/******************************************************************************/
1032/* S t o p */
1033/******************************************************************************/
1034
1036{
1037 PipeData cmdbuff;
1038 CallBack *theCB;
1039 Channel *cP;
1040 void *cbArg;
1041 int doCB;
1042
1043// Initialize the pipdata structure
1044//
1045 memset(static_cast<void*>( &cmdbuff ), 0, sizeof(cmdbuff));
1046 cmdbuff.req = PipeData::Stop;
1047
1048// Lock all of this
1049//
1050 adMutex.Lock();
1051
1052// If we are already shutdown then we are done
1053//
1054 if (cmdFD == -1) {adMutex.UnLock(); return;}
1055
1056// First we must stop the poller thread in an orderly fashion.
1057//
1058 adMutex.UnLock();
1059 SendCmd(cmdbuff);
1060 adMutex.Lock();
1061
1062// Close the pipe communication mechanism
1063//
1064 close(cmdFD); cmdFD = -1;
1065 close(reqFD); reqFD = -1;
1066
1067// Run through cleaning up the channels. While there should not be any other
1068// operations happening on this poller, we take the conservative approach.
1069//
1070 while((cP = attBase))
1071 {REMOVE(attBase, attList, cP);
1072 adMutex.UnLock();
1073 cP->chMutex.Lock();
1074 doCB = cP->chCB != 0 && (cP->chEvents & Channel::stopEvent);
1075 if (cP->inTOQ) TmoDel(cP);
1076 cP->Reset(&pollErr1, cP->chFD, EIDRM);
1077 cP->chPollXQ = &pollErr1;
1078 if (doCB)
1079 {cP->chStat = Channel::isClear;
1080 theCB = cP->chCB; cbArg = cP->chCBA;
1081 cP->chMutex.UnLock();
1082 theCB->Stop(cP, cbArg);
1083 } else cP->chMutex.UnLock();
1084 adMutex.Lock();
1085 }
1086
1087// Now invoke the poller specific shutdown
1088//
1089 Shutdown();
1090 adMutex.UnLock();
1091}
1092
1093/******************************************************************************/
1094/* T m o A d d */
1095/******************************************************************************/
1096
1098{
1099 XrdSysMutexHelper mHelper(toMutex);
1100 time_t tNow;
1101 Channel *ncP;
1102 bool setRTO, setWTO;
1103
1104// Do some tracing
1105//
1106 IF_TRACE(TmoAdd,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1107 <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1108
1109// Remove element from timeout queue if it is there
1110//
1111 if (cP->inTOQ)
1112 {REMOVE(tmoBase, tmoList, cP);
1113 cP->inTOQ = 0;
1114 }
1115
1116// Determine which timeouts need to be reset
1117//
1118 tmoSet|= cP->dlType >> 4;
1121
1122// Reset the required deadlines
1123//
1124 tNow = time(0);
1125 if (setRTO && REVENTS(cP->chEvents) && cP->chRTO)
1126 cP->rdDL = cP->chRTO + tNow;
1127 if (setWTO && WEVENTS(cP->chEvents) && cP->chWTO)
1128 cP->wrDL = cP->chWTO + tNow;
1129
1130// Calculate the closest enabled deadline
1131//
1132 if (cP->rdDL < cP->wrDL)
1133 {cP->deadLine = cP->rdDL; cP->dlType = CallBack:: ReadTimeOut;
1134 } else {
1135 cP->deadLine = cP->wrDL; cP->dlType = CallBack::WriteTimeOut;
1136 if (cP->rdDL == cP->wrDL) cP->dlType |= CallBack:: ReadTimeOut;
1137 }
1138 IF_TRACE(TmoAdd, cP->chFD, "t=" <<tNow <<" rdDL=" <<setRTO <<' ' <<cP->rdDL
1139 <<" wrDL=" <<setWTO <<' ' <<cP->wrDL);
1140
1141// If no timeout really applies, we are done
1142//
1143 if (cP->deadLine == maxTime) return false;
1144
1145// Add the channel to the timeout queue in correct deadline position.
1146//
1147 if ((ncP = tmoBase))
1148 {do {if (cP->deadLine < ncP->deadLine) break;
1149 ncP = ncP->tmoList.next;
1150 } while(ncP != tmoBase);
1151 INSERT(tmoList, ncP, cP);
1152 if (cP->deadLine < tmoBase->deadLine) tmoBase = cP;
1153 } else tmoBase = cP;
1154 cP->inTOQ = 1;
1155
1156// Indicate to the caller whether or not a wakeup is required
1157//
1158 return (tmoBase == cP);
1159}
1160
1161/******************************************************************************/
1162/* T m o D e l */
1163/******************************************************************************/
1164
1166{
1167
1168// Do some tracing
1169//
1170 IF_TRACE(TmoDel,cP->chFD,"chan="<< std::hex<<(void*)cP<< std::dec
1171 <<" inTOQ="<<BOOLNAME(cP->inTOQ)<<" status="<<STATUSOF(cP));
1172
1173// Get the timeout queue lock and remove the channel from the queue
1174//
1175 toMutex.Lock();
1176 REMOVE(tmoBase, tmoList, cP);
1177 cP->inTOQ = 0;
1178 toMutex.UnLock();
1179}
1180
1181/******************************************************************************/
1182/* T m o G e t */
1183/******************************************************************************/
1184
1186{
1187 int wtval;
1188
1189// Lock the timeout queue
1190//
1191 toMutex.Lock();
1192
1193// Calculate wait time. If the deadline passed, invoke the timeout callback.
1194// we will need to drop the timeout lock as we don't have the channel lock.
1195//
1196 do {if (!tmoBase) {wtval = -1; break;}
1197 wtval = (tmoBase->deadLine - time(0)) * 1000;
1198 if (wtval > 0) break;
1199 toMutex.UnLock();
1200 CbkTMO();
1201 toMutex.Lock();
1202 } while(1);
1203
1204// Return the value
1205//
1206 CPP_ATOMIC_STORE(wakePend, false, std::memory_order_release);
1207 toMutex.UnLock();
1208 return wtval;
1209}
1210
1211/******************************************************************************/
1212/* W a k e U p */
1213/******************************************************************************/
1214
1215void XrdSys::IOEvents::Poller::WakeUp()
1216{
1217 static PipeData cmdbuff(PipeData::NoOp);
1218
1219// Send it off to wakeup the poller thread, but only if here is no wakeup in
1220// progress.
1221//
1222// We use a mutex here because we want to produce a synchronization point - all
1223// threads that might be interested timeouts and wakeups are going to incur a
1224// cache bounce for the page where wakePend resides; they will see a consistent
1225// view of the wakePend flag. For those threads, this is equivalent to
1226// an atomic with memory_order std::memory_order_seq_cst (the strongest ordering).
1227// However, the threads that are not interested in timeouts will not get a flush
1228// for their copy of the wakePend page. They will still have the weaker memory
1229// ordering of consume/release (which is guaranteed anyway on all current architectures
1230// except for DEC Alpha).
1231 toMutex.Lock();
1232 bool isWakePend = CPP_ATOMIC_LOAD(wakePend, std::memory_order_consume);
1233 if (isWakePend) {toMutex.UnLock();}
1234 else {CPP_ATOMIC_STORE(wakePend, true, std::memory_order_release);
1235 toMutex.UnLock();
1236 SendCmd(cmdbuff);
1237 }
1238}
1239
1240/******************************************************************************/
1241/* I m p l e m e n t a t i o n S p e c i f i c s */
1242/******************************************************************************/
1243
1244#if defined( __solaris__ )
1246#elif defined( __linux__ )
1248#elif defined(__APPLE__)
1250#else
1252#endif
#define close(a)
Definition XrdPosix.hh:48
#define write(a, b, c)
Definition XrdPosix.hh:115
#define read(a, b, c)
Definition XrdPosix.hh:82
#define REMOVE(dlbase, dlvar, curitem)
#define INSERT(dlvar, curitem, newitem)
#define CPP_ATOMIC_LOAD(x, order)
#define CPP_ATOMIC_STORE(x, val, order)
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
#define IF_TRACE(x, fd, y)
#define TRACE_LOK
#define STATUS
#define TRACE_NOD(x, fd, y)
#define STATUSOF(x)
#define DO_TRACE(x, fd, y)
#define REVENTS(x)
#define BOOLNAME(x)
#define TRACE_MOD(x, fd, y)
#define ISPOLLER
#define WEVENTS(x)
#define XRDSYSTHREAD_BIND
#define TRACING(x)
Definition XrdTrace.hh:70
void Lock(XrdSysMutex *Mutex)
static int Same(pthread_t t1, pthread_t t2)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
static pthread_t ID(void)
static void * Start(void *parg)
virtual void Fatal(Channel *chP, void *cbArg, int eNum, const char *eTxt)
virtual bool Event(Channel *chP, void *cbArg, int evFlags)=0
virtual void Stop(Channel *chP, void *cbArg)
@ ReadyToWrite
Writing won't block.
@ ReadyToRead
New data has arrived.
@ ValidEvents
Mask to test for valid events.
void SetCallBack(CallBack *cbP, void *cbArg=0)
void GetCallBack(CallBack **cbP, void **cbArg)
@ allEvents
All of the above.
@ errorEvents
Error event non-r/w specific.
@ stopEvent
Poller stop event.
bool Enable(int events, int timeout=0, const char **eText=0)
Channel(Poller *pollP, int fd, CallBack *cbP=0, void *cbArg=0)
bool Disable(int events, const char **eText=0)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)
void Exclude(Channel *cP, bool &isLocked, bool dover=1)
void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)
virtual bool Modify(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
virtual bool Include(Channel *cP, int &eNum, const char **eTxt, bool &isLocked)=0
static Poller * Create(int &eNum, const char **eTxt=0, int crOpts=0)
virtual void Begin(XrdSysSemaphore *syncp, int &rc, const char **eTxt)=0
bool CbkXeq(Channel *cP, int events, int eNum, const char *eTxt)
int SendCmd(PipeData &cmd)
virtual void Shutdown()=0
int Poll2Enum(short events)
bool TmoAdd(Channel *cP, int tmoSet)
void SetPollEnt(Channel *cP, int ptEnt)
bool Init(Channel *cP, int &eNum, const char **eTxt, bool &isLockd)