1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #define LOG_TAG "RdbCloud"
17 #include "rdb_cloud.h"
18
19 #include "cloud/schema_meta.h"
20 #include "log_print.h"
21 #include "rdb_query.h"
22 #include "relational_store_manager.h"
23 #include "value_proxy.h"
24 #include "utils/anonymous.h"
25
26 namespace OHOS::DistributedRdb {
27 using namespace DistributedDB;
28 using namespace DistributedData;
RdbCloud(std::shared_ptr<DistributedData::CloudDB> cloudDB,BindAssets * bindAssets)29 RdbCloud::RdbCloud(std::shared_ptr<DistributedData::CloudDB> cloudDB, BindAssets* bindAssets)
30 : cloudDB_(std::move(cloudDB)), snapshots_(bindAssets)
31 {
32 }
33
BatchInsert(const std::string & tableName,std::vector<DBVBucket> && record,std::vector<DBVBucket> & extend)34 DBStatus RdbCloud::BatchInsert(
35 const std::string &tableName, std::vector<DBVBucket> &&record, std::vector<DBVBucket> &extend)
36 {
37 extend.resize(record.size());
38 VBuckets extends = ValueProxy::Convert(std::move(extend));
39 VBuckets records = ValueProxy::Convert(std::move(record));
40 std::set<std::string> skipAssets;
41 PostEvent(records, skipAssets, extends, DistributedData::AssetEvent::UPLOAD);
42 VBuckets temp = records;
43 auto error = cloudDB_->BatchInsert(tableName, std::move(records), extends);
44 PostEvent(temp, skipAssets, extends, DistributedData::AssetEvent::UPLOAD_FINISHED);
45 ConvertErrorField(extends);
46 extend = ValueProxy::Convert(std::move(extends));
47 return ConvertStatus(static_cast<GeneralError>(error));
48 }
49
BatchUpdate(const std::string & tableName,std::vector<DBVBucket> && record,std::vector<DBVBucket> & extend)50 DBStatus RdbCloud::BatchUpdate(
51 const std::string &tableName, std::vector<DBVBucket> &&record, std::vector<DBVBucket> &extend)
52 {
53 extend.resize(record.size());
54 VBuckets extends = ValueProxy::Convert(std::move(extend));
55 VBuckets records = ValueProxy::Convert(std::move(record));
56 std::set<std::string> skipAssets;
57 PostEvent(records, skipAssets, extends, DistributedData::AssetEvent::UPLOAD);
58 VBuckets temp = records;
59 auto error = cloudDB_->BatchUpdate(tableName, std::move(records), extends);
60 PostEvent(temp, skipAssets, extends, DistributedData::AssetEvent::UPLOAD_FINISHED);
61 ConvertErrorField(extends);
62 extend = ValueProxy::Convert(std::move(extends));
63 return ConvertStatus(static_cast<GeneralError>(error));
64 }
65
BatchDelete(const std::string & tableName,std::vector<DBVBucket> & extend)66 DBStatus RdbCloud::BatchDelete(const std::string &tableName, std::vector<DBVBucket> &extend)
67 {
68 VBuckets extends = ValueProxy::Convert(std::move(extend));
69 auto error = cloudDB_->BatchDelete(tableName, extends);
70 ConvertErrorField(extends);
71 extend = ValueProxy::Convert(std::move(extends));
72 return ConvertStatus(static_cast<GeneralError>(error));
73 }
74
Query(const std::string & tableName,DBVBucket & extend,std::vector<DBVBucket> & data)75 DBStatus RdbCloud::Query(const std::string &tableName, DBVBucket &extend, std::vector<DBVBucket> &data)
76 {
77 auto [nodes, status] = ConvertQuery(extend);
78 std::shared_ptr<Cursor> cursor = nullptr;
79 if (status == GeneralError::E_OK && !nodes.empty()) {
80 RdbQuery query;
81 query.SetQueryNodes(tableName, std::move(nodes));
82 cursor = cloudDB_->Query(query, ValueProxy::Convert(std::move(extend)));
83 } else {
84 cursor = cloudDB_->Query(tableName, ValueProxy::Convert(std::move(extend)));
85 }
86 if (cursor == nullptr) {
87 ZLOGE("cursor is null, table:%{public}s, extend:%{public}zu",
88 Anonymous::Change(tableName).c_str(), extend.size());
89 return ConvertStatus(static_cast<GeneralError>(E_ERROR));
90 }
91 int32_t count = cursor->GetCount();
92 data.reserve(count);
93 auto err = cursor->MoveToFirst();
94 while (err == E_OK && count > 0) {
95 DistributedData::VBucket entry;
96 err = cursor->GetEntry(entry);
97 if (err != E_OK) {
98 break;
99 }
100 data.emplace_back(ValueProxy::Convert(std::move(entry)));
101 err = cursor->MoveToNext();
102 count--;
103 }
104 DistributedData::Value cursorFlag;
105 cursor->Get(SchemaMeta::CURSOR_FIELD, cursorFlag);
106 extend[SchemaMeta::CURSOR_FIELD] = ValueProxy::Convert(std::move(cursorFlag));
107 if (cursor->IsEnd()) {
108 ZLOGD("query end, table:%{public}s", Anonymous::Change(tableName).c_str());
109 return DBStatus::QUERY_END;
110 }
111 return ConvertStatus(static_cast<GeneralError>(err));
112 }
113
PreSharing(const std::string & tableName,VBuckets & extend)114 DistributedData::GeneralError RdbCloud::PreSharing(const std::string& tableName, VBuckets& extend)
115 {
116 return static_cast<GeneralError>(cloudDB_->PreSharing(tableName, extend));
117 }
118
Lock()119 std::pair<DBStatus, uint32_t> RdbCloud::Lock()
120 {
121 auto result = InnerLock(FLAG::SYSTEM_ABILITY);
122 return { ConvertStatus(result.first), result.second };
123 }
124
UnLock()125 DBStatus RdbCloud::UnLock()
126 {
127 return ConvertStatus(InnerUnLock(FLAG::SYSTEM_ABILITY));
128 }
129
HeartBeat()130 DBStatus RdbCloud::HeartBeat()
131 {
132 auto error = cloudDB_->Heartbeat();
133 return ConvertStatus(static_cast<GeneralError>(error));
134 }
135
Close()136 DBStatus RdbCloud::Close()
137 {
138 auto error = cloudDB_->Close();
139 return ConvertStatus(static_cast<GeneralError>(error));
140 }
141
InnerLock(FLAG flag)142 std::pair<GeneralError, uint32_t> RdbCloud::InnerLock(FLAG flag)
143 {
144 std::lock_guard<decltype(mutex_)> lock(mutex_);
145 flag_ |= flag;
146 // int64_t <-> uint32_t, s <-> ms
147 return std::make_pair(static_cast<GeneralError>(cloudDB_->Lock()), cloudDB_->AliveTime() * TO_MS);
148 }
149
InnerUnLock(FLAG flag)150 GeneralError RdbCloud::InnerUnLock(FLAG flag)
151 {
152 std::lock_guard<decltype(mutex_)> lock(mutex_);
153 flag_ &= ~flag;
154 if (flag_ == 0) {
155 return static_cast<GeneralError>(cloudDB_->Unlock());
156 }
157 return GeneralError::E_OK;
158 }
159
LockCloudDB(FLAG flag)160 std::pair<GeneralError, uint32_t> RdbCloud::LockCloudDB(FLAG flag)
161 {
162 return InnerLock(flag);
163 }
164
UnLockCloudDB(FLAG flag)165 GeneralError RdbCloud::UnLockCloudDB(FLAG flag)
166 {
167 return InnerUnLock(flag);
168 }
169
GetEmptyCursor(const std::string & tableName)170 std::pair<DBStatus, std::string> RdbCloud::GetEmptyCursor(const std::string &tableName)
171 {
172 auto [error, cursor] = cloudDB_->GetEmptyCursor(tableName);
173 return { ConvertStatus(static_cast<GeneralError>(error)), cursor };
174 }
175
ConvertStatus(DistributedData::GeneralError error)176 DBStatus RdbCloud::ConvertStatus(DistributedData::GeneralError error)
177 {
178 switch (error) {
179 case GeneralError::E_OK:
180 return DBStatus::OK;
181 case GeneralError::E_NETWORK_ERROR:
182 return DBStatus::CLOUD_NETWORK_ERROR;
183 case GeneralError::E_LOCKED_BY_OTHERS:
184 return DBStatus::CLOUD_LOCK_ERROR;
185 case GeneralError::E_RECODE_LIMIT_EXCEEDED:
186 return DBStatus::CLOUD_FULL_RECORDS;
187 case GeneralError::E_NO_SPACE_FOR_ASSET:
188 return DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT;
189 case GeneralError::E_VERSION_CONFLICT:
190 return DBStatus::CLOUD_VERSION_CONFLICT;
191 case GeneralError::E_RECORD_EXIST_CONFLICT:
192 return DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
193 case GeneralError::E_RECORD_NOT_FOUND:
194 return DBStatus::CLOUD_RECORD_NOT_FOUND;
195 case GeneralError::E_RECORD_ALREADY_EXISTED:
196 return DBStatus::CLOUD_RECORD_ALREADY_EXISTED;
197 case GeneralError::E_FILE_NOT_EXIST:
198 return DBStatus::LOCAL_ASSET_NOT_FOUND;
199 case GeneralError::E_TIME_OUT:
200 return DBStatus::TIME_OUT;
201 default:
202 ZLOGI("error:0x%{public}x", error);
203 break;
204 }
205 return DBStatus::CLOUD_ERROR;
206 }
207
ConvertQuery(RdbCloud::DBVBucket & extend)208 std::pair<RdbCloud::QueryNodes, DistributedData::GeneralError> RdbCloud::ConvertQuery(RdbCloud::DBVBucket& extend)
209 {
210 auto it = extend.find(TYPE_FIELD);
211 if (it == extend.end() || std::get<int64_t>(it->second) != static_cast<int64_t>(CloudQueryType::QUERY_FIELD)) {
212 return { {}, GeneralError::E_ERROR };
213 }
214 it = extend.find(QUERY_FIELD);
215 if (it == extend.end()) {
216 ZLOGE("error, no QUERY_FIELD!");
217 return { {}, GeneralError::E_ERROR };
218 }
219
220 auto bytes = std::get_if<DistributedDB::Bytes>(&it->second);
221 std::vector<DistributedDB::QueryNode> nodes;
222 DBStatus status = DB_ERROR;
223 if (bytes != nullptr) {
224 nodes = DistributedDB::RelationalStoreManager::ParserQueryNodes(*bytes, status);
225 }
226 if (status != OK) {
227 ZLOGE("error, ParserQueryNodes failed, status:%{public}d", status);
228 return { {}, GeneralError::E_ERROR };
229 }
230 return { ConvertQuery(std::move(nodes)), GeneralError::E_OK };
231 }
232
ConvertQuery(RdbCloud::DBQueryNodes && nodes)233 RdbCloud::QueryNodes RdbCloud::ConvertQuery(RdbCloud::DBQueryNodes&& nodes)
234 {
235 QueryNodes queryNodes;
236 queryNodes.reserve(nodes.size());
237 for (auto& node : nodes) {
238 QueryNode queryNode;
239 queryNode.fieldName = std::move(node.fieldName);
240 queryNode.fieldValue = ValueProxy::Convert(std::move(node.fieldValue));
241 switch (node.type) {
242 case QueryNodeType::IN:
243 queryNode.op = QueryOperation::IN;
244 break;
245 case QueryNodeType::OR:
246 queryNode.op = QueryOperation::OR;
247 break;
248 case QueryNodeType::AND:
249 queryNode.op = QueryOperation::AND;
250 break;
251 case QueryNodeType::EQUAL_TO:
252 queryNode.op = QueryOperation::EQUAL_TO;
253 break;
254 case QueryNodeType::BEGIN_GROUP:
255 queryNode.op = QueryOperation::BEGIN_GROUP;
256 break;
257 case QueryNodeType::END_GROUP:
258 queryNode.op = QueryOperation::END_GROUP;
259 break;
260 default:
261 ZLOGE("invalid operation:0x%{public}d", node.type);
262 return {};
263 }
264 queryNodes.emplace_back(std::move(queryNode));
265 }
266 return queryNodes;
267 }
268
PostEvent(VBuckets & records,std::set<std::string> & skipAssets,VBuckets & extend,DistributedData::AssetEvent eventId)269 void RdbCloud::PostEvent(VBuckets& records, std::set<std::string>& skipAssets, VBuckets& extend,
270 DistributedData::AssetEvent eventId)
271 {
272 int32_t index = 0;
273 for (auto& record : records) {
274 DataBucket& ext = extend[index++];
275 for (auto& [key, value] : record) {
276 PostEvent(value, ext, skipAssets, eventId);
277 }
278 }
279 }
280
PostEvent(DistributedData::Value & value,DataBucket & extend,std::set<std::string> & skipAssets,DistributedData::AssetEvent eventId)281 void RdbCloud::PostEvent(DistributedData::Value& value, DataBucket& extend, std::set<std::string>& skipAssets,
282 DistributedData::AssetEvent eventId)
283 {
284 if (value.index() != TYPE_INDEX<DistributedData::Asset> && value.index() != TYPE_INDEX<DistributedData::Assets>) {
285 return;
286 }
287
288 if (value.index() == TYPE_INDEX<DistributedData::Asset>) {
289 auto* asset = Traits::get_if<DistributedData::Asset>(&value);
290 PostEventAsset(*asset, extend, skipAssets, eventId);
291 }
292
293 if (value.index() == TYPE_INDEX<DistributedData::Assets>) {
294 auto* assets = Traits::get_if<DistributedData::Assets>(&value);
295 for (auto& asset : *assets) {
296 PostEventAsset(asset, extend, skipAssets, eventId);
297 }
298 }
299 }
300
PostEventAsset(DistributedData::Asset & asset,DataBucket & extend,std::set<std::string> & skipAssets,DistributedData::AssetEvent eventId)301 void RdbCloud::PostEventAsset(DistributedData::Asset& asset, DataBucket& extend, std::set<std::string>& skipAssets,
302 DistributedData::AssetEvent eventId)
303 {
304 if (snapshots_->bindAssets == nullptr) {
305 return;
306 }
307 auto it = snapshots_->bindAssets->find(asset.uri);
308 if (it == snapshots_->bindAssets->end() || it->second == nullptr) {
309 return;
310 }
311
312 if (eventId == DistributedData::UPLOAD) {
313 it->second->Upload(asset);
314 if (it->second->GetAssetStatus(asset) == TransferStatus::STATUS_WAIT_UPLOAD) {
315 skipAssets.insert(asset.uri);
316 extend[SchemaMeta::ERROR_FIELD] = DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
317 }
318 }
319
320 if (eventId == DistributedData::UPLOAD_FINISHED) {
321 auto skip = skipAssets.find(asset.uri);
322 if (skip != skipAssets.end()) {
323 return;
324 }
325 it->second->Uploaded(asset);
326 }
327 }
328
GetLockFlag() const329 uint8_t RdbCloud::GetLockFlag() const
330 {
331 return flag_;
332 }
333
ConvertErrorField(DistributedData::VBuckets & extends)334 void RdbCloud::ConvertErrorField(DistributedData::VBuckets& extends)
335 {
336 for (auto& extend : extends) {
337 auto errorField = extend.find(SchemaMeta::ERROR_FIELD);
338 if (errorField == extend.end()) {
339 continue;
340 }
341 auto errCode = Traits::get_if<int64_t>(&(errorField->second));
342 if (errCode == nullptr) {
343 continue;
344 }
345 errorField->second = ConvertStatus(static_cast<GeneralError>(*errCode));
346 }
347 }
348
SetPrepareTraceId(const std::string & traceId)349 void RdbCloud::SetPrepareTraceId(const std::string &traceId)
350 {
351 if (cloudDB_ == nullptr) {
352 return;
353 }
354 cloudDB_->SetPrepareTraceId(traceId);
355 }
356 } // namespace OHOS::DistributedRdb