1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "snapuserd_core.h"
18
19 namespace android {
20 namespace snapshot {
21
22 using namespace android;
23 using namespace android::dm;
24 using android::base::unique_fd;
25
ReadAhead(const std::string & cow_device,const std::string & backing_device,const std::string & misc_name,std::shared_ptr<SnapshotHandler> snapuserd)26 ReadAhead::ReadAhead(const std::string& cow_device, const std::string& backing_device,
27 const std::string& misc_name, std::shared_ptr<SnapshotHandler> snapuserd) {
28 cow_device_ = cow_device;
29 backing_store_device_ = backing_device;
30 misc_name_ = misc_name;
31 snapuserd_ = snapuserd;
32 }
33
CheckOverlap(const CowOperation * cow_op)34 void ReadAhead::CheckOverlap(const CowOperation* cow_op) {
35 uint64_t source_block = cow_op->source;
36 uint64_t source_offset = 0;
37 if (cow_op->type == kCowXorOp) {
38 source_block /= BLOCK_SZ;
39 source_offset = cow_op->source % BLOCK_SZ;
40 }
41 if (dest_blocks_.count(cow_op->new_block) || source_blocks_.count(source_block) ||
42 (source_offset > 0 && source_blocks_.count(source_block + 1))) {
43 overlap_ = true;
44 }
45
46 dest_blocks_.insert(source_block);
47 if (source_offset > 0) {
48 dest_blocks_.insert(source_block + 1);
49 }
50 source_blocks_.insert(cow_op->new_block);
51 }
52
PrepareNextReadAhead(uint64_t * source_offset,int * pending_ops,std::vector<uint64_t> & blocks,std::vector<const CowOperation * > & xor_op_vec)53 int ReadAhead::PrepareNextReadAhead(uint64_t* source_offset, int* pending_ops,
54 std::vector<uint64_t>& blocks,
55 std::vector<const CowOperation*>& xor_op_vec) {
56 int num_ops = *pending_ops;
57 int nr_consecutive = 0;
58
59 bool is_ops_present = (!RAIterDone() && num_ops);
60
61 if (!is_ops_present) {
62 return nr_consecutive;
63 }
64
65 // Get the first block with offset
66 const CowOperation* cow_op = GetRAOpIter();
67 *source_offset = cow_op->source;
68
69 if (cow_op->type == kCowCopyOp) {
70 *source_offset *= BLOCK_SZ;
71 } else if (cow_op->type == kCowXorOp) {
72 xor_op_vec.push_back(cow_op);
73 }
74
75 RAIterNext();
76 num_ops -= 1;
77 nr_consecutive = 1;
78 blocks.push_back(cow_op->new_block);
79
80 if (!overlap_) {
81 CheckOverlap(cow_op);
82 }
83
84 /*
85 * Find number of consecutive blocks
86 */
87 while (!RAIterDone() && num_ops) {
88 const CowOperation* op = GetRAOpIter();
89 uint64_t next_offset = op->source;
90
91 if (cow_op->type == kCowCopyOp) {
92 next_offset *= BLOCK_SZ;
93 }
94
95 // Check for consecutive blocks
96 if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
97 break;
98 }
99
100 if (op->type == kCowXorOp) {
101 xor_op_vec.push_back(op);
102 }
103
104 nr_consecutive += 1;
105 num_ops -= 1;
106 blocks.push_back(op->new_block);
107 RAIterNext();
108
109 if (!overlap_) {
110 CheckOverlap(op);
111 }
112 }
113
114 return nr_consecutive;
115 }
116
ReconstructDataFromCow()117 bool ReadAhead::ReconstructDataFromCow() {
118 std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
119 loff_t metadata_offset = 0;
120 loff_t start_data_offset = snapuserd_->GetBufferDataOffset();
121 int num_ops = 0;
122 int total_blocks_merged = 0;
123
124 // This memcpy is important as metadata_buffer_ will be an unaligned address and will fault
125 // on 32-bit systems
126 std::unique_ptr<uint8_t[]> metadata_buffer =
127 std::make_unique<uint8_t[]>(snapuserd_->GetBufferMetadataSize());
128 memcpy(metadata_buffer.get(), metadata_buffer_, snapuserd_->GetBufferMetadataSize());
129
130 while (true) {
131 struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
132 (char*)metadata_buffer.get() + metadata_offset);
133
134 // Done reading metadata
135 if (bm->new_block == 0 && bm->file_offset == 0) {
136 break;
137 }
138
139 loff_t buffer_offset = bm->file_offset - start_data_offset;
140 void* bufptr = static_cast<void*>((char*)read_ahead_buffer_ + buffer_offset);
141 read_ahead_buffer_map[bm->new_block] = bufptr;
142 num_ops += 1;
143 total_blocks_merged += 1;
144
145 metadata_offset += sizeof(struct ScratchMetadata);
146 }
147
148 // We are done re-constructing the mapping; however, we need to make sure
149 // all the COW operations to-be merged are present in the re-constructed
150 // mapping.
151 while (!RAIterDone()) {
152 const CowOperation* op = GetRAOpIter();
153 if (read_ahead_buffer_map.find(op->new_block) != read_ahead_buffer_map.end()) {
154 num_ops -= 1;
155 RAIterNext();
156 continue;
157 }
158
159 // Verify that we have covered all the ops which were re-constructed
160 // from COW device - These are the ops which are being
161 // re-constructed after crash.
162 if (!(num_ops == 0)) {
163 SNAP_LOG(ERROR) << "ReconstructDataFromCow failed. Not all ops recoverd "
164 << " Pending ops: " << num_ops;
165 snapuserd_->ReadAheadIOFailed();
166 return false;
167 }
168
169 break;
170 }
171
172 snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged);
173
174 snapuserd_->FinishReconstructDataFromCow();
175
176 if (!snapuserd_->ReadAheadIOCompleted(true)) {
177 SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
178 snapuserd_->ReadAheadIOFailed();
179 return false;
180 }
181
182 SNAP_LOG(INFO) << "ReconstructDataFromCow success";
183 return true;
184 }
185
186 /*
187 * With io_uring, the data flow is slightly different.
188 *
189 * The data flow is as follows:
190 *
191 * 1: Queue the I/O requests to be read from backing source device.
192 * This is done by retrieving the SQE entry from ring and populating
193 * the SQE entry. Note that the I/O is not submitted yet.
194 *
195 * 2: Once the ring is full (aka queue_depth), we will submit all
196 * the queued I/O request with a single system call. This essentially
197 * cuts down "queue_depth" number of system calls to a single system call.
198 *
199 * 3: Once the I/O is submitted, user-space thread will now work
200 * on processing the XOR Operations. This happens in parallel when
201 * I/O requests are submitted to the kernel. This is ok because, for XOR
202 * operations, we first need to retrieve the compressed data form COW block
203 * device. Thus, we have offloaded the backing source I/O to the kernel
204 * and user-space is parallely working on fetching the data for XOR operations.
205 *
206 * 4: After the XOR operations are read from COW device, poll the completion
207 * queue for all the I/O submitted. If the I/O's were already completed,
208 * then user-space thread will just read the CQE requests from the ring
209 * without doing any system call. If none of the I/O were completed yet,
210 * user-space thread will do a system call and wait for I/O completions.
211 *
212 * Flow diagram:
213 * SQ-RING
214 * SQE1 <----------- Fetch SQE1 Entry ---------- |SQE1||SQE2|SQE3|
215 *
216 * SQE1 ------------ Populate SQE1 Entry ------> |SQE1-X||SQE2|SQE3|
217 *
218 * SQE2 <----------- Fetch SQE2 Entry ---------- |SQE1-X||SQE2|SQE3|
219 *
220 * SQE2 ------------ Populate SQE2 Entry ------> |SQE1-X||SQE2-X|SQE3|
221 *
222 * SQE3 <----------- Fetch SQE3 Entry ---------- |SQE1-X||SQE2-X|SQE3|
223 *
224 * SQE3 ------------ Populate SQE3 Entry ------> |SQE1-X||SQE2-X|SQE3-X|
225 *
226 * Submit-IO ---------------------------------> |SQE1-X||SQE2-X|SQE3-X|
227 * | |
228 * | Process I/O entries in kernel
229 * | |
230 * Retrieve XOR |
231 * data from COW |
232 * | |
233 * | |
234 * Fetch CQ completions
235 * | CQ-RING
236 * |CQE1-X||CQE2-X|CQE3-X|
237 * |
238 * CQE1 <------------Fetch CQE1 Entry |CQE1||CQE2-X|CQE3-X|
239 * CQE2 <------------Fetch CQE2 Entry |CQE1||CQE2-|CQE3-X|
240 * CQE3 <------------Fetch CQE3 Entry |CQE1||CQE2-|CQE3-|
241 * |
242 * |
243 * Continue Next set of operations in the RING
244 */
245
ReadAheadAsyncIO()246 bool ReadAhead::ReadAheadAsyncIO() {
247 int num_ops = (snapuserd_->GetBufferDataSize()) / BLOCK_SZ;
248 loff_t buffer_offset = 0;
249 total_blocks_merged_ = 0;
250 overlap_ = false;
251 dest_blocks_.clear();
252 source_blocks_.clear();
253 blocks_.clear();
254 std::vector<const CowOperation*> xor_op_vec;
255
256 int pending_sqe = queue_depth_;
257 int pending_ios_to_submit = 0;
258
259 size_t xor_op_index = 0;
260 size_t block_index = 0;
261
262 loff_t offset = 0;
263
264 bufsink_.ResetBufferOffset();
265
266 // Number of ops to be merged in this window. This is a fixed size
267 // except for the last window wherein the number of ops can be less
268 // than the size of the RA window.
269 while (num_ops) {
270 uint64_t source_offset;
271 struct io_uring_sqe* sqe;
272
273 int linear_blocks = PrepareNextReadAhead(&source_offset, &num_ops, blocks_, xor_op_vec);
274
275 if (linear_blocks != 0) {
276 size_t io_size = (linear_blocks * BLOCK_SZ);
277
278 // Get an SQE entry from the ring and populate the I/O variables
279 sqe = io_uring_get_sqe(ring_.get());
280 if (!sqe) {
281 SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during read-ahead";
282 return false;
283 }
284
285 io_uring_prep_read(sqe, backing_store_fd_.get(),
286 (char*)ra_temp_buffer_.get() + buffer_offset, io_size,
287 source_offset);
288
289 buffer_offset += io_size;
290 num_ops -= linear_blocks;
291 total_blocks_merged_ += linear_blocks;
292
293 pending_sqe -= 1;
294 pending_ios_to_submit += 1;
295 sqe->flags |= IOSQE_ASYNC;
296 }
297
298 // pending_sqe == 0 : Ring is full
299 //
300 // num_ops == 0 : All the COW ops in this batch are processed - Submit
301 // pending I/O requests in the ring
302 //
303 // linear_blocks == 0 : All the COW ops processing is done. Submit
304 // pending I/O requests in the ring
305 if (pending_sqe == 0 || num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
306 // Submit the IO for all the COW ops in a single syscall
307 int ret = io_uring_submit(ring_.get());
308 if (ret != pending_ios_to_submit) {
309 SNAP_PLOG(ERROR) << "io_uring_submit failed for read-ahead: "
310 << " io submit: " << ret << " expected: " << pending_ios_to_submit;
311 return false;
312 }
313
314 int pending_ios_to_complete = pending_ios_to_submit;
315 pending_ios_to_submit = 0;
316
317 bool xor_processing_required = (xor_op_vec.size() > 0);
318
319 // Read XOR data from COW file in parallel when I/O's are in-flight
320 if (xor_processing_required && !ReadXorData(block_index, xor_op_index, xor_op_vec)) {
321 SNAP_LOG(ERROR) << "ReadXorData failed";
322 return false;
323 }
324
325 // Fetch I/O completions
326 if (!ReapIoCompletions(pending_ios_to_complete)) {
327 SNAP_LOG(ERROR) << "ReapIoCompletions failed";
328 return false;
329 }
330
331 // Retrieve XOR'ed data
332 if (xor_processing_required) {
333 ProcessXorData(block_index, xor_op_index, xor_op_vec, ra_temp_buffer_.get(),
334 offset);
335 }
336
337 // All the I/O in the ring is processed.
338 pending_sqe = queue_depth_;
339 }
340
341 if (linear_blocks == 0) {
342 break;
343 }
344 }
345
346 // Done with merging ordered ops
347 if (RAIterDone() && total_blocks_merged_ == 0) {
348 return true;
349 }
350
351 CHECK(blocks_.size() == total_blocks_merged_);
352
353 UpdateScratchMetadata();
354
355 return true;
356 }
357
UpdateScratchMetadata()358 void ReadAhead::UpdateScratchMetadata() {
359 loff_t metadata_offset = 0;
360
361 struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
362 (char*)ra_temp_meta_buffer_.get() + metadata_offset);
363
364 bm->new_block = 0;
365 bm->file_offset = 0;
366
367 loff_t file_offset = snapuserd_->GetBufferDataOffset();
368
369 for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
370 uint64_t new_block = blocks_[block_index];
371 // Track the metadata blocks which are stored in scratch space
372 bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
373 metadata_offset);
374
375 bm->new_block = new_block;
376 bm->file_offset = file_offset;
377
378 metadata_offset += sizeof(struct ScratchMetadata);
379 file_offset += BLOCK_SZ;
380 }
381
382 // This is important - explicitly set the contents to zero. This is used
383 // when re-constructing the data after crash. This indicates end of
384 // reading metadata contents when re-constructing the data
385 bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
386 metadata_offset);
387 bm->new_block = 0;
388 bm->file_offset = 0;
389 }
390
ReapIoCompletions(int pending_ios_to_complete)391 bool ReadAhead::ReapIoCompletions(int pending_ios_to_complete) {
392 bool status = true;
393
394 // Reap I/O completions
395 while (pending_ios_to_complete) {
396 struct io_uring_cqe* cqe;
397
398 // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR;
399 // these error codes are not truly I/O errors; we can retry them
400 // by re-populating the SQE entries and submitting the I/O
401 // request back. However, we don't do that now; instead we
402 // will fallback to synchronous I/O.
403 int ret = io_uring_wait_cqe(ring_.get(), &cqe);
404 if (ret) {
405 SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << ret;
406 status = false;
407 break;
408 }
409
410 if (cqe->res < 0) {
411 SNAP_LOG(ERROR) << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res;
412 status = false;
413 break;
414 }
415
416 io_uring_cqe_seen(ring_.get(), cqe);
417 pending_ios_to_complete -= 1;
418 }
419
420 return status;
421 }
422
ProcessXorData(size_t & block_xor_index,size_t & xor_index,std::vector<const CowOperation * > & xor_op_vec,void * buffer,loff_t & buffer_offset)423 void ReadAhead::ProcessXorData(size_t& block_xor_index, size_t& xor_index,
424 std::vector<const CowOperation*>& xor_op_vec, void* buffer,
425 loff_t& buffer_offset) {
426 loff_t xor_buf_offset = 0;
427
428 while (block_xor_index < blocks_.size()) {
429 void* bufptr = static_cast<void*>((char*)buffer + buffer_offset);
430 uint64_t new_block = blocks_[block_xor_index];
431
432 if (xor_index < xor_op_vec.size()) {
433 const CowOperation* xor_op = xor_op_vec[xor_index];
434
435 // Check if this block is an XOR op
436 if (xor_op->new_block == new_block) {
437 // Pointer to the data read from base device
438 uint8_t* buffer = reinterpret_cast<uint8_t*>(bufptr);
439 // Get the xor'ed data read from COW device
440 uint8_t* xor_data = reinterpret_cast<uint8_t*>((char*)bufsink_.GetPayloadBufPtr() +
441 xor_buf_offset);
442
443 for (size_t byte_offset = 0; byte_offset < BLOCK_SZ; byte_offset++) {
444 buffer[byte_offset] ^= xor_data[byte_offset];
445 }
446
447 // Move to next XOR op
448 xor_index += 1;
449 xor_buf_offset += BLOCK_SZ;
450 }
451 }
452
453 buffer_offset += BLOCK_SZ;
454 block_xor_index += 1;
455 }
456
457 bufsink_.ResetBufferOffset();
458 }
459
ReadXorData(size_t block_index,size_t xor_op_index,std::vector<const CowOperation * > & xor_op_vec)460 bool ReadAhead::ReadXorData(size_t block_index, size_t xor_op_index,
461 std::vector<const CowOperation*>& xor_op_vec) {
462 // Process the XOR ops in parallel - We will be reading data
463 // from COW file for XOR ops processing.
464 while (block_index < blocks_.size()) {
465 uint64_t new_block = blocks_[block_index];
466
467 if (xor_op_index < xor_op_vec.size()) {
468 const CowOperation* xor_op = xor_op_vec[xor_op_index];
469 if (xor_op->new_block == new_block) {
470 if (!reader_->ReadData(*xor_op, &bufsink_)) {
471 SNAP_LOG(ERROR)
472 << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block;
473 return false;
474 }
475
476 xor_op_index += 1;
477 bufsink_.UpdateBufferOffset(BLOCK_SZ);
478 }
479 }
480 block_index += 1;
481 }
482 return true;
483 }
484
ReadAheadSyncIO()485 bool ReadAhead::ReadAheadSyncIO() {
486 int num_ops = (snapuserd_->GetBufferDataSize()) / BLOCK_SZ;
487 loff_t buffer_offset = 0;
488 total_blocks_merged_ = 0;
489 overlap_ = false;
490 dest_blocks_.clear();
491 source_blocks_.clear();
492 blocks_.clear();
493 std::vector<const CowOperation*> xor_op_vec;
494
495 bufsink_.ResetBufferOffset();
496
497 // Number of ops to be merged in this window. This is a fixed size
498 // except for the last window wherein the number of ops can be less
499 // than the size of the RA window.
500 while (num_ops) {
501 uint64_t source_offset;
502
503 int linear_blocks = PrepareNextReadAhead(&source_offset, &num_ops, blocks_, xor_op_vec);
504 if (linear_blocks == 0) {
505 // No more blocks to read
506 SNAP_LOG(DEBUG) << " Read-ahead completed....";
507 break;
508 }
509
510 size_t io_size = (linear_blocks * BLOCK_SZ);
511
512 // Read from the base device consecutive set of blocks in one shot
513 if (!android::base::ReadFullyAtOffset(backing_store_fd_,
514 (char*)ra_temp_buffer_.get() + buffer_offset, io_size,
515 source_offset)) {
516 SNAP_PLOG(ERROR) << "Ordered-op failed. Read from backing store: "
517 << backing_store_device_ << "at block :" << source_offset / BLOCK_SZ
518 << " offset :" << source_offset % BLOCK_SZ
519 << " buffer_offset : " << buffer_offset << " io_size : " << io_size
520 << " buf-addr : " << read_ahead_buffer_;
521
522 snapuserd_->ReadAheadIOFailed();
523 return false;
524 }
525
526 buffer_offset += io_size;
527 total_blocks_merged_ += linear_blocks;
528 num_ops -= linear_blocks;
529 }
530
531 // Done with merging ordered ops
532 if (RAIterDone() && total_blocks_merged_ == 0) {
533 return true;
534 }
535
536 loff_t metadata_offset = 0;
537
538 struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
539 (char*)ra_temp_meta_buffer_.get() + metadata_offset);
540
541 bm->new_block = 0;
542 bm->file_offset = 0;
543
544 loff_t file_offset = snapuserd_->GetBufferDataOffset();
545
546 loff_t offset = 0;
547 CHECK(blocks_.size() == total_blocks_merged_);
548
549 size_t xor_index = 0;
550 BufferSink bufsink;
551 bufsink.Initialize(BLOCK_SZ * 2);
552
553 for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
554 void* bufptr = static_cast<void*>((char*)ra_temp_buffer_.get() + offset);
555 uint64_t new_block = blocks_[block_index];
556
557 if (xor_index < xor_op_vec.size()) {
558 const CowOperation* xor_op = xor_op_vec[xor_index];
559
560 // Check if this block is an XOR op
561 if (xor_op->new_block == new_block) {
562 // Read the xor'ed data from COW
563 if (!reader_->ReadData(*xor_op, &bufsink)) {
564 SNAP_LOG(ERROR)
565 << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block;
566 snapuserd_->ReadAheadIOFailed();
567 return false;
568 }
569 // Pointer to the data read from base device
570 uint8_t* buffer = reinterpret_cast<uint8_t*>(bufptr);
571 // Get the xor'ed data read from COW device
572 uint8_t* xor_data = reinterpret_cast<uint8_t*>(bufsink.GetPayloadBufPtr());
573
574 // Retrieve the original data
575 for (size_t byte_offset = 0; byte_offset < BLOCK_SZ; byte_offset++) {
576 buffer[byte_offset] ^= xor_data[byte_offset];
577 }
578
579 // Move to next XOR op
580 xor_index += 1;
581 }
582 }
583
584 offset += BLOCK_SZ;
585 // Track the metadata blocks which are stored in scratch space
586 bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
587 metadata_offset);
588
589 bm->new_block = new_block;
590 bm->file_offset = file_offset;
591
592 metadata_offset += sizeof(struct ScratchMetadata);
593 file_offset += BLOCK_SZ;
594 }
595
596 // Verify if all the xor blocks were scanned to retrieve the original data
597 CHECK(xor_index == xor_op_vec.size());
598
599 // This is important - explicitly set the contents to zero. This is used
600 // when re-constructing the data after crash. This indicates end of
601 // reading metadata contents when re-constructing the data
602 bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
603 metadata_offset);
604 bm->new_block = 0;
605 bm->file_offset = 0;
606
607 return true;
608 }
609
ReadAheadIOStart()610 bool ReadAhead::ReadAheadIOStart() {
611 // Check if the data has to be constructed from the COW file.
612 // This will be true only once during boot up after a crash
613 // during merge.
614 if (snapuserd_->ShouldReconstructDataFromCow()) {
615 return ReconstructDataFromCow();
616 }
617
618 bool retry = false;
619 bool ra_status;
620
621 // Start Async read-ahead
622 if (read_ahead_async_) {
623 ra_status = ReadAheadAsyncIO();
624 if (!ra_status) {
625 SNAP_LOG(ERROR) << "ReadAheadAsyncIO failed - Falling back synchronous I/O";
626 FinalizeIouring();
627 RAResetIter(total_blocks_merged_);
628 retry = true;
629 read_ahead_async_ = false;
630 }
631 }
632
633 // Check if we need to fallback and retry the merge
634 //
635 // If the device doesn't support async operations, we
636 // will directly enter here (aka devices with 4.x kernels)
637
638 const bool ra_sync_required = (retry || !read_ahead_async_);
639
640 if (ra_sync_required) {
641 ra_status = ReadAheadSyncIO();
642 if (!ra_status) {
643 SNAP_LOG(ERROR) << "ReadAheadSyncIO failed";
644 return false;
645 }
646 }
647
648 SNAP_LOG(DEBUG) << "Read-ahead: total_ra_blocks_merged: " << total_ra_blocks_completed_;
649
650 // Wait for the merge to finish for the previous RA window. We shouldn't
651 // be touching the scratch space until merge is complete of previous RA
652 // window. If there is a crash during this time frame, merge should resume
653 // based on the contents of the scratch space.
654 if (!snapuserd_->WaitForMergeReady()) {
655 return false;
656 }
657
658 // Copy the data to scratch space
659 memcpy(metadata_buffer_, ra_temp_meta_buffer_.get(), snapuserd_->GetBufferMetadataSize());
660 memcpy(read_ahead_buffer_, ra_temp_buffer_.get(), total_blocks_merged_ * BLOCK_SZ);
661
662 loff_t offset = 0;
663 std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
664 read_ahead_buffer_map.clear();
665
666 for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
667 void* bufptr = static_cast<void*>((char*)read_ahead_buffer_ + offset);
668 uint64_t new_block = blocks_[block_index];
669
670 read_ahead_buffer_map[new_block] = bufptr;
671 offset += BLOCK_SZ;
672 }
673
674 total_ra_blocks_completed_ += total_blocks_merged_;
675 snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged_);
676
677 // Flush the data only if we have a overlapping blocks in the region
678 // Notify the Merge thread to resume merging this window
679 if (!snapuserd_->ReadAheadIOCompleted(overlap_)) {
680 SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
681 snapuserd_->ReadAheadIOFailed();
682 return false;
683 }
684
685 return true;
686 }
687
InitializeIouring()688 bool ReadAhead::InitializeIouring() {
689 if (!snapuserd_->IsIouringSupported()) {
690 return false;
691 }
692
693 ring_ = std::make_unique<struct io_uring>();
694
695 int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
696 if (ret) {
697 SNAP_LOG(ERROR) << "io_uring_queue_init failed with ret: " << ret;
698 return false;
699 }
700
701 // For xor ops processing
702 bufsink_.Initialize(PAYLOAD_BUFFER_SZ * 2);
703 read_ahead_async_ = true;
704
705 SNAP_LOG(INFO) << "Read-ahead: io_uring initialized with queue depth: " << queue_depth_;
706 return true;
707 }
708
FinalizeIouring()709 void ReadAhead::FinalizeIouring() {
710 if (read_ahead_async_) {
711 io_uring_queue_exit(ring_.get());
712 }
713 }
714
RunThread()715 bool ReadAhead::RunThread() {
716 if (!InitializeFds()) {
717 return false;
718 }
719
720 InitializeBuffer();
721
722 if (!InitReader()) {
723 return false;
724 }
725
726 InitializeRAIter();
727
728 InitializeIouring();
729
730 if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) {
731 SNAP_PLOG(ERROR) << "Failed to set priority for TID: " << gettid();
732 }
733
734 while (!RAIterDone()) {
735 if (!ReadAheadIOStart()) {
736 break;
737 }
738 }
739
740 FinalizeIouring();
741 CloseFds();
742 reader_->CloseCowFd();
743
744 SNAP_LOG(INFO) << " ReadAhead thread terminating....";
745 return true;
746 }
747
748 // Initialization
InitializeFds()749 bool ReadAhead::InitializeFds() {
750 backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
751 if (backing_store_fd_ < 0) {
752 SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
753 return false;
754 }
755
756 cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
757 if (cow_fd_ < 0) {
758 SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
759 return false;
760 }
761
762 return true;
763 }
764
InitReader()765 bool ReadAhead::InitReader() {
766 reader_ = snapuserd_->CloneReaderForWorker();
767
768 if (!reader_->InitForMerge(std::move(cow_fd_))) {
769 return false;
770 }
771 return true;
772 }
773
InitializeRAIter()774 void ReadAhead::InitializeRAIter() {
775 cowop_iter_ = reader_->GetOpIter(true);
776 }
777
RAIterDone()778 bool ReadAhead::RAIterDone() {
779 if (cowop_iter_->Done()) {
780 return true;
781 }
782
783 const CowOperation* cow_op = GetRAOpIter();
784
785 if (!IsOrderedOp(*cow_op)) {
786 return true;
787 }
788
789 return false;
790 }
791
RAIterNext()792 void ReadAhead::RAIterNext() {
793 cowop_iter_->Next();
794 }
795
RAResetIter(uint64_t num_blocks)796 void ReadAhead::RAResetIter(uint64_t num_blocks) {
797 while (num_blocks && !cowop_iter_->RDone()) {
798 cowop_iter_->Prev();
799 num_blocks -= 1;
800 }
801 }
802
GetRAOpIter()803 const CowOperation* ReadAhead::GetRAOpIter() {
804 const CowOperation* cow_op = &cowop_iter_->Get();
805 return cow_op;
806 }
807
InitializeBuffer()808 void ReadAhead::InitializeBuffer() {
809 void* mapped_addr = snapuserd_->GetMappedAddr();
810 // Map the scratch space region into memory
811 metadata_buffer_ =
812 static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferMetadataOffset());
813 read_ahead_buffer_ = static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
814
815 ra_temp_buffer_ = std::make_unique<uint8_t[]>(snapuserd_->GetBufferDataSize());
816 ra_temp_meta_buffer_ = std::make_unique<uint8_t[]>(snapuserd_->GetBufferMetadataSize());
817 }
818
819 } // namespace snapshot
820 } // namespace android
821