1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "cloud/cloud_meta_data.h"
17 #include "cloud/cloud_db_constant.h"
18 #include "db_common.h"
19 #include "db_errno.h"
20 #include "parcel.h"
21 
22 namespace DistributedDB {
23 
CloudMetaData(ICloudSyncStorageInterface * store)24 CloudMetaData::CloudMetaData(ICloudSyncStorageInterface *store)
25     : store_(store)
26 {
27 }
28 
GetLocalWaterMark(const TableName & tableName,Timestamp & localMark)29 int CloudMetaData::GetLocalWaterMark(const TableName &tableName, Timestamp &localMark)
30 {
31     std::lock_guard<std::mutex> lock(cloudMetaMutex_);
32     if (cloudMetaVals_.count(tableName) == 0) {
33         int ret = ReadMarkFromMeta(tableName);
34         if (ret != E_OK) {
35             LOGE("[Meta][GetLocalWaterMark] read mark from meta failed.");
36             return ret;
37         }
38     }
39     localMark = cloudMetaVals_[tableName].localMark;
40     return E_OK;
41 }
42 
GetLocalWaterMarkByType(const TableName & tableName,CloudWaterType type,Timestamp & localMark)43 int CloudMetaData::GetLocalWaterMarkByType(const TableName &tableName, CloudWaterType type, Timestamp &localMark)
44 {
45     std::lock_guard<std::mutex> lock(cloudMetaMutex_);
46     if (cloudMetaVals_.count(tableName) == 0) {
47         int ret = ReadMarkFromMeta(tableName);
48         if (ret != E_OK) {
49             LOGE("[Meta][GetLocalWaterMarkByType] read mark from meta failed.");
50             return ret;
51         }
52     }
53     if (type == CloudWaterType::INSERT) {
54         localMark = cloudMetaVals_[tableName].insertLocalMark;
55     } else if (type == CloudWaterType::UPDATE) {
56         localMark = cloudMetaVals_[tableName].updateLocalMark;
57     } else if (type == CloudWaterType::DELETE) {
58         localMark = cloudMetaVals_[tableName].deleteLocalMark;
59     }
60     cloudMetaVals_[tableName].localMark = std::max(localMark, cloudMetaVals_[tableName].localMark);
61     return E_OK;
62 }
63 
GetCloudWaterMark(const TableName & tableName,std::string & cloudMark)64 int CloudMetaData::GetCloudWaterMark(const TableName &tableName, std::string &cloudMark)
65 {
66     std::lock_guard<std::mutex> lock(cloudMetaMutex_);
67     if (cloudMetaVals_.count(tableName) == 0) {
68         int ret = ReadMarkFromMeta(tableName);
69         if (ret != E_OK) {
70             return ret;
71         }
72     }
73     cloudMark = cloudMetaVals_[tableName].cloudMark;
74     LOGD("[Meta] get cloud water mark=%s", cloudMark.c_str());
75     return E_OK;
76 }
77 
SetLocalWaterMark(const TableName & tableName,Timestamp localMark)78 int CloudMetaData::SetLocalWaterMark(const TableName &tableName, Timestamp localMark)
79 {
80     std::lock_guard<std::mutex> lock(cloudMetaMutex_);
81     std::string cloudMark;
82     auto iter = cloudMetaVals_.find(tableName);
83     if (iter != cloudMetaVals_.end()) {
84         cloudMark = iter->second.cloudMark;
85     }
86     int ret = WriteMarkToMeta(tableName, localMark, cloudMark);
87     if (ret != E_OK) {
88         return ret;
89     }
90     if (iter == cloudMetaVals_.end()) {
91         CloudMetaValue cloudMetaVal;
92         cloudMetaVal.localMark = localMark;
93         cloudMetaVal.cloudMark = cloudMark;
94         cloudMetaVals_[tableName] = cloudMetaVal;
95     } else {
96         iter->second.localMark = localMark;
97     }
98     return E_OK;
99 }
100 
SetLocalWaterMarkByType(const TableName & tableName,CloudWaterType type,Timestamp localMark)101 int CloudMetaData::SetLocalWaterMarkByType(const TableName &tableName, CloudWaterType type, Timestamp localMark)
102 {
103     std::lock_guard<std::mutex> lock(cloudMetaMutex_);
104     CloudMetaValue cloudMetaVal;
105     auto iter = cloudMetaVals_.find(tableName);
106     if (iter != cloudMetaVals_.end()) {
107         cloudMetaVal = iter->second;
108     }
109     cloudMetaVal.localMark = std::max(localMark, cloudMetaVal.localMark);
110     if (type == CloudWaterType::DELETE) {
111         cloudMetaVal.deleteLocalMark = localMark;
112     } else if (type == CloudWaterType::UPDATE) {
113         cloudMetaVal.updateLocalMark = localMark;
114         cloudMetaVal.deleteLocalMark = std::max(cloudMetaVal.deleteLocalMark, localMark);
115     } else if (type == CloudWaterType::INSERT) {
116         cloudMetaVal.insertLocalMark = localMark;
117         cloudMetaVal.deleteLocalMark = std::max(cloudMetaVal.deleteLocalMark, localMark);
118         cloudMetaVal.updateLocalMark = std::max(cloudMetaVal.updateLocalMark, localMark);
119     }
120     int ret = WriteTypeMarkToMeta(tableName, cloudMetaVal);
121     if (ret != E_OK) {
122         return ret;
123     }
124     if (iter == cloudMetaVals_.end()) {
125         cloudMetaVals_[tableName] = cloudMetaVal;
126     } else {
127         iter->second = cloudMetaVal;
128     }
129     return E_OK;
130 }
131 
SetCloudWaterMark(const TableName & tableName,std::string & cloudMark)132 int CloudMetaData::SetCloudWaterMark(const TableName &tableName, std::string &cloudMark)
133 {
134     std::lock_guard<std::mutex> lock(cloudMetaMutex_);
135     Timestamp localMark = 0;
136     auto iter = cloudMetaVals_.find(tableName);
137     if (iter != cloudMetaVals_.end()) {
138         localMark = iter->second.localMark;
139     }
140     int ret = WriteMarkToMeta(tableName, localMark, cloudMark);
141     if (ret != E_OK) {
142         return ret;
143     }
144     if (iter == cloudMetaVals_.end()) {
145         CloudMetaValue cloudMetaVal;
146         cloudMetaVal.localMark = localMark;
147         cloudMetaVal.cloudMark = cloudMark;
148         cloudMetaVals_[tableName] = cloudMetaVal;
149     } else {
150         iter->second.cloudMark = cloudMark;
151     }
152     LOGD("[Meta] set cloud water mark=%s", cloudMark.c_str());
153     return E_OK;
154 }
155 
ReadMarkFromMeta(const TableName & tableName)156 int CloudMetaData::ReadMarkFromMeta(const TableName &tableName)
157 {
158     if (store_ == nullptr) {
159         return -E_INVALID_DB;
160     }
161     Value blobMetaVal;
162     int ret = store_->GetMetaData(DBCommon::GetPrefixTableName(tableName), blobMetaVal);
163     if (ret != -E_NOT_FOUND && ret != E_OK) {
164         return ret;
165     }
166     CloudMetaValue cloudMetaValue;
167     ret = DeserializeMark(blobMetaVal, cloudMetaValue);
168     if (ret != E_OK) {
169         return ret;
170     }
171     cloudMetaVals_[tableName] = cloudMetaValue;
172     return E_OK;
173 }
174 
WriteMarkToMeta(const TableName & tableName,Timestamp localmark,std::string & cloudMark)175 int CloudMetaData::WriteMarkToMeta(const TableName &tableName, Timestamp localmark, std::string &cloudMark)
176 {
177     Value blobMetaVal;
178     int ret = DBCommon::SerializeWaterMark(localmark, cloudMark, blobMetaVal);
179     if (ret != E_OK) {
180         return ret;
181     }
182     if (store_ == nullptr) {
183         return -E_INVALID_DB;
184     }
185     return store_->PutMetaData(DBCommon::GetPrefixTableName(tableName), blobMetaVal);
186 }
187 
WriteTypeMarkToMeta(const TableName & tableName,CloudMetaValue & cloudMetaValue)188 int CloudMetaData::WriteTypeMarkToMeta(const TableName &tableName, CloudMetaValue &cloudMetaValue)
189 {
190     Value blobMetaVal;
191     int ret = SerializeWaterMark(cloudMetaValue, blobMetaVal);
192     if (ret != E_OK) {
193         return ret;
194     }
195     if (store_ == nullptr) {
196         return -E_INVALID_DB;
197     }
198     return store_->PutMetaData(DBCommon::GetPrefixTableName(tableName), blobMetaVal);
199 }
200 
GetParcelCurrentLength(CloudMetaValue & cloudMetaValue)201 uint64_t CloudMetaData::GetParcelCurrentLength(CloudMetaValue &cloudMetaValue)
202 {
203     return Parcel::GetUInt64Len() + Parcel::GetStringLen(cloudMetaValue.cloudMark) + Parcel::GetUInt64Len() +
204            Parcel::GetUInt64Len() + Parcel::GetUInt64Len();
205 }
206 
SerializeWaterMark(CloudMetaValue & cloudMetaValue,Value & blobMetaVal)207 int CloudMetaData::SerializeWaterMark(CloudMetaValue &cloudMetaValue, Value &blobMetaVal)
208 {
209     uint64_t length = GetParcelCurrentLength(cloudMetaValue);
210     blobMetaVal.resize(length);
211     Parcel parcel(blobMetaVal.data(), blobMetaVal.size());
212     parcel.WriteUInt64(cloudMetaValue.localMark);
213     parcel.WriteString(cloudMetaValue.cloudMark);
214     parcel.WriteUInt64(cloudMetaValue.insertLocalMark);
215     parcel.WriteUInt64(cloudMetaValue.updateLocalMark);
216     parcel.WriteUInt64(cloudMetaValue.deleteLocalMark);
217     if (parcel.IsError()) {
218         LOGE("[Meta] Parcel error while deserializing cloud meta data.");
219         return -E_PARSE_FAIL;
220     }
221     return E_OK;
222 }
223 
DeserializeMark(Value & blobMark,CloudMetaValue & cloudMetaValue)224 int CloudMetaData::DeserializeMark(Value &blobMark, CloudMetaValue &cloudMetaValue)
225 {
226     if (blobMark.empty()) {
227         cloudMetaValue.localMark = 0;
228         cloudMetaValue.insertLocalMark = 0;
229         cloudMetaValue.updateLocalMark = 0;
230         cloudMetaValue.deleteLocalMark = 0;
231         cloudMetaValue.cloudMark = "";
232         return E_OK;
233     }
234     Parcel parcel(blobMark.data(), blobMark.size());
235     parcel.ReadUInt64(cloudMetaValue.localMark);
236     parcel.ReadString(cloudMetaValue.cloudMark);
237     if (parcel.IsContinueRead()) {
238         parcel.ReadUInt64(cloudMetaValue.insertLocalMark);
239         parcel.ReadUInt64(cloudMetaValue.updateLocalMark);
240         parcel.ReadUInt64(cloudMetaValue.deleteLocalMark);
241     }
242     if (parcel.IsError()) {
243         LOGE("[Meta] Parcel error while deserializing cloud meta data.");
244         return -E_PARSE_FAIL;
245     }
246     if (blobMark.size() < GetParcelCurrentLength(cloudMetaValue)) {
247         cloudMetaValue.insertLocalMark = cloudMetaValue.localMark;
248         cloudMetaValue.updateLocalMark = cloudMetaValue.localMark;
249         cloudMetaValue.deleteLocalMark = cloudMetaValue.localMark;
250     }
251     return E_OK;
252 }
253 
CleanWaterMark(const TableName & tableName)254 int CloudMetaData::CleanWaterMark(const TableName &tableName)
255 {
256     std::lock_guard<std::mutex> lock(cloudMetaMutex_);
257     std::string cloudWaterMark;
258     int ret = WriteMarkToMeta(tableName, 0, cloudWaterMark);
259     if (ret != E_OK) {
260         return ret;
261     }
262     cloudMetaVals_[tableName] = {};
263     LOGD("[Meta] clean cloud water mark");
264     return E_OK;
265 }
266 
CleanAllWaterMark()267 void CloudMetaData::CleanAllWaterMark()
268 {
269     std::lock_guard<std::mutex> lock(cloudMetaMutex_);
270     cloudMetaVals_.clear();
271     LOGD("[Meta] clean cloud water mark");
272 }
273 
CleanWaterMarkInMemory(const TableName & tableName)274 void CloudMetaData::CleanWaterMarkInMemory(const TableName &tableName)
275 {
276     std::lock_guard<std::mutex> lock(cloudMetaMutex_);
277     cloudMetaVals_[tableName] = {};
278     LOGD("[Meta] clean cloud water mark in memory");
279 }
280 } // namespace DistributedDB