1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "snapuserd_core.h"
18
19 #include <sys/utsname.h>
20
21 #include <android-base/chrono_utils.h>
22 #include <android-base/properties.h>
23 #include <android-base/scopeguard.h>
24 #include <android-base/strings.h>
25
26 namespace android {
27 namespace snapshot {
28
29 using namespace android;
30 using namespace android::dm;
31 using android::base::unique_fd;
32
SnapshotHandler(std::string misc_name,std::string cow_device,std::string backing_device,std::string base_path_merge,int num_worker_threads,bool use_iouring,bool perform_verification)33 SnapshotHandler::SnapshotHandler(std::string misc_name, std::string cow_device,
34 std::string backing_device, std::string base_path_merge,
35 int num_worker_threads, bool use_iouring,
36 bool perform_verification) {
37 misc_name_ = std::move(misc_name);
38 cow_device_ = std::move(cow_device);
39 backing_store_device_ = std::move(backing_device);
40 control_device_ = "/dev/dm-user/" + misc_name_;
41 base_path_merge_ = std::move(base_path_merge);
42 num_worker_threads_ = num_worker_threads;
43 is_io_uring_enabled_ = use_iouring;
44 perform_verification_ = perform_verification;
45 }
46
InitializeWorkers()47 bool SnapshotHandler::InitializeWorkers() {
48 for (int i = 0; i < num_worker_threads_; i++) {
49 std::unique_ptr<Worker> wt =
50 std::make_unique<Worker>(cow_device_, backing_store_device_, control_device_,
51 misc_name_, base_path_merge_, GetSharedPtr());
52 if (!wt->Init()) {
53 SNAP_LOG(ERROR) << "Thread initialization failed";
54 return false;
55 }
56
57 worker_threads_.push_back(std::move(wt));
58 }
59
60 merge_thread_ = std::make_unique<Worker>(cow_device_, backing_store_device_, control_device_,
61 misc_name_, base_path_merge_, GetSharedPtr());
62
63 read_ahead_thread_ = std::make_unique<ReadAhead>(cow_device_, backing_store_device_, misc_name_,
64 GetSharedPtr());
65
66 update_verify_ = std::make_unique<UpdateVerify>(misc_name_);
67
68 return true;
69 }
70
CloneReaderForWorker()71 std::unique_ptr<CowReader> SnapshotHandler::CloneReaderForWorker() {
72 return reader_->CloneCowReader();
73 }
74
UpdateMergeCompletionPercentage()75 void SnapshotHandler::UpdateMergeCompletionPercentage() {
76 struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
77 merge_completion_percentage_ = (ch->num_merge_ops * 100.0) / reader_->get_num_total_data_ops();
78
79 SNAP_LOG(DEBUG) << "Merge-complete %: " << merge_completion_percentage_
80 << " num_merge_ops: " << ch->num_merge_ops
81 << " total-ops: " << reader_->get_num_total_data_ops();
82 }
83
CommitMerge(int num_merge_ops)84 bool SnapshotHandler::CommitMerge(int num_merge_ops) {
85 struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
86 ch->num_merge_ops += num_merge_ops;
87
88 if (scratch_space_) {
89 if (ra_thread_) {
90 struct BufferState* ra_state = GetBufferState();
91 ra_state->read_ahead_state = kCowReadAheadInProgress;
92 }
93
94 int ret = msync(mapped_addr_, BLOCK_SZ, MS_SYNC);
95 if (ret < 0) {
96 SNAP_PLOG(ERROR) << "msync header failed: " << ret;
97 return false;
98 }
99 } else {
100 reader_->UpdateMergeOpsCompleted(num_merge_ops);
101 CowHeader header;
102 reader_->GetHeader(&header);
103
104 if (lseek(cow_fd_.get(), 0, SEEK_SET) < 0) {
105 SNAP_PLOG(ERROR) << "lseek failed";
106 return false;
107 }
108
109 if (!android::base::WriteFully(cow_fd_, &header, sizeof(CowHeader))) {
110 SNAP_PLOG(ERROR) << "Write to header failed";
111 return false;
112 }
113
114 if (fsync(cow_fd_.get()) < 0) {
115 SNAP_PLOG(ERROR) << "fsync failed";
116 return false;
117 }
118 }
119
120 // Update the merge completion - this is used by update engine
121 // to track the completion. No need to take a lock. It is ok
122 // even if there is a miss on reading a latest updated value.
123 // Subsequent polling will eventually converge to completion.
124 UpdateMergeCompletionPercentage();
125
126 return true;
127 }
128
PrepareReadAhead()129 void SnapshotHandler::PrepareReadAhead() {
130 struct BufferState* ra_state = GetBufferState();
131 // Check if the data has to be re-constructed from COW device
132 if (ra_state->read_ahead_state == kCowReadAheadDone) {
133 populate_data_from_cow_ = true;
134 } else {
135 populate_data_from_cow_ = false;
136 }
137
138 NotifyRAForMergeReady();
139 }
140
CheckMergeCompletionStatus()141 bool SnapshotHandler::CheckMergeCompletionStatus() {
142 if (!merge_initiated_) {
143 SNAP_LOG(INFO) << "Merge was not initiated. Total-data-ops: "
144 << reader_->get_num_total_data_ops();
145 return false;
146 }
147
148 struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
149
150 SNAP_LOG(INFO) << "Merge-status: Total-Merged-ops: " << ch->num_merge_ops
151 << " Total-data-ops: " << reader_->get_num_total_data_ops();
152 return true;
153 }
154
ReadMetadata()155 bool SnapshotHandler::ReadMetadata() {
156 reader_ = std::make_unique<CowReader>(CowReader::ReaderFlags::USERSPACE_MERGE, true);
157 CowHeader header;
158 CowOptions options;
159
160 SNAP_LOG(DEBUG) << "ReadMetadata: Parsing cow file";
161
162 if (!reader_->Parse(cow_fd_)) {
163 SNAP_LOG(ERROR) << "Failed to parse";
164 return false;
165 }
166
167 if (!reader_->GetHeader(&header)) {
168 SNAP_LOG(ERROR) << "Failed to get header";
169 return false;
170 }
171
172 if (!(header.block_size == BLOCK_SZ)) {
173 SNAP_LOG(ERROR) << "Invalid header block size found: " << header.block_size;
174 return false;
175 }
176
177 SNAP_LOG(INFO) << "Merge-ops: " << header.num_merge_ops;
178
179 if (!MmapMetadata()) {
180 SNAP_LOG(ERROR) << "mmap failed";
181 return false;
182 }
183
184 UpdateMergeCompletionPercentage();
185
186 // Initialize the iterator for reading metadata
187 std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetOpIter(true);
188
189 int num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ);
190 int ra_index = 0;
191
192 size_t copy_ops = 0, replace_ops = 0, zero_ops = 0, xor_ops = 0;
193
194 while (!cowop_iter->Done()) {
195 const CowOperation* cow_op = &cowop_iter->Get();
196
197 if (cow_op->type == kCowCopyOp) {
198 copy_ops += 1;
199 } else if (cow_op->type == kCowReplaceOp) {
200 replace_ops += 1;
201 } else if (cow_op->type == kCowZeroOp) {
202 zero_ops += 1;
203 } else if (cow_op->type == kCowXorOp) {
204 xor_ops += 1;
205 }
206
207 chunk_vec_.push_back(std::make_pair(ChunkToSector(cow_op->new_block), cow_op));
208
209 if (IsOrderedOp(*cow_op)) {
210 ra_thread_ = true;
211 block_to_ra_index_[cow_op->new_block] = ra_index;
212 num_ra_ops_per_iter -= 1;
213
214 if ((ra_index + 1) - merge_blk_state_.size() == 1) {
215 std::unique_ptr<MergeGroupState> blk_state = std::make_unique<MergeGroupState>(
216 MERGE_GROUP_STATE::GROUP_MERGE_PENDING, 0);
217
218 merge_blk_state_.push_back(std::move(blk_state));
219 }
220
221 // Move to next RA block
222 if (num_ra_ops_per_iter == 0) {
223 num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ);
224 ra_index += 1;
225 }
226 }
227 cowop_iter->Next();
228 }
229
230 chunk_vec_.shrink_to_fit();
231
232 // Sort the vector based on sectors as we need this during un-aligned access
233 std::sort(chunk_vec_.begin(), chunk_vec_.end(), compare);
234
235 PrepareReadAhead();
236
237 SNAP_LOG(INFO) << "Merged-ops: " << header.num_merge_ops
238 << " Total-data-ops: " << reader_->get_num_total_data_ops()
239 << " Unmerged-ops: " << chunk_vec_.size() << " Copy-ops: " << copy_ops
240 << " Zero-ops: " << zero_ops << " Replace-ops: " << replace_ops
241 << " Xor-ops: " << xor_ops;
242
243 return true;
244 }
245
MmapMetadata()246 bool SnapshotHandler::MmapMetadata() {
247 CowHeader header;
248 reader_->GetHeader(&header);
249
250 total_mapped_addr_length_ = header.header_size + BUFFER_REGION_DEFAULT_SIZE;
251
252 if (header.major_version >= 2 && header.buffer_size > 0) {
253 scratch_space_ = true;
254 }
255
256 if (scratch_space_) {
257 mapped_addr_ = mmap(NULL, total_mapped_addr_length_, PROT_READ | PROT_WRITE, MAP_SHARED,
258 cow_fd_.get(), 0);
259 } else {
260 mapped_addr_ = mmap(NULL, total_mapped_addr_length_, PROT_READ | PROT_WRITE,
261 MAP_SHARED | MAP_ANONYMOUS, -1, 0);
262 struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
263 ch->num_merge_ops = header.num_merge_ops;
264 }
265
266 if (mapped_addr_ == MAP_FAILED) {
267 SNAP_LOG(ERROR) << "mmap metadata failed";
268 return false;
269 }
270
271 return true;
272 }
273
UnmapBufferRegion()274 void SnapshotHandler::UnmapBufferRegion() {
275 int ret = munmap(mapped_addr_, total_mapped_addr_length_);
276 if (ret < 0) {
277 SNAP_PLOG(ERROR) << "munmap failed";
278 }
279 }
280
InitCowDevice()281 bool SnapshotHandler::InitCowDevice() {
282 cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
283 if (cow_fd_ < 0) {
284 SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
285 return false;
286 }
287
288 unique_fd fd(TEMP_FAILURE_RETRY(open(base_path_merge_.c_str(), O_RDONLY | O_CLOEXEC)));
289 if (fd < 0) {
290 SNAP_LOG(ERROR) << "Cannot open block device";
291 return false;
292 }
293
294 uint64_t dev_sz = get_block_device_size(fd.get());
295 if (!dev_sz) {
296 SNAP_LOG(ERROR) << "Failed to find block device size: " << base_path_merge_;
297 return false;
298 }
299
300 num_sectors_ = dev_sz >> SECTOR_SHIFT;
301
302 return ReadMetadata();
303 }
304
305 /*
306 * Entry point to launch threads
307 */
Start()308 bool SnapshotHandler::Start() {
309 std::vector<std::future<bool>> threads;
310 std::future<bool> ra_thread_status;
311
312 if (ra_thread_) {
313 ra_thread_status =
314 std::async(std::launch::async, &ReadAhead::RunThread, read_ahead_thread_.get());
315
316 SNAP_LOG(INFO) << "Read-ahead thread started...";
317 }
318
319 // Launch worker threads
320 for (int i = 0; i < worker_threads_.size(); i++) {
321 threads.emplace_back(
322 std::async(std::launch::async, &Worker::RunThread, worker_threads_[i].get()));
323 }
324
325 std::future<bool> merge_thread =
326 std::async(std::launch::async, &Worker::RunMergeThread, merge_thread_.get());
327
328 // Now that the worker threads are up, scan the partitions.
329 if (perform_verification_) {
330 update_verify_->VerifyUpdatePartition();
331 }
332
333 bool ret = true;
334 for (auto& t : threads) {
335 ret = t.get() && ret;
336 }
337
338 // Worker threads are terminated by this point - this can only happen:
339 //
340 // 1: If dm-user device is destroyed
341 // 2: We had an I/O failure when reading root partitions
342 //
343 // In case (1), this would be a graceful shutdown. In this case, merge
344 // thread and RA thread should have already terminated by this point. We will be
345 // destroying the dm-user device only _after_ merge is completed.
346 //
347 // In case (2), if merge thread had started, then it will be
348 // continuing to merge; however, since we had an I/O failure and the
349 // I/O on root partitions are no longer served, we will terminate the
350 // merge
351
352 NotifyIOTerminated();
353
354 bool read_ahead_retval = false;
355
356 SNAP_LOG(INFO) << "Snapshot I/O terminated. Waiting for merge thread....";
357 bool merge_thread_status = merge_thread.get();
358
359 if (ra_thread_) {
360 read_ahead_retval = ra_thread_status.get();
361 }
362
363 SNAP_LOG(INFO) << "Worker threads terminated with ret: " << ret
364 << " Merge-thread with ret: " << merge_thread_status
365 << " RA-thread with ret: " << read_ahead_retval;
366 return ret;
367 }
368
GetBufferMetadataOffset()369 uint64_t SnapshotHandler::GetBufferMetadataOffset() {
370 CowHeader header;
371 reader_->GetHeader(&header);
372
373 return (header.header_size + sizeof(BufferState));
374 }
375
376 /*
377 * Metadata for read-ahead is 16 bytes. For a 2 MB region, we will
378 * end up with 8k (2 PAGE) worth of metadata. Thus, a 2MB buffer
379 * region is split into:
380 *
381 * 1: 8k metadata
382 * 2: Scratch space
383 *
384 */
GetBufferMetadataSize()385 size_t SnapshotHandler::GetBufferMetadataSize() {
386 CowHeader header;
387 reader_->GetHeader(&header);
388 size_t buffer_size = header.buffer_size;
389
390 // If there is no scratch space, then just use the
391 // anonymous memory
392 if (buffer_size == 0) {
393 buffer_size = BUFFER_REGION_DEFAULT_SIZE;
394 }
395
396 return ((buffer_size * sizeof(struct ScratchMetadata)) / BLOCK_SZ);
397 }
398
GetBufferDataOffset()399 size_t SnapshotHandler::GetBufferDataOffset() {
400 CowHeader header;
401 reader_->GetHeader(&header);
402
403 return (header.header_size + GetBufferMetadataSize());
404 }
405
406 /*
407 * (2MB - 8K = 2088960 bytes) will be the buffer region to hold the data.
408 */
GetBufferDataSize()409 size_t SnapshotHandler::GetBufferDataSize() {
410 CowHeader header;
411 reader_->GetHeader(&header);
412 size_t buffer_size = header.buffer_size;
413
414 // If there is no scratch space, then just use the
415 // anonymous memory
416 if (buffer_size == 0) {
417 buffer_size = BUFFER_REGION_DEFAULT_SIZE;
418 }
419
420 return (buffer_size - GetBufferMetadataSize());
421 }
422
GetBufferState()423 struct BufferState* SnapshotHandler::GetBufferState() {
424 CowHeader header;
425 reader_->GetHeader(&header);
426
427 struct BufferState* ra_state =
428 reinterpret_cast<struct BufferState*>((char*)mapped_addr_ + header.header_size);
429 return ra_state;
430 }
431
IsIouringSupported()432 bool SnapshotHandler::IsIouringSupported() {
433 struct utsname uts;
434 unsigned int major, minor;
435
436 if (android::base::GetBoolProperty("snapuserd.test.io_uring.force_disable", false)) {
437 SNAP_LOG(INFO) << "io_uring disabled for testing";
438 return false;
439 }
440
441 if ((uname(&uts) != 0) || (sscanf(uts.release, "%u.%u", &major, &minor) != 2)) {
442 SNAP_LOG(ERROR) << "Could not parse the kernel version from uname. "
443 << " io_uring not supported";
444 return false;
445 }
446
447 // We will only support kernels from 5.6 onwards as IOSQE_ASYNC flag and
448 // IO_URING_OP_READ/WRITE opcodes were introduced only on 5.6 kernel
449 if (major >= 5) {
450 if (major == 5 && minor < 6) {
451 return false;
452 }
453 } else {
454 return false;
455 }
456
457 // During selinux init transition, libsnapshot will propagate the
458 // status of io_uring enablement. As properties are not initialized,
459 // we cannot query system property.
460 if (is_io_uring_enabled_) {
461 return true;
462 }
463
464 // Finally check the system property
465 return android::base::GetBoolProperty("ro.virtual_ab.io_uring.enabled", false);
466 }
467
468 } // namespace snapshot
469 } // namespace android
470