XRootD
Loading...
Searching...
No Matches
XrdClHttpOps.hh
Go to the documentation of this file.
1/******************************************************************************/
2/* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research */
3/* */
4/* This file is part of the XrdClHttp client plugin for XRootD. */
5/* */
6/* XRootD is free software: you can redistribute it and/or modify it under */
7/* the terms of the GNU Lesser General Public License as published by the */
8/* Free Software Foundation, either version 3 of the License, or (at your */
9/* option) any later version. */
10/* */
11/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
12/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
13/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
14/* License for more details. */
15/* */
16/* The copyright holder's institutional names and contributor's names may not */
17/* be used to endorse or promote products derived from this software without */
18/* specific prior written permission of the institution or contributor. */
19/******************************************************************************/
20
21#ifndef XRDCLHTTP_CURLOPS_HH
22#define XRDCLHTTP_CURLOPS_HH
23
27#include "XrdClHttpUtil.hh"
28
29#include <XrdCl/XrdClBuffer.hh>
31
32#include <atomic>
33#include <memory>
34#include <string>
35#include <utility>
36#include <vector>
37
38#include <curl/curl.h>
39
40namespace XrdCl {
41
42class Log;
43class ResponseHandler;
44class URL;
45
46}
47
48class TiXmlElement;
49
50namespace XrdClHttp {
51
52class CurlWorker;
53class File;
54class ResponseInfo;
55
57public:
58 using HeaderList = std::vector<std::pair<std::string, std::string>>;
59
71
72 // Operation constructor when the timeout is given as an offset from now.
73 CurlOperation(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout,
74 XrdCl::Log *log, CreateConnCalloutType, HeaderCallout *header_callout);
75
76
77 // Operation constructor when the timeout is given as an absolute time.
78 CurlOperation(XrdCl::ResponseHandler *handler, const std::string &url, std::chrono::steady_clock::time_point expiry,
79 XrdCl::Log *log, CreateConnCalloutType, HeaderCallout *header_callout);
80
81 virtual ~CurlOperation();
82
83 CurlOperation(const CurlOperation &) = delete;
84
85 // Finish the setup of the curl handle
86 //
87 // Used for configuring any extra headers
88 bool FinishSetup(CURL *curl);
89
90 virtual bool Setup(CURL *curl, CurlWorker &);
91
92 virtual void Fail(uint16_t errCode, uint32_t errNum, const std::string &);
93
94 virtual void ReleaseHandle();
95
96 virtual void Success() = 0;
97
98 // Returns the connection callout function for this operation
99 CreateConnCalloutType GetConnCalloutFunc() const {return m_conn_callout;}
100
101 // Return the HTTP verb to use with this operation.
102 virtual HttpVerb GetVerb() const = 0;
103
104 // Return a string version of the HTTP operation
105 static const std::string GetVerbString(HttpVerb);
106
107 // Returns when the curl header timeout expires.
108 //
109 // The first byte of the header must be received before this time.
110 std::chrono::steady_clock::time_point GetHeaderExpiry() const {return m_header_expiry;}
111
112 // Returns when the curl operation expires
113 std::chrono::steady_clock::time_point GetOperationExpiry() {
114 if (m_last_xfer == std::chrono::steady_clock::time_point()) {
115 return GetHeaderExpiry();
116 }
117 return m_last_xfer + m_stall_interval;
118 }
119
120 // Clean up the thread-local DNS cache for fake lookups associated with the
121 // connection callback cache.
122 static void CleanupDnsCache();
123
124 // Invoked when the worker thread is ready to resume a request after a pause.
125 //
126 // Pauses occur when a PUT request has started but is waiting on more data
127 // from the client; when additional data has arrived, the operation will
128 // be continued and this function called by the worker thread.
129 virtual bool ContinueHandle() {return true;}
130
131 // Set the continue queue to use for when a paused handle is ready to
132 // be re-run.
133 virtual void SetContinueQueue(std::shared_ptr<XrdClHttp::HandlerQueue> queue) {}
134
135 enum class RedirectAction {
136 Fail, // The redirect parsing failed and Fail() was called
137 Reinvoke, // Reinvoke the curl handle, following redirect
138 ReinvokeAfterAllow, // Reinvoke the Redirect function once the allowed verbs are known.
139 };
140 // Handle a redirect to a different URL.
141 // Returns Reinvoke if the curl handle should be invoked again immediately.
142 // Returns ReinvokeAfterAllow if the redirect should be invoked after the allowed verbs are known.
143 // In this case, the operation will set the target to the redirect target.
144 // Implementations must call Fail() if the handler should not re-invoke the curl handle.
145 virtual RedirectAction Redirect(std::string &target);
146
147 // Indicate whether the result of the operation is a redirect.
148 //
149 // This relies on the response headers having been parsed and available; anything in
150 // the 30X range is considered a redirect.
151 bool IsRedirect() const {return m_headers.GetStatusCode() >= 300 && m_headers.GetStatusCode() < 400;}
152
153 // If returns non-negative, the result is a FD that should be waited on after a broker connection request.
154 virtual int WaitSocket() {return m_conn_callout_listener;}
155 // Callback when the `WaitSocket` is active for read.
156 virtual int WaitSocketCallback(std::string &err);
157
158 // Connection broker-related functionality.
159 // When the broker URL is set, the operation will use the connection broker to get a TCP socket
160 // to the remote server. Note that we will try the operation initially without in case the curl
161 // handle has an existing socket it can reuse. If reuse fails, then the operation is going to fail
162 // with CURLE_COULDNT_CONNECT and we will retry (once) to connect via the broker. This is all
163 // done outside curl's open socket callback to ensure the event loop stays non-blocking.
164
165 bool StartConnectionCallout(std::string &err); // Start the connection callout process for a URL.
166 bool UseConnectionCallout() {return m_callout.get();} // Returns true if the callout should be tried.
167 bool GetTriedBoker() const {return m_tried_broker;} // Returns true if the connection broker has been tried.
168 void SetTriedBoker() {m_tried_broker = true;} // Note that the connection broker has been attempted.
169
170 // Returns whethe the OPTIONS call needs to be made before the operation is started.
171 bool virtual RequiresOptions() const {return false;}
172
173 // Invoked after the OPTIONS request is done and results are available
174 void virtual OptionsDone() {}
175
176 // Returns the URL that was used for the operation.
177 const std::string &GetUrl() const {return m_url;}
178
179 // Returns the response info for the operation
180 std::unique_ptr<ResponseInfo> GetResponseInfo();
181
182 // Returns true if the header timeout has expired.
183 //
184 // The "header timeout" fires if the remote service has not returned any
185 // headers or data within the specified time.
186 // If the header timeout has expired - and no error has already been set -
187 // the m_error will be set
188 bool HeaderTimeoutExpired(const std::chrono::steady_clock::time_point &now);
189
190 // Returns true if the operation timeout has expired.
191 //
192 // Some operations (HEAD, PROPFIND for open) return nearly no data and thus have
193 // no need for adaptive timeouts. Instead, we use a fixed timeout.
194 // If the header timeout has expired - and no error has already been set -
195 // the m_error will be set
196 bool OperationTimeoutExpired(const std::chrono::steady_clock::time_point &now);
197
198 // Returns true if the body timeout has expired.
199 //
200 // The "body timeout" fires if the remote service has not returned any
201 // data within the specified time.
202 // If the body timeout has expired - and no error has already been set -
203 // the m_error will be set
204 bool TransferStalled(uint64_t xfer_bytes, const std::chrono::steady_clock::time_point &now);
205
206 enum OpError {
207 ErrNone, // No error
208 ErrHeaderTimeout, // Header was not sent back in time
209 ErrCallback, // Error in the read/write callback (e.g., response too large for propfind)
210 ErrOperationTimeout, // Entire curl request operation has timed out
211 ErrTransferClientStall, // Transfer stalled while client had paused it (no data was available)
212 ErrTransferStall, // Transfer has stalled, not receiving any data within 60 seconds
213 ErrTransferSlow, // Average transfer rate is below the minimum
214 };
215
216 // Return the libcurl handle owned by this operation.
217 CURL *GetCurlHandle() const {return m_curl.get();}
218
219 // Return the error generated by the operation itself (separate from a curl error)
220 OpError GetError() const {return m_error;}
221
222 // Move response info to the caller.
223 std::unique_ptr<ResponseInfo> MoveResponseInfo() {return std::move(m_response_info);}
224
225 // Return the error generated by the callback (e.g., server has incorrect multipart framing)
226 std::pair<XErrorCode, std::string> GetCallbackError() const {return std::make_pair(m_callback_error_code, m_callback_error_str);}
227
228 // Returns the HTTP status code (-1 if the response has not been parsed)
229 int GetStatusCode() const {return m_headers.GetStatusCode();}
230
231 // Returns the HTTP status message (empty if the response has not been parsed)
232 std::string GetStatusMessage() const {return m_headers.GetStatusMessage();}
233
234 // Return true if the transfer is done
235 bool IsDone() const {return m_done;}
236
237 // Return true if the operation is paused in libcurl
238 bool IsPaused() const {return m_is_paused;}
239
240 // Returns true if the operation has been marked as failed.
241 bool HasFailed() const {return m_has_failed.load(std::memory_order_acquire);}
242
243 // Resets the statistics for the operation and returns a tuple of:
244 // - bytes transferred,
245 // - duration between operation start and header receipt.
246 // - duration between header receipt and now.
247 // - duration the operation has spent on pause in libcurl (waiting for client data)
248 // These numbers are reset to zero each time the `StatisticsReset` function is called.
249 std::tuple<uint64_t, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration> StatisticsReset();
250
251
252 std::string GetCurlErrorMessage() const {
253 if (m_curl_error_buffer[0] != '\0')
254 return m_curl_error_buffer;
255 return "";
256 }
257
258 // Sets the stall timeout for the operation in seconds.
259 static void SetStallTimeout(int stall_interval)
260 {
261 std::chrono::seconds seconds{stall_interval};
262 m_stall_interval = std::chrono::duration_cast<std::chrono::steady_clock::duration>(seconds);
263 }
264
265 // Sets the stall timeout for the operation
266 static void SetStallTimeout(const std::chrono::steady_clock::duration &stall_interval)
267 {
268 m_stall_interval = stall_interval;
269 }
270
271 // Gets the code's default stall timeout in seconds
273 {
274 return std::chrono::duration_cast<std::chrono::seconds>(m_default_stall_interval).count();
275 }
276
277 // Gets the code's default slow transfer rate
279 {
281 }
282
283 // Sets the slow transfer rate for transfer operations.
284 static void SetSlowRateBytesSec(int rate)
285 {
287 }
288
289protected:
290
291 // Update the count of bytes transferred
292 void UpdateBytes(uint64_t bytes) {m_bytes += bytes;}
293
294 // Set failure from a callback function.
295 // The Fail() function may invoke libcurl functions and hence cannot be invoked from a
296 // libcurl callback. This stores the failure in the object itself and the worker
297 // thread will invoke the `Fail()` after libcurl fails the handle.
298 int FailCallback(XErrorCode ecode, const std::string &emsg);
299
300 // Set the pause status
301 void SetPaused(bool paused);
302
303 // The default minimum transfer rate for the operation, in bytes / sec
304 static constexpr int m_default_minimum_rate{1024 * 256}; // 256 KB/sec
305
306 // The current global instance's minimum transfer rate for "transfer type"
307 // operations (GET, PUT). Defaults to the m_default_minimum_rate but can be
308 // overridden by configuration.
310
311 // The minimum transfer rate for this operation, in bytes / sec
313
314 // The expiration of the entire operation.
315 std::chrono::steady_clock::time_point m_operation_expiry;
316
317 // The expiration time for receiving the first header.
318 std::chrono::steady_clock::time_point m_header_expiry;
319
320 // Any additional headers to send with the request.
322
323private:
324 bool Header(const std::string &header);
325 static size_t HeaderCallback(char *buffer, size_t size, size_t nitems, void *data);
326
327 // Information about the responses received for this operation.
328 std::unique_ptr<ResponseInfo> m_response_info;
329
330 // The "stall time" for the body transfer.
331 // If the body transfer has not been updated in this time, the operation
332 // will be marked as expired.
333 //
334 // This is also used for the calculation of the interval of the EMA rate
335 static constexpr std::chrono::steady_clock::duration m_default_stall_interval{std::chrono::seconds(60)};
336 static std::chrono::steady_clock::duration m_stall_interval;
337
338 OpError m_error{ErrNone};
339 XErrorCode m_callback_error_code{kXR_noErrorYet}; // Stored error that occurred in a callback.
340 std::string m_callback_error_str; // Stored error message that occurred in a callback.
341 bool m_tried_broker{false};
342 bool m_received_header{false};
343 bool m_done{false};
344 std::atomic<bool> m_has_failed{false};
345 bool m_is_paused{false};
346 int m_conn_callout_result{-1}; // The result of the connection callout
347 int m_conn_callout_listener{-1}; // The listener socket for the connection callout
348 uint64_t m_bytes{0}; // Count of bytes transferred by operation since last StatisticsReset()
349 std::chrono::steady_clock::time_point m_last_reset{}; // Time of last StatisticsReset()
350 std::chrono::steady_clock::time_point m_last_header_reset{}; // Time of last StatisticsReset() for header statistics
351 std::chrono::steady_clock::time_point m_start_op{}; // Time when the entire operation was started.
352 std::chrono::steady_clock::time_point m_header_start{}; // Time when the first header was received.
353 std::chrono::steady_clock::time_point m_pause_start{}; // Time of the last pause start/reset
354 std::chrono::steady_clock::duration m_pause_duration{}; // Accumulated pause time since last statistics update.
355
356 // List of custom headers for the operation.
357 std::unique_ptr<struct curl_slist, void(*)(struct curl_slist *)> m_header_slist{nullptr, &curl_slist_free_all};
358
359 // The callout class for connection creation.
360 CreateConnCalloutType m_conn_callout{nullptr};
361
362 // The last time header data was received.
363 std::chrono::steady_clock::time_point m_header_lastop;
364
365 // The last time data was transferred.
366 std::chrono::steady_clock::time_point m_last_xfer;
367
368 // The last recorded number of bytes that had been transferred.
369 uint64_t m_last_xfer_count{0};
370
371 // The exponential moving average of the transfer rate
372 double m_ema_rate{-1.0};
373
374 // Detailed error message populated by libcurl via CURLOPT_ERRORBUFFER.
375 char m_curl_error_buffer[CURL_ERROR_SIZE]{};
376
377 // Object representing the state of the callout for a connected socket.
378 std::unique_ptr<ConnectionCallout> m_callout;
379 std::unique_ptr<XrdCl::URL> m_parsed_url{nullptr};
380
381 // A map of endpoints to IP addresses for the CURLOPT_CONNECT_TO option.
382 std::unique_ptr<struct curl_slist, void(*)(struct curl_slist *)> m_resolve_slist{nullptr, &curl_slist_free_all};
383
384 static curl_socket_t OpenSocketCallback(void *clientp, curlsocktype purpose, struct curl_sockaddr *address);
385 static int SockOptCallback(void *clientp, curl_socket_t curlfd, curlsocktype purpose);
386 static curl_socket_t CloseSocketCallback(void *clientp, curl_socket_t item);
387
388 // Periodic transfer info callback function invoked by curl; used for more fine-grained timeouts.
389 static int XferInfoCallback(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow);
390
391protected:
392 void SetDone(bool has_failed) {m_done = true; m_has_failed.store(has_failed, std::memory_order_release);}
393 const std::string m_url;
395 std::unique_ptr<CURL, void(*)(CURL *)> m_curl;
397 std::vector<std::pair<std::string, std::string>> m_headers_list;
399};
400
401// Query the remote service using the OPTIONS verb.
402//
403// This is used to determine the capabilities of the remote service,
404// such as whether it supports the PROPFIND verb.
405// Note this does not take an XrdCl::ResponseHandler callback but is meant to be
406// invoked directly by a libcurl worker which, based on the response, will use
407// it to invoke the original operation.
408class CurlOptionsOp final : public CurlOperation {
409public:
410 CurlOptionsOp(CURL *curl, std::shared_ptr<CurlOperation> op, const std::string &url,
411 XrdCl::Log *log, CreateConnCalloutType callout) :
412 CurlOperation(nullptr, url, op->GetHeaderExpiry(), log, callout, {}),
413 m_parent(op),
414 m_parent_curl(curl)
415 {
417 }
418
419 virtual ~CurlOptionsOp() {}
420
421 bool Setup(CURL *curl, CurlWorker &) override;
422 void Success() override;
423 void Fail(uint16_t errCode, uint32_t errNum, const std::string &) override;
424 void ReleaseHandle() override;
425
426 // Returns the parent operation that has been paused while waiting for the
427 // OPTIONS response.
428 std::shared_ptr<CurlOperation> GetOperation() const {return m_parent;}
429
430 // Returns the parent operation's curl handle that has been paused while
431 // waiting for the OPTIONS response.
432 CURL *GetParentCurlHandle() const {return m_parent_curl;}
433
434 virtual HttpVerb GetVerb() const override {return HttpVerb::OPTIONS;}
435
436private:
437 std::shared_ptr<CurlOperation> m_parent;
438 CURL *m_parent_curl{nullptr};
439};
440
441// An operation representing a `stat` operation.
442//
443// Queries the remote service and parses out the response to a `stat` buffer.
444// Depending on the remote service, this may be a HEAD or PROPFIND request.
445class CurlStatOp : public CurlOperation {
446public:
447 CurlStatOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout,
448 XrdCl::Log *log, bool response_info, CreateConnCalloutType callout, HeaderCallout *header_callout) :
449 CurlOperation(handler, url, timeout, log, callout, header_callout),
450 m_response_info(response_info)
451 {
453 }
454
455 virtual ~CurlStatOp() {}
456
457 bool Setup(CURL *curl, CurlWorker &) override;
458 void Success() override;
459 RedirectAction Redirect(std::string &target) override;
460 void ReleaseHandle() override;
461
462 bool virtual RequiresOptions() const override;
463 void virtual OptionsDone() override;
464
465 std::pair<int64_t, bool> GetStatInfo();
466
467 virtual HttpVerb GetVerb() const override {return m_is_propfind ? HttpVerb::PROPFIND : HttpVerb::HEAD;}
468
469protected:
470 // Mark the operation as a success and, as requested, return the stat info back
471 // to the object handler.
472 //
473 // Returning the info is optional as the CurlOpenOp derives from this clasa and
474 // if stat info is returned from an open without being requested then the
475 // object is leaked
476 void SuccessImpl(bool returnObj);
477
478private:
479 // Parse the properties element of a PROPFIND response.
480 std::pair<int64_t, bool> ParseProp(TiXmlElement *prop);
481 // Callback for writing the response body to the internal buffer.
482 static size_t WriteCallback(char *buffer, size_t size, size_t nitems, void *this_ptr);
483
484 // Whether the response info variant of the info object should be sent
485 bool m_response_info{false};
486 // Whether the stat request is made using the PROPFIND verb.
487 bool m_is_propfind{false};
488 // Whether the stat response indicated that the object is a directory.
489 bool m_is_dir{false};
490 std::string m_response; // Body of the response (if using PROPFIND)
491 int64_t m_length{-1}; // Length of the object from the response
492};
493
494class CurlOpenOp final : public CurlStatOp {
495public:
496 CurlOpenOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout,
497 XrdCl::Log *logger, XrdClHttp::File *file, bool response_info, CreateConnCalloutType callout,
498 HeaderCallout *header_callout);
499
500 virtual ~CurlOpenOp() {}
501
502 void ReleaseHandle() override;
503 void Success() override;
504
505 // Invoked to handle a failure-to-open (HEAD returns non-200)
506 //
507 // If the open operation is invoked for a file with the `New` flag set, this
508 // may be a success if the remote server returned a 404.
509 void Fail(uint16_t errCode, uint32_t errNum, const std::string &) override;
510
511private:
512 // Set various common properties after an open has completed.
513 //
514 // If `setSize` is set, then we'll set the file size as a file property.
515 // This is made optional because the open operation may succeed after a 404
516 // (if this was invoked by an open with O_CREAT set); in such a case, setting
517 // the size is nonsensical because the file doesn't exist.
518 void SetOpenProperties(bool setSize);
519
520 XrdClHttp::File *m_file{nullptr};
521};
522
523// Query the origin for a checksum via a HEAD request.
524//
525// Since the open op is a PROPFIND, we need a second operation for checksums.
526// We expect the checksum only is done after a successful transfer.
527class CurlChecksumOp final : public CurlStatOp {
528 public:
529 CurlChecksumOp(XrdCl::ResponseHandler *handler, const std::string &url, XrdClHttp::ChecksumType preferred,
530 struct timespec timeout, XrdCl::Log *logger,
531 bool response_info, CreateConnCalloutType callout, HeaderCallout *header_callout);
532
533 virtual ~CurlChecksumOp() {}
534
535 virtual HttpVerb GetVerb() const override {return HttpVerb::HEAD;}
536 virtual void OptionsDone() override;
537 bool Setup(CURL *curl, CurlWorker &) override;
538 void Success() override;
539 RedirectAction Redirect(std::string &target) override;
540 void ReleaseHandle() override;
541
542 private:
544 XrdClHttp::File *m_file{nullptr};
545 };
546
547// Operation issuing a DELETE request to the remote server.
548//
549class CurlDeleteOp final : public CurlOperation {
550public:
551 CurlDeleteOp(XrdCl::ResponseHandler *handler, const std::string &url,
552 struct timespec timeout, XrdCl::Log *logger,
553 bool response_info, CreateConnCalloutType callout,
554 HeaderCallout *header_callout);
555
556 virtual ~CurlDeleteOp();
557
558 bool Setup(CURL *curl, CurlWorker &) override;
559 void Success() override;
560 void ReleaseHandle() override;
561
562 virtual HttpVerb GetVerb() const override {return HttpVerb::DELETE;}
563
564private:
565 bool m_response_info{false}; // Indicate whether to give extended information in the response.
566};
567
568// Operation issuing a MKCOL request to the remote server.
569//
570// Creates a "directory" on the remote side
571//
572class CurlMkcolOp final : public CurlOperation {
573public:
574CurlMkcolOp(XrdCl::ResponseHandler *handler, const std::string &url,
575 struct timespec timeout, XrdCl::Log *logger,
576 bool response_info, CreateConnCalloutType callout,
577 HeaderCallout *header_callout);
578
579 virtual ~CurlMkcolOp();
580
581 void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override;
582 void ReleaseHandle() override;
583 bool Setup(CURL *curl, CurlWorker &) override;
584 void Success() override;
585
586 virtual HttpVerb GetVerb() const override {return HttpVerb::MKCOL;}
587
588private:
589 bool m_response_info{false}; // Indicate whether to give extended information in the response.
590};
591
592// Cache control query
593//
594class CurlQueryOp final : public CurlStatOp {
595public:
596 CurlQueryOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout,
597 XrdCl::Log *log, bool response_info, CreateConnCalloutType callout, int queryCode, HeaderCallout *header_callout) :
598 CurlStatOp(handler, url, timeout, log, response_info, callout, header_callout),
599 m_queryCode(queryCode)
600 {
601 }
602
603 virtual ~CurlQueryOp() {}
604
605 void Success() override;
606
608 std::string m_queryVal;
609};
610
611class CurlReadOp : public CurlOperation {
612public:
613 CurlReadOp(XrdCl::ResponseHandler *handler, std::shared_ptr<XrdCl::ResponseHandler> default_handler,
614 const std::string &url, struct timespec timeout, const std::pair<uint64_t, uint64_t> &op,
615 char *buffer, size_t sz, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout);
616
617 virtual ~CurlReadOp() {}
618
619 // Start continuation of a previously-started operation with additional data.
620 bool Continue(std::shared_ptr<CurlOperation> op, XrdCl::ResponseHandler *handler, char *buffer, size_t buffer_size);
621
622 // Make state changes necessary to the curl handle for it to unpause.
623 bool ContinueHandle() override;
624
625 // Pause the GET operation; indicates the current buffer was sent successfully
626 // but the operation is not yet complete. Will invoke the current callback.
627 virtual void Pause();
628
629 bool Setup(CURL *curl, CurlWorker &) override;
630 void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override;
631 void Success() override;
632 void ReleaseHandle() override;
633
634 virtual void SetContinueQueue(std::shared_ptr<XrdClHttp::HandlerQueue> queue) override {
635 m_continue_queue = queue;
636 }
637
638 virtual HttpVerb GetVerb() const override {return HttpVerb::GET;}
639
640
641private:
642 // Deliver the current buffer to the response handler and reset internal buffer state.
643 void DeliverResponse();
644
645 static size_t WriteCallback(char *buffer, size_t size, size_t nitems, void *this_ptr);
646 size_t Write(char *buffer, size_t size);
647
648 // Extra response data from curl that overflowed the last buffer
649 //
650 // libcurl's callback is "all or nothing": you cannot accept part of a buffer
651 // then pause the operation until the user provides a new buffer. Hence, we keep
652 // this as the "overflow" buffer; next time Continue() is called, we will process
653 // this data first.
654 std::string m_prefetch_buffer;
655
656 // Offset into m_prefetch_buffer pointing at the first byte of unconsumed data.
657 size_t m_prefetch_buffer_offset{0};
658
659 // Offset into the object, for the current Continue() call, relative to m_op.first
660 off_t m_prefetch_object_offset{0};
661
662 // Default callback handler; used when the HTTP operation times out while there
663 // is no ongoing CurlFile read operation.
664 std::shared_ptr<XrdCl::ResponseHandler> m_default_handler;
665
666protected:
667 std::pair<uint64_t, uint64_t> m_op;
668 uint64_t m_written{0}; // Bytes written into the current client-provided buffer
669 char *m_buffer{nullptr}; // Buffer passed by XrdCl; we do not own it.
670 size_t m_buffer_size{0}; // Size of the provided buffer
671
672 // When the read fails, the body of the response will be copied
673 // here instead of invoking the callback.
674 std::string m_err_msg;
675
676 // Reference to the continue queue to use when the operation should be resumed.
677 std::shared_ptr<XrdClHttp::HandlerQueue> m_continue_queue;
678};
679
680// Open operation that is actually an entire-object GET
682public:
683 CurlPrefetchOpenOp(XrdClHttp::File &file, XrdCl::ResponseHandler *handler, std::shared_ptr<XrdCl::ResponseHandler> default_handler,
684 const std::string &url, struct timespec timeout, const std::pair<uint64_t, uint64_t> &op,
685 char *buffer, size_t sz, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
686 : CurlReadOp(handler, default_handler, url, timeout, op, buffer, sz, logger, callout, header_callout), m_file(file)
687 {}
688
689 // Special handling of the first "Pause" operation after the read
690 // has started. Do the correct invocation of success or failure.
691 virtual void Pause() override;
692
693private:
694 bool m_first_pause{true};
695 XrdClHttp::File &m_file;
696};
697
699 public:
700
701 CurlVectorReadOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout,
702 const XrdCl::ChunkList &op_list, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout);
703
704 virtual ~CurlVectorReadOp() {}
705
706 bool Setup(CURL *curl, CurlWorker &) override;
707 void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override;
708 void Success() override;
709 void ReleaseHandle() override;
710
711 // Set the expected separator between parts of a response;
712 // not expected to be used externally except by unit tests.
713 void SetSeparator(const std::string &sep) {
714 m_headers.SetMultipartSeparator(sep);
715 }
716
717 // Set the status code for the operation
718 void SetStatusCode(int sc) {m_headers.SetStatusCode(sc);}
719
720 // Invoke the write callback for the vector read.
721 //
722 // Note: made public to help unit testing of the class; not intended for direct invocation.
723 size_t Write(char *buffer, size_t size);
724
725 virtual HttpVerb GetVerb() const override {return HttpVerb::GET;}
726
727 private:
728 static size_t WriteCallback(char *buffer, size_t size, size_t nitems, void *this_ptr);
729
730 // Calculate the next request buffer the current response buffer will service.
731 // Sets the m_response_idx and m_skip_bytes
732 void CalculateNextBuffer();
733
734 protected:
735 size_t m_response_idx{0}; // The offset in the m_chunk_list which the current response chunk will write into.
736 off_t m_chunk_buffer_idx{0}; // Current offset in requested chunk where we are writing bytes.
737 off_t m_bytes_consumed{0}; // Total number of bytes used for results serving the request.
738 uint64_t m_skip_bytes{0}; // Count of bytes to skip in the next response (if response chunk contains unneeded bytes).
739 std::string m_response_headers; // Buffer of an incomplete response line from a prior curl write operation.
740 std::pair<off_t, off_t> m_current_op{-1, -1}; // The (offset, length) of the current response chunk.
741 std::unique_ptr<XrdCl::VectorReadInfo> m_vr; // The response buffers for the client.
742 XrdCl::ChunkList m_chunk_list; // The requested chunks from the client.
743};
744
745class CurlPgReadOp final : public CurlReadOp {
746public:
747 CurlPgReadOp(XrdCl::ResponseHandler *handler, std::shared_ptr<XrdCl::ResponseHandler> default_handler,
748 const std::string &url, struct timespec timeout, const std::pair<uint64_t, uint64_t> &op,
749 char *buffer, size_t buffer_size, XrdCl::Log *logger, CreateConnCalloutType callout,
750 HeaderCallout *header_callout)
751 :
752 CurlReadOp(handler, default_handler, url, timeout, op, buffer, buffer_size, logger, callout, header_callout)
753 {}
754
755 virtual ~CurlPgReadOp() {}
756
757 void Success() override;
758
759 virtual HttpVerb GetVerb() const override {return HttpVerb::GET;}
760
761};
762
763class CurlListdirOp final : public CurlOperation {
764public:
765 CurlListdirOp(XrdCl::ResponseHandler *handler, const std::string &url, const std::string &host_addr, bool response_info,
766 struct timespec timeout, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout);
767
768 virtual ~CurlListdirOp() {}
769
770 bool Setup(CURL *curl, CurlWorker &) override;
771 void Success() override;
772 void ReleaseHandle() override;
773
774 virtual HttpVerb GetVerb() const override {return HttpVerb::PROPFIND;}
775
776private:
777 struct DavEntry {
778 std::string m_name;
779 bool m_isdir{false};
780 bool m_isexec{false};
781 int64_t m_size{-1};
782 time_t m_lastmodified{-1};
783 };
784 // Parses the properties element of a PROPFIND response into a DavEntry object
785 //
786 // - prop: The properties element to parse
787 // - Returns: A pair containing the DavEntry object and a boolean indicating success or not
788 bool ParseProp(DavEntry &entry, TiXmlElement *prop);
789
790 // Indicate whether the operation should use the extended "response info" object in response
791 const bool m_response_info{false};
792
793 // Parses the response element of a PROPFIND
794 std::pair<DavEntry, bool> ParseResponse(TiXmlElement *response);
795
796 // Callback for writing the response body to the internal buffer.
797 static size_t WriteCallback(char *buffer, size_t size, size_t nitems, void *this_ptr);
798
799 // Whether the provided URL is an origin URL (and hence PROPFIND can be done directly).
800 bool m_is_origin{false};
801
802 // Response body from the PROPFIND request.
803 std::string m_response;
804
805 // Host address (hostname:port) of the data federation
806 std::string m_host_addr;
807};
808
809// A third-party-copy operation
810//
811// Invoke the COPY verb to move a file between two HTTP endpoints.
812class CurlCopyOp final : public CurlOperation {
813public:
814 using Headers = std::vector<std::pair<std::string, std::string>>;
815
816 CurlCopyOp(XrdCl::ResponseHandler *handler, const std::string &source_url, const Headers &source_hdrs, const std::string &dest_url, const Headers &dest_hdrs, struct timespec timeout,
817 XrdCl::Log *logger, CreateConnCalloutType callout);
818
819 virtual ~CurlCopyOp() {}
820
821 bool Setup(CURL *curl, CurlWorker &) override;
822 void Success() override;
823 void ReleaseHandle() override;
824
826 public:
828 virtual void Progress(off_t bytemark) = 0;
829 };
830
831 void SetCallback(std::unique_ptr<CurlProgressCallback> callback);
832
833 virtual HttpVerb GetVerb() const override {return HttpVerb::COPY;}
834
835private:
836 // Callback for writing the response body to the internal buffer.
837 static size_t WriteCallback(char *buffer, size_t size, size_t nitems, void *this_ptr);
838
839 // Handle a line of information in the control channel.
840 void HandleLine(std::string_view line);
841
842 // Returns true if the control channel has not gotten data recently enough.
843 bool ControlChannelTimeoutExpired() const;
844
845 // Source of the TPC transfer
846 std::string m_source_url;
847
848 // Buffer of current response line
849 std::string m_line_buffer;
850
851 // A callback object for when a performance marker is received
852 std::unique_ptr<CurlProgressCallback> m_callback;
853
854 // The performance marker indication of bytes processed.
855 off_t m_bytemark{-1};
856
857 // Whether the COPY operation indicated a success status in the control channel:
858 bool m_sent_success{false};
859
860 // Failure string sent back in the control channel:
861 std::string m_failure;
862};
863
864// An upload operation
865//
866// Invoke a PUT on the remote HTTP server; assumes that Writes are done
867// in a single-stream
868class CurlPutOp final : public CurlOperation {
869public:
870 CurlPutOp(XrdCl::ResponseHandler *handler, std::shared_ptr<XrdCl::ResponseHandler> default_handler,
871 const std::string &url, const char *buffer, size_t buffer_size,
872 struct timespec timeout, XrdCl::Log *logger, CreateConnCalloutType callout,
873 HeaderCallout *header_callout);
874 CurlPutOp(XrdCl::ResponseHandler *handler, std::shared_ptr<XrdCl::ResponseHandler> default_handler,
875 const std::string &url, XrdCl::Buffer &&buffer,
876 struct timespec timeout, XrdCl::Log *logger, CreateConnCalloutType callout,
877 HeaderCallout *header_callout);
878
879 virtual ~CurlPutOp() {}
880
881 void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override;
882 bool Setup(CURL *curl, CurlWorker &) override;
883 void Success() override;
884 void ReleaseHandle() override;
885 bool ContinueHandle() override;
886
887 virtual void SetContinueQueue(std::shared_ptr<XrdClHttp::HandlerQueue> queue) override {
888 m_continue_queue = queue;
889 }
890
891 // Start continuation of a previously-started operation with additional data.
892 //
893 // Since the CurlPutOp itself is kept as a reference-counted pointer by the
894 // XrdClHttp::File handle, we need to pass a shared pointer to the continue queue.
895 // Hence the awkward interface of needing to be provided a shared pointer to oneself.
896 bool Continue(std::shared_ptr<CurlOperation> op, XrdCl::ResponseHandler *handler, const char *buffer, size_t buffer_size);
897 bool Continue(std::shared_ptr<CurlOperation> op, XrdCl::ResponseHandler *handler, XrdCl::Buffer &&buffer);
898
899 // Pause the put operation; indicates the current buffer was sent successfully
900 // but the operation is not yet complete.
901 void Pause();
902
903 virtual HttpVerb GetVerb() const override {return HttpVerb::PUT;}
904
905private:
906
907 // Callback function for libcurl when it would like to read data from m_data
908 // (and write it to the remote socket).
909 static size_t ReadCallback(char *buffer, size_t size, size_t n, void *v);
910
911 // Handle that represents the current operation to libcurl
912 CURL *m_curl_handle{nullptr};
913
914 // Reference to the continue queue to use when the operation should be resumed.
915 std::shared_ptr<XrdClHttp::HandlerQueue> m_continue_queue;
916
917 // The buffer of data to upload (if the CurlPutOp owns the buffer).
918 XrdCl::Buffer m_owned_buffer;
919
920 // The non-owned view of the data to upload.
921 // This may reference m_owned_buffer or an externally-owned `const char *`.
922 std::string_view m_data;
923
924 // The default handler to invoke if an File::Write operation is not pending.
925 // Typically used for timeouts/errors on the PUT operation between client
926 // writes.
927 std::shared_ptr<XrdCl::ResponseHandler> m_default_handler;
928
929 // File pointer offset
930 off_t m_offset{0};
931
932 // The final size of the object to be uploaded; -1 if not known
933 off_t m_object_size{-1};
934
935 bool m_final{false};
936};
937
938} // namespace XrdClHttp
939
940#endif // XRDCLHTTP_CURLOPS_HH
XErrorCode
@ kXR_noErrorYet
void CURL
int emsg(int rc, char *msg)
virtual void OptionsDone() override
RedirectAction Redirect(std::string &target) override
CurlChecksumOp(XrdCl::ResponseHandler *handler, const std::string &url, XrdClHttp::ChecksumType preferred, struct timespec timeout, XrdCl::Log *logger, bool response_info, CreateConnCalloutType callout, HeaderCallout *header_callout)
virtual HttpVerb GetVerb() const override
bool Setup(CURL *curl, CurlWorker &) override
virtual void Progress(off_t bytemark)=0
void Success() override
virtual HttpVerb GetVerb() const override
std::vector< std::pair< std::string, std::string > > Headers
void ReleaseHandle() override
CurlCopyOp(XrdCl::ResponseHandler *handler, const std::string &source_url, const Headers &source_hdrs, const std::string &dest_url, const Headers &dest_hdrs, struct timespec timeout, XrdCl::Log *logger, CreateConnCalloutType callout)
bool Setup(CURL *curl, CurlWorker &) override
void SetCallback(std::unique_ptr< CurlProgressCallback > callback)
bool Setup(CURL *curl, CurlWorker &) override
CurlDeleteOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, XrdCl::Log *logger, bool response_info, CreateConnCalloutType callout, HeaderCallout *header_callout)
virtual HttpVerb GetVerb() const override
bool Setup(CURL *curl, CurlWorker &) override
CurlListdirOp(XrdCl::ResponseHandler *handler, const std::string &url, const std::string &host_addr, bool response_info, struct timespec timeout, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
virtual HttpVerb GetVerb() const override
void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override
bool Setup(CURL *curl, CurlWorker &) override
virtual HttpVerb GetVerb() const override
void ReleaseHandle() override
CurlMkcolOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, XrdCl::Log *logger, bool response_info, CreateConnCalloutType callout, HeaderCallout *header_callout)
void ReleaseHandle() override
void Success() override
void Fail(uint16_t errCode, uint32_t errNum, const std::string &) override
CurlOpenOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, XrdCl::Log *logger, XrdClHttp::File *file, bool response_info, CreateConnCalloutType callout, HeaderCallout *header_callout)
virtual void Success()=0
void SetDone(bool has_failed)
static void SetStallTimeout(const std::chrono::steady_clock::duration &stall_interval)
int FailCallback(XErrorCode ecode, const std::string &emsg)
std::chrono::steady_clock::time_point GetHeaderExpiry() const
bool FinishSetup(CURL *curl)
std::unique_ptr< ResponseInfo > MoveResponseInfo()
const std::string m_url
std::chrono::steady_clock::time_point m_header_expiry
const std::string & GetUrl() const
CURL * GetCurlHandle() const
std::unique_ptr< CURL, void(*)(CURL *)> m_curl
bool TransferStalled(uint64_t xfer_bytes, const std::chrono::steady_clock::time_point &now)
static const std::string GetVerbString(HttpVerb)
virtual HttpVerb GetVerb() const =0
std::string GetCurlErrorMessage() const
virtual void ReleaseHandle()
void UpdateBytes(uint64_t bytes)
virtual bool RequiresOptions() const
static void CleanupDnsCache()
std::chrono::steady_clock::time_point GetOperationExpiry()
std::tuple< uint64_t, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration, std::chrono::steady_clock::duration > StatisticsReset()
virtual bool ContinueHandle()
std::string GetStatusMessage() const
static constexpr int m_default_minimum_rate
static void SetSlowRateBytesSec(int rate)
static void SetStallTimeout(int stall_interval)
CreateConnCalloutType GetConnCalloutFunc() const
std::vector< std::pair< std::string, std::string > > HeaderList
std::vector< std::pair< std::string, std::string > > m_headers_list
HeaderCallout * m_header_callout
static int GetDefaultSlowRateBytesSec()
bool HeaderTimeoutExpired(const std::chrono::steady_clock::time_point &now)
virtual int WaitSocketCallback(std::string &err)
std::unique_ptr< ResponseInfo > GetResponseInfo()
std::chrono::steady_clock::time_point m_operation_expiry
virtual void Fail(uint16_t errCode, uint32_t errNum, const std::string &)
virtual RedirectAction Redirect(std::string &target)
XrdCl::ResponseHandler * m_handler
CurlOperation(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, XrdCl::Log *log, CreateConnCalloutType, HeaderCallout *header_callout)
static int GetDefaultStallTimeout()
void SetPaused(bool paused)
virtual void SetContinueQueue(std::shared_ptr< XrdClHttp::HandlerQueue > queue)
bool StartConnectionCallout(std::string &err)
bool OperationTimeoutExpired(const std::chrono::steady_clock::time_point &now)
CurlOperation(const CurlOperation &)=delete
virtual bool Setup(CURL *curl, CurlWorker &)
std::pair< XErrorCode, std::string > GetCallbackError() const
CurlOptionsOp(CURL *curl, std::shared_ptr< CurlOperation > op, const std::string &url, XrdCl::Log *log, CreateConnCalloutType callout)
virtual HttpVerb GetVerb() const override
std::shared_ptr< CurlOperation > GetOperation() const
CURL * GetParentCurlHandle() const
bool Setup(CURL *curl, CurlWorker &) override
void Fail(uint16_t errCode, uint32_t errNum, const std::string &) override
virtual HttpVerb GetVerb() const override
CurlPgReadOp(XrdCl::ResponseHandler *handler, std::shared_ptr< XrdCl::ResponseHandler > default_handler, const std::string &url, struct timespec timeout, const std::pair< uint64_t, uint64_t > &op, char *buffer, size_t buffer_size, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
CurlPrefetchOpenOp(XrdClHttp::File &file, XrdCl::ResponseHandler *handler, std::shared_ptr< XrdCl::ResponseHandler > default_handler, const std::string &url, struct timespec timeout, const std::pair< uint64_t, uint64_t > &op, char *buffer, size_t sz, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
virtual void Pause() override
bool ContinueHandle() override
void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override
bool Setup(CURL *curl, CurlWorker &) override
bool Continue(std::shared_ptr< CurlOperation > op, XrdCl::ResponseHandler *handler, const char *buffer, size_t buffer_size)
virtual HttpVerb GetVerb() const override
virtual void SetContinueQueue(std::shared_ptr< XrdClHttp::HandlerQueue > queue) override
CurlPutOp(XrdCl::ResponseHandler *handler, std::shared_ptr< XrdCl::ResponseHandler > default_handler, const std::string &url, const char *buffer, size_t buffer_size, struct timespec timeout, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
void Success() override
void ReleaseHandle() override
CurlQueryOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, XrdCl::Log *log, bool response_info, CreateConnCalloutType callout, int queryCode, HeaderCallout *header_callout)
void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override
std::pair< uint64_t, uint64_t > m_op
bool Setup(CURL *curl, CurlWorker &) override
std::shared_ptr< XrdClHttp::HandlerQueue > m_continue_queue
bool ContinueHandle() override
CurlReadOp(XrdCl::ResponseHandler *handler, std::shared_ptr< XrdCl::ResponseHandler > default_handler, const std::string &url, struct timespec timeout, const std::pair< uint64_t, uint64_t > &op, char *buffer, size_t sz, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
virtual void SetContinueQueue(std::shared_ptr< XrdClHttp::HandlerQueue > queue) override
virtual HttpVerb GetVerb() const override
bool Continue(std::shared_ptr< CurlOperation > op, XrdCl::ResponseHandler *handler, char *buffer, size_t buffer_size)
void ReleaseHandle() override
void ReleaseHandle() override
void SuccessImpl(bool returnObj)
virtual bool RequiresOptions() const override
bool Setup(CURL *curl, CurlWorker &) override
RedirectAction Redirect(std::string &target) override
std::pair< int64_t, bool > GetStatInfo()
CurlStatOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, XrdCl::Log *log, bool response_info, CreateConnCalloutType callout, HeaderCallout *header_callout)
virtual HttpVerb GetVerb() const override
virtual void OptionsDone() override
size_t Write(char *buffer, size_t size)
CurlVectorReadOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, const XrdCl::ChunkList &op_list, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
void SetSeparator(const std::string &sep)
XrdCl::ChunkList m_chunk_list
virtual HttpVerb GetVerb() const override
std::pair< off_t, off_t > m_current_op
std::unique_ptr< XrdCl::VectorReadInfo > m_vr
bool Setup(CURL *curl, CurlWorker &) override
void Fail(uint16_t errCode, uint32_t errNum, const std::string &msg) override
Binary blob representation.
Handle diagnostics.
Definition XrdClLog.hh:101
Handle an async response.
URL representation.
Definition XrdClURL.hh:31
ConnectionCallout *(*)(const std::string &, const ResponseInfo &) CreateConnCalloutType
std::vector< ChunkInfo > ChunkList
List of chunks.