1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef CLOUD_SYNCER_H
17 #define CLOUD_SYNCER_H
18 #include <atomic>
19 #include <condition_variable>
20 #include <mutex>
21 #include <utility>
22 
23 #include "cloud/cloud_db_proxy.h"
24 #include "cloud/cloud_store_types.h"
25 #include "cloud/cloud_sync_state_machine.h"
26 #include "cloud/cloud_sync_strategy.h"
27 #include "cloud/icloud_db.h"
28 #include "cloud/icloud_syncer.h"
29 #include "cloud/process_notifier.h"
30 #include "cloud/process_recorder.h"
31 #include "cloud_locker.h"
32 #include "data_transformer.h"
33 #include "db_common.h"
34 #include "ref_object.h"
35 #include "runtime_context.h"
36 #include "storage_proxy.h"
37 #include "store_observer.h"
38 
39 namespace DistributedDB {
40 using DownloadCommitList = std::vector<std::tuple<std::string, std::map<std::string, Assets>, bool>>;
41 class CloudSyncer : public ICloudSyncer {
42 public:
43     explicit CloudSyncer(std::shared_ptr<StorageProxy> storageProxy, bool isKvScene = false,
44         SingleVerConflictResolvePolicy policy = SingleVerConflictResolvePolicy::DEFAULT_LAST_WIN);
45     void InitCloudSyncStateMachine();
46     ~CloudSyncer() override = default;
47     DISABLE_COPY_ASSIGN_MOVE(CloudSyncer);
48 
49     int Sync(const std::vector<DeviceID> &devices, SyncMode mode, const std::vector<std::string> &tables,
50         const SyncProcessCallback &callback, int64_t waitTime);
51 
52     int Sync(const CloudTaskInfo &taskInfo);
53 
54     void SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB);
55 
56     void SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader);
57 
58     int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList,
59         const RelationalSchemaObject &localSchema);
60 
61     int CleanWaterMarkInMemory(const std::set<std::string> &tableNameList);
62 
63     int32_t GetCloudSyncTaskCount();
64 
65     void Close();
66 
67     void StopAllTasks();
68 
69     std::string GetIdentify() const override;
70 
71     bool IsClosed() const override;
72 
73     void GenerateCompensatedSync(CloudTaskInfo &taskInfo);
74 
75     int SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs);
76 
77     const std::map<std::string, std::shared_ptr<ICloudDb>> GetCloudDB() const;
78 
79     void CleanAllWaterMark();
80 
81     CloudSyncEvent SyncMachineDoDownload();
82 
83     CloudSyncEvent SyncMachineDoUpload();
84 
85     CloudSyncEvent SyncMachineDoFinished();
86 
87     void SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback);
88 
89     SyncProcess GetCloudTaskStatus(uint64_t taskId) const;
90 protected:
91     struct TaskContext {
92         TaskId currentTaskId = 0u;
93         std::string tableName;
94         std::shared_ptr<ProcessNotifier> notifier;
95         std::shared_ptr<CloudSyncStrategy> strategy;
96         std::shared_ptr<ProcessRecorder> processRecorder;
97         std::map<TableName, std::vector<Field>> assetFields;
98         // should be cleared after each Download
99         DownloadList assetDownloadList;
100         // store GID and assets, using in upload procedure
101         std::map<TableName, std::map<std::string, std::map<std::string, Assets>>> assetsInfo;
102         // struct: <currentUserIndex, <tableName, waterMark>>
103         std::map<int, std::map<TableName, std::string>> cloudWaterMarks;
104         std::shared_ptr<CloudLocker> locker;
105         bool isNeedUpload = false;  // whether the current task need to do upload
106         bool isRealNeedUpload = false;
107         bool isFirstDownload = false;
108         int currentUserIndex = 0;
109         int repeatCount = 0;
110     };
111     struct UploadParam {
112         int64_t count = 0;
113         TaskId taskId = 0u;
114         Timestamp localMark = 0u;
115         bool lastTable = false;
116         CloudWaterType mode = CloudWaterType::DELETE;
117         LockAction lockAction = LockAction::INSERT;
118     };
119     struct DownloadItem {
120         std::string gid;
121         Type prefix;
122         OpType strategy;
123         std::map<std::string, Assets> assets;
124         Key hashKey;
125         std::vector<Type> primaryKeyValList;
126         Timestamp timestamp;
127         bool recordConflict = false;
128     };
129     struct ResumeTaskInfo {
130         TaskContext context;
131         SyncParam syncParam;
132         bool upload = false; // task pause when upload
133         bool skipQuery = false; // task should skip query now
134         size_t lastDownloadIndex = 0u;
135         Timestamp lastLocalWatermark = 0u;
136         int downloadStatus = E_OK;
137     };
138 
139     int TriggerSync();
140 
141     void DoSyncIfNeed();
142 
143     int DoSync(TaskId taskId);
144 
145     int PrepareAndUpload(const CloudTaskInfo &taskInfo, size_t index);
146 
147     int DoSyncInner(const CloudTaskInfo &taskInfo);
148 
149     int DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload);
150 
151     void DoNotifyInNeed(const CloudSyncer::TaskId &taskId, const std::vector<std::string> &needNotifyTables,
152         const bool isFirstDownload);
153 
154     int GetUploadCountByTable(const CloudSyncer::TaskId &taskId, int64_t &count);
155 
156     void UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId, const std::string &tableName, bool needNotify);
157 
158     virtual int DoDownloadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload, bool isFirstDownload);
159 
160     void DoFinished(TaskId taskId, int errCode);
161 
162     virtual int DoDownload(CloudSyncer::TaskId taskId, bool isFirstDownload);
163 
164     int DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam &param, bool isFirstDownload);
165 
166     void NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info);
167 
168     int PreCheckUpload(TaskId &taskId, const TableName &tableName, Timestamp &localMark);
169 
170     int PreCheck(TaskId &taskId, const TableName &tableName);
171 
172     int SaveUploadData(Info &insertInfo, Info &updateInfo, Info &deleteInfo, CloudSyncData &uploadData,
173         InnerProcessInfo &innerProcessInfo);
174 
175     int DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo);
176 
177     int PreProcessBatchUpload(UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo,
178         CloudSyncData &uploadData);
179 
180     int PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam);
181 
182     virtual int DoUpload(CloudSyncer::TaskId taskId, bool lastTable, LockAction lockAction);
183 
184     void SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData);
185 
186     bool IsModeForcePush(TaskId taskId);
187 
188     bool IsModeForcePull(const TaskId taskId);
189 
190     bool IsPriorityTask(TaskId taskId);
191 
192     bool IsCompensatedTask(TaskId taskId);
193 
194     int DoUploadInner(const std::string &tableName, UploadParam &uploadParam);
195 
196     int DoUploadByMode(const std::string &tableName, UploadParam &uploadParam, InnerProcessInfo &info);
197 
198     int PreHandleData(VBucket &datum, const std::vector<std::string> &pkColNames);
199 
200     int QueryCloudData(TaskId taskId, const std::string &tableName, std::string &cloudWaterMark,
201         DownloadData &downloadData);
202 
203     int CheckTaskIdValid(TaskId taskId);
204 
205     int GetCurrentTableName(std::string &tableName);
206 
207     int TryToAddSyncTask(CloudTaskInfo &&taskInfo);
208 
209     int CheckQueueSizeWithNoLock(bool priorityTask);
210 
211     int PrepareSync(TaskId taskId);
212 
213     int LockCloud(TaskId taskId);
214 
215     int UnlockCloud();
216 
217     int StartHeartBeatTimer(int period, TaskId taskId);
218 
219     void FinishHeartBeatTimer();
220 
221     void HeartBeat(TimerId timerId, TaskId taskId);
222 
223     void HeartBeatFailed(TaskId taskId, int errCode);
224 
225     void SetTaskFailed(TaskId taskId, int errCode);
226 
227     int SaveDatum(SyncParam &param, size_t idx, std::vector<std::pair<Key, size_t>> &deletedList,
228         std::map<std::string, LogInfo> &localLogInfoCache);
229 
230     int SaveData(CloudSyncer::TaskId taskId, SyncParam &param);
231 
232     void NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam &param, bool isFirstDownload);
233 
234     int SaveDataInTransaction(CloudSyncer::TaskId taskId,  SyncParam &param);
235 
236     int SaveChangedData(SyncParam &param, size_t dataIndex, const DataInfo &dataInfo,
237         std::vector<std::pair<Key, size_t>> &deletedList);
238 
239     int DoDownloadAssets(bool skipSave, SyncParam &param);
240 
241     int SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam &param);
242 
243     void NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo, bool lastBatch);
244 
245     bool NeedNotifyChangedData(const ChangedData &changedData);
246 
247     int NotifyChangedData(ChangedData &&changedData);
248 
249     std::map<std::string, Assets> GetAssetsFromVBucket(VBucket &data);
250 
251     std::map<std::string, Assets> TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData,
252         bool setNormalStatus, int &errCode);
253 
254     int TagStatus(bool isExist, SyncParam &param, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo);
255 
256     int HandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam &param,
257         VBucket &localAssetInfo);
258 
259     int TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam &param, const DataInfo &dataInfo,
260         VBucket &localAssetInfo);
261 
262     void TagUploadAssets(CloudSyncData &uploadData);
263 
264     int FillCloudAssets(const std::string &tableName, VBucket &normalAssets,
265         VBucket &failedAssets);
266 
267     int HandleDownloadResult(const DownloadItem &downloadItem, const std::string &tableName,
268         DownloadCommitList &commitList, uint32_t &successCount);
269 
270     int FillDownloadExtend(TaskId taskId, const std::string &tableName, const std::string &cloudWaterMark,
271         VBucket &extend);
272 
273     int GetCloudGid(TaskId taskId, const std::string &tableName, QuerySyncObject &obj);
274 
275     int DownloadAssets(InnerProcessInfo &info, const std::vector<std::string> &pKColNames,
276         const std::set<Key> &dupHashKeySet, ChangedData &changedAssets);
277 
278     int CloudDbDownloadAssets(TaskId taskId, InnerProcessInfo &info, const DownloadList &downloadList,
279         const std::set<Key> &dupHashKeySet, ChangedData &changedAssets);
280 
281     void GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem);
282 
283     bool IsDataContainAssets();
284 
285     int SaveCloudWaterMark(const TableName &tableName, const TaskId taskId);
286 
287     bool IsDataContainDuplicateAsset(const std::vector<Field> &assetFields, VBucket &data);
288 
289     int UpdateChangedData(SyncParam &param, DownloadList &assetsDownloadList);
290 
291     void UpdateCloudWaterMark(TaskId taskId, const SyncParam &param);
292 
293     int TagStatusByStrategy(bool isExist, SyncParam &param, DataInfo &dataInfo, OpType &strategyOpResult);
294 
295     int CommitDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info,
296         DownloadCommitList &commitList, int errCode);
297 
298     int GetLocalInfo(size_t index, SyncParam &param, DataInfoWithLog &logInfo,
299         std::map<std::string, LogInfo> &localLogInfoCache, VBucket &localAssetInfo);
300 
301     TaskId GetNextTaskId();
302 
303     void MarkCurrentTaskPausedIfNeed();
304 
305     void SetCurrentTaskFailedWithoutLock(int errCode);
306 
307     int LockCloudIfNeed(TaskId taskId);
308 
309     void UnlockIfNeed();
310 
311     void ClearCurrentContextWithoutLock();
312 
313     void ClearContextAndNotify(TaskId taskId, int errCode);
314 
315     int DownloadOneBatch(TaskId taskId, SyncParam &param, bool isFirstDownload);
316 
317     int DownloadOneAssetRecord(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList,
318         DownloadItem &downloadItem, InnerProcessInfo &info, ChangedData &changedAssets);
319 
320     int GetSyncParamForDownload(TaskId taskId, SyncParam &param);
321 
322     bool IsCurrentTaskResume(TaskId taskId);
323 
324     bool IsCurrentTableResume(TaskId taskId, bool upload);
325 
326     int DownloadDataFromCloud(TaskId taskId, SyncParam &param, bool &abort, bool isFirstDownload);
327 
328     size_t GetDownloadAssetIndex(TaskId taskId);
329 
330     uint32_t GetCurrentTableUploadBatchIndex();
331 
332     void ResetCurrentTableUploadBatchIndex();
333 
334     void RecordWaterMark(TaskId taskId, Timestamp waterMark);
335 
336     Timestamp GetResumeWaterMark(TaskId taskId);
337 
338     void ReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark);
339 
340     void ReloadCloudWaterMarkIfNeed(const std::string &tableName, std::string &cloudWaterMark);
341 
342     void ReloadUploadInfoIfNeed(TaskId taskId, const UploadParam &param, InnerProcessInfo &info);
343 
344     uint32_t GetLastUploadSuccessCount(const std::string &tableName);
345 
346     QuerySyncObject GetQuerySyncObject(const std::string &tableName);
347 
348     InnerProcessInfo GetInnerProcessInfo(const std::string &tableName, UploadParam &uploadParam);
349 
350     void NotifyUploadFailed(int errCode, InnerProcessInfo &info);
351 
352     void UpdateProccessWhenUploadFailed(InnerProcessInfo &info);
353 
354     int BatchInsert(Info &insertInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo);
355 
356     int BatchUpdate(Info &updateInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo);
357 
358     int BatchDelete(Info &deleteInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo);
359 
360     int DownloadAssetsOneByOne(const InnerProcessInfo &info, DownloadItem &downloadItem,
361         std::map<std::string, Assets> &downloadAssets);
362 
363     std::pair<int, uint32_t> GetDBAssets(bool isSharedTable, const InnerProcessInfo &info,
364         const DownloadItem &downloadItem, VBucket &dbAssets);
365 
366     std::map<std::string, Assets>& BackFillAssetsAfterDownload(int downloadCode, int deleteCode,
367         std::map<std::string, std::vector<uint32_t>> &tmpFlags, std::map<std::string, Assets> &tmpAssetsToDownload,
368         std::map<std::string, Assets> &tmpAssetsToDelete);
369 
370     int IsNeedSkipDownload(bool isSharedTable, int &errCode, const InnerProcessInfo &info,
371         const DownloadItem &downloadItem, VBucket &dbAssets);
372 
373     bool CheckDownloadOrDeleteCode(int &errCode, int downloadCode, int deleteCode, DownloadItem &downloadItem);
374 
375     int DownloadAssetsOneByOneInner(bool isSharedTable, const InnerProcessInfo &info, DownloadItem &downloadItem,
376         std::map<std::string, Assets> &downloadAssets);
377 
378     int CommitDownloadAssets(const DownloadItem &downloadItem, const std::string &tableName,
379         DownloadCommitList &commitList, uint32_t &successCount);
380 
381     void ChkIgnoredProcess(InnerProcessInfo &info, const CloudSyncData &uploadData, UploadParam &uploadParam);
382 
383     int SaveCursorIfNeed(const std::string &tableName);
384 
385     int PrepareAndDownload(const std::string &table, const CloudTaskInfo &taskInfo, bool isFirstDownload);
386 
387     int UpdateFlagForSavedRecord(const SyncParam &param);
388 
389     bool IsNeedGetLocalWater(TaskId taskId);
390 
391     void SetProxyUser(const std::string &user);
392 
393     bool MergeTaskInfo(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId taskId);
394 
395     std::pair<bool, TaskId> TryMergeTask(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId tryTaskId);
396 
397     bool IsTaskCanMerge(const CloudTaskInfo &taskInfo);
398 
399     bool IsTasksCanMerge(TaskId taskId, TaskId tryMergeTaskId);
400 
401     bool MergeTaskTablesIfConsistent(TaskId sourceId, TaskId targetId);
402 
403     void AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> &cloudSchema, CloudTaskInfo &taskInfo);
404 
405     std::pair<TaskId, TaskId> SwapTwoTaskAndCopyTable(TaskId source, TaskId target);
406 
407     bool IsQueryListEmpty(TaskId taskId);
408 
409     std::pair<int, Timestamp> GetLocalWater(const std::string &tableName, UploadParam &uploadParam);
410 
411     int HandleBatchUpload(UploadParam &uploadParam, InnerProcessInfo &info, CloudSyncData &uploadData,
412         ContinueToken &continueStmtToken);
413 
414     bool IsNeedLock(const UploadParam &param);
415 
416     int UploadVersionRecordIfNeed(const UploadParam &uploadParam);
417 
418     std::vector<CloudTaskInfo> CopyAndClearTaskInfos();
419 
420     void WaitCurTaskFinished();
421 
422     bool IsLockInDownload();
423 
424     CloudSyncEvent SetCurrentTaskFailedInMachine(int errCode);
425 
426     CloudSyncEvent SyncMachineDoRepeatCheck();
427 
428     void MarkDownloadFinishIfNeed(const std::string &downloadTable, bool isFinish = true);
429 
430     bool IsTableFinishInUpload(const std::string &table);
431 
432     void MarkUploadFinishIfNeed(const std::string &table);
433 
434     bool IsNeedUpdateAsset(const VBucket &data);
435 
436     int GenerateTaskIdIfNeed(CloudTaskInfo &taskInfo);
437 
438     mutable std::mutex dataLock_;
439     TaskId lastTaskId_;
440     std::list<TaskId> taskQueue_;
441     std::list<TaskId> priorityTaskQueue_;
442     std::map<TaskId, CloudTaskInfo> cloudTaskInfos_;
443     std::map<TaskId, ResumeTaskInfo> resumeTaskInfos_;
444 
445     TaskContext currentContext_;
446     std::condition_variable contextCv_;
447     std::mutex syncMutex_;  // Clean Cloud Data and Sync are mutually exclusive
448 
449     CloudDBProxy cloudDB_;
450 
451     std::shared_ptr<StorageProxy> storageProxy_;
452     std::atomic<int32_t> queuedManualSyncLimit_;
453 
454     std::atomic<bool> closed_;
455 
456     std::atomic<TimerId> timerId_;
457     std::mutex heartbeatMutex_;
458     std::condition_variable heartbeatCv_;
459     std::map<TaskId, int32_t> heartbeatCount_;
460     std::map<TaskId, int32_t> failedHeartbeatCount_;
461 
462     std::string id_;
463 
464     // isKvScene_ is used to distinguish between the KV and RDB in the following scenarios:
465     // 1. Whether upload to the cloud after delete local data that does not have a gid.
466     // 2. Whether the local data need update for different flag when the local time is larger.
467     bool isKvScene_;
468     std::atomic<SingleVerConflictResolvePolicy> policy_;
469 
470     static constexpr const TaskId INVALID_TASK_ID = 0u;
471     static constexpr const int MAX_HEARTBEAT_FAILED_LIMIT = 2;
472     static constexpr const int HEARTBEAT_PERIOD = 3;
473 
474     CloudSyncStateMachine cloudSyncStateMachine_;
475 };
476 }
477 #endif // CLOUD_SYNCER_H
478