XRootD
Loading...
Searching...
No Matches
XrdClHttpOpReadV.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* Copyright (C) 2025, Pelican Project, Morgridge Institute for Research */
3/* */
4/* This file is part of the XrdClHttp client plugin for XRootD. */
5/* */
6/* XRootD is free software: you can redistribute it and/or modify it under */
7/* the terms of the GNU Lesser General Public License as published by the */
8/* Free Software Foundation, either version 3 of the License, or (at your */
9/* option) any later version. */
10/* */
11/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
12/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
13/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
14/* License for more details. */
15/* */
16/* The copyright holder's institutional names and contributor's names may not */
17/* be used to endorse or promote products derived from this software without */
18/* specific prior written permission of the institution or contributor. */
19/******************************************************************************/
20
21#include "XrdClHttpOps.hh"
22
23#include <XrdCl/XrdClLog.hh>
25#include <XrdOuc/XrdOucCRC.hh>
27
28using namespace XrdClHttp;
29
30CurlVectorReadOp::CurlVectorReadOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout,
31 const XrdCl::ChunkList &op_list, XrdCl::Log *logger, CreateConnCalloutType callout,
32 HeaderCallout *header_callout) :
33 CurlOperation(handler, url, timeout, logger, callout, header_callout),
34 m_vr(new XrdCl::VectorReadInfo()),
35 m_chunk_list(op_list)
36 {}
37
38bool
40{
41 if (!CurlOperation::Setup(curl, worker)) return false;
42 curl_easy_setopt(m_curl.get(), CURLOPT_WRITEFUNCTION, CurlVectorReadOp::WriteCallback);
43 curl_easy_setopt(m_curl.get(), CURLOPT_WRITEDATA, this);
44
45 std::stringstream ss;
46 auto multiple = false;
47 for (const auto &chunk : m_chunk_list) {
48 if (!chunk.GetLength()) continue;
49 if (multiple) {ss << ",";}
50 ss << chunk.GetOffset() << "-" << chunk.GetOffset() + chunk.GetLength() - 1;
51 multiple = true;
52 }
53 auto byte_range_val = ss.str();
54 if (byte_range_val.size()) {
55 m_headers_list.emplace_back("Range", "bytes=" + byte_range_val);
56 }
57 return true;
58}
59
60void
61CurlVectorReadOp::Fail(uint16_t errCode, uint32_t errNum, const std::string &msg)
62{
63 std::string custom_msg = msg;
64 SetDone(true);
65 if (m_handler == nullptr) {return;}
66 std::string offset = "(unknown)";
67 std::string length = "(unknown)";
68 if (!m_chunk_list.empty()) {
69 offset = std::to_string(m_chunk_list[0].GetOffset());
70 length = std::to_string(m_chunk_list[0].GetLength());
71 }
72 if (!custom_msg.empty()) {
73 m_logger->Debug(kLogXrdClHttp, "curl operation with vector starting offset %s / length %s failed with message: %s", offset.c_str(), length.c_str(), custom_msg.c_str());
74 custom_msg += " (vector read operation starting at offset " + offset + " / length " + length + ")";
75 } else {
76 m_logger->Debug(kLogXrdClHttp, "curl vector operation starting at offset %s / length %s failed with status code %d", offset.c_str(), length.c_str(), errNum);
77 }
78 auto status = new XrdCl::XRootDStatus(XrdCl::stError, errCode, errNum, custom_msg);
79 auto handle = m_handler;
80 m_handler = nullptr;
81 handle->HandleResponse(status, nullptr);
82}
83
84void
86{
87 SetDone(false);
88 if (m_handler == nullptr) {return;}
89
90 // If there's a partial last response, give it to the client.
92 auto &chunk = m_chunk_list[m_response_idx];
93 m_vr->GetChunks().emplace_back(chunk.GetOffset(), m_chunk_buffer_idx, chunk.GetBuffer());
95 }
96
97 auto status = new XrdCl::XRootDStatus();
98 m_vr->SetSize(m_bytes_consumed);
99 auto obj = new XrdCl::AnyObject();
100 obj->Set(m_vr.release());
101 auto handle = m_handler;
102 m_handler = nullptr;
103 handle->HandleResponse(status, obj);
104}
105
106void
108{
109 if (m_curl == nullptr) return;
110 curl_easy_setopt(m_curl.get(), CURLOPT_WRITEFUNCTION, nullptr);
111 curl_easy_setopt(m_curl.get(), CURLOPT_WRITEDATA, nullptr);
112 curl_easy_setopt(m_curl.get(), CURLOPT_HTTPHEADER, nullptr);
113 curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETFUNCTION, nullptr);
114 curl_easy_setopt(m_curl.get(), CURLOPT_OPENSOCKETDATA, nullptr);
115 curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTFUNCTION, nullptr);
116 curl_easy_setopt(m_curl.get(), CURLOPT_SOCKOPTDATA, nullptr);
118}
119
120size_t
121CurlVectorReadOp::WriteCallback(char *buffer, size_t size, size_t nitems, void *this_ptr)
122{
123 return static_cast<CurlVectorReadOp*>(this_ptr)->Write(buffer, size * nitems);
124}
125
126// Given a buffer of data from curl, parse it and write it to the response buffers.
127size_t
128CurlVectorReadOp::Write(char *orig_buffer, size_t orig_length)
129{
130 UpdateBytes(orig_length);
131 //m_logger->Debug(kLogXrdClHttp, "Received a write of size %ld with contents:\n%s", static_cast<long>(orig_length), std::string(orig_buffer, orig_length).c_str());
132
133 // Handle the (hopefully uncommon) cases where the server responds to a vector read op
134 // with a single response. We set the length of the response to the max as we
135 // don't care how many bytes the server actually sends.
136 if (GetStatusCode() == 200) {
137 m_current_op.first = 0;
138 m_current_op.second = std::numeric_limits<off_t>::max();
139 } else if (HTTPStatusIsError(GetStatusCode())) {
140 return orig_length;
141 } else if (!m_headers.IsMultipartByterange()) {
142 m_current_op.first = m_headers.GetOffset();
143 m_current_op.second = std::numeric_limits<off_t>::max();
144 }
145
146 auto buffer = orig_buffer;
147 auto length = orig_length;
148
149 while (length) {
150 // If we're in the middle of a response chunk, copy as much data as possible.
151 if (m_current_op.first != -1 && m_current_op.second != -1) {
152 //m_logger->Debug(kLogXrdClHttp, "Processing response buffer of (%lld, %lld)", static_cast<long long>(m_current_op.first), static_cast<long long>(m_current_op.second));
153 if (m_skip_bytes) {
154 //m_logger->Debug(kLogXrdClHttp, "Skipping %lld bytes", static_cast<long long>(m_skip_bytes));
155 auto to_skip = (m_skip_bytes < length) ? m_skip_bytes : length;
156 buffer += to_skip;
157 length -= to_skip;
158 m_skip_bytes -= to_skip;
159 continue;
160 } else {
161 auto &chunk = m_chunk_list[m_response_idx];
162 auto remaining = static_cast<off_t>(chunk.GetLength()) - m_chunk_buffer_idx;
163 if (remaining < 0) {
164 return FailCallback(kXR_ServerError, "Invalid chunk framing");
165 }
166 auto to_copy = (static_cast<size_t>(remaining) < length) ? static_cast<size_t>(remaining) : length;
167 //m_logger->Debug(kLogXrdClHttp, "Copying %lld bytes to request buffer %ld at offset %lld", static_cast<long long>(to_copy), m_response_idx, static_cast<long long>(m_chunk_buffer_idx));
168 memcpy(static_cast<char *>(chunk.GetBuffer()) + m_chunk_buffer_idx, buffer, to_copy);
169 m_chunk_buffer_idx += to_copy;
170 buffer += to_copy;
171 length -= to_copy;
172 // Handle cases where the requested or response chunk is complete
173 if (chunk.GetLength() == m_chunk_buffer_idx) {
174 m_vr->GetChunks().emplace_back(chunk.GetOffset(), m_chunk_buffer_idx, chunk.GetBuffer());
178 if (m_current_op.second == chunk.GetLength()) {
179 m_current_op.first = m_current_op.second = -1;
180 } else {
181 // We may need to skip the remaining bytes or, potentially, the server
182 // coalesced two adjacent requests into one larger response.
183 m_current_op.first += chunk.GetLength();
184 m_current_op.second -= chunk.GetLength();
185 CalculateNextBuffer();
186 continue;
187 }
188 } else if (m_current_op.second == m_chunk_buffer_idx) {
189 // There are no more bytes in the response but the requested chunk hasn't finished.
190 // Add what we have to the results and create a new chunk on the request list from the remainder; perhaps
191 // the server will send it in the future.
192 m_chunk_list.emplace_back(chunk.GetOffset() + m_chunk_buffer_idx, chunk.GetLength() - m_chunk_buffer_idx, static_cast<char*>(chunk.GetBuffer()) + m_chunk_buffer_idx);
193 m_vr->GetChunks().emplace_back(chunk.GetOffset(), m_chunk_buffer_idx, chunk.GetBuffer());
196 m_current_op.first = m_current_op.second = -1;
198 }
199 }
200 }
201 if (m_skip_bytes) {
202 continue;
203 }
204
205 // We are at the boundary between chunks; we must parse header lines to understand the
206 // next thing to do.
207
208 // The following lambda function returns a string view to the next complete header line,
209 // potentially partially from the previous buffer from curl. If the second item in
210 // the returned pair is false, then we ran out of buffer from curl before finding a
211 // complete line.
212 auto get_next_line = [&]() {
213 std::string_view chunk_header(buffer, length);
214 auto pos = chunk_header.find("\r\n");
215 if (pos == std::string_view::npos) {
216 m_response_headers += chunk_header;
217 length = 0;
218 return std::make_pair(std::string_view(), false);
219 } else {
220 auto line = chunk_header.substr(0, pos);
221 if (!m_response_headers.empty()) {
222 m_response_headers += line;
223 line = m_response_headers;
224 }
225 buffer += pos + 2;
226 length -= pos + 2;
227 return std::make_pair(line, true);
228 }
229 };
230
231 // Consume the boundary line.
232 bool last_segment = false;
233 while (true) {
234 auto [line, ok] = get_next_line();
235 if (!ok) {
236 return orig_length;
237 }
238 // Per RFC7233, Appendix A, Implementation note 1, multiple CRLF might precede the
239 // first boundary string in the body. However, the XRootD server appears to have an
240 // extra CRLF in front of every boundary string.
241 if (line.empty()) {continue;}
242 if (line == m_headers.MultipartSeparator()) {
243 break;
244 }
245 if (line == m_headers.MultipartSeparator() + "--") {
246 last_segment = true;
247 break;
248 }
249 std::stringstream ss;
250 ss << "Server has responded with an invalid boundary line: '" << line << "' (expected '" << m_headers.MultipartSeparator() << "')";
251 return FailCallback(kXR_ServerError, ss.str());
252 }
253 if (last_segment) {
254 length = 0;
255 break;
256 }
257 // Consume the header lines
258 while (true) {
259 auto [line, ok] = get_next_line();
260 if (!ok) {
261 return orig_length;
262 }
263 if (line.empty()) {
264 break;
265 }
266 auto header_name_end = line.find(':');
267 if (header_name_end == std::string_view::npos) {
268 std::stringstream ss; ss << "Invalid header line in response from server: " << line;
269 return FailCallback(kXR_ServerError, ss.str());
270 }
271 auto header_name = line.substr(0, header_name_end);
272 // Cannot use strcasecmp here as a string_view's data is not necessarily nul-terminated.
273 // len("content-type") == 13
274 if (header_name.size() != 13 || strncasecmp(header_name.data(), "content-range", 13)) {
275 continue;
276 }
277 // We are parsing a Content-Range value.
278 // Example: Content-Range: bytes 7000-7999/8000
279 auto value = line.substr(header_name_end + 1);
280
281 // Advance whitespace
282 while (!value.empty() && value[0] == ' ') {
283 value = value.substr(1);
284 }
285
286 if (value.substr(0, 5) != "bytes") {
287 std::stringstream ss; ss << "Invalid Content-Range value (no 'bytes' unit): " << value;
288 return FailCallback(kXR_ServerError, ss.str());
289 }
290
291 value = value.substr(5);
292 while (!value.empty() && value[0] == ' ') {
293 value = value.substr(1);
294 }
295
296 // Example: 500-999/8000
297 size_t count;
298 long long bytes_val;
299 try {
300 // It may seem strange to see the string_view data being passed to std::stoll here
301 // as it's not guaranteed to be null-terminated. However, by this point, we do know
302 // there's a CRLF in the buffer -- that is sufficient to guarantee the stoll search
303 // terminates before it goes out-of-bounds.
304 bytes_val = std::stoll(value.data(), &count);
305 } catch (std::invalid_argument &) {
306 std::stringstream ss; ss << "Invalid Content-Range value (no integer in range start): " << value;
307 return FailCallback(kXR_ServerError, ss.str());
308 } catch (std::out_of_range &) {
309 std::stringstream ss; ss << "Invalid Content-Range value (out of range): " << value;
310 return FailCallback(kXR_ServerError, ss.str());
311 }
312 if (value.size() <= count || value[count] != '-') {
313 std::stringstream ss; ss << "Invalid Content-Range value (no dash in range): " << value;
314 return FailCallback(kXR_ServerError, ss.str());
315 }
316 m_current_op.first = bytes_val;
317 value = value.substr(count + 1);
318 try {
319 bytes_val = std::stoll(value.data(), &count);
320 } catch (std::invalid_argument &) {
321 std::stringstream ss; ss << "Invalid Content-Range value (no integer in range end): " << value;
322 return FailCallback(kXR_ServerError, ss.str());
323 } catch (std::out_of_range &) {
324 std::stringstream ss; ss << "Invalid Content-Range value (out of range in range end): " << value;
325 return FailCallback(kXR_ServerError, ss.str());
326 }
327 if (value.size() <= count || value[count] != '/') {
328 std::stringstream ss; ss << "Invalid Content-Range value (no trailing /): " << value;
329 return FailCallback(kXR_ServerError, ss.str());
330 }
331 auto length = bytes_val + 1 - m_current_op.first;
332 if (length < 0) {
333 std::stringstream ss; ss << "Invalid Content-Range value (negative length): " << line;
334 return FailCallback(kXR_ServerError, ss.str());
335 }
336 if (length > std::numeric_limits<decltype(m_current_op.second)>::max()) {
337 std::stringstream ss; ss << "Invalid Content-Range value (length too long): " << line;
338 return FailCallback(kXR_ServerError, ss.str());
339 }
340 m_current_op.second = length;
341
342 // We now have a valid response range; locate a buffer where we will copy the bytes into.
343 CalculateNextBuffer();
344 }
345 // Check to see if the Content-Range was missing.
346 if (!last_segment && (m_current_op.first == -1 || m_current_op.second == -1)) {
347 return FailCallback(kXR_ServerError, "Response segment is missing a Content-Range header");
348 }
349 }
350 return orig_length;
351}
352
353void CurlVectorReadOp::CalculateNextBuffer() {
354 // Strategy is to select the index where we will throw away the fewest bytes.
355 off_t distance = std::numeric_limits<off_t>::max();
356 auto starting_idx = m_response_idx;
357 for (decltype(m_chunk_list)::size_type ctr=0; ctr<m_chunk_list.size(); ctr++) {
358 auto idx = (starting_idx + ctr) % m_chunk_list.size();
359 if (static_cast<uint64_t>(m_current_op.first) == m_chunk_list[idx].GetOffset()) {
360 m_response_idx = idx;
361 distance = 0;
362 break;
363 }
364 off_t bytes_to_skip = static_cast<off_t>(m_chunk_list[idx].GetOffset()) - m_current_op.first;
365 //m_logger->Debug(kLogXrdClHttp, "Using client request at index %lu would require us to skip %lld bytes", idx, static_cast<long long>(bytes_to_skip));
366 if (bytes_to_skip > 0 && bytes_to_skip < distance) {
367 distance = bytes_to_skip;
368 m_response_idx = idx;
369 // Note we don't break; some other request might be a better fit.
370 }
371 }
373 if (distance > 0) {
374 m_skip_bytes = distance;
375 } else {
376 m_skip_bytes = 0;
377 }
378}
@ kXR_ServerError
void CURL
void SetDone(bool has_failed)
int FailCallback(XErrorCode ecode, const std::string &emsg)
std::unique_ptr< CURL, void(*)(CURL *)> m_curl
virtual void ReleaseHandle()
void UpdateBytes(uint64_t bytes)
std::vector< std::pair< std::string, std::string > > m_headers_list
XrdCl::ResponseHandler * m_handler
CurlOperation(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, XrdCl::Log *log, CreateConnCalloutType, HeaderCallout *header_callout)
virtual bool Setup(CURL *curl, CurlWorker &)
size_t Write(char *buffer, size_t size)
CurlVectorReadOp(XrdCl::ResponseHandler *handler, const std::string &url, struct timespec timeout, const XrdCl::ChunkList &op_list, XrdCl::Log *logger, CreateConnCalloutType callout, HeaderCallout *header_callout)
XrdCl::ChunkList m_chunk_list
std::pair< off_t, off_t > m_current_op
std::unique_ptr< XrdCl::VectorReadInfo > m_vr
bool Setup(CURL *curl, CurlWorker &) override
Handle diagnostics.
Definition XrdClLog.hh:101
Handle an async response.
ConnectionCallout *(*)(const std::string &, const ResponseInfo &) CreateConnCalloutType
bool HTTPStatusIsError(unsigned status)
const uint64_t kLogXrdClHttp
const uint16_t stError
An error occurred that could potentially be retried.
std::vector< ChunkInfo > ChunkList
List of chunks.