XRootD
Loading...
Searching...
No Matches
XrdOssThrottleFile.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* */
3/* (c) 2025 by the Morgridge Institute for Research */
4/* */
5/* This file is part of the XRootD software suite. */
6/* */
7/* XRootD is free software: you can redistribute it and/or modify it under */
8/* the terms of the GNU Lesser General Public License as published by the */
9/* Free Software Foundation, either version 3 of the License, or (at your */
10/* option) any later version. */
11/* */
12/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
13/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
14/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
15/* License for more details. */
16/* */
17/* You should have received a copy of the GNU Lesser General Public License */
18/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
19/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
20/* */
21/* The copyright holder's institutional names and contributor's names may not */
22/* be used to endorse or promote products derived from this software without */
23/* specific prior written permission of the institution or contributor. */
24/******************************************************************************/
25
26#include "XrdOuc/XrdOucEnv.hh"
28#include "XrdOss/XrdOss.hh"
30#include "XrdSfs/XrdSfsAio.hh"
35#include "XrdVersion.hh"
36
37#include <functional>
38
39namespace {
40
41class File final : public XrdOssWrapDF {
42public:
43 File(std::unique_ptr<XrdOssDF> wrapDF, XrdThrottleManager &throttle, XrdSysError *lP, XrdOucTrace *tP)
44 : XrdOssWrapDF(*wrapDF), m_log(lP), m_throttle(throttle), m_trace(tP), m_wrapped(std::move(wrapDF)) {}
45
46virtual ~File() {}
47
48virtual int Open(const char *path, int Oflag, mode_t Mode,
49 XrdOucEnv &env) override {
50
51 std::tie(m_user, m_uid) = m_throttle.GetUserInfo(env.secEnv());
52
53 std::string open_error_message;
54 if (!m_throttle.OpenFile(m_user, open_error_message)) {
55 TRACE(DEBUG, open_error_message);
56 return -EMFILE;
57 }
58
59 auto rval = wrapDF.Open(path, Oflag, Mode, env);
60
61 if (rval < 0) {
62 m_throttle.CloseFile(m_user);
63 }
64
65 return rval;
66}
67
68virtual int Close(long long *retsz) override {
69 m_throttle.CloseFile(m_user);
70 return wrapDF.Close(retsz);
71}
72
73virtual int getFD() override {return -1;}
74
75virtual off_t getMmap(void **addr) override {*addr = 0; return 0;}
76
77virtual ssize_t pgRead (void* buffer, off_t offset, size_t rdlen,
78 uint32_t* csvec, uint64_t opts) override {
79
80 return DoThrottle(rdlen, 1,
81 static_cast<ssize_t (XrdOssDF::*)(void*, off_t, size_t, uint32_t*, uint64_t)>(&XrdOssDF::pgRead),
82 buffer, offset, rdlen, csvec, opts);
83}
84
85virtual int pgRead(XrdSfsAio *aioparm, uint64_t opts) override
86{ // We disable all AIO-based reads.
87 aioparm->Result = pgRead((char *)aioparm->sfsAio.aio_buf,
88 aioparm->sfsAio.aio_offset,
89 aioparm->sfsAio.aio_nbytes,
90 aioparm->cksVec, opts);
91 aioparm->doneRead();
92 return 0;
93}
94
95virtual ssize_t pgWrite(void* buffer, off_t offset, size_t wrlen,
96 uint32_t* csvec, uint64_t opts) override {
97
98 return DoThrottle(wrlen, 1,
99 static_cast<ssize_t (XrdOssDF::*)(void*, off_t, size_t, uint32_t*, uint64_t)>(&XrdOssDF::pgWrite),
100 buffer, offset, wrlen, csvec, opts);
101}
102
103virtual int pgWrite(XrdSfsAio *aioparm, uint64_t opts) override
104{ // We disable all AIO-based writes.
105 aioparm->Result = this->pgWrite((char *)aioparm->sfsAio.aio_buf,
106 aioparm->sfsAio.aio_offset,
107 aioparm->sfsAio.aio_nbytes,
108 aioparm->cksVec, opts);
109 aioparm->doneWrite();
110 return 0;
111}
112
113virtual ssize_t Read(off_t offset, size_t size) override {
114 return DoThrottle(size, 1,
115 static_cast<ssize_t (XrdOssDF::*)(off_t, size_t)>(&XrdOssDF::Read),
116 offset, size);
117}
118virtual ssize_t Read(void* buffer, off_t offset, size_t size) override {
119 return DoThrottle(size, 1,
120 static_cast<ssize_t (XrdOssDF::*)(void*, off_t, size_t)>(&XrdOssDF::Read),
121 buffer, offset, size);
122}
123
124virtual int Read(XrdSfsAio *aiop) override {
125 aiop->Result = this->Read((char *)aiop->sfsAio.aio_buf,
126 aiop->sfsAio.aio_offset,
127 aiop->sfsAio.aio_nbytes);
128 aiop->doneRead();
129 return 0;
130}
131
132virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt) override {
133 off_t sum = 0;
134 for (int i = 0; i < rdvcnt; ++i) {
135 sum += readV[i].size;
136 }
137 return DoThrottle(sum, rdvcnt, &XrdOssDF::ReadV, readV, rdvcnt);
138}
139
140
141virtual ssize_t Write(const void* buffer, off_t offset, size_t size) override {
142 return DoThrottle(size, 1,
143 static_cast<ssize_t (XrdOssDF::*)(const void*, off_t, size_t)>(&XrdOssDF::Write),
144 buffer, offset, size);
145}
146
147virtual int Write(XrdSfsAio *aiop) override {
148 aiop->Result = this->Write((char *)aiop->sfsAio.aio_buf,
149 aiop->sfsAio.aio_offset,
150 aiop->sfsAio.aio_nbytes);
151 aiop->doneWrite();
152 return 0;
153}
154
155private:
156
157 template <class Fn, class... Args>
158 int DoThrottle(size_t rdlen, size_t ops, Fn &&fn, Args &&... args) {
159 m_throttle.Apply(rdlen, ops, m_uid);
160 bool ok = true;
161 XrdThrottleTimer timer = m_throttle.StartIOTimer(m_uid, ok);
162 if (!ok) {
163 TRACE(DEBUG, "Throttling in progress");
164 return -EMFILE;
165 }
166 return std::invoke(fn, wrapDF, std::forward<Args>(args)...);
167 }
168
169 XrdSysError *m_log{nullptr};
170 XrdThrottleManager &m_throttle;
171 XrdOucTrace *m_trace{nullptr};
172 std::unique_ptr<XrdOssDF> m_wrapped;
173 std::string m_user;
174 uint16_t m_uid;
175
176 static constexpr char TraceID[] = "XrdThrottleFile";
177};
178
179class FileSystem final : public XrdOssWrapper {
180public:
181 FileSystem(XrdOss *oss, XrdSysLogger *log, XrdOucEnv *envP)
182 : XrdOssWrapper(*oss),
183 m_env(envP),
184 m_oss(oss),
185 m_log(new XrdSysError(log)),
186 m_trace(new XrdOucTrace(m_log.get())),
187 m_throttle(m_log.get(), m_trace.get())
188 {
189
190 m_throttle.Init();
191 if (envP)
192 {
193 auto gstream = reinterpret_cast<XrdXrootdGStream*>(envP->GetPtr("Throttle.gStream*"));
194 m_log->Say("Config", "Throttle g-stream has", gstream ? "" : " NOT", " been configured via xrootd.mongstream directive");
195 m_throttle.SetMonitor(gstream);
196 }
197 }
198
199 int Configure(const std::string &config_filename) {
200 XrdThrottle::Configuration config(*m_log, m_env);
201 if (config.Configure(config_filename)) {
202 m_log->Emsg("Config", "Unable to load configuration file", config_filename.c_str());
203 return 1;
204 }
205 m_throttle.FromConfig(config);
206 return 0;
207 }
208
209 virtual ~FileSystem() {}
210
211 virtual XrdOssDF *newFile(const char *user = 0) override {
212 std::unique_ptr<XrdOssDF> wrapped(wrapPI.newFile(user));
213 return new File(std::move(wrapped), m_throttle, m_log.get(), m_trace.get());
214 }
215
216private:
217 XrdOucEnv *m_env{nullptr};
218 std::unique_ptr<XrdOss> m_oss;
219 std::unique_ptr<XrdSysError> m_log{nullptr};
220 std::unique_ptr<XrdOucTrace> m_trace{nullptr};
221 XrdThrottleManager m_throttle;
222};
223
224} // namespace
225
226extern "C" {
227
229 const char *config_fn, const char *parms,
230 XrdOucEnv *envP) {
231 std::unique_ptr<FileSystem> fs(new FileSystem(curr_oss, logger, envP));
232 if (fs->Configure(config_fn)) {
233 XrdSysError(logger, "XrdThrottle").Say("Config", "Unable to load configuration file", config_fn);
234 return nullptr;
235 }
236 // Note the throttle is set up as an OSS.
237 // This will prevent the throttle from being layered on top of the OFS; to keep backward
238 // compatibility with old configurations, we do not cause the server to fail.
239 //
240 // Originally, XrdThrottle was used as an OFS because the loadshed code required the ability
241 // to redirect the client to a different server. This is rarely (never?) used in practice.
242 // By putting the throttle in the OSS, we benefit from the fact the OFS has first run the
243 // authorization code and has made a user name available for fairshare of the throttle.
244 envP->PutInt("XrdOssThrottle", 1);
245 return fs.release();
246}
247
249
250} // extern "C"
251
#define DEBUG(x)
XrdOss * XrdOssAddStorageSystem2(XrdOss *curr_oss, XrdSysLogger *Logger, const char *config_fn, const char *parms, XrdOucEnv *envP)
Definition XrdOssCsi.cc:455
XrdOss * XrdOssAddStorageSystem2(XrdOss *curr_oss, XrdSysLogger *logger, const char *config_fn, const char *parms, XrdOucEnv *envP)
XrdVERSIONINFO(XrdOssAddStorageSystem2, throttle)
int Mode
XrdOucString File
struct myOpts opts
off_t aio_offset
Definition XrdSfsAio.hh:49
size_t aio_nbytes
Definition XrdSfsAio.hh:48
void * aio_buf
Definition XrdSfsAio.hh:47
#define TRACE(act, x)
Definition XrdTrace.hh:63
virtual ssize_t Read(off_t offset, size_t size)
Definition XrdOss.hh:310
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
Definition XrdOss.cc:198
virtual ssize_t pgRead(void *buffer, off_t offset, size_t rdlen, uint32_t *csvec, uint64_t opts)
Definition XrdOss.cc:160
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
Definition XrdOss.cc:252
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition XrdOss.hh:385
void PutInt(const char *varname, long value)
Definition XrdOucEnv.cc:250
const XrdSecEntity * secEnv() const
Definition XrdOucEnv.hh:107
void * GetPtr(const char *varname)
Definition XrdOucEnv.cc:263
uint32_t * cksVec
Definition XrdSfsAio.hh:63
ssize_t Result
Definition XrdSfsAio.hh:65
virtual void doneRead()=0
struct aiocb sfsAio
Definition XrdSfsAio.hh:62
virtual void doneWrite()=0
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, time_t timeout=0)
Factory for creating ReadImpl objects.
CloseImpl< false > Close(Ctx< File > file, time_t timeout=0)
Factory for creating CloseImpl objects.
OpenImpl< false > Open(Ctx< File > file, Arg< std::string > url, Arg< OpenFlags::Flags > flags, Arg< Access::Mode > mode=Access::None, time_t timeout=0)
Factory for creating ReadImpl objects.
WriteImpl< false > Write(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< const void * > buffer, time_t timeout=0)
Factory for creating WriteImpl objects.
XrdOucEnv * envP
Definition XrdPss.cc:110