1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "cloud/cloud_meta_data.h"
17 #include "cloud/cloud_db_constant.h"
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "parcel.h"
21
22 namespace DistributedDB {
23
CloudMetaData(ICloudSyncStorageInterface * store)24 CloudMetaData::CloudMetaData(ICloudSyncStorageInterface *store)
25 : store_(store)
26 {
27 }
28
GetLocalWaterMark(const TableName & tableName,Timestamp & localMark)29 int CloudMetaData::GetLocalWaterMark(const TableName &tableName, Timestamp &localMark)
30 {
31 std::lock_guard<std::mutex> lock(cloudMetaMutex_);
32 if (cloudMetaVals_.count(tableName) == 0) {
33 int ret = ReadMarkFromMeta(tableName);
34 if (ret != E_OK) {
35 LOGE("[Meta][GetLocalWaterMark] read mark from meta failed.");
36 return ret;
37 }
38 }
39 localMark = cloudMetaVals_[tableName].localMark;
40 return E_OK;
41 }
42
GetLocalWaterMarkByType(const TableName & tableName,CloudWaterType type,Timestamp & localMark)43 int CloudMetaData::GetLocalWaterMarkByType(const TableName &tableName, CloudWaterType type, Timestamp &localMark)
44 {
45 std::lock_guard<std::mutex> lock(cloudMetaMutex_);
46 if (cloudMetaVals_.count(tableName) == 0) {
47 int ret = ReadMarkFromMeta(tableName);
48 if (ret != E_OK) {
49 LOGE("[Meta][GetLocalWaterMarkByType] read mark from meta failed.");
50 return ret;
51 }
52 }
53 if (type == CloudWaterType::INSERT) {
54 localMark = cloudMetaVals_[tableName].insertLocalMark;
55 } else if (type == CloudWaterType::UPDATE) {
56 localMark = cloudMetaVals_[tableName].updateLocalMark;
57 } else if (type == CloudWaterType::DELETE) {
58 localMark = cloudMetaVals_[tableName].deleteLocalMark;
59 }
60 cloudMetaVals_[tableName].localMark = std::max(localMark, cloudMetaVals_[tableName].localMark);
61 return E_OK;
62 }
63
GetCloudWaterMark(const TableName & tableName,std::string & cloudMark)64 int CloudMetaData::GetCloudWaterMark(const TableName &tableName, std::string &cloudMark)
65 {
66 std::lock_guard<std::mutex> lock(cloudMetaMutex_);
67 if (cloudMetaVals_.count(tableName) == 0) {
68 int ret = ReadMarkFromMeta(tableName);
69 if (ret != E_OK) {
70 return ret;
71 }
72 }
73 cloudMark = cloudMetaVals_[tableName].cloudMark;
74 LOGD("[Meta] get cloud water mark=%s", cloudMark.c_str());
75 return E_OK;
76 }
77
SetLocalWaterMark(const TableName & tableName,Timestamp localMark)78 int CloudMetaData::SetLocalWaterMark(const TableName &tableName, Timestamp localMark)
79 {
80 std::lock_guard<std::mutex> lock(cloudMetaMutex_);
81 std::string cloudMark;
82 auto iter = cloudMetaVals_.find(tableName);
83 if (iter != cloudMetaVals_.end()) {
84 cloudMark = iter->second.cloudMark;
85 }
86 int ret = WriteMarkToMeta(tableName, localMark, cloudMark);
87 if (ret != E_OK) {
88 return ret;
89 }
90 if (iter == cloudMetaVals_.end()) {
91 CloudMetaValue cloudMetaVal;
92 cloudMetaVal.localMark = localMark;
93 cloudMetaVal.cloudMark = cloudMark;
94 cloudMetaVals_[tableName] = cloudMetaVal;
95 } else {
96 iter->second.localMark = localMark;
97 }
98 return E_OK;
99 }
100
SetLocalWaterMarkByType(const TableName & tableName,CloudWaterType type,Timestamp localMark)101 int CloudMetaData::SetLocalWaterMarkByType(const TableName &tableName, CloudWaterType type, Timestamp localMark)
102 {
103 std::lock_guard<std::mutex> lock(cloudMetaMutex_);
104 CloudMetaValue cloudMetaVal;
105 auto iter = cloudMetaVals_.find(tableName);
106 if (iter != cloudMetaVals_.end()) {
107 cloudMetaVal = iter->second;
108 }
109 cloudMetaVal.localMark = std::max(localMark, cloudMetaVal.localMark);
110 if (type == CloudWaterType::DELETE) {
111 cloudMetaVal.deleteLocalMark = localMark;
112 } else if (type == CloudWaterType::UPDATE) {
113 cloudMetaVal.updateLocalMark = localMark;
114 cloudMetaVal.deleteLocalMark = std::max(cloudMetaVal.deleteLocalMark, localMark);
115 } else if (type == CloudWaterType::INSERT) {
116 cloudMetaVal.insertLocalMark = localMark;
117 cloudMetaVal.deleteLocalMark = std::max(cloudMetaVal.deleteLocalMark, localMark);
118 cloudMetaVal.updateLocalMark = std::max(cloudMetaVal.updateLocalMark, localMark);
119 }
120 int ret = WriteTypeMarkToMeta(tableName, cloudMetaVal);
121 if (ret != E_OK) {
122 return ret;
123 }
124 if (iter == cloudMetaVals_.end()) {
125 cloudMetaVals_[tableName] = cloudMetaVal;
126 } else {
127 iter->second = cloudMetaVal;
128 }
129 return E_OK;
130 }
131
SetCloudWaterMark(const TableName & tableName,std::string & cloudMark)132 int CloudMetaData::SetCloudWaterMark(const TableName &tableName, std::string &cloudMark)
133 {
134 std::lock_guard<std::mutex> lock(cloudMetaMutex_);
135 Timestamp localMark = 0;
136 auto iter = cloudMetaVals_.find(tableName);
137 if (iter != cloudMetaVals_.end()) {
138 localMark = iter->second.localMark;
139 }
140 int ret = WriteMarkToMeta(tableName, localMark, cloudMark);
141 if (ret != E_OK) {
142 return ret;
143 }
144 if (iter == cloudMetaVals_.end()) {
145 CloudMetaValue cloudMetaVal;
146 cloudMetaVal.localMark = localMark;
147 cloudMetaVal.cloudMark = cloudMark;
148 cloudMetaVals_[tableName] = cloudMetaVal;
149 } else {
150 iter->second.cloudMark = cloudMark;
151 }
152 LOGD("[Meta] set cloud water mark=%s", cloudMark.c_str());
153 return E_OK;
154 }
155
ReadMarkFromMeta(const TableName & tableName)156 int CloudMetaData::ReadMarkFromMeta(const TableName &tableName)
157 {
158 if (store_ == nullptr) {
159 return -E_INVALID_DB;
160 }
161 Value blobMetaVal;
162 int ret = store_->GetMetaData(DBCommon::GetPrefixTableName(tableName), blobMetaVal);
163 if (ret != -E_NOT_FOUND && ret != E_OK) {
164 return ret;
165 }
166 CloudMetaValue cloudMetaValue;
167 ret = DeserializeMark(blobMetaVal, cloudMetaValue);
168 if (ret != E_OK) {
169 return ret;
170 }
171 cloudMetaVals_[tableName] = cloudMetaValue;
172 return E_OK;
173 }
174
WriteMarkToMeta(const TableName & tableName,Timestamp localmark,std::string & cloudMark)175 int CloudMetaData::WriteMarkToMeta(const TableName &tableName, Timestamp localmark, std::string &cloudMark)
176 {
177 Value blobMetaVal;
178 int ret = DBCommon::SerializeWaterMark(localmark, cloudMark, blobMetaVal);
179 if (ret != E_OK) {
180 return ret;
181 }
182 if (store_ == nullptr) {
183 return -E_INVALID_DB;
184 }
185 return store_->PutMetaData(DBCommon::GetPrefixTableName(tableName), blobMetaVal);
186 }
187
WriteTypeMarkToMeta(const TableName & tableName,CloudMetaValue & cloudMetaValue)188 int CloudMetaData::WriteTypeMarkToMeta(const TableName &tableName, CloudMetaValue &cloudMetaValue)
189 {
190 Value blobMetaVal;
191 int ret = SerializeWaterMark(cloudMetaValue, blobMetaVal);
192 if (ret != E_OK) {
193 return ret;
194 }
195 if (store_ == nullptr) {
196 return -E_INVALID_DB;
197 }
198 return store_->PutMetaData(DBCommon::GetPrefixTableName(tableName), blobMetaVal);
199 }
200
GetParcelCurrentLength(CloudMetaValue & cloudMetaValue)201 uint64_t CloudMetaData::GetParcelCurrentLength(CloudMetaValue &cloudMetaValue)
202 {
203 return Parcel::GetUInt64Len() + Parcel::GetStringLen(cloudMetaValue.cloudMark) + Parcel::GetUInt64Len() +
204 Parcel::GetUInt64Len() + Parcel::GetUInt64Len();
205 }
206
SerializeWaterMark(CloudMetaValue & cloudMetaValue,Value & blobMetaVal)207 int CloudMetaData::SerializeWaterMark(CloudMetaValue &cloudMetaValue, Value &blobMetaVal)
208 {
209 uint64_t length = GetParcelCurrentLength(cloudMetaValue);
210 blobMetaVal.resize(length);
211 Parcel parcel(blobMetaVal.data(), blobMetaVal.size());
212 parcel.WriteUInt64(cloudMetaValue.localMark);
213 parcel.WriteString(cloudMetaValue.cloudMark);
214 parcel.WriteUInt64(cloudMetaValue.insertLocalMark);
215 parcel.WriteUInt64(cloudMetaValue.updateLocalMark);
216 parcel.WriteUInt64(cloudMetaValue.deleteLocalMark);
217 if (parcel.IsError()) {
218 LOGE("[Meta] Parcel error while deserializing cloud meta data.");
219 return -E_PARSE_FAIL;
220 }
221 return E_OK;
222 }
223
DeserializeMark(Value & blobMark,CloudMetaValue & cloudMetaValue)224 int CloudMetaData::DeserializeMark(Value &blobMark, CloudMetaValue &cloudMetaValue)
225 {
226 if (blobMark.empty()) {
227 cloudMetaValue.localMark = 0;
228 cloudMetaValue.insertLocalMark = 0;
229 cloudMetaValue.updateLocalMark = 0;
230 cloudMetaValue.deleteLocalMark = 0;
231 cloudMetaValue.cloudMark = "";
232 return E_OK;
233 }
234 Parcel parcel(blobMark.data(), blobMark.size());
235 parcel.ReadUInt64(cloudMetaValue.localMark);
236 parcel.ReadString(cloudMetaValue.cloudMark);
237 if (parcel.IsContinueRead()) {
238 parcel.ReadUInt64(cloudMetaValue.insertLocalMark);
239 parcel.ReadUInt64(cloudMetaValue.updateLocalMark);
240 parcel.ReadUInt64(cloudMetaValue.deleteLocalMark);
241 }
242 if (parcel.IsError()) {
243 LOGE("[Meta] Parcel error while deserializing cloud meta data.");
244 return -E_PARSE_FAIL;
245 }
246 if (blobMark.size() < GetParcelCurrentLength(cloudMetaValue)) {
247 cloudMetaValue.insertLocalMark = cloudMetaValue.localMark;
248 cloudMetaValue.updateLocalMark = cloudMetaValue.localMark;
249 cloudMetaValue.deleteLocalMark = cloudMetaValue.localMark;
250 }
251 return E_OK;
252 }
253
CleanWaterMark(const TableName & tableName)254 int CloudMetaData::CleanWaterMark(const TableName &tableName)
255 {
256 std::lock_guard<std::mutex> lock(cloudMetaMutex_);
257 std::string cloudWaterMark;
258 int ret = WriteMarkToMeta(tableName, 0, cloudWaterMark);
259 if (ret != E_OK) {
260 return ret;
261 }
262 cloudMetaVals_[tableName] = {};
263 LOGD("[Meta] clean cloud water mark");
264 return E_OK;
265 }
266
CleanAllWaterMark()267 void CloudMetaData::CleanAllWaterMark()
268 {
269 std::lock_guard<std::mutex> lock(cloudMetaMutex_);
270 cloudMetaVals_.clear();
271 LOGD("[Meta] clean cloud water mark");
272 }
273
CleanWaterMarkInMemory(const TableName & tableName)274 void CloudMetaData::CleanWaterMarkInMemory(const TableName &tableName)
275 {
276 std::lock_guard<std::mutex> lock(cloudMetaMutex_);
277 cloudMetaVals_[tableName] = {};
278 LOGD("[Meta] clean cloud water mark in memory");
279 }
280 } // namespace DistributedDB