1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #define LOG_TAG "RdbCloud"
17 #include "rdb_cloud.h"
18 
19 #include "cloud/schema_meta.h"
20 #include "log_print.h"
21 #include "rdb_query.h"
22 #include "relational_store_manager.h"
23 #include "value_proxy.h"
24 #include "utils/anonymous.h"
25 
26 namespace OHOS::DistributedRdb {
27 using namespace DistributedDB;
28 using namespace DistributedData;
RdbCloud(std::shared_ptr<DistributedData::CloudDB> cloudDB,BindAssets * bindAssets)29 RdbCloud::RdbCloud(std::shared_ptr<DistributedData::CloudDB> cloudDB, BindAssets* bindAssets)
30     : cloudDB_(std::move(cloudDB)), snapshots_(bindAssets)
31 {
32 }
33 
BatchInsert(const std::string & tableName,std::vector<DBVBucket> && record,std::vector<DBVBucket> & extend)34 DBStatus RdbCloud::BatchInsert(
35     const std::string &tableName, std::vector<DBVBucket> &&record, std::vector<DBVBucket> &extend)
36 {
37     extend.resize(record.size());
38     VBuckets extends = ValueProxy::Convert(std::move(extend));
39     VBuckets records = ValueProxy::Convert(std::move(record));
40     std::set<std::string> skipAssets;
41     PostEvent(records, skipAssets, extends, DistributedData::AssetEvent::UPLOAD);
42     VBuckets temp = records;
43     auto error = cloudDB_->BatchInsert(tableName, std::move(records), extends);
44     PostEvent(temp, skipAssets, extends, DistributedData::AssetEvent::UPLOAD_FINISHED);
45     ConvertErrorField(extends);
46     extend = ValueProxy::Convert(std::move(extends));
47     return ConvertStatus(static_cast<GeneralError>(error));
48 }
49 
BatchUpdate(const std::string & tableName,std::vector<DBVBucket> && record,std::vector<DBVBucket> & extend)50 DBStatus RdbCloud::BatchUpdate(
51     const std::string &tableName, std::vector<DBVBucket> &&record, std::vector<DBVBucket> &extend)
52 {
53     extend.resize(record.size());
54     VBuckets extends = ValueProxy::Convert(std::move(extend));
55     VBuckets records = ValueProxy::Convert(std::move(record));
56     std::set<std::string> skipAssets;
57     PostEvent(records, skipAssets, extends, DistributedData::AssetEvent::UPLOAD);
58     VBuckets temp = records;
59     auto error = cloudDB_->BatchUpdate(tableName, std::move(records), extends);
60     PostEvent(temp, skipAssets, extends, DistributedData::AssetEvent::UPLOAD_FINISHED);
61     ConvertErrorField(extends);
62     extend = ValueProxy::Convert(std::move(extends));
63     return ConvertStatus(static_cast<GeneralError>(error));
64 }
65 
BatchDelete(const std::string & tableName,std::vector<DBVBucket> & extend)66 DBStatus RdbCloud::BatchDelete(const std::string &tableName, std::vector<DBVBucket> &extend)
67 {
68     VBuckets extends = ValueProxy::Convert(std::move(extend));
69     auto error = cloudDB_->BatchDelete(tableName, extends);
70     ConvertErrorField(extends);
71     extend = ValueProxy::Convert(std::move(extends));
72     return ConvertStatus(static_cast<GeneralError>(error));
73 }
74 
Query(const std::string & tableName,DBVBucket & extend,std::vector<DBVBucket> & data)75 DBStatus RdbCloud::Query(const std::string &tableName, DBVBucket &extend, std::vector<DBVBucket> &data)
76 {
77     auto [nodes, status] = ConvertQuery(extend);
78     std::shared_ptr<Cursor> cursor = nullptr;
79     if (status == GeneralError::E_OK && !nodes.empty()) {
80         RdbQuery query;
81         query.SetQueryNodes(tableName, std::move(nodes));
82         cursor = cloudDB_->Query(query, ValueProxy::Convert(std::move(extend)));
83     } else {
84         cursor = cloudDB_->Query(tableName, ValueProxy::Convert(std::move(extend)));
85     }
86     if (cursor == nullptr) {
87         ZLOGE("cursor is null, table:%{public}s, extend:%{public}zu",
88             Anonymous::Change(tableName).c_str(), extend.size());
89         return ConvertStatus(static_cast<GeneralError>(E_ERROR));
90     }
91     int32_t count = cursor->GetCount();
92     data.reserve(count);
93     auto err = cursor->MoveToFirst();
94     while (err == E_OK && count > 0) {
95         DistributedData::VBucket entry;
96         err = cursor->GetEntry(entry);
97         if (err != E_OK) {
98             break;
99         }
100         data.emplace_back(ValueProxy::Convert(std::move(entry)));
101         err = cursor->MoveToNext();
102         count--;
103     }
104     DistributedData::Value cursorFlag;
105     cursor->Get(SchemaMeta::CURSOR_FIELD, cursorFlag);
106     extend[SchemaMeta::CURSOR_FIELD] = ValueProxy::Convert(std::move(cursorFlag));
107     if (cursor->IsEnd()) {
108         ZLOGD("query end, table:%{public}s", Anonymous::Change(tableName).c_str());
109         return DBStatus::QUERY_END;
110     }
111     return ConvertStatus(static_cast<GeneralError>(err));
112 }
113 
PreSharing(const std::string & tableName,VBuckets & extend)114 DistributedData::GeneralError RdbCloud::PreSharing(const std::string& tableName, VBuckets& extend)
115 {
116     return static_cast<GeneralError>(cloudDB_->PreSharing(tableName, extend));
117 }
118 
Lock()119 std::pair<DBStatus, uint32_t> RdbCloud::Lock()
120 {
121     auto result = InnerLock(FLAG::SYSTEM_ABILITY);
122     return { ConvertStatus(result.first), result.second };
123 }
124 
UnLock()125 DBStatus RdbCloud::UnLock()
126 {
127     return ConvertStatus(InnerUnLock(FLAG::SYSTEM_ABILITY));
128 }
129 
HeartBeat()130 DBStatus RdbCloud::HeartBeat()
131 {
132     auto error = cloudDB_->Heartbeat();
133     return ConvertStatus(static_cast<GeneralError>(error));
134 }
135 
Close()136 DBStatus RdbCloud::Close()
137 {
138     auto error = cloudDB_->Close();
139     return ConvertStatus(static_cast<GeneralError>(error));
140 }
141 
InnerLock(FLAG flag)142 std::pair<GeneralError, uint32_t> RdbCloud::InnerLock(FLAG flag)
143 {
144     std::lock_guard<decltype(mutex_)> lock(mutex_);
145     flag_ |= flag;
146     // int64_t <-> uint32_t, s <-> ms
147     return std::make_pair(static_cast<GeneralError>(cloudDB_->Lock()), cloudDB_->AliveTime() * TO_MS);
148 }
149 
InnerUnLock(FLAG flag)150 GeneralError RdbCloud::InnerUnLock(FLAG flag)
151 {
152     std::lock_guard<decltype(mutex_)> lock(mutex_);
153     flag_ &= ~flag;
154     if (flag_ == 0) {
155         return static_cast<GeneralError>(cloudDB_->Unlock());
156     }
157     return GeneralError::E_OK;
158 }
159 
LockCloudDB(FLAG flag)160 std::pair<GeneralError, uint32_t> RdbCloud::LockCloudDB(FLAG flag)
161 {
162     return InnerLock(flag);
163 }
164 
UnLockCloudDB(FLAG flag)165 GeneralError RdbCloud::UnLockCloudDB(FLAG flag)
166 {
167     return InnerUnLock(flag);
168 }
169 
GetEmptyCursor(const std::string & tableName)170 std::pair<DBStatus, std::string> RdbCloud::GetEmptyCursor(const std::string &tableName)
171 {
172     auto [error, cursor] = cloudDB_->GetEmptyCursor(tableName);
173     return { ConvertStatus(static_cast<GeneralError>(error)), cursor };
174 }
175 
ConvertStatus(DistributedData::GeneralError error)176 DBStatus RdbCloud::ConvertStatus(DistributedData::GeneralError error)
177 {
178     switch (error) {
179         case GeneralError::E_OK:
180             return DBStatus::OK;
181         case GeneralError::E_NETWORK_ERROR:
182             return DBStatus::CLOUD_NETWORK_ERROR;
183         case GeneralError::E_LOCKED_BY_OTHERS:
184             return DBStatus::CLOUD_LOCK_ERROR;
185         case GeneralError::E_RECODE_LIMIT_EXCEEDED:
186             return DBStatus::CLOUD_FULL_RECORDS;
187         case GeneralError::E_NO_SPACE_FOR_ASSET:
188             return DBStatus::CLOUD_ASSET_SPACE_INSUFFICIENT;
189         case GeneralError::E_VERSION_CONFLICT:
190             return DBStatus::CLOUD_VERSION_CONFLICT;
191         case GeneralError::E_RECORD_EXIST_CONFLICT:
192             return DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
193         case GeneralError::E_RECORD_NOT_FOUND:
194             return DBStatus::CLOUD_RECORD_NOT_FOUND;
195         case GeneralError::E_RECORD_ALREADY_EXISTED:
196             return DBStatus::CLOUD_RECORD_ALREADY_EXISTED;
197         case GeneralError::E_FILE_NOT_EXIST:
198             return DBStatus::LOCAL_ASSET_NOT_FOUND;
199         case GeneralError::E_TIME_OUT:
200             return DBStatus::TIME_OUT;
201         default:
202             ZLOGI("error:0x%{public}x", error);
203             break;
204     }
205     return DBStatus::CLOUD_ERROR;
206 }
207 
ConvertQuery(RdbCloud::DBVBucket & extend)208 std::pair<RdbCloud::QueryNodes, DistributedData::GeneralError> RdbCloud::ConvertQuery(RdbCloud::DBVBucket& extend)
209 {
210     auto it = extend.find(TYPE_FIELD);
211     if (it == extend.end() || std::get<int64_t>(it->second) != static_cast<int64_t>(CloudQueryType::QUERY_FIELD)) {
212         return { {}, GeneralError::E_ERROR };
213     }
214     it = extend.find(QUERY_FIELD);
215     if (it == extend.end()) {
216         ZLOGE("error, no QUERY_FIELD!");
217         return { {}, GeneralError::E_ERROR };
218     }
219 
220     auto bytes = std::get_if<DistributedDB::Bytes>(&it->second);
221     std::vector<DistributedDB::QueryNode> nodes;
222     DBStatus status = DB_ERROR;
223     if (bytes != nullptr) {
224         nodes = DistributedDB::RelationalStoreManager::ParserQueryNodes(*bytes, status);
225     }
226     if (status != OK) {
227         ZLOGE("error, ParserQueryNodes failed, status:%{public}d", status);
228         return { {}, GeneralError::E_ERROR };
229     }
230     return { ConvertQuery(std::move(nodes)), GeneralError::E_OK };
231 }
232 
ConvertQuery(RdbCloud::DBQueryNodes && nodes)233 RdbCloud::QueryNodes RdbCloud::ConvertQuery(RdbCloud::DBQueryNodes&& nodes)
234 {
235     QueryNodes queryNodes;
236     queryNodes.reserve(nodes.size());
237     for (auto& node : nodes) {
238         QueryNode queryNode;
239         queryNode.fieldName = std::move(node.fieldName);
240         queryNode.fieldValue = ValueProxy::Convert(std::move(node.fieldValue));
241         switch (node.type) {
242             case QueryNodeType::IN:
243                 queryNode.op = QueryOperation::IN;
244                 break;
245             case QueryNodeType::OR:
246                 queryNode.op = QueryOperation::OR;
247                 break;
248             case QueryNodeType::AND:
249                 queryNode.op = QueryOperation::AND;
250                 break;
251             case QueryNodeType::EQUAL_TO:
252                 queryNode.op = QueryOperation::EQUAL_TO;
253                 break;
254             case QueryNodeType::BEGIN_GROUP:
255                 queryNode.op = QueryOperation::BEGIN_GROUP;
256                 break;
257             case QueryNodeType::END_GROUP:
258                 queryNode.op = QueryOperation::END_GROUP;
259                 break;
260             default:
261                 ZLOGE("invalid operation:0x%{public}d", node.type);
262                 return {};
263         }
264         queryNodes.emplace_back(std::move(queryNode));
265     }
266     return queryNodes;
267 }
268 
PostEvent(VBuckets & records,std::set<std::string> & skipAssets,VBuckets & extend,DistributedData::AssetEvent eventId)269 void RdbCloud::PostEvent(VBuckets& records, std::set<std::string>& skipAssets, VBuckets& extend,
270     DistributedData::AssetEvent eventId)
271 {
272     int32_t index = 0;
273     for (auto& record : records) {
274         DataBucket& ext = extend[index++];
275         for (auto& [key, value] : record) {
276             PostEvent(value, ext, skipAssets, eventId);
277         }
278     }
279 }
280 
PostEvent(DistributedData::Value & value,DataBucket & extend,std::set<std::string> & skipAssets,DistributedData::AssetEvent eventId)281 void RdbCloud::PostEvent(DistributedData::Value& value, DataBucket& extend, std::set<std::string>& skipAssets,
282     DistributedData::AssetEvent eventId)
283 {
284     if (value.index() != TYPE_INDEX<DistributedData::Asset> && value.index() != TYPE_INDEX<DistributedData::Assets>) {
285         return;
286     }
287 
288     if (value.index() == TYPE_INDEX<DistributedData::Asset>) {
289         auto* asset = Traits::get_if<DistributedData::Asset>(&value);
290         PostEventAsset(*asset, extend, skipAssets, eventId);
291     }
292 
293     if (value.index() == TYPE_INDEX<DistributedData::Assets>) {
294         auto* assets = Traits::get_if<DistributedData::Assets>(&value);
295         for (auto& asset : *assets) {
296             PostEventAsset(asset, extend, skipAssets, eventId);
297         }
298     }
299 }
300 
PostEventAsset(DistributedData::Asset & asset,DataBucket & extend,std::set<std::string> & skipAssets,DistributedData::AssetEvent eventId)301 void RdbCloud::PostEventAsset(DistributedData::Asset& asset, DataBucket& extend, std::set<std::string>& skipAssets,
302     DistributedData::AssetEvent eventId)
303 {
304     if (snapshots_->bindAssets == nullptr) {
305         return;
306     }
307     auto it = snapshots_->bindAssets->find(asset.uri);
308     if (it == snapshots_->bindAssets->end() || it->second == nullptr) {
309         return;
310     }
311 
312     if (eventId == DistributedData::UPLOAD) {
313         it->second->Upload(asset);
314         if (it->second->GetAssetStatus(asset) == TransferStatus::STATUS_WAIT_UPLOAD) {
315             skipAssets.insert(asset.uri);
316             extend[SchemaMeta::ERROR_FIELD] = DBStatus::CLOUD_RECORD_EXIST_CONFLICT;
317         }
318     }
319 
320     if (eventId == DistributedData::UPLOAD_FINISHED) {
321         auto skip = skipAssets.find(asset.uri);
322         if (skip != skipAssets.end()) {
323             return;
324         }
325         it->second->Uploaded(asset);
326     }
327 }
328 
GetLockFlag() const329 uint8_t RdbCloud::GetLockFlag() const
330 {
331     return flag_;
332 }
333 
ConvertErrorField(DistributedData::VBuckets & extends)334 void RdbCloud::ConvertErrorField(DistributedData::VBuckets& extends)
335 {
336     for (auto& extend : extends) {
337         auto errorField = extend.find(SchemaMeta::ERROR_FIELD);
338         if (errorField == extend.end()) {
339             continue;
340         }
341         auto errCode = Traits::get_if<int64_t>(&(errorField->second));
342         if (errCode == nullptr) {
343             continue;
344         }
345         errorField->second = ConvertStatus(static_cast<GeneralError>(*errCode));
346     }
347 }
348 
SetPrepareTraceId(const std::string & traceId)349 void RdbCloud::SetPrepareTraceId(const std::string &traceId)
350 {
351     if (cloudDB_ == nullptr) {
352         return;
353     }
354     cloudDB_->SetPrepareTraceId(traceId);
355 }
356 } // namespace OHOS::DistributedRdb