XRootD
Loading...
Searching...
No Matches
XrdThrottleManager.hh
Go to the documentation of this file.
1
2/*
3 * XrdThrottleManager
4 *
5 * This class provides an implementation of a throttle manager.
6 * The throttled manager purposely pause if the bandwidth, IOPS
7 * rate, or number of outstanding IO requests is sustained above
8 * a certain level.
9 *
10 * The XrdThrottleManager is user-aware and provides fairshare.
11 *
12 * This works by having a separate thread periodically refilling
13 * each user's shares.
14 *
15 * Note that we do not actually keep close track of users, but rather
16 * put them into a hash. This way, we can pretend there's a constant
17 * number of users and use a lock-free algorithm.
18 */
19
20#ifndef __XrdThrottleManager_hh_
21#define __XrdThrottleManager_hh_
22
23#ifdef __GNUC__
24#define likely(x) __builtin_expect(!!(x), 1)
25#define unlikely(x) __builtin_expect(!!(x), 0)
26#else
27#define likely(x) x
28#define unlikely(x) x
29#endif
30
31#include <array>
32#include <ctime>
33#include <condition_variable>
34#include <memory>
35#include <mutex>
36#include <string>
37#include <shared_mutex>
38#include <unordered_map>
39#include <vector>
40
43
44class XrdSecEntity;
45class XrdSysError;
46class XrdOucTrace;
49
50namespace XrdThrottle {
51 class Configuration;
52}
53
55{
56
57friend class XrdThrottleTimer;
58
59public:
60
61void Init();
62
63bool OpenFile(const std::string &entity, std::string &open_error_message);
64bool CloseFile(const std::string &entity);
65
66void Apply(int reqsize, int reqops, int uid);
67
69
70bool IsThrottling() {return (m_ops_per_second > 0) || (m_bytes_per_second > 0);}
71
72// Returns the user name and UID for the given client.
73//
74// The UID is a hash of the user name; it is not guaranteed to be unique.
75std::tuple<std::string, uint16_t> GetUserInfo(const XrdSecEntity *client);
76
77void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
78 {m_interval_length_seconds = interval_length; m_bytes_per_second = reqbyterate;
79 m_ops_per_second = reqoprate; m_concurrency_limit = concurrency;}
80
81void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
82 {m_loadshed_host = hostname; m_loadshed_port = port; m_loadshed_frequency = frequency;}
83
84void SetMaxOpen(unsigned long max_open) {m_max_open = max_open;}
85
86void SetMaxConns(unsigned long max_conns) {m_max_conns = max_conns;}
87
88void SetMaxWait(unsigned long max_wait) {m_max_wait_time = std::chrono::seconds(max_wait);}
89
90// Load per-user limits from configuration file
91// Returns 0 on success, non-zero on failure
92int LoadUserLimits(const std::string &config_file);
93
94// Reload per-user limits (for runtime reloading)
96
97// Get per-user connection limit for a given username
98// Returns 0 if no per-user limit is set (use global), otherwise returns the limit
99unsigned long GetUserMaxConn(const std::string &username);
100
101void SetMonitor(XrdXrootdGStream *gstream) {m_gstream = gstream;}
102
103//int Stats(char *buff, int blen, int do_sync=0) {return m_pool.Stats(buff, blen, do_sync);}
104
105// Notify that an I/O operation has started for a given user.
106//
107// If we are at the maximum concurrency limit then this will block;
108// if we block for too long, the second return value will return false.
109XrdThrottleTimer StartIOTimer(uint16_t uid, bool &ok);
110
111void PrepLoadShed(const char *opaque, std::string &lsOpaque);
112
113bool CheckLoadShed(const std::string &opaque);
114
115void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port);
116
118
119 ~XrdThrottleManager() {} // The buffmanager is never deleted
120
121protected:
122
123// Notify the manager an I/O operation has completed for a given user.
124// This is used to update the I/O wait time for the user and, potentially,
125// wake up a waiting thread.
126void StopIOTimer(std::chrono::steady_clock::duration & event_duration, uint16_t uid);
127
128private:
129
130// Determine the UID for a given user name.
131// This is a hash of the username; it is not guaranteed to be unique.
132// The UID is used to index into the waiters array and cannot be more than m_max_users.
133uint16_t GetUid(const std::string &);
134
135void Recompute();
136
137void RecomputeInternal();
138
139static
140void * RecomputeBootstrap(void *pp);
141
142// Compute the order of wakeups for the existing waiters.
143void ComputeWaiterOrder();
144
145// Walk through the outstanding IO operations and compute the per-user
146// IO time.
147//
148// Meant to be done periodically as part of the Recompute interval. Used
149// to make sure we have a better estimate of the concurrency for each user.
150void UserIOAccounting();
151
152int WaitForShares();
153
154void GetShares(int &shares, int &request);
155
156void StealShares(int uid, int &reqsize, int &reqops);
157
158// Return the timer hash list ID to use for the current request.
159//
160// When on Linux, this will hash across the CPU ID; the goal is to distribute
161// the different timers across several lists to avoid mutex contention.
162static unsigned GetTimerListHash();
163
164// Notify a single waiter thread that it can proceed.
165void NotifyOne();
166
167XrdOucTrace * m_trace;
168XrdSysError * m_log;
169
170XrdSysCondVar m_compute_var;
171
172// Controls for the various rates.
173float m_interval_length_seconds;
174float m_bytes_per_second;
175float m_ops_per_second;
176int m_concurrency_limit;
177
178// Maintain the shares
179
180static constexpr int m_max_users = 1024; // Maximum number of users we can have; used for various fixed-size arrays.
181std::vector<int> m_primary_bytes_shares;
182std::vector<int> m_secondary_bytes_shares;
183std::vector<int> m_primary_ops_shares;
184std::vector<int> m_secondary_ops_shares;
185int m_last_round_allocation;
186
187// Waiter counts for each user
188struct alignas(64) Waiter
189{
190 std::condition_variable m_cv; // Condition variable for waiters of this user.
191 std::mutex m_mutex; // Mutex for this structure
192 unsigned m_waiting{0}; // Number of waiting operations for this user.
193
194 // EWMA of the concurrency for this user. This is used to determine how much
195 // above / below the user's concurrency share they've been recently. This subsequently
196 // will affect the likelihood of being woken up.
197 XrdSys::RAtomic<float> m_concurrency{0};
198
199 // I/O time for this user since the last recompute interval. The value is used
200 // to compute the EWMA of the concurrency (m_concurrency).
201 XrdSys::RAtomic<std::chrono::steady_clock::duration::rep> m_io_time{0};
202
203 // Pointer to the XrdThrottleManager object that owns this waiter.
204 XrdThrottleManager *m_manager{nullptr};
205
206 // Causes the current thread to wait until it's the user's turn to wake up.
207 bool Wait();
208
209 // Wakes up one I/O operation for this user.
210 void NotifyOne(std::unique_lock<std::mutex> lock)
211 {
212 m_cv.notify_one();
213 }
214};
215std::array<Waiter, m_max_users> m_waiter_info;
216
217// Array with the wake up ordering of the waiter users.
218// Every recompute interval, we compute how much over the concurrency limit
219// each user is, quantize this to an integer number of shares and then set the
220// array value to the user ID (so if user ID 5 has two shares, then there are two
221// entries with value 5 in the array). The array is then shuffled to randomize the
222// order of the wakeup.
223//
224// All reads and writes to the wake order array are meant to be relaxed atomics; if a thread
225// has an outdated view of the array, it simply means that a given user might get slightly
226// incorrect random probability of being woken up. That's seen as acceptable to keep
227// the selection algorithm lock and fence-free.
228std::array<XrdSys::RAtomic<int16_t>, m_max_users> m_wake_order_0;
229std::array<XrdSys::RAtomic<int16_t>, m_max_users> m_wake_order_1; // A second wake order array; every recompute interval, we will swap the active array, avoiding locks.
230XrdSys::RAtomic<char> m_wake_order_active; // The current active wake order array; 0 or 1
231std::atomic<size_t> m_waiter_offset{0}; // Offset inside the wake order array; this is used to wake up the next potential user in line. Cannot be relaxed atomic as offsets need to be seen in order.
232std::chrono::steady_clock::time_point m_last_waiter_recompute_time; // Last time we recomputed the wait ordering.
233XrdSys::RAtomic<unsigned> m_waiting_users{0}; // Number of users waiting behind the throttle as of the last recompute time.
234
235std::atomic<uint32_t> m_io_active; // Count of in-progress IO operations: cannot be a relaxed atomic as ordering of inc/dec matters.
236XrdSys::RAtomic<std::chrono::steady_clock::duration::rep> m_io_active_time; // Total IO wait time recorded since the last recompute interval; reset to zero about every second.
237XrdSys::RAtomic<uint64_t> m_io_total{0}; // Monotonically increasing count of IO operations; reset to zero about every second.
238
239int m_stable_io_active{0}; // Number of IO operations in progress as of the last recompute interval; must hold m_compute_var lock when reading/writing.
240uint64_t m_stable_io_total{0}; // Total IO operations since startup. Recomputed every second; must hold m_compute_var lock when reading/writing.
241
242std::chrono::steady_clock::duration m_stable_io_wait; // Total IO wait time as of the last recompute interval.
243
244// Load shed details
245std::string m_loadshed_host;
246unsigned m_loadshed_port;
247unsigned m_loadshed_frequency;
248
249// The number of times we have an I/O operation that hit the concurrency limit.
250// This is monotonically increasing and is "relaxed" because it's purely advisory;
251// ordering of the increments between threads is not important.
252XrdSys::RAtomic<int> m_loadshed_limit_hit;
253
254// Maximum number of open files
255unsigned long m_max_open{0};
256unsigned long m_max_conns{0};
257std::unordered_map<std::string, unsigned long> m_file_counters;
258std::unordered_map<std::string, unsigned long> m_conn_counters;
259std::unordered_map<std::string, std::unique_ptr<std::unordered_map<pid_t, unsigned long>>> m_active_conns;
260std::mutex m_file_mutex;
261
262// Per-user connection limits
263struct UserLimit {
264 unsigned long max_conn{0}; // 0 means no limit (use global)
265 bool is_wildcard{false}; // true if this is a wildcard pattern
266};
267std::unordered_map<std::string, UserLimit> m_user_limits;
268std::shared_mutex m_user_limits_mutex;
269std::string m_user_config_file;
270
271// Track the ongoing I/O operations. We have several linked lists (hashed on the
272// CPU ID) of I/O operations that are in progress. This way, we can periodically sum
273// up the time spent in ongoing operations - which is important for operations that
274// last longer than the recompute interval.
275struct TimerList {
276 std::mutex m_mutex;
277 XrdThrottleTimer *m_first{nullptr};
278 XrdThrottleTimer *m_last{nullptr};
279};
280#if defined(__linux__)
281static constexpr size_t m_timer_list_size = 32;
282#else
283static constexpr size_t m_timer_list_size = 1;
284#endif
285std::array<TimerList, m_timer_list_size> m_timer_list; // A vector of linked lists of I/O operations. We keep track of multiple instead of a single one to avoid a global mutex.
286
287// Maximum wait time for a user to perform an I/O operation before failing.
288// Most clients have some sort of operation timeout; after that point, if we go
289// ahead and do the work, it's wasted effort as the client has gone.
290std::chrono::steady_clock::duration m_max_wait_time{std::chrono::seconds(30)};
291
292// Monitoring handle, if configured
293XrdXrootdGStream* m_gstream{nullptr};
294
295static const char *TraceID;
296
297};
298
300{
301
303
304public:
305
307{
308 if (m_manager) {
309 StopTimer();
310 }
311}
312
313protected:
314
316 m_start_time(std::chrono::steady_clock::time_point::min())
317{}
318
320 m_owner(uid),
321 m_timer_list_entry(XrdThrottleManager::GetTimerListHash()),
322 m_manager(manager),
323 m_start_time(std::chrono::steady_clock::now())
324{
325 if (!m_manager) {
326 return;
327 }
328 auto &timerList = m_manager->m_timer_list[m_timer_list_entry];
329 std::lock_guard<std::mutex> lock(timerList.m_mutex);
330 if (timerList.m_first == nullptr) {
331 timerList.m_first = this;
332 } else {
333 m_prev = timerList.m_last;
334 m_prev->m_next = this;
335 }
336 timerList.m_last = this;
337}
338
339std::chrono::steady_clock::duration Reset() {
340 auto now = std::chrono::steady_clock::now();
341 auto last_start = m_start_time.exchange(now);
342 return now - last_start;
343}
344
345private:
346
347 void StopTimer()
348 {
349 if (!m_manager) return;
350
351 auto event_duration = Reset();
352 auto &timerList = m_manager->m_timer_list[m_timer_list_entry];
353 {
354 std::unique_lock<std::mutex> lock(timerList.m_mutex);
355 if (m_prev) {
356 m_prev->m_next = m_next;
357 if (m_next) {
358 m_next->m_prev = m_prev;
359 } else {
360 timerList.m_last = m_prev;
361 }
362 } else {
363 timerList.m_first = m_next;
364 if (m_next) {
365 m_next->m_prev = nullptr;
366 } else {
367 timerList.m_last = nullptr;
368 }
369 }
370 }
371 m_manager->StopIOTimer(event_duration, m_owner);
372 }
373
374 const uint16_t m_owner{0};
375 const uint16_t m_timer_list_entry{0};
376 XrdThrottleManager *m_manager{nullptr};
377 XrdThrottleTimer *m_prev{nullptr};
378 XrdThrottleTimer *m_next{nullptr};
379 XrdSys::RAtomic<std::chrono::steady_clock::time_point> m_start_time;
380
381};
382
383#endif
void StopIOTimer(std::chrono::steady_clock::duration &event_duration, uint16_t uid)
void SetThrottles(float reqbyterate, float reqoprate, int concurrency, float interval_length)
void SetMaxOpen(unsigned long max_open)
void FromConfig(XrdThrottle::Configuration &config)
void Apply(int reqsize, int reqops, int uid)
std::tuple< std::string, uint16_t > GetUserInfo(const XrdSecEntity *client)
XrdThrottleTimer StartIOTimer(uint16_t uid, bool &ok)
void SetLoadShed(std::string &hostname, unsigned port, unsigned frequency)
friend class XrdThrottleTimer
void SetMonitor(XrdXrootdGStream *gstream)
void PrepLoadShed(const char *opaque, std::string &lsOpaque)
bool CheckLoadShed(const std::string &opaque)
int LoadUserLimits(const std::string &config_file)
void SetMaxWait(unsigned long max_wait)
unsigned long GetUserMaxConn(const std::string &username)
void SetMaxConns(unsigned long max_conns)
XrdThrottleManager(XrdSysError *lP, XrdOucTrace *tP)
void PerformLoadShed(const std::string &opaque, std::string &host, unsigned &port)
bool CloseFile(const std::string &entity)
bool OpenFile(const std::string &entity, std::string &open_error_message)
XrdThrottleTimer(XrdThrottleManager *manager, int uid)
friend class XrdThrottleManager
std::chrono::steady_clock::duration Reset()