1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "cloud/asset_operation_utils.h"
16 
17 #include <mutex>
18 #include "cloud/cloud_db_types.h"
19 #include "runtime_context.h"
20 namespace DistributedDB {
21 using RecordAssetOpType = AssetOperationUtils::RecordAssetOpType;
22 using CloudSyncAction = AssetOperationUtils::CloudSyncAction;
23 namespace {
24 std::once_flag g_init;
25 using Reaction = std::function<AssetOperationUtils::AssetOpType (const Asset &, const Assets &)>;
26 std::map<CloudSyncAction, Reaction> reactions;
GetReaction(const CloudSyncAction & action)27 Reaction GetReaction(const CloudSyncAction &action)
28 {
29     if (reactions.find(action) != reactions.end()) {
30         return reactions[action];
31     } else {
32         return reactions[CloudSyncAction::DEFAULT_ACTION];
33     }
34 }
35 }
36 
CalAssetOperation(const VBucket & cacheAssets,const VBucket & dbAssets,const CloudSyncAction & action)37 RecordAssetOpType AssetOperationUtils::CalAssetOperation(const VBucket &cacheAssets,
38     const VBucket &dbAssets, const CloudSyncAction &action)
39 {
40     std::call_once(g_init, Init);
41     // switch produce function by action
42     Reaction reaction = GetReaction(action);
43     RecordAssetOpType res;
44     // check each cache asset with db asset by same col name and asset name
45     for (const auto &[colName, colData] : cacheAssets) {
46         auto checkAssets = GetAssets(colName, dbAssets);
47         if (TYPE_INDEX<Asset> == colData.index()) {
48             auto asset = std::get<Asset>(colData);
49             res[colName][asset.name] = reaction(asset, checkAssets);
50         } else if (TYPE_INDEX<Assets> == colData.index()) {
51             auto assets = std::get<Assets>(colData);
52             for (const auto &asset : assets) {
53                 res[colName][asset.name] = reaction(asset, checkAssets);
54             }
55         }
56     }
57     return res;
58 }
59 
CalAssetOperation(const std::string & colName,const Asset & cacheAsset,const VBucket & dbAssets,const AssetOperationUtils::CloudSyncAction & action)60 AssetOperationUtils::AssetOpType AssetOperationUtils::CalAssetOperation(const std::string &colName,
61     const Asset &cacheAsset, const VBucket &dbAssets, const AssetOperationUtils::CloudSyncAction &action)
62 {
63     std::call_once(g_init, Init);
64     // switch produce function by action
65     Reaction reaction = GetReaction(action);
66     return reaction(cacheAsset, GetAssets(colName, dbAssets));
67 }
68 
EraseBitMask(uint32_t status)69 uint32_t AssetOperationUtils::EraseBitMask(uint32_t status)
70 {
71     return ((status << BIT_MASK_COUNT) >> BIT_MASK_COUNT);
72 }
73 
UpdateAssetsFlag(std::vector<VBucket> & from,std::vector<VBucket> & target)74 void AssetOperationUtils::UpdateAssetsFlag(std::vector<VBucket> &from, std::vector<VBucket> &target)
75 {
76     if (from.size() != target.size()) {
77         LOGW("the num of VBucket are not equal when update assets flag.");
78         return;
79     }
80     for (size_t i = 0; i < from.size(); ++i) {
81         VBucket &fromRecord = from[i];
82         VBucket &targetRecord = target[i];
83         if (targetRecord.empty()) {
84             continue;
85         }
86         for (auto &[colName, colData] : targetRecord) {
87             auto fromAssets = GetAssets(colName, fromRecord);
88             MergeAssetsFlag(fromAssets, colData);
89         }
90     }
91 }
92 
FilterDeleteAsset(VBucket & record)93 void AssetOperationUtils::FilterDeleteAsset(VBucket &record)
94 {
95     int filterCount = 0;
96     for (auto &item : record) {
97         if (item.second.index() == TYPE_INDEX<Asset>) {
98             auto &asset = std::get<Asset>(item.second);
99             if (EraseBitMask(asset.status) == static_cast<uint32_t>(AssetStatus::DELETE)) {
100                 item.second = Nil();
101                 filterCount++;
102             }
103             continue;
104         }
105         if (item.second.index() != TYPE_INDEX<Assets>) {
106             continue;
107         }
108         auto &assets = std::get<Assets>(item.second);
109         auto it = assets.begin();
110         while (it != assets.end()) {
111             if (EraseBitMask(it->status) == static_cast<uint32_t>(AssetStatus::DELETE)) {
112                 it = assets.erase(it);
113                 filterCount++;
114             }
115             it++;
116         }
117     }
118     if (filterCount > 0) {
119         LOGW("[AssetOperationUtils] Filter %d asset", filterCount);
120     }
121 }
122 
Init()123 void AssetOperationUtils::Init()
124 {
125     reactions[CloudSyncAction::DEFAULT_ACTION] = DefaultOperation;
126     reactions[CloudSyncAction::START_DOWNLOAD] = CheckBeforeDownload;
127     reactions[CloudSyncAction::START_UPLOAD] = HandleIfExistAndSameStatus;
128     reactions[CloudSyncAction::END_DOWNLOAD] = CheckAfterDownload;
129     reactions[CloudSyncAction::END_UPLOAD] = CheckAfterUpload;
130 }
131 
DefaultOperation(const Asset &,const Assets &)132 AssetOperationUtils::AssetOpType AssetOperationUtils::DefaultOperation(const Asset &, const Assets &)
133 {
134     return AssetOpType::HANDLE;
135 }
136 
CheckBeforeDownload(const Asset & cacheAsset,const Assets & dbAssets)137 AssetOperationUtils::AssetOpType AssetOperationUtils::CheckBeforeDownload(const Asset &cacheAsset,
138     const Assets &dbAssets)
139 {
140     return CheckWithDownload(true, cacheAsset, dbAssets);
141 }
142 
CheckAfterDownload(const Asset & cacheAsset,const Assets & dbAssets)143 AssetOperationUtils::AssetOpType AssetOperationUtils::CheckAfterDownload(const Asset &cacheAsset,
144     const Assets &dbAssets)
145 {
146     return CheckWithDownload(false, cacheAsset, dbAssets);
147 }
148 
CheckWithDownload(bool before,const Asset & cacheAsset,const Assets & dbAssets)149 AssetOperationUtils::AssetOpType AssetOperationUtils::CheckWithDownload(bool before, const Asset &cacheAsset,
150     const Assets &dbAssets)
151 {
152     for (const auto &dbAsset : dbAssets) {
153         if (dbAsset.name != cacheAsset.name) {
154             continue;
155         }
156         if (EraseBitMask(dbAsset.status) == AssetStatus::DOWNLOADING ||
157             EraseBitMask(dbAsset.status) == AssetStatus::ABNORMAL) {
158             return AssetOpType::HANDLE;
159         }
160         return AssetOpType::NOT_HANDLE;
161     }
162     if (before) {
163         if (cacheAsset.status == (AssetStatus::DOWNLOADING | AssetStatus::DOWNLOAD_WITH_NULL) ||
164             EraseBitMask(cacheAsset.status) == AssetStatus::ABNORMAL) {
165             return AssetOpType::NOT_HANDLE;
166         }
167         return (cacheAsset.flag == static_cast<uint32_t>(DistributedDB::AssetOpType::DELETE) &&
168             EraseBitMask(cacheAsset.status) != AssetStatus::DELETE) ?
169             AssetOpType::HANDLE : AssetOpType::NOT_HANDLE;
170     }
171     return AssetOpType::NOT_HANDLE;
172 }
173 
CheckAfterUpload(const Asset & cacheAsset,const Assets & dbAssets)174 AssetOperationUtils::AssetOpType AssetOperationUtils::CheckAfterUpload(const Asset &cacheAsset, const Assets &dbAssets)
175 {
176     for (const auto &dbAsset : dbAssets) {
177         if (dbAsset.name != cacheAsset.name) {
178             continue;
179         }
180         if ((dbAsset.status & static_cast<uint32_t>(AssetStatus::UPLOADING)) ==
181             static_cast<uint32_t>(AssetStatus::UPLOADING)) {
182             return AssetOpType::HANDLE;
183         }
184         return AssetOpType::NOT_HANDLE;
185     }
186     return AssetOpType::NOT_HANDLE;
187 }
188 
GetAssets(const std::string & colName,const VBucket & rowData)189 Assets AssetOperationUtils::GetAssets(const std::string &colName, const VBucket &rowData)
190 {
191     if (rowData.find(colName) == rowData.end()) {
192         return {};
193     }
194     Assets res;
195     auto value = rowData.at(colName);
196     if (TYPE_INDEX<Asset> == value.index()) {
197         res.push_back(std::get<Asset>(value));
198     } else if (TYPE_INDEX<Assets> == value.index()) {
199         for (const auto &asset : std::get<Assets>(value)) {
200             res.push_back(asset);
201         }
202     }
203     return res;
204 }
205 
HandleIfExistAndSameStatus(const Asset & cacheAsset,const Assets & dbAssets)206 AssetOperationUtils::AssetOpType AssetOperationUtils::HandleIfExistAndSameStatus(const Asset &cacheAsset,
207     const Assets &dbAssets)
208 {
209     for (const auto &dbAsset : dbAssets) {
210         if (dbAsset.name != cacheAsset.name) {
211             continue;
212         }
213         if (dbAsset.status == cacheAsset.status) {
214             return AssetOpType::HANDLE;
215         }
216         return AssetOpType::NOT_HANDLE;
217     }
218     return AssetOpType::NOT_HANDLE;
219 }
220 
MergeAssetFlag(const Assets & from,Asset & target)221 void AssetOperationUtils::MergeAssetFlag(const Assets &from, Asset &target)
222 {
223     for (const auto &fromAsset : from) {
224         if (fromAsset.name == target.name) {
225             target.flag = fromAsset.flag;
226         }
227     }
228 }
229 
MergeAssetsFlag(const Assets & from,Type & target)230 void AssetOperationUtils::MergeAssetsFlag(const Assets &from, Type &target)
231 {
232     if (TYPE_INDEX<Asset> == target.index()) {
233         MergeAssetFlag(from, std::get<Asset>(target));
234     } else if (TYPE_INDEX<Assets> == target.index()) {
235         for (auto &targetAsset : std::get<Assets>(target)) {
236             MergeAssetFlag(from, targetAsset);
237         }
238     }
239 }
240 }