1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "snapuserd_core.h"
18 
19 namespace android {
20 namespace snapshot {
21 
22 using namespace android;
23 using namespace android::dm;
24 using android::base::unique_fd;
25 
ReadAhead(const std::string & cow_device,const std::string & backing_device,const std::string & misc_name,std::shared_ptr<SnapshotHandler> snapuserd)26 ReadAhead::ReadAhead(const std::string& cow_device, const std::string& backing_device,
27                      const std::string& misc_name, std::shared_ptr<SnapshotHandler> snapuserd) {
28     cow_device_ = cow_device;
29     backing_store_device_ = backing_device;
30     misc_name_ = misc_name;
31     snapuserd_ = snapuserd;
32 }
33 
CheckOverlap(const CowOperation * cow_op)34 void ReadAhead::CheckOverlap(const CowOperation* cow_op) {
35     uint64_t source_block = cow_op->source;
36     uint64_t source_offset = 0;
37     if (cow_op->type == kCowXorOp) {
38         source_block /= BLOCK_SZ;
39         source_offset = cow_op->source % BLOCK_SZ;
40     }
41     if (dest_blocks_.count(cow_op->new_block) || source_blocks_.count(source_block) ||
42         (source_offset > 0 && source_blocks_.count(source_block + 1))) {
43         overlap_ = true;
44     }
45 
46     dest_blocks_.insert(source_block);
47     if (source_offset > 0) {
48         dest_blocks_.insert(source_block + 1);
49     }
50     source_blocks_.insert(cow_op->new_block);
51 }
52 
PrepareNextReadAhead(uint64_t * source_offset,int * pending_ops,std::vector<uint64_t> & blocks,std::vector<const CowOperation * > & xor_op_vec)53 int ReadAhead::PrepareNextReadAhead(uint64_t* source_offset, int* pending_ops,
54                                     std::vector<uint64_t>& blocks,
55                                     std::vector<const CowOperation*>& xor_op_vec) {
56     int num_ops = *pending_ops;
57     int nr_consecutive = 0;
58 
59     bool is_ops_present = (!RAIterDone() && num_ops);
60 
61     if (!is_ops_present) {
62         return nr_consecutive;
63     }
64 
65     // Get the first block with offset
66     const CowOperation* cow_op = GetRAOpIter();
67     *source_offset = cow_op->source;
68 
69     if (cow_op->type == kCowCopyOp) {
70         *source_offset *= BLOCK_SZ;
71     } else if (cow_op->type == kCowXorOp) {
72         xor_op_vec.push_back(cow_op);
73     }
74 
75     RAIterNext();
76     num_ops -= 1;
77     nr_consecutive = 1;
78     blocks.push_back(cow_op->new_block);
79 
80     if (!overlap_) {
81         CheckOverlap(cow_op);
82     }
83 
84     /*
85      * Find number of consecutive blocks
86      */
87     while (!RAIterDone() && num_ops) {
88         const CowOperation* op = GetRAOpIter();
89         uint64_t next_offset = op->source;
90 
91         if (cow_op->type == kCowCopyOp) {
92             next_offset *= BLOCK_SZ;
93         }
94 
95         // Check for consecutive blocks
96         if (next_offset != (*source_offset + nr_consecutive * BLOCK_SZ)) {
97             break;
98         }
99 
100         if (op->type == kCowXorOp) {
101             xor_op_vec.push_back(op);
102         }
103 
104         nr_consecutive += 1;
105         num_ops -= 1;
106         blocks.push_back(op->new_block);
107         RAIterNext();
108 
109         if (!overlap_) {
110             CheckOverlap(op);
111         }
112     }
113 
114     return nr_consecutive;
115 }
116 
ReconstructDataFromCow()117 bool ReadAhead::ReconstructDataFromCow() {
118     std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
119     loff_t metadata_offset = 0;
120     loff_t start_data_offset = snapuserd_->GetBufferDataOffset();
121     int num_ops = 0;
122     int total_blocks_merged = 0;
123 
124     // This memcpy is important as metadata_buffer_ will be an unaligned address and will fault
125     // on 32-bit systems
126     std::unique_ptr<uint8_t[]> metadata_buffer =
127             std::make_unique<uint8_t[]>(snapuserd_->GetBufferMetadataSize());
128     memcpy(metadata_buffer.get(), metadata_buffer_, snapuserd_->GetBufferMetadataSize());
129 
130     while (true) {
131         struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
132                 (char*)metadata_buffer.get() + metadata_offset);
133 
134         // Done reading metadata
135         if (bm->new_block == 0 && bm->file_offset == 0) {
136             break;
137         }
138 
139         loff_t buffer_offset = bm->file_offset - start_data_offset;
140         void* bufptr = static_cast<void*>((char*)read_ahead_buffer_ + buffer_offset);
141         read_ahead_buffer_map[bm->new_block] = bufptr;
142         num_ops += 1;
143         total_blocks_merged += 1;
144 
145         metadata_offset += sizeof(struct ScratchMetadata);
146     }
147 
148     // We are done re-constructing the mapping; however, we need to make sure
149     // all the COW operations to-be merged are present in the re-constructed
150     // mapping.
151     while (!RAIterDone()) {
152         const CowOperation* op = GetRAOpIter();
153         if (read_ahead_buffer_map.find(op->new_block) != read_ahead_buffer_map.end()) {
154             num_ops -= 1;
155             RAIterNext();
156             continue;
157         }
158 
159         // Verify that we have covered all the ops which were re-constructed
160         // from COW device - These are the ops which are being
161         // re-constructed after crash.
162         if (!(num_ops == 0)) {
163             SNAP_LOG(ERROR) << "ReconstructDataFromCow failed. Not all ops recoverd "
164                             << " Pending ops: " << num_ops;
165             snapuserd_->ReadAheadIOFailed();
166             return false;
167         }
168 
169         break;
170     }
171 
172     snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged);
173 
174     snapuserd_->FinishReconstructDataFromCow();
175 
176     if (!snapuserd_->ReadAheadIOCompleted(true)) {
177         SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
178         snapuserd_->ReadAheadIOFailed();
179         return false;
180     }
181 
182     SNAP_LOG(INFO) << "ReconstructDataFromCow success";
183     return true;
184 }
185 
186 /*
187  * With io_uring, the data flow is slightly different.
188  *
189  * The data flow is as follows:
190  *
191  * 1: Queue the I/O requests to be read from backing source device.
192  * This is done by retrieving the SQE entry from ring and populating
193  * the SQE entry. Note that the I/O is not submitted yet.
194  *
195  * 2: Once the ring is full (aka queue_depth), we will submit all
196  * the queued I/O request with a single system call. This essentially
197  * cuts down "queue_depth" number of system calls to a single system call.
198  *
199  * 3: Once the I/O is submitted, user-space thread will now work
200  * on processing the XOR Operations. This happens in parallel when
201  * I/O requests are submitted to the kernel. This is ok because, for XOR
202  * operations, we first need to retrieve the compressed data form COW block
203  * device. Thus, we have offloaded the backing source I/O to the kernel
204  * and user-space is parallely working on fetching the data for XOR operations.
205  *
206  * 4: After the XOR operations are read from COW device, poll the completion
207  * queue for all the I/O submitted. If the I/O's were already completed,
208  * then user-space thread will just read the CQE requests from the ring
209  * without doing any system call. If none of the I/O were completed yet,
210  * user-space thread will do a system call and wait for I/O completions.
211  *
212  * Flow diagram:
213  *                                                    SQ-RING
214  *  SQE1 <----------- Fetch SQE1 Entry ---------- |SQE1||SQE2|SQE3|
215  *
216  *  SQE1  ------------ Populate SQE1 Entry ------> |SQE1-X||SQE2|SQE3|
217  *
218  *  SQE2 <----------- Fetch SQE2 Entry ---------- |SQE1-X||SQE2|SQE3|
219  *
220  *  SQE2  ------------ Populate SQE2 Entry ------> |SQE1-X||SQE2-X|SQE3|
221  *
222  *  SQE3 <----------- Fetch SQE3 Entry ---------- |SQE1-X||SQE2-X|SQE3|
223  *
224  *  SQE3  ------------ Populate SQE3 Entry ------> |SQE1-X||SQE2-X|SQE3-X|
225  *
226  *  Submit-IO ---------------------------------> |SQE1-X||SQE2-X|SQE3-X|
227  *     |                                                  |
228  *     |                                        Process I/O entries in kernel
229  *     |                                                  |
230  *  Retrieve XOR                                          |
231  *  data from COW                                         |
232  *     |                                                  |
233  *     |                                                  |
234  *  Fetch CQ completions
235  *     |                                              CQ-RING
236  *                                               |CQE1-X||CQE2-X|CQE3-X|
237  *                                                        |
238  *   CQE1 <------------Fetch CQE1 Entry          |CQE1||CQE2-X|CQE3-X|
239  *   CQE2 <------------Fetch CQE2 Entry          |CQE1||CQE2-|CQE3-X|
240  *   CQE3 <------------Fetch CQE3 Entry          |CQE1||CQE2-|CQE3-|
241  *    |
242  *    |
243  *  Continue Next set of operations in the RING
244  */
245 
ReadAheadAsyncIO()246 bool ReadAhead::ReadAheadAsyncIO() {
247     int num_ops = (snapuserd_->GetBufferDataSize()) / BLOCK_SZ;
248     loff_t buffer_offset = 0;
249     total_blocks_merged_ = 0;
250     overlap_ = false;
251     dest_blocks_.clear();
252     source_blocks_.clear();
253     blocks_.clear();
254     std::vector<const CowOperation*> xor_op_vec;
255 
256     int pending_sqe = queue_depth_;
257     int pending_ios_to_submit = 0;
258 
259     size_t xor_op_index = 0;
260     size_t block_index = 0;
261 
262     loff_t offset = 0;
263 
264     bufsink_.ResetBufferOffset();
265 
266     // Number of ops to be merged in this window. This is a fixed size
267     // except for the last window wherein the number of ops can be less
268     // than the size of the RA window.
269     while (num_ops) {
270         uint64_t source_offset;
271         struct io_uring_sqe* sqe;
272 
273         int linear_blocks = PrepareNextReadAhead(&source_offset, &num_ops, blocks_, xor_op_vec);
274 
275         if (linear_blocks != 0) {
276             size_t io_size = (linear_blocks * BLOCK_SZ);
277 
278             // Get an SQE entry from the ring and populate the I/O variables
279             sqe = io_uring_get_sqe(ring_.get());
280             if (!sqe) {
281                 SNAP_PLOG(ERROR) << "io_uring_get_sqe failed during read-ahead";
282                 return false;
283             }
284 
285             io_uring_prep_read(sqe, backing_store_fd_.get(),
286                                (char*)ra_temp_buffer_.get() + buffer_offset, io_size,
287                                source_offset);
288 
289             buffer_offset += io_size;
290             num_ops -= linear_blocks;
291             total_blocks_merged_ += linear_blocks;
292 
293             pending_sqe -= 1;
294             pending_ios_to_submit += 1;
295             sqe->flags |= IOSQE_ASYNC;
296         }
297 
298         // pending_sqe == 0 : Ring is full
299         //
300         // num_ops == 0 : All the COW ops in this batch are processed - Submit
301         // pending I/O requests in the ring
302         //
303         // linear_blocks == 0 : All the COW ops processing is done. Submit
304         // pending I/O requests in the ring
305         if (pending_sqe == 0 || num_ops == 0 || (linear_blocks == 0 && pending_ios_to_submit)) {
306             // Submit the IO for all the COW ops in a single syscall
307             int ret = io_uring_submit(ring_.get());
308             if (ret != pending_ios_to_submit) {
309                 SNAP_PLOG(ERROR) << "io_uring_submit failed for read-ahead: "
310                                  << " io submit: " << ret << " expected: " << pending_ios_to_submit;
311                 return false;
312             }
313 
314             int pending_ios_to_complete = pending_ios_to_submit;
315             pending_ios_to_submit = 0;
316 
317             bool xor_processing_required = (xor_op_vec.size() > 0);
318 
319             // Read XOR data from COW file in parallel when I/O's are in-flight
320             if (xor_processing_required && !ReadXorData(block_index, xor_op_index, xor_op_vec)) {
321                 SNAP_LOG(ERROR) << "ReadXorData failed";
322                 return false;
323             }
324 
325             // Fetch I/O completions
326             if (!ReapIoCompletions(pending_ios_to_complete)) {
327                 SNAP_LOG(ERROR) << "ReapIoCompletions failed";
328                 return false;
329             }
330 
331             // Retrieve XOR'ed data
332             if (xor_processing_required) {
333                 ProcessXorData(block_index, xor_op_index, xor_op_vec, ra_temp_buffer_.get(),
334                                offset);
335             }
336 
337             // All the I/O in the ring is processed.
338             pending_sqe = queue_depth_;
339         }
340 
341         if (linear_blocks == 0) {
342             break;
343         }
344     }
345 
346     // Done with merging ordered ops
347     if (RAIterDone() && total_blocks_merged_ == 0) {
348         return true;
349     }
350 
351     CHECK(blocks_.size() == total_blocks_merged_);
352 
353     UpdateScratchMetadata();
354 
355     return true;
356 }
357 
UpdateScratchMetadata()358 void ReadAhead::UpdateScratchMetadata() {
359     loff_t metadata_offset = 0;
360 
361     struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
362             (char*)ra_temp_meta_buffer_.get() + metadata_offset);
363 
364     bm->new_block = 0;
365     bm->file_offset = 0;
366 
367     loff_t file_offset = snapuserd_->GetBufferDataOffset();
368 
369     for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
370         uint64_t new_block = blocks_[block_index];
371         // Track the metadata blocks which are stored in scratch space
372         bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
373                                                        metadata_offset);
374 
375         bm->new_block = new_block;
376         bm->file_offset = file_offset;
377 
378         metadata_offset += sizeof(struct ScratchMetadata);
379         file_offset += BLOCK_SZ;
380     }
381 
382     // This is important - explicitly set the contents to zero. This is used
383     // when re-constructing the data after crash. This indicates end of
384     // reading metadata contents when re-constructing the data
385     bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
386                                                    metadata_offset);
387     bm->new_block = 0;
388     bm->file_offset = 0;
389 }
390 
ReapIoCompletions(int pending_ios_to_complete)391 bool ReadAhead::ReapIoCompletions(int pending_ios_to_complete) {
392     bool status = true;
393 
394     // Reap I/O completions
395     while (pending_ios_to_complete) {
396         struct io_uring_cqe* cqe;
397 
398         // io_uring_wait_cqe can potentially return -EAGAIN or -EINTR;
399         // these error codes are not truly I/O errors; we can retry them
400         // by re-populating the SQE entries and submitting the I/O
401         // request back. However, we don't do that now; instead we
402         // will fallback to synchronous I/O.
403         int ret = io_uring_wait_cqe(ring_.get(), &cqe);
404         if (ret) {
405             SNAP_LOG(ERROR) << "Read-ahead - io_uring_wait_cqe failed: " << ret;
406             status = false;
407             break;
408         }
409 
410         if (cqe->res < 0) {
411             SNAP_LOG(ERROR) << "Read-ahead - io_uring_Wait_cqe failed with res: " << cqe->res;
412             status = false;
413             break;
414         }
415 
416         io_uring_cqe_seen(ring_.get(), cqe);
417         pending_ios_to_complete -= 1;
418     }
419 
420     return status;
421 }
422 
ProcessXorData(size_t & block_xor_index,size_t & xor_index,std::vector<const CowOperation * > & xor_op_vec,void * buffer,loff_t & buffer_offset)423 void ReadAhead::ProcessXorData(size_t& block_xor_index, size_t& xor_index,
424                                std::vector<const CowOperation*>& xor_op_vec, void* buffer,
425                                loff_t& buffer_offset) {
426     loff_t xor_buf_offset = 0;
427 
428     while (block_xor_index < blocks_.size()) {
429         void* bufptr = static_cast<void*>((char*)buffer + buffer_offset);
430         uint64_t new_block = blocks_[block_xor_index];
431 
432         if (xor_index < xor_op_vec.size()) {
433             const CowOperation* xor_op = xor_op_vec[xor_index];
434 
435             // Check if this block is an XOR op
436             if (xor_op->new_block == new_block) {
437                 // Pointer to the data read from base device
438                 uint8_t* buffer = reinterpret_cast<uint8_t*>(bufptr);
439                 // Get the xor'ed data read from COW device
440                 uint8_t* xor_data = reinterpret_cast<uint8_t*>((char*)bufsink_.GetPayloadBufPtr() +
441                                                                xor_buf_offset);
442 
443                 for (size_t byte_offset = 0; byte_offset < BLOCK_SZ; byte_offset++) {
444                     buffer[byte_offset] ^= xor_data[byte_offset];
445                 }
446 
447                 // Move to next XOR op
448                 xor_index += 1;
449                 xor_buf_offset += BLOCK_SZ;
450             }
451         }
452 
453         buffer_offset += BLOCK_SZ;
454         block_xor_index += 1;
455     }
456 
457     bufsink_.ResetBufferOffset();
458 }
459 
ReadXorData(size_t block_index,size_t xor_op_index,std::vector<const CowOperation * > & xor_op_vec)460 bool ReadAhead::ReadXorData(size_t block_index, size_t xor_op_index,
461                             std::vector<const CowOperation*>& xor_op_vec) {
462     // Process the XOR ops in parallel - We will be reading data
463     // from COW file for XOR ops processing.
464     while (block_index < blocks_.size()) {
465         uint64_t new_block = blocks_[block_index];
466 
467         if (xor_op_index < xor_op_vec.size()) {
468             const CowOperation* xor_op = xor_op_vec[xor_op_index];
469             if (xor_op->new_block == new_block) {
470                 if (!reader_->ReadData(*xor_op, &bufsink_)) {
471                     SNAP_LOG(ERROR)
472                             << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block;
473                     return false;
474                 }
475 
476                 xor_op_index += 1;
477                 bufsink_.UpdateBufferOffset(BLOCK_SZ);
478             }
479         }
480         block_index += 1;
481     }
482     return true;
483 }
484 
ReadAheadSyncIO()485 bool ReadAhead::ReadAheadSyncIO() {
486     int num_ops = (snapuserd_->GetBufferDataSize()) / BLOCK_SZ;
487     loff_t buffer_offset = 0;
488     total_blocks_merged_ = 0;
489     overlap_ = false;
490     dest_blocks_.clear();
491     source_blocks_.clear();
492     blocks_.clear();
493     std::vector<const CowOperation*> xor_op_vec;
494 
495     bufsink_.ResetBufferOffset();
496 
497     // Number of ops to be merged in this window. This is a fixed size
498     // except for the last window wherein the number of ops can be less
499     // than the size of the RA window.
500     while (num_ops) {
501         uint64_t source_offset;
502 
503         int linear_blocks = PrepareNextReadAhead(&source_offset, &num_ops, blocks_, xor_op_vec);
504         if (linear_blocks == 0) {
505             // No more blocks to read
506             SNAP_LOG(DEBUG) << " Read-ahead completed....";
507             break;
508         }
509 
510         size_t io_size = (linear_blocks * BLOCK_SZ);
511 
512         // Read from the base device consecutive set of blocks in one shot
513         if (!android::base::ReadFullyAtOffset(backing_store_fd_,
514                                               (char*)ra_temp_buffer_.get() + buffer_offset, io_size,
515                                               source_offset)) {
516             SNAP_PLOG(ERROR) << "Ordered-op failed. Read from backing store: "
517                              << backing_store_device_ << "at block :" << source_offset / BLOCK_SZ
518                              << " offset :" << source_offset % BLOCK_SZ
519                              << " buffer_offset : " << buffer_offset << " io_size : " << io_size
520                              << " buf-addr : " << read_ahead_buffer_;
521 
522             snapuserd_->ReadAheadIOFailed();
523             return false;
524         }
525 
526         buffer_offset += io_size;
527         total_blocks_merged_ += linear_blocks;
528         num_ops -= linear_blocks;
529     }
530 
531     // Done with merging ordered ops
532     if (RAIterDone() && total_blocks_merged_ == 0) {
533         return true;
534     }
535 
536     loff_t metadata_offset = 0;
537 
538     struct ScratchMetadata* bm = reinterpret_cast<struct ScratchMetadata*>(
539             (char*)ra_temp_meta_buffer_.get() + metadata_offset);
540 
541     bm->new_block = 0;
542     bm->file_offset = 0;
543 
544     loff_t file_offset = snapuserd_->GetBufferDataOffset();
545 
546     loff_t offset = 0;
547     CHECK(blocks_.size() == total_blocks_merged_);
548 
549     size_t xor_index = 0;
550     BufferSink bufsink;
551     bufsink.Initialize(BLOCK_SZ * 2);
552 
553     for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
554         void* bufptr = static_cast<void*>((char*)ra_temp_buffer_.get() + offset);
555         uint64_t new_block = blocks_[block_index];
556 
557         if (xor_index < xor_op_vec.size()) {
558             const CowOperation* xor_op = xor_op_vec[xor_index];
559 
560             // Check if this block is an XOR op
561             if (xor_op->new_block == new_block) {
562                 // Read the xor'ed data from COW
563                 if (!reader_->ReadData(*xor_op, &bufsink)) {
564                     SNAP_LOG(ERROR)
565                             << " ReadAhead - XorOp Read failed for block: " << xor_op->new_block;
566                     snapuserd_->ReadAheadIOFailed();
567                     return false;
568                 }
569                 // Pointer to the data read from base device
570                 uint8_t* buffer = reinterpret_cast<uint8_t*>(bufptr);
571                 // Get the xor'ed data read from COW device
572                 uint8_t* xor_data = reinterpret_cast<uint8_t*>(bufsink.GetPayloadBufPtr());
573 
574                 // Retrieve the original data
575                 for (size_t byte_offset = 0; byte_offset < BLOCK_SZ; byte_offset++) {
576                     buffer[byte_offset] ^= xor_data[byte_offset];
577                 }
578 
579                 // Move to next XOR op
580                 xor_index += 1;
581             }
582         }
583 
584         offset += BLOCK_SZ;
585         // Track the metadata blocks which are stored in scratch space
586         bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
587                                                        metadata_offset);
588 
589         bm->new_block = new_block;
590         bm->file_offset = file_offset;
591 
592         metadata_offset += sizeof(struct ScratchMetadata);
593         file_offset += BLOCK_SZ;
594     }
595 
596     // Verify if all the xor blocks were scanned to retrieve the original data
597     CHECK(xor_index == xor_op_vec.size());
598 
599     // This is important - explicitly set the contents to zero. This is used
600     // when re-constructing the data after crash. This indicates end of
601     // reading metadata contents when re-constructing the data
602     bm = reinterpret_cast<struct ScratchMetadata*>((char*)ra_temp_meta_buffer_.get() +
603                                                    metadata_offset);
604     bm->new_block = 0;
605     bm->file_offset = 0;
606 
607     return true;
608 }
609 
ReadAheadIOStart()610 bool ReadAhead::ReadAheadIOStart() {
611     // Check if the data has to be constructed from the COW file.
612     // This will be true only once during boot up after a crash
613     // during merge.
614     if (snapuserd_->ShouldReconstructDataFromCow()) {
615         return ReconstructDataFromCow();
616     }
617 
618     bool retry = false;
619     bool ra_status;
620 
621     // Start Async read-ahead
622     if (read_ahead_async_) {
623         ra_status = ReadAheadAsyncIO();
624         if (!ra_status) {
625             SNAP_LOG(ERROR) << "ReadAheadAsyncIO failed - Falling back synchronous I/O";
626             FinalizeIouring();
627             RAResetIter(total_blocks_merged_);
628             retry = true;
629             read_ahead_async_ = false;
630         }
631     }
632 
633     // Check if we need to fallback and retry the merge
634     //
635     // If the device doesn't support async operations, we
636     // will directly enter here (aka devices with 4.x kernels)
637 
638     const bool ra_sync_required = (retry || !read_ahead_async_);
639 
640     if (ra_sync_required) {
641         ra_status = ReadAheadSyncIO();
642         if (!ra_status) {
643             SNAP_LOG(ERROR) << "ReadAheadSyncIO failed";
644             return false;
645         }
646     }
647 
648     SNAP_LOG(DEBUG) << "Read-ahead: total_ra_blocks_merged: " << total_ra_blocks_completed_;
649 
650     // Wait for the merge to finish for the previous RA window. We shouldn't
651     // be touching the scratch space until merge is complete of previous RA
652     // window. If there is a crash during this time frame, merge should resume
653     // based on the contents of the scratch space.
654     if (!snapuserd_->WaitForMergeReady()) {
655         return false;
656     }
657 
658     // Copy the data to scratch space
659     memcpy(metadata_buffer_, ra_temp_meta_buffer_.get(), snapuserd_->GetBufferMetadataSize());
660     memcpy(read_ahead_buffer_, ra_temp_buffer_.get(), total_blocks_merged_ * BLOCK_SZ);
661 
662     loff_t offset = 0;
663     std::unordered_map<uint64_t, void*>& read_ahead_buffer_map = snapuserd_->GetReadAheadMap();
664     read_ahead_buffer_map.clear();
665 
666     for (size_t block_index = 0; block_index < blocks_.size(); block_index++) {
667         void* bufptr = static_cast<void*>((char*)read_ahead_buffer_ + offset);
668         uint64_t new_block = blocks_[block_index];
669 
670         read_ahead_buffer_map[new_block] = bufptr;
671         offset += BLOCK_SZ;
672     }
673 
674     total_ra_blocks_completed_ += total_blocks_merged_;
675     snapuserd_->SetMergedBlockCountForNextCommit(total_blocks_merged_);
676 
677     // Flush the data only if we have a overlapping blocks in the region
678     // Notify the Merge thread to resume merging this window
679     if (!snapuserd_->ReadAheadIOCompleted(overlap_)) {
680         SNAP_LOG(ERROR) << "ReadAheadIOCompleted failed...";
681         snapuserd_->ReadAheadIOFailed();
682         return false;
683     }
684 
685     return true;
686 }
687 
InitializeIouring()688 bool ReadAhead::InitializeIouring() {
689     if (!snapuserd_->IsIouringSupported()) {
690         return false;
691     }
692 
693     ring_ = std::make_unique<struct io_uring>();
694 
695     int ret = io_uring_queue_init(queue_depth_, ring_.get(), 0);
696     if (ret) {
697         SNAP_LOG(ERROR) << "io_uring_queue_init failed with ret: " << ret;
698         return false;
699     }
700 
701     // For xor ops processing
702     bufsink_.Initialize(PAYLOAD_BUFFER_SZ * 2);
703     read_ahead_async_ = true;
704 
705     SNAP_LOG(INFO) << "Read-ahead: io_uring initialized with queue depth: " << queue_depth_;
706     return true;
707 }
708 
FinalizeIouring()709 void ReadAhead::FinalizeIouring() {
710     if (read_ahead_async_) {
711         io_uring_queue_exit(ring_.get());
712     }
713 }
714 
RunThread()715 bool ReadAhead::RunThread() {
716     if (!InitializeFds()) {
717         return false;
718     }
719 
720     InitializeBuffer();
721 
722     if (!InitReader()) {
723         return false;
724     }
725 
726     InitializeRAIter();
727 
728     InitializeIouring();
729 
730     if (setpriority(PRIO_PROCESS, gettid(), kNiceValueForMergeThreads)) {
731         SNAP_PLOG(ERROR) << "Failed to set priority for TID: " << gettid();
732     }
733 
734     while (!RAIterDone()) {
735         if (!ReadAheadIOStart()) {
736             break;
737         }
738     }
739 
740     FinalizeIouring();
741     CloseFds();
742     reader_->CloseCowFd();
743 
744     SNAP_LOG(INFO) << " ReadAhead thread terminating....";
745     return true;
746 }
747 
748 // Initialization
InitializeFds()749 bool ReadAhead::InitializeFds() {
750     backing_store_fd_.reset(open(backing_store_device_.c_str(), O_RDONLY));
751     if (backing_store_fd_ < 0) {
752         SNAP_PLOG(ERROR) << "Open Failed: " << backing_store_device_;
753         return false;
754     }
755 
756     cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
757     if (cow_fd_ < 0) {
758         SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
759         return false;
760     }
761 
762     return true;
763 }
764 
InitReader()765 bool ReadAhead::InitReader() {
766     reader_ = snapuserd_->CloneReaderForWorker();
767 
768     if (!reader_->InitForMerge(std::move(cow_fd_))) {
769         return false;
770     }
771     return true;
772 }
773 
InitializeRAIter()774 void ReadAhead::InitializeRAIter() {
775     cowop_iter_ = reader_->GetOpIter(true);
776 }
777 
RAIterDone()778 bool ReadAhead::RAIterDone() {
779     if (cowop_iter_->Done()) {
780         return true;
781     }
782 
783     const CowOperation* cow_op = GetRAOpIter();
784 
785     if (!IsOrderedOp(*cow_op)) {
786         return true;
787     }
788 
789     return false;
790 }
791 
RAIterNext()792 void ReadAhead::RAIterNext() {
793     cowop_iter_->Next();
794 }
795 
RAResetIter(uint64_t num_blocks)796 void ReadAhead::RAResetIter(uint64_t num_blocks) {
797     while (num_blocks && !cowop_iter_->RDone()) {
798         cowop_iter_->Prev();
799         num_blocks -= 1;
800     }
801 }
802 
GetRAOpIter()803 const CowOperation* ReadAhead::GetRAOpIter() {
804     const CowOperation* cow_op = &cowop_iter_->Get();
805     return cow_op;
806 }
807 
InitializeBuffer()808 void ReadAhead::InitializeBuffer() {
809     void* mapped_addr = snapuserd_->GetMappedAddr();
810     // Map the scratch space region into memory
811     metadata_buffer_ =
812             static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferMetadataOffset());
813     read_ahead_buffer_ = static_cast<void*>((char*)mapped_addr + snapuserd_->GetBufferDataOffset());
814 
815     ra_temp_buffer_ = std::make_unique<uint8_t[]>(snapuserd_->GetBufferDataSize());
816     ra_temp_meta_buffer_ = std::make_unique<uint8_t[]>(snapuserd_->GetBufferMetadataSize());
817 }
818 
819 }  // namespace snapshot
820 }  // namespace android
821