1 // Copyright (C) 2021 The Android Open Source Project 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #pragma once 16 17 #include <linux/types.h> 18 #include <stdint.h> 19 #include <stdlib.h> 20 #include <sys/mman.h> 21 #include <sys/resource.h> 22 #include <sys/time.h> 23 #include <unistd.h> 24 25 #include <condition_variable> 26 #include <cstring> 27 #include <future> 28 #include <iostream> 29 #include <limits> 30 #include <mutex> 31 #include <string> 32 #include <thread> 33 #include <unordered_map> 34 #include <unordered_set> 35 #include <vector> 36 37 #include <android-base/file.h> 38 #include <android-base/logging.h> 39 #include <android-base/stringprintf.h> 40 #include <android-base/unique_fd.h> 41 #include <ext4_utils/ext4_utils.h> 42 #include <libdm/dm.h> 43 #include <libsnapshot/cow_reader.h> 44 #include <libsnapshot/cow_writer.h> 45 #include <liburing.h> 46 #include <snapuserd/snapuserd_buffer.h> 47 #include <snapuserd/snapuserd_kernel.h> 48 #include <storage_literals/storage_literals.h> 49 50 namespace android { 51 namespace snapshot { 52 53 using android::base::unique_fd; 54 using namespace std::chrono_literals; 55 using namespace android::storage_literals; 56 57 static constexpr size_t PAYLOAD_BUFFER_SZ = (1UL << 20); 58 static_assert(PAYLOAD_BUFFER_SZ >= BLOCK_SZ); 59 60 static constexpr int kNumWorkerThreads = 4; 61 62 static constexpr int kNiceValueForMergeThreads = -5; 63 64 #define SNAP_LOG(level) LOG(level) << misc_name_ << ": " 65 #define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": " 66 67 enum class MERGE_IO_TRANSITION { 68 MERGE_READY, 69 MERGE_BEGIN, 70 MERGE_FAILED, 71 MERGE_COMPLETE, 72 IO_TERMINATED, 73 READ_AHEAD_FAILURE, 74 }; 75 76 class SnapshotHandler; 77 78 enum class MERGE_GROUP_STATE { 79 GROUP_MERGE_PENDING, 80 GROUP_MERGE_RA_READY, 81 GROUP_MERGE_IN_PROGRESS, 82 GROUP_MERGE_COMPLETED, 83 GROUP_MERGE_FAILED, 84 GROUP_INVALID, 85 }; 86 87 struct MergeGroupState { 88 MERGE_GROUP_STATE merge_state_; 89 // Ref count I/O when group state 90 // is in "GROUP_MERGE_PENDING" 91 size_t num_ios_in_progress; 92 std::mutex m_lock; 93 std::condition_variable m_cv; 94 MergeGroupStateMergeGroupState95 MergeGroupState(MERGE_GROUP_STATE state, size_t n_ios) 96 : merge_state_(state), num_ios_in_progress(n_ios) {} 97 }; 98 99 class ReadAhead { 100 public: 101 ReadAhead(const std::string& cow_device, const std::string& backing_device, 102 const std::string& misc_name, std::shared_ptr<SnapshotHandler> snapuserd); 103 bool RunThread(); 104 105 private: 106 void InitializeRAIter(); 107 bool RAIterDone(); 108 void RAIterNext(); 109 void RAResetIter(uint64_t num_blocks); 110 const CowOperation* GetRAOpIter(); 111 112 void InitializeBuffer(); 113 bool InitReader(); 114 bool InitializeFds(); 115 CloseFds()116 void CloseFds() { backing_store_fd_ = {}; } 117 118 bool ReadAheadIOStart(); 119 int PrepareNextReadAhead(uint64_t* source_offset, int* pending_ops, 120 std::vector<uint64_t>& blocks, 121 std::vector<const CowOperation*>& xor_op_vec); 122 bool ReconstructDataFromCow(); 123 void CheckOverlap(const CowOperation* cow_op); 124 125 bool ReadAheadAsyncIO(); 126 bool ReapIoCompletions(int pending_ios_to_complete); 127 bool ReadXorData(size_t block_index, size_t xor_op_index, 128 std::vector<const CowOperation*>& xor_op_vec); 129 void ProcessXorData(size_t& block_xor_index, size_t& xor_index, 130 std::vector<const CowOperation*>& xor_op_vec, void* buffer, 131 loff_t& buffer_offset); 132 void UpdateScratchMetadata(); 133 134 bool ReadAheadSyncIO(); 135 bool InitializeIouring(); 136 void FinalizeIouring(); 137 138 void* read_ahead_buffer_; 139 void* metadata_buffer_; 140 141 std::unique_ptr<ICowOpIter> cowop_iter_; 142 143 std::string cow_device_; 144 std::string backing_store_device_; 145 std::string misc_name_; 146 147 unique_fd cow_fd_; 148 unique_fd backing_store_fd_; 149 150 std::shared_ptr<SnapshotHandler> snapuserd_; 151 std::unique_ptr<CowReader> reader_; 152 153 std::unordered_set<uint64_t> dest_blocks_; 154 std::unordered_set<uint64_t> source_blocks_; 155 bool overlap_; 156 std::vector<uint64_t> blocks_; 157 int total_blocks_merged_ = 0; 158 std::unique_ptr<uint8_t[]> ra_temp_buffer_; 159 std::unique_ptr<uint8_t[]> ra_temp_meta_buffer_; 160 BufferSink bufsink_; 161 162 uint64_t total_ra_blocks_completed_ = 0; 163 bool read_ahead_async_ = false; 164 // Queue depth of 8 seems optimal. We don't want 165 // to have a huge depth as it may put more memory pressure 166 // on the kernel worker threads given that we use 167 // IOSQE_ASYNC flag - ASYNC flags can potentially 168 // result in EINTR; Since we don't restart 169 // syscalls and fallback to synchronous I/O, we 170 // don't want huge queue depth 171 int queue_depth_ = 8; 172 std::unique_ptr<struct io_uring> ring_; 173 }; 174 175 class UpdateVerify { 176 public: 177 UpdateVerify(const std::string& misc_name); 178 void VerifyUpdatePartition(); 179 bool CheckPartitionVerification(); 180 181 private: 182 enum class UpdateVerifyState { 183 VERIFY_UNKNOWN, 184 VERIFY_FAILED, 185 VERIFY_SUCCESS, 186 }; 187 188 std::string misc_name_; 189 UpdateVerifyState state_; 190 std::mutex m_lock_; 191 std::condition_variable m_cv_; 192 193 int kMinThreadsToVerify = 1; 194 int kMaxThreadsToVerify = 4; 195 uint64_t kThresholdSize = 512_MiB; 196 uint64_t kBlockSizeVerify = 1_MiB; 197 IsBlockAligned(uint64_t read_size)198 bool IsBlockAligned(uint64_t read_size) { return ((read_size & (BLOCK_SZ - 1)) == 0); } 199 void UpdatePartitionVerificationState(UpdateVerifyState state); 200 bool VerifyPartition(const std::string& partition_name, const std::string& dm_block_device); 201 bool VerifyBlocks(const std::string& partition_name, const std::string& dm_block_device, 202 off_t offset, int skip_blocks, uint64_t dev_sz); 203 }; 204 205 class Worker { 206 public: 207 Worker(const std::string& cow_device, const std::string& backing_device, 208 const std::string& control_device, const std::string& misc_name, 209 const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd); 210 bool RunThread(); 211 bool RunMergeThread(); 212 bool Init(); 213 214 private: 215 // Initialization 216 void InitializeBufsink(); 217 bool InitializeFds(); 218 bool InitReader(); CloseFds()219 void CloseFds() { 220 ctrl_fd_ = {}; 221 backing_store_fd_ = {}; 222 base_path_merge_fd_ = {}; 223 } 224 225 // Functions interacting with dm-user 226 bool ReadDmUserHeader(); 227 bool WriteDmUserPayload(size_t size, bool header_response); 228 bool DmuserReadRequest(); 229 230 // IO Path 231 bool ProcessIORequest(); IsBlockAligned(size_t size)232 bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); } 233 234 bool ReadDataFromBaseDevice(sector_t sector, size_t read_size); 235 bool ReadFromSourceDevice(const CowOperation* cow_op); 236 237 bool ReadAlignedSector(sector_t sector, size_t sz, bool header_response); 238 bool ReadUnalignedSector(sector_t sector, size_t size); 239 int ReadUnalignedSector(sector_t sector, size_t size, 240 std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it); 241 bool RespondIOError(bool header_response); 242 243 // Processing COW operations 244 bool ProcessCowOp(const CowOperation* cow_op); 245 bool ProcessReplaceOp(const CowOperation* cow_op); 246 bool ProcessZeroOp(); 247 248 // Handles Copy and Xor 249 bool ProcessCopyOp(const CowOperation* cow_op); 250 bool ProcessXorOp(const CowOperation* cow_op); 251 bool ProcessOrderedOp(const CowOperation* cow_op); 252 253 // Merge related ops 254 bool Merge(); 255 bool AsyncMerge(); 256 bool SyncMerge(); 257 bool MergeOrderedOps(); 258 bool MergeOrderedOpsAsync(); 259 bool MergeReplaceZeroOps(); 260 int PrepareMerge(uint64_t* source_offset, int* pending_ops, 261 std::vector<const CowOperation*>* replace_zero_vec = nullptr); 262 ChunkToSector(chunk_t chunk)263 sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; } SectorToChunk(sector_t sector)264 chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; } 265 266 bool InitializeIouring(); 267 void FinalizeIouring(); 268 269 std::unique_ptr<CowReader> reader_; 270 BufferSink bufsink_; 271 XorSink xorsink_; 272 273 std::string cow_device_; 274 std::string backing_store_device_; 275 std::string control_device_; 276 std::string misc_name_; 277 std::string base_path_merge_; 278 279 unique_fd cow_fd_; 280 unique_fd backing_store_fd_; 281 unique_fd base_path_merge_fd_; 282 unique_fd ctrl_fd_; 283 284 std::unique_ptr<ICowOpIter> cowop_iter_; 285 size_t ra_block_index_ = 0; 286 uint64_t blocks_merged_in_group_ = 0; 287 bool merge_async_ = false; 288 // Queue depth of 8 seems optimal. We don't want 289 // to have a huge depth as it may put more memory pressure 290 // on the kernel worker threads given that we use 291 // IOSQE_ASYNC flag - ASYNC flags can potentially 292 // result in EINTR; Since we don't restart 293 // syscalls and fallback to synchronous I/O, we 294 // don't want huge queue depth 295 int queue_depth_ = 8; 296 std::unique_ptr<struct io_uring> ring_; 297 298 std::shared_ptr<SnapshotHandler> snapuserd_; 299 }; 300 301 class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> { 302 public: 303 SnapshotHandler(std::string misc_name, std::string cow_device, std::string backing_device, 304 std::string base_path_merge, int num_workers, bool use_iouring, 305 bool perform_verification); 306 bool InitCowDevice(); 307 bool Start(); 308 GetControlDevicePath()309 const std::string& GetControlDevicePath() { return control_device_; } GetMiscName()310 const std::string& GetMiscName() { return misc_name_; } GetNumSectors()311 const uint64_t& GetNumSectors() { return num_sectors_; } IsAttached()312 const bool& IsAttached() const { return attached_; } AttachControlDevice()313 void AttachControlDevice() { attached_ = true; } 314 315 bool CheckMergeCompletionStatus(); 316 bool CommitMerge(int num_merge_ops); 317 CloseFds()318 void CloseFds() { cow_fd_ = {}; } FreeResources()319 void FreeResources() { 320 worker_threads_.clear(); 321 read_ahead_thread_ = nullptr; 322 merge_thread_ = nullptr; 323 } 324 325 bool InitializeWorkers(); 326 std::unique_ptr<CowReader> CloneReaderForWorker(); GetSharedPtr()327 std::shared_ptr<SnapshotHandler> GetSharedPtr() { return shared_from_this(); } 328 GetChunkVec()329 std::vector<std::pair<sector_t, const CowOperation*>>& GetChunkVec() { return chunk_vec_; } 330 compare(std::pair<sector_t,const CowOperation * > p1,std::pair<sector_t,const CowOperation * > p2)331 static bool compare(std::pair<sector_t, const CowOperation*> p1, 332 std::pair<sector_t, const CowOperation*> p2) { 333 return p1.first < p2.first; 334 } 335 336 void UnmapBufferRegion(); 337 bool MmapMetadata(); 338 339 // Read-ahead related functions GetMappedAddr()340 void* GetMappedAddr() { return mapped_addr_; } 341 void PrepareReadAhead(); GetReadAheadMap()342 std::unordered_map<uint64_t, void*>& GetReadAheadMap() { return read_ahead_buffer_map_; } 343 344 // State transitions for merge 345 void InitiateMerge(); 346 void MonitorMerge(); 347 void WakeupMonitorMergeThread(); 348 void WaitForMergeComplete(); 349 bool WaitForMergeBegin(); 350 void NotifyRAForMergeReady(); 351 bool WaitForMergeReady(); 352 void MergeFailed(); 353 bool IsIOTerminated(); 354 void MergeCompleted(); 355 void NotifyIOTerminated(); 356 bool ReadAheadIOCompleted(bool sync); 357 void ReadAheadIOFailed(); 358 ShouldReconstructDataFromCow()359 bool ShouldReconstructDataFromCow() { return populate_data_from_cow_; } FinishReconstructDataFromCow()360 void FinishReconstructDataFromCow() { populate_data_from_cow_ = false; } 361 // Return the snapshot status 362 std::string GetMergeStatus(); 363 364 // RA related functions 365 uint64_t GetBufferMetadataOffset(); 366 size_t GetBufferMetadataSize(); 367 size_t GetBufferDataOffset(); 368 size_t GetBufferDataSize(); 369 370 // Total number of blocks to be merged in a given read-ahead buffer region SetMergedBlockCountForNextCommit(int x)371 void SetMergedBlockCountForNextCommit(int x) { total_ra_blocks_merged_ = x; } GetTotalBlocksToMerge()372 int GetTotalBlocksToMerge() { return total_ra_blocks_merged_; } MergeInitiated()373 bool MergeInitiated() { return merge_initiated_; } MergeMonitored()374 bool MergeMonitored() { return merge_monitored_; } GetMergePercentage()375 double GetMergePercentage() { return merge_completion_percentage_; } 376 377 // Merge Block State Transitions 378 void SetMergeCompleted(size_t block_index); 379 void SetMergeInProgress(size_t block_index); 380 void SetMergeFailed(size_t block_index); 381 void NotifyIOCompletion(uint64_t new_block); 382 bool GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block, void* buffer); 383 MERGE_GROUP_STATE ProcessMergingBlock(uint64_t new_block, void* buffer); 384 385 bool IsIouringSupported(); CheckPartitionVerification()386 bool CheckPartitionVerification() { return update_verify_->CheckPartitionVerification(); } 387 388 private: 389 bool ReadMetadata(); ChunkToSector(chunk_t chunk)390 sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; } SectorToChunk(sector_t sector)391 chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; } IsBlockAligned(uint64_t read_size)392 bool IsBlockAligned(uint64_t read_size) { return ((read_size & (BLOCK_SZ - 1)) == 0); } 393 struct BufferState* GetBufferState(); 394 void UpdateMergeCompletionPercentage(); 395 396 // COW device 397 std::string cow_device_; 398 // Source device 399 std::string backing_store_device_; 400 // dm-user control device 401 std::string control_device_; 402 std::string misc_name_; 403 // Base device for merging 404 std::string base_path_merge_; 405 406 unique_fd cow_fd_; 407 408 uint64_t num_sectors_; 409 410 std::unique_ptr<CowReader> reader_; 411 412 // chunk_vec stores the pseudo mapping of sector 413 // to COW operations. 414 std::vector<std::pair<sector_t, const CowOperation*>> chunk_vec_; 415 416 std::mutex lock_; 417 std::condition_variable cv; 418 419 void* mapped_addr_; 420 size_t total_mapped_addr_length_; 421 422 std::vector<std::unique_ptr<Worker>> worker_threads_; 423 // Read-ahead related 424 bool populate_data_from_cow_ = false; 425 bool ra_thread_ = false; 426 int total_ra_blocks_merged_ = 0; 427 MERGE_IO_TRANSITION io_state_; 428 std::unique_ptr<ReadAhead> read_ahead_thread_; 429 std::unordered_map<uint64_t, void*> read_ahead_buffer_map_; 430 431 // user-space-merging 432 std::unordered_map<uint64_t, int> block_to_ra_index_; 433 434 // Merge Block state 435 std::vector<std::unique_ptr<MergeGroupState>> merge_blk_state_; 436 437 std::unique_ptr<Worker> merge_thread_; 438 double merge_completion_percentage_; 439 440 bool merge_initiated_ = false; 441 bool merge_monitored_ = false; 442 bool attached_ = false; 443 bool is_io_uring_enabled_ = false; 444 bool scratch_space_ = false; 445 int num_worker_threads_ = kNumWorkerThreads; 446 bool perform_verification_ = true; 447 448 std::unique_ptr<struct io_uring> ring_; 449 std::unique_ptr<UpdateVerify> update_verify_; 450 }; 451 452 } // namespace snapshot 453 } // namespace android 454