1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "cloud/asset_operation_utils.h"
16
17 #include <mutex>
18 #include "cloud/cloud_db_types.h"
19 #include "runtime_context.h"
20 namespace DistributedDB {
21 using RecordAssetOpType = AssetOperationUtils::RecordAssetOpType;
22 using CloudSyncAction = AssetOperationUtils::CloudSyncAction;
23 namespace {
24 std::once_flag g_init;
25 using Reaction = std::function<AssetOperationUtils::AssetOpType (const Asset &, const Assets &)>;
26 std::map<CloudSyncAction, Reaction> reactions;
GetReaction(const CloudSyncAction & action)27 Reaction GetReaction(const CloudSyncAction &action)
28 {
29 if (reactions.find(action) != reactions.end()) {
30 return reactions[action];
31 } else {
32 return reactions[CloudSyncAction::DEFAULT_ACTION];
33 }
34 }
35 }
36
CalAssetOperation(const VBucket & cacheAssets,const VBucket & dbAssets,const CloudSyncAction & action)37 RecordAssetOpType AssetOperationUtils::CalAssetOperation(const VBucket &cacheAssets,
38 const VBucket &dbAssets, const CloudSyncAction &action)
39 {
40 std::call_once(g_init, Init);
41 // switch produce function by action
42 Reaction reaction = GetReaction(action);
43 RecordAssetOpType res;
44 // check each cache asset with db asset by same col name and asset name
45 for (const auto &[colName, colData] : cacheAssets) {
46 auto checkAssets = GetAssets(colName, dbAssets);
47 if (TYPE_INDEX<Asset> == colData.index()) {
48 auto asset = std::get<Asset>(colData);
49 res[colName][asset.name] = reaction(asset, checkAssets);
50 } else if (TYPE_INDEX<Assets> == colData.index()) {
51 auto assets = std::get<Assets>(colData);
52 for (const auto &asset : assets) {
53 res[colName][asset.name] = reaction(asset, checkAssets);
54 }
55 }
56 }
57 return res;
58 }
59
CalAssetOperation(const std::string & colName,const Asset & cacheAsset,const VBucket & dbAssets,const AssetOperationUtils::CloudSyncAction & action)60 AssetOperationUtils::AssetOpType AssetOperationUtils::CalAssetOperation(const std::string &colName,
61 const Asset &cacheAsset, const VBucket &dbAssets, const AssetOperationUtils::CloudSyncAction &action)
62 {
63 std::call_once(g_init, Init);
64 // switch produce function by action
65 Reaction reaction = GetReaction(action);
66 return reaction(cacheAsset, GetAssets(colName, dbAssets));
67 }
68
EraseBitMask(uint32_t status)69 uint32_t AssetOperationUtils::EraseBitMask(uint32_t status)
70 {
71 return ((status << BIT_MASK_COUNT) >> BIT_MASK_COUNT);
72 }
73
UpdateAssetsFlag(std::vector<VBucket> & from,std::vector<VBucket> & target)74 void AssetOperationUtils::UpdateAssetsFlag(std::vector<VBucket> &from, std::vector<VBucket> &target)
75 {
76 if (from.size() != target.size()) {
77 LOGW("the num of VBucket are not equal when update assets flag.");
78 return;
79 }
80 for (size_t i = 0; i < from.size(); ++i) {
81 VBucket &fromRecord = from[i];
82 VBucket &targetRecord = target[i];
83 if (targetRecord.empty()) {
84 continue;
85 }
86 for (auto &[colName, colData] : targetRecord) {
87 auto fromAssets = GetAssets(colName, fromRecord);
88 MergeAssetsFlag(fromAssets, colData);
89 }
90 }
91 }
92
FilterDeleteAsset(VBucket & record)93 void AssetOperationUtils::FilterDeleteAsset(VBucket &record)
94 {
95 int filterCount = 0;
96 for (auto &item : record) {
97 if (item.second.index() == TYPE_INDEX<Asset>) {
98 auto &asset = std::get<Asset>(item.second);
99 if (EraseBitMask(asset.status) == static_cast<uint32_t>(AssetStatus::DELETE)) {
100 item.second = Nil();
101 filterCount++;
102 }
103 continue;
104 }
105 if (item.second.index() != TYPE_INDEX<Assets>) {
106 continue;
107 }
108 auto &assets = std::get<Assets>(item.second);
109 auto it = assets.begin();
110 while (it != assets.end()) {
111 if (EraseBitMask(it->status) == static_cast<uint32_t>(AssetStatus::DELETE)) {
112 it = assets.erase(it);
113 filterCount++;
114 }
115 it++;
116 }
117 }
118 if (filterCount > 0) {
119 LOGW("[AssetOperationUtils] Filter %d asset", filterCount);
120 }
121 }
122
Init()123 void AssetOperationUtils::Init()
124 {
125 reactions[CloudSyncAction::DEFAULT_ACTION] = DefaultOperation;
126 reactions[CloudSyncAction::START_DOWNLOAD] = CheckBeforeDownload;
127 reactions[CloudSyncAction::START_UPLOAD] = HandleIfExistAndSameStatus;
128 reactions[CloudSyncAction::END_DOWNLOAD] = CheckAfterDownload;
129 reactions[CloudSyncAction::END_UPLOAD] = CheckAfterUpload;
130 }
131
DefaultOperation(const Asset &,const Assets &)132 AssetOperationUtils::AssetOpType AssetOperationUtils::DefaultOperation(const Asset &, const Assets &)
133 {
134 return AssetOpType::HANDLE;
135 }
136
CheckBeforeDownload(const Asset & cacheAsset,const Assets & dbAssets)137 AssetOperationUtils::AssetOpType AssetOperationUtils::CheckBeforeDownload(const Asset &cacheAsset,
138 const Assets &dbAssets)
139 {
140 return CheckWithDownload(true, cacheAsset, dbAssets);
141 }
142
CheckAfterDownload(const Asset & cacheAsset,const Assets & dbAssets)143 AssetOperationUtils::AssetOpType AssetOperationUtils::CheckAfterDownload(const Asset &cacheAsset,
144 const Assets &dbAssets)
145 {
146 return CheckWithDownload(false, cacheAsset, dbAssets);
147 }
148
CheckWithDownload(bool before,const Asset & cacheAsset,const Assets & dbAssets)149 AssetOperationUtils::AssetOpType AssetOperationUtils::CheckWithDownload(bool before, const Asset &cacheAsset,
150 const Assets &dbAssets)
151 {
152 for (const auto &dbAsset : dbAssets) {
153 if (dbAsset.name != cacheAsset.name) {
154 continue;
155 }
156 if (EraseBitMask(dbAsset.status) == AssetStatus::DOWNLOADING ||
157 EraseBitMask(dbAsset.status) == AssetStatus::ABNORMAL) {
158 return AssetOpType::HANDLE;
159 }
160 return AssetOpType::NOT_HANDLE;
161 }
162 if (before) {
163 if (cacheAsset.status == (AssetStatus::DOWNLOADING | AssetStatus::DOWNLOAD_WITH_NULL) ||
164 EraseBitMask(cacheAsset.status) == AssetStatus::ABNORMAL) {
165 return AssetOpType::NOT_HANDLE;
166 }
167 return (cacheAsset.flag == static_cast<uint32_t>(DistributedDB::AssetOpType::DELETE) &&
168 EraseBitMask(cacheAsset.status) != AssetStatus::DELETE) ?
169 AssetOpType::HANDLE : AssetOpType::NOT_HANDLE;
170 }
171 return AssetOpType::NOT_HANDLE;
172 }
173
CheckAfterUpload(const Asset & cacheAsset,const Assets & dbAssets)174 AssetOperationUtils::AssetOpType AssetOperationUtils::CheckAfterUpload(const Asset &cacheAsset, const Assets &dbAssets)
175 {
176 for (const auto &dbAsset : dbAssets) {
177 if (dbAsset.name != cacheAsset.name) {
178 continue;
179 }
180 if ((dbAsset.status & static_cast<uint32_t>(AssetStatus::UPLOADING)) ==
181 static_cast<uint32_t>(AssetStatus::UPLOADING)) {
182 return AssetOpType::HANDLE;
183 }
184 return AssetOpType::NOT_HANDLE;
185 }
186 return AssetOpType::NOT_HANDLE;
187 }
188
GetAssets(const std::string & colName,const VBucket & rowData)189 Assets AssetOperationUtils::GetAssets(const std::string &colName, const VBucket &rowData)
190 {
191 if (rowData.find(colName) == rowData.end()) {
192 return {};
193 }
194 Assets res;
195 auto value = rowData.at(colName);
196 if (TYPE_INDEX<Asset> == value.index()) {
197 res.push_back(std::get<Asset>(value));
198 } else if (TYPE_INDEX<Assets> == value.index()) {
199 for (const auto &asset : std::get<Assets>(value)) {
200 res.push_back(asset);
201 }
202 }
203 return res;
204 }
205
HandleIfExistAndSameStatus(const Asset & cacheAsset,const Assets & dbAssets)206 AssetOperationUtils::AssetOpType AssetOperationUtils::HandleIfExistAndSameStatus(const Asset &cacheAsset,
207 const Assets &dbAssets)
208 {
209 for (const auto &dbAsset : dbAssets) {
210 if (dbAsset.name != cacheAsset.name) {
211 continue;
212 }
213 if (dbAsset.status == cacheAsset.status) {
214 return AssetOpType::HANDLE;
215 }
216 return AssetOpType::NOT_HANDLE;
217 }
218 return AssetOpType::NOT_HANDLE;
219 }
220
MergeAssetFlag(const Assets & from,Asset & target)221 void AssetOperationUtils::MergeAssetFlag(const Assets &from, Asset &target)
222 {
223 for (const auto &fromAsset : from) {
224 if (fromAsset.name == target.name) {
225 target.flag = fromAsset.flag;
226 }
227 }
228 }
229
MergeAssetsFlag(const Assets & from,Type & target)230 void AssetOperationUtils::MergeAssetsFlag(const Assets &from, Type &target)
231 {
232 if (TYPE_INDEX<Asset> == target.index()) {
233 MergeAssetFlag(from, std::get<Asset>(target));
234 } else if (TYPE_INDEX<Assets> == target.index()) {
235 for (auto &targetAsset : std::get<Assets>(target)) {
236 MergeAssetFlag(from, targetAsset);
237 }
238 }
239 }
240 }