1 // Copyright (C) 2021 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #pragma once
16 
17 #include <linux/types.h>
18 #include <stdint.h>
19 #include <stdlib.h>
20 #include <sys/mman.h>
21 #include <sys/resource.h>
22 #include <sys/time.h>
23 #include <unistd.h>
24 
25 #include <condition_variable>
26 #include <cstring>
27 #include <future>
28 #include <iostream>
29 #include <limits>
30 #include <mutex>
31 #include <string>
32 #include <thread>
33 #include <unordered_map>
34 #include <unordered_set>
35 #include <vector>
36 
37 #include <android-base/file.h>
38 #include <android-base/logging.h>
39 #include <android-base/stringprintf.h>
40 #include <android-base/unique_fd.h>
41 #include <ext4_utils/ext4_utils.h>
42 #include <libdm/dm.h>
43 #include <libsnapshot/cow_reader.h>
44 #include <libsnapshot/cow_writer.h>
45 #include <liburing.h>
46 #include <snapuserd/snapuserd_buffer.h>
47 #include <snapuserd/snapuserd_kernel.h>
48 #include <storage_literals/storage_literals.h>
49 
50 namespace android {
51 namespace snapshot {
52 
53 using android::base::unique_fd;
54 using namespace std::chrono_literals;
55 using namespace android::storage_literals;
56 
57 static constexpr size_t PAYLOAD_BUFFER_SZ = (1UL << 20);
58 static_assert(PAYLOAD_BUFFER_SZ >= BLOCK_SZ);
59 
60 static constexpr int kNumWorkerThreads = 4;
61 
62 static constexpr int kNiceValueForMergeThreads = -5;
63 
64 #define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
65 #define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
66 
67 enum class MERGE_IO_TRANSITION {
68     MERGE_READY,
69     MERGE_BEGIN,
70     MERGE_FAILED,
71     MERGE_COMPLETE,
72     IO_TERMINATED,
73     READ_AHEAD_FAILURE,
74 };
75 
76 class SnapshotHandler;
77 
78 enum class MERGE_GROUP_STATE {
79     GROUP_MERGE_PENDING,
80     GROUP_MERGE_RA_READY,
81     GROUP_MERGE_IN_PROGRESS,
82     GROUP_MERGE_COMPLETED,
83     GROUP_MERGE_FAILED,
84     GROUP_INVALID,
85 };
86 
87 struct MergeGroupState {
88     MERGE_GROUP_STATE merge_state_;
89     // Ref count I/O when group state
90     // is in "GROUP_MERGE_PENDING"
91     size_t num_ios_in_progress;
92     std::mutex m_lock;
93     std::condition_variable m_cv;
94 
MergeGroupStateMergeGroupState95     MergeGroupState(MERGE_GROUP_STATE state, size_t n_ios)
96         : merge_state_(state), num_ios_in_progress(n_ios) {}
97 };
98 
99 class ReadAhead {
100   public:
101     ReadAhead(const std::string& cow_device, const std::string& backing_device,
102               const std::string& misc_name, std::shared_ptr<SnapshotHandler> snapuserd);
103     bool RunThread();
104 
105   private:
106     void InitializeRAIter();
107     bool RAIterDone();
108     void RAIterNext();
109     void RAResetIter(uint64_t num_blocks);
110     const CowOperation* GetRAOpIter();
111 
112     void InitializeBuffer();
113     bool InitReader();
114     bool InitializeFds();
115 
CloseFds()116     void CloseFds() { backing_store_fd_ = {}; }
117 
118     bool ReadAheadIOStart();
119     int PrepareNextReadAhead(uint64_t* source_offset, int* pending_ops,
120                              std::vector<uint64_t>& blocks,
121                              std::vector<const CowOperation*>& xor_op_vec);
122     bool ReconstructDataFromCow();
123     void CheckOverlap(const CowOperation* cow_op);
124 
125     bool ReadAheadAsyncIO();
126     bool ReapIoCompletions(int pending_ios_to_complete);
127     bool ReadXorData(size_t block_index, size_t xor_op_index,
128                      std::vector<const CowOperation*>& xor_op_vec);
129     void ProcessXorData(size_t& block_xor_index, size_t& xor_index,
130                         std::vector<const CowOperation*>& xor_op_vec, void* buffer,
131                         loff_t& buffer_offset);
132     void UpdateScratchMetadata();
133 
134     bool ReadAheadSyncIO();
135     bool InitializeIouring();
136     void FinalizeIouring();
137 
138     void* read_ahead_buffer_;
139     void* metadata_buffer_;
140 
141     std::unique_ptr<ICowOpIter> cowop_iter_;
142 
143     std::string cow_device_;
144     std::string backing_store_device_;
145     std::string misc_name_;
146 
147     unique_fd cow_fd_;
148     unique_fd backing_store_fd_;
149 
150     std::shared_ptr<SnapshotHandler> snapuserd_;
151     std::unique_ptr<CowReader> reader_;
152 
153     std::unordered_set<uint64_t> dest_blocks_;
154     std::unordered_set<uint64_t> source_blocks_;
155     bool overlap_;
156     std::vector<uint64_t> blocks_;
157     int total_blocks_merged_ = 0;
158     std::unique_ptr<uint8_t[]> ra_temp_buffer_;
159     std::unique_ptr<uint8_t[]> ra_temp_meta_buffer_;
160     BufferSink bufsink_;
161 
162     uint64_t total_ra_blocks_completed_ = 0;
163     bool read_ahead_async_ = false;
164     // Queue depth of 8 seems optimal. We don't want
165     // to have a huge depth as it may put more memory pressure
166     // on the kernel worker threads given that we use
167     // IOSQE_ASYNC flag - ASYNC flags can potentially
168     // result in EINTR; Since we don't restart
169     // syscalls and fallback to synchronous I/O, we
170     // don't want huge queue depth
171     int queue_depth_ = 8;
172     std::unique_ptr<struct io_uring> ring_;
173 };
174 
175 class UpdateVerify {
176   public:
177     UpdateVerify(const std::string& misc_name);
178     void VerifyUpdatePartition();
179     bool CheckPartitionVerification();
180 
181   private:
182     enum class UpdateVerifyState {
183         VERIFY_UNKNOWN,
184         VERIFY_FAILED,
185         VERIFY_SUCCESS,
186     };
187 
188     std::string misc_name_;
189     UpdateVerifyState state_;
190     std::mutex m_lock_;
191     std::condition_variable m_cv_;
192 
193     int kMinThreadsToVerify = 1;
194     int kMaxThreadsToVerify = 4;
195     uint64_t kThresholdSize = 512_MiB;
196     uint64_t kBlockSizeVerify = 1_MiB;
197 
IsBlockAligned(uint64_t read_size)198     bool IsBlockAligned(uint64_t read_size) { return ((read_size & (BLOCK_SZ - 1)) == 0); }
199     void UpdatePartitionVerificationState(UpdateVerifyState state);
200     bool VerifyPartition(const std::string& partition_name, const std::string& dm_block_device);
201     bool VerifyBlocks(const std::string& partition_name, const std::string& dm_block_device,
202                       off_t offset, int skip_blocks, uint64_t dev_sz);
203 };
204 
205 class Worker {
206   public:
207     Worker(const std::string& cow_device, const std::string& backing_device,
208            const std::string& control_device, const std::string& misc_name,
209            const std::string& base_path_merge, std::shared_ptr<SnapshotHandler> snapuserd);
210     bool RunThread();
211     bool RunMergeThread();
212     bool Init();
213 
214   private:
215     // Initialization
216     void InitializeBufsink();
217     bool InitializeFds();
218     bool InitReader();
CloseFds()219     void CloseFds() {
220         ctrl_fd_ = {};
221         backing_store_fd_ = {};
222         base_path_merge_fd_ = {};
223     }
224 
225     // Functions interacting with dm-user
226     bool ReadDmUserHeader();
227     bool WriteDmUserPayload(size_t size, bool header_response);
228     bool DmuserReadRequest();
229 
230     // IO Path
231     bool ProcessIORequest();
IsBlockAligned(size_t size)232     bool IsBlockAligned(size_t size) { return ((size & (BLOCK_SZ - 1)) == 0); }
233 
234     bool ReadDataFromBaseDevice(sector_t sector, size_t read_size);
235     bool ReadFromSourceDevice(const CowOperation* cow_op);
236 
237     bool ReadAlignedSector(sector_t sector, size_t sz, bool header_response);
238     bool ReadUnalignedSector(sector_t sector, size_t size);
239     int ReadUnalignedSector(sector_t sector, size_t size,
240                             std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
241     bool RespondIOError(bool header_response);
242 
243     // Processing COW operations
244     bool ProcessCowOp(const CowOperation* cow_op);
245     bool ProcessReplaceOp(const CowOperation* cow_op);
246     bool ProcessZeroOp();
247 
248     // Handles Copy and Xor
249     bool ProcessCopyOp(const CowOperation* cow_op);
250     bool ProcessXorOp(const CowOperation* cow_op);
251     bool ProcessOrderedOp(const CowOperation* cow_op);
252 
253     // Merge related ops
254     bool Merge();
255     bool AsyncMerge();
256     bool SyncMerge();
257     bool MergeOrderedOps();
258     bool MergeOrderedOpsAsync();
259     bool MergeReplaceZeroOps();
260     int PrepareMerge(uint64_t* source_offset, int* pending_ops,
261                      std::vector<const CowOperation*>* replace_zero_vec = nullptr);
262 
ChunkToSector(chunk_t chunk)263     sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
SectorToChunk(sector_t sector)264     chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
265 
266     bool InitializeIouring();
267     void FinalizeIouring();
268 
269     std::unique_ptr<CowReader> reader_;
270     BufferSink bufsink_;
271     XorSink xorsink_;
272 
273     std::string cow_device_;
274     std::string backing_store_device_;
275     std::string control_device_;
276     std::string misc_name_;
277     std::string base_path_merge_;
278 
279     unique_fd cow_fd_;
280     unique_fd backing_store_fd_;
281     unique_fd base_path_merge_fd_;
282     unique_fd ctrl_fd_;
283 
284     std::unique_ptr<ICowOpIter> cowop_iter_;
285     size_t ra_block_index_ = 0;
286     uint64_t blocks_merged_in_group_ = 0;
287     bool merge_async_ = false;
288     // Queue depth of 8 seems optimal. We don't want
289     // to have a huge depth as it may put more memory pressure
290     // on the kernel worker threads given that we use
291     // IOSQE_ASYNC flag - ASYNC flags can potentially
292     // result in EINTR; Since we don't restart
293     // syscalls and fallback to synchronous I/O, we
294     // don't want huge queue depth
295     int queue_depth_ = 8;
296     std::unique_ptr<struct io_uring> ring_;
297 
298     std::shared_ptr<SnapshotHandler> snapuserd_;
299 };
300 
301 class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
302   public:
303     SnapshotHandler(std::string misc_name, std::string cow_device, std::string backing_device,
304                     std::string base_path_merge, int num_workers, bool use_iouring,
305                     bool perform_verification);
306     bool InitCowDevice();
307     bool Start();
308 
GetControlDevicePath()309     const std::string& GetControlDevicePath() { return control_device_; }
GetMiscName()310     const std::string& GetMiscName() { return misc_name_; }
GetNumSectors()311     const uint64_t& GetNumSectors() { return num_sectors_; }
IsAttached()312     const bool& IsAttached() const { return attached_; }
AttachControlDevice()313     void AttachControlDevice() { attached_ = true; }
314 
315     bool CheckMergeCompletionStatus();
316     bool CommitMerge(int num_merge_ops);
317 
CloseFds()318     void CloseFds() { cow_fd_ = {}; }
FreeResources()319     void FreeResources() {
320         worker_threads_.clear();
321         read_ahead_thread_ = nullptr;
322         merge_thread_ = nullptr;
323     }
324 
325     bool InitializeWorkers();
326     std::unique_ptr<CowReader> CloneReaderForWorker();
GetSharedPtr()327     std::shared_ptr<SnapshotHandler> GetSharedPtr() { return shared_from_this(); }
328 
GetChunkVec()329     std::vector<std::pair<sector_t, const CowOperation*>>& GetChunkVec() { return chunk_vec_; }
330 
compare(std::pair<sector_t,const CowOperation * > p1,std::pair<sector_t,const CowOperation * > p2)331     static bool compare(std::pair<sector_t, const CowOperation*> p1,
332                         std::pair<sector_t, const CowOperation*> p2) {
333         return p1.first < p2.first;
334     }
335 
336     void UnmapBufferRegion();
337     bool MmapMetadata();
338 
339     // Read-ahead related functions
GetMappedAddr()340     void* GetMappedAddr() { return mapped_addr_; }
341     void PrepareReadAhead();
GetReadAheadMap()342     std::unordered_map<uint64_t, void*>& GetReadAheadMap() { return read_ahead_buffer_map_; }
343 
344     // State transitions for merge
345     void InitiateMerge();
346     void MonitorMerge();
347     void WakeupMonitorMergeThread();
348     void WaitForMergeComplete();
349     bool WaitForMergeBegin();
350     void NotifyRAForMergeReady();
351     bool WaitForMergeReady();
352     void MergeFailed();
353     bool IsIOTerminated();
354     void MergeCompleted();
355     void NotifyIOTerminated();
356     bool ReadAheadIOCompleted(bool sync);
357     void ReadAheadIOFailed();
358 
ShouldReconstructDataFromCow()359     bool ShouldReconstructDataFromCow() { return populate_data_from_cow_; }
FinishReconstructDataFromCow()360     void FinishReconstructDataFromCow() { populate_data_from_cow_ = false; }
361     // Return the snapshot status
362     std::string GetMergeStatus();
363 
364     // RA related functions
365     uint64_t GetBufferMetadataOffset();
366     size_t GetBufferMetadataSize();
367     size_t GetBufferDataOffset();
368     size_t GetBufferDataSize();
369 
370     // Total number of blocks to be merged in a given read-ahead buffer region
SetMergedBlockCountForNextCommit(int x)371     void SetMergedBlockCountForNextCommit(int x) { total_ra_blocks_merged_ = x; }
GetTotalBlocksToMerge()372     int GetTotalBlocksToMerge() { return total_ra_blocks_merged_; }
MergeInitiated()373     bool MergeInitiated() { return merge_initiated_; }
MergeMonitored()374     bool MergeMonitored() { return merge_monitored_; }
GetMergePercentage()375     double GetMergePercentage() { return merge_completion_percentage_; }
376 
377     // Merge Block State Transitions
378     void SetMergeCompleted(size_t block_index);
379     void SetMergeInProgress(size_t block_index);
380     void SetMergeFailed(size_t block_index);
381     void NotifyIOCompletion(uint64_t new_block);
382     bool GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block, void* buffer);
383     MERGE_GROUP_STATE ProcessMergingBlock(uint64_t new_block, void* buffer);
384 
385     bool IsIouringSupported();
CheckPartitionVerification()386     bool CheckPartitionVerification() { return update_verify_->CheckPartitionVerification(); }
387 
388   private:
389     bool ReadMetadata();
ChunkToSector(chunk_t chunk)390     sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
SectorToChunk(sector_t sector)391     chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
IsBlockAligned(uint64_t read_size)392     bool IsBlockAligned(uint64_t read_size) { return ((read_size & (BLOCK_SZ - 1)) == 0); }
393     struct BufferState* GetBufferState();
394     void UpdateMergeCompletionPercentage();
395 
396     // COW device
397     std::string cow_device_;
398     // Source device
399     std::string backing_store_device_;
400     // dm-user control device
401     std::string control_device_;
402     std::string misc_name_;
403     // Base device for merging
404     std::string base_path_merge_;
405 
406     unique_fd cow_fd_;
407 
408     uint64_t num_sectors_;
409 
410     std::unique_ptr<CowReader> reader_;
411 
412     // chunk_vec stores the pseudo mapping of sector
413     // to COW operations.
414     std::vector<std::pair<sector_t, const CowOperation*>> chunk_vec_;
415 
416     std::mutex lock_;
417     std::condition_variable cv;
418 
419     void* mapped_addr_;
420     size_t total_mapped_addr_length_;
421 
422     std::vector<std::unique_ptr<Worker>> worker_threads_;
423     // Read-ahead related
424     bool populate_data_from_cow_ = false;
425     bool ra_thread_ = false;
426     int total_ra_blocks_merged_ = 0;
427     MERGE_IO_TRANSITION io_state_;
428     std::unique_ptr<ReadAhead> read_ahead_thread_;
429     std::unordered_map<uint64_t, void*> read_ahead_buffer_map_;
430 
431     // user-space-merging
432     std::unordered_map<uint64_t, int> block_to_ra_index_;
433 
434     // Merge Block state
435     std::vector<std::unique_ptr<MergeGroupState>> merge_blk_state_;
436 
437     std::unique_ptr<Worker> merge_thread_;
438     double merge_completion_percentage_;
439 
440     bool merge_initiated_ = false;
441     bool merge_monitored_ = false;
442     bool attached_ = false;
443     bool is_io_uring_enabled_ = false;
444     bool scratch_space_ = false;
445     int num_worker_threads_ = kNumWorkerThreads;
446     bool perform_verification_ = true;
447 
448     std::unique_ptr<struct io_uring> ring_;
449     std::unique_ptr<UpdateVerify> update_verify_;
450 };
451 
452 }  // namespace snapshot
453 }  // namespace android
454