1 /* 2 * Copyright (c) 2023 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef CLOUD_SYNCER_H 17 #define CLOUD_SYNCER_H 18 #include <atomic> 19 #include <condition_variable> 20 #include <mutex> 21 #include <utility> 22 23 #include "cloud/cloud_db_proxy.h" 24 #include "cloud/cloud_store_types.h" 25 #include "cloud/cloud_sync_state_machine.h" 26 #include "cloud/cloud_sync_strategy.h" 27 #include "cloud/icloud_db.h" 28 #include "cloud/icloud_syncer.h" 29 #include "cloud/process_notifier.h" 30 #include "cloud/process_recorder.h" 31 #include "cloud_locker.h" 32 #include "data_transformer.h" 33 #include "db_common.h" 34 #include "ref_object.h" 35 #include "runtime_context.h" 36 #include "storage_proxy.h" 37 #include "store_observer.h" 38 39 namespace DistributedDB { 40 using DownloadCommitList = std::vector<std::tuple<std::string, std::map<std::string, Assets>, bool>>; 41 class CloudSyncer : public ICloudSyncer { 42 public: 43 explicit CloudSyncer(std::shared_ptr<StorageProxy> storageProxy, bool isKvScene = false, 44 SingleVerConflictResolvePolicy policy = SingleVerConflictResolvePolicy::DEFAULT_LAST_WIN); 45 void InitCloudSyncStateMachine(); 46 ~CloudSyncer() override = default; 47 DISABLE_COPY_ASSIGN_MOVE(CloudSyncer); 48 49 int Sync(const std::vector<DeviceID> &devices, SyncMode mode, const std::vector<std::string> &tables, 50 const SyncProcessCallback &callback, int64_t waitTime); 51 52 int Sync(const CloudTaskInfo &taskInfo); 53 54 void SetCloudDB(const std::shared_ptr<ICloudDb> &cloudDB); 55 56 void SetIAssetLoader(const std::shared_ptr<IAssetLoader> &loader); 57 58 int CleanCloudData(ClearMode mode, const std::vector<std::string> &tableNameList, 59 const RelationalSchemaObject &localSchema); 60 61 int CleanWaterMarkInMemory(const std::set<std::string> &tableNameList); 62 63 int32_t GetCloudSyncTaskCount(); 64 65 void Close(); 66 67 void StopAllTasks(); 68 69 std::string GetIdentify() const override; 70 71 bool IsClosed() const override; 72 73 void GenerateCompensatedSync(CloudTaskInfo &taskInfo); 74 75 int SetCloudDB(const std::map<std::string, std::shared_ptr<ICloudDb>> &cloudDBs); 76 77 const std::map<std::string, std::shared_ptr<ICloudDb>> GetCloudDB() const; 78 79 void CleanAllWaterMark(); 80 81 CloudSyncEvent SyncMachineDoDownload(); 82 83 CloudSyncEvent SyncMachineDoUpload(); 84 85 CloudSyncEvent SyncMachineDoFinished(); 86 87 void SetGenCloudVersionCallback(const GenerateCloudVersionCallback &callback); 88 89 SyncProcess GetCloudTaskStatus(uint64_t taskId) const; 90 protected: 91 struct TaskContext { 92 TaskId currentTaskId = 0u; 93 std::string tableName; 94 std::shared_ptr<ProcessNotifier> notifier; 95 std::shared_ptr<CloudSyncStrategy> strategy; 96 std::shared_ptr<ProcessRecorder> processRecorder; 97 std::map<TableName, std::vector<Field>> assetFields; 98 // should be cleared after each Download 99 DownloadList assetDownloadList; 100 // store GID and assets, using in upload procedure 101 std::map<TableName, std::map<std::string, std::map<std::string, Assets>>> assetsInfo; 102 // struct: <currentUserIndex, <tableName, waterMark>> 103 std::map<int, std::map<TableName, std::string>> cloudWaterMarks; 104 std::shared_ptr<CloudLocker> locker; 105 bool isNeedUpload = false; // whether the current task need to do upload 106 bool isRealNeedUpload = false; 107 bool isFirstDownload = false; 108 int currentUserIndex = 0; 109 int repeatCount = 0; 110 }; 111 struct UploadParam { 112 int64_t count = 0; 113 TaskId taskId = 0u; 114 Timestamp localMark = 0u; 115 bool lastTable = false; 116 CloudWaterType mode = CloudWaterType::DELETE; 117 LockAction lockAction = LockAction::INSERT; 118 }; 119 struct DownloadItem { 120 std::string gid; 121 Type prefix; 122 OpType strategy; 123 std::map<std::string, Assets> assets; 124 Key hashKey; 125 std::vector<Type> primaryKeyValList; 126 Timestamp timestamp; 127 bool recordConflict = false; 128 }; 129 struct ResumeTaskInfo { 130 TaskContext context; 131 SyncParam syncParam; 132 bool upload = false; // task pause when upload 133 bool skipQuery = false; // task should skip query now 134 size_t lastDownloadIndex = 0u; 135 Timestamp lastLocalWatermark = 0u; 136 int downloadStatus = E_OK; 137 }; 138 139 int TriggerSync(); 140 141 void DoSyncIfNeed(); 142 143 int DoSync(TaskId taskId); 144 145 int PrepareAndUpload(const CloudTaskInfo &taskInfo, size_t index); 146 147 int DoSyncInner(const CloudTaskInfo &taskInfo); 148 149 int DoUploadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload); 150 151 void DoNotifyInNeed(const CloudSyncer::TaskId &taskId, const std::vector<std::string> &needNotifyTables, 152 const bool isFirstDownload); 153 154 int GetUploadCountByTable(const CloudSyncer::TaskId &taskId, int64_t &count); 155 156 void UpdateProcessInfoWithoutUpload(CloudSyncer::TaskId taskId, const std::string &tableName, bool needNotify); 157 158 virtual int DoDownloadInNeed(const CloudTaskInfo &taskInfo, const bool needUpload, bool isFirstDownload); 159 160 void DoFinished(TaskId taskId, int errCode); 161 162 virtual int DoDownload(CloudSyncer::TaskId taskId, bool isFirstDownload); 163 164 int DoDownloadInner(CloudSyncer::TaskId taskId, SyncParam ¶m, bool isFirstDownload); 165 166 void NotifyInEmptyDownload(CloudSyncer::TaskId taskId, InnerProcessInfo &info); 167 168 int PreCheckUpload(TaskId &taskId, const TableName &tableName, Timestamp &localMark); 169 170 int PreCheck(TaskId &taskId, const TableName &tableName); 171 172 int SaveUploadData(Info &insertInfo, Info &updateInfo, Info &deleteInfo, CloudSyncData &uploadData, 173 InnerProcessInfo &innerProcessInfo); 174 175 int DoBatchUpload(CloudSyncData &uploadData, UploadParam &uploadParam, InnerProcessInfo &innerProcessInfo); 176 177 int PreProcessBatchUpload(UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo, 178 CloudSyncData &uploadData); 179 180 int PutWaterMarkAfterBatchUpload(const std::string &tableName, UploadParam &uploadParam); 181 182 virtual int DoUpload(CloudSyncer::TaskId taskId, bool lastTable, LockAction lockAction); 183 184 void SetUploadDataFlag(const TaskId taskId, CloudSyncData& uploadData); 185 186 bool IsModeForcePush(TaskId taskId); 187 188 bool IsModeForcePull(const TaskId taskId); 189 190 bool IsPriorityTask(TaskId taskId); 191 192 bool IsCompensatedTask(TaskId taskId); 193 194 int DoUploadInner(const std::string &tableName, UploadParam &uploadParam); 195 196 int DoUploadByMode(const std::string &tableName, UploadParam &uploadParam, InnerProcessInfo &info); 197 198 int PreHandleData(VBucket &datum, const std::vector<std::string> &pkColNames); 199 200 int QueryCloudData(TaskId taskId, const std::string &tableName, std::string &cloudWaterMark, 201 DownloadData &downloadData); 202 203 int CheckTaskIdValid(TaskId taskId); 204 205 int GetCurrentTableName(std::string &tableName); 206 207 int TryToAddSyncTask(CloudTaskInfo &&taskInfo); 208 209 int CheckQueueSizeWithNoLock(bool priorityTask); 210 211 int PrepareSync(TaskId taskId); 212 213 int LockCloud(TaskId taskId); 214 215 int UnlockCloud(); 216 217 int StartHeartBeatTimer(int period, TaskId taskId); 218 219 void FinishHeartBeatTimer(); 220 221 void HeartBeat(TimerId timerId, TaskId taskId); 222 223 void HeartBeatFailed(TaskId taskId, int errCode); 224 225 void SetTaskFailed(TaskId taskId, int errCode); 226 227 int SaveDatum(SyncParam ¶m, size_t idx, std::vector<std::pair<Key, size_t>> &deletedList, 228 std::map<std::string, LogInfo> &localLogInfoCache); 229 230 int SaveData(CloudSyncer::TaskId taskId, SyncParam ¶m); 231 232 void NotifyInDownload(CloudSyncer::TaskId taskId, SyncParam ¶m, bool isFirstDownload); 233 234 int SaveDataInTransaction(CloudSyncer::TaskId taskId, SyncParam ¶m); 235 236 int SaveChangedData(SyncParam ¶m, size_t dataIndex, const DataInfo &dataInfo, 237 std::vector<std::pair<Key, size_t>> &deletedList); 238 239 int DoDownloadAssets(bool skipSave, SyncParam ¶m); 240 241 int SaveDataNotifyProcess(CloudSyncer::TaskId taskId, SyncParam ¶m); 242 243 void NotifyInBatchUpload(const UploadParam &uploadParam, const InnerProcessInfo &innerProcessInfo, bool lastBatch); 244 245 bool NeedNotifyChangedData(const ChangedData &changedData); 246 247 int NotifyChangedData(ChangedData &&changedData); 248 249 std::map<std::string, Assets> GetAssetsFromVBucket(VBucket &data); 250 251 std::map<std::string, Assets> TagAssetsInSingleRecord(VBucket &coveredData, VBucket &beCoveredData, 252 bool setNormalStatus, int &errCode); 253 254 int TagStatus(bool isExist, SyncParam ¶m, size_t idx, DataInfo &dataInfo, VBucket &localAssetInfo); 255 256 int HandleTagAssets(const Key &hashKey, const DataInfo &dataInfo, size_t idx, SyncParam ¶m, 257 VBucket &localAssetInfo); 258 259 int TagDownloadAssets(const Key &hashKey, size_t idx, SyncParam ¶m, const DataInfo &dataInfo, 260 VBucket &localAssetInfo); 261 262 void TagUploadAssets(CloudSyncData &uploadData); 263 264 int FillCloudAssets(const std::string &tableName, VBucket &normalAssets, 265 VBucket &failedAssets); 266 267 int HandleDownloadResult(const DownloadItem &downloadItem, const std::string &tableName, 268 DownloadCommitList &commitList, uint32_t &successCount); 269 270 int FillDownloadExtend(TaskId taskId, const std::string &tableName, const std::string &cloudWaterMark, 271 VBucket &extend); 272 273 int GetCloudGid(TaskId taskId, const std::string &tableName, QuerySyncObject &obj); 274 275 int DownloadAssets(InnerProcessInfo &info, const std::vector<std::string> &pKColNames, 276 const std::set<Key> &dupHashKeySet, ChangedData &changedAssets); 277 278 int CloudDbDownloadAssets(TaskId taskId, InnerProcessInfo &info, const DownloadList &downloadList, 279 const std::set<Key> &dupHashKeySet, ChangedData &changedAssets); 280 281 void GetDownloadItem(const DownloadList &downloadList, size_t i, DownloadItem &downloadItem); 282 283 bool IsDataContainAssets(); 284 285 int SaveCloudWaterMark(const TableName &tableName, const TaskId taskId); 286 287 bool IsDataContainDuplicateAsset(const std::vector<Field> &assetFields, VBucket &data); 288 289 int UpdateChangedData(SyncParam ¶m, DownloadList &assetsDownloadList); 290 291 void UpdateCloudWaterMark(TaskId taskId, const SyncParam ¶m); 292 293 int TagStatusByStrategy(bool isExist, SyncParam ¶m, DataInfo &dataInfo, OpType &strategyOpResult); 294 295 int CommitDownloadResult(const DownloadItem &downloadItem, InnerProcessInfo &info, 296 DownloadCommitList &commitList, int errCode); 297 298 int GetLocalInfo(size_t index, SyncParam ¶m, DataInfoWithLog &logInfo, 299 std::map<std::string, LogInfo> &localLogInfoCache, VBucket &localAssetInfo); 300 301 TaskId GetNextTaskId(); 302 303 void MarkCurrentTaskPausedIfNeed(); 304 305 void SetCurrentTaskFailedWithoutLock(int errCode); 306 307 int LockCloudIfNeed(TaskId taskId); 308 309 void UnlockIfNeed(); 310 311 void ClearCurrentContextWithoutLock(); 312 313 void ClearContextAndNotify(TaskId taskId, int errCode); 314 315 int DownloadOneBatch(TaskId taskId, SyncParam ¶m, bool isFirstDownload); 316 317 int DownloadOneAssetRecord(const std::set<Key> &dupHashKeySet, const DownloadList &downloadList, 318 DownloadItem &downloadItem, InnerProcessInfo &info, ChangedData &changedAssets); 319 320 int GetSyncParamForDownload(TaskId taskId, SyncParam ¶m); 321 322 bool IsCurrentTaskResume(TaskId taskId); 323 324 bool IsCurrentTableResume(TaskId taskId, bool upload); 325 326 int DownloadDataFromCloud(TaskId taskId, SyncParam ¶m, bool &abort, bool isFirstDownload); 327 328 size_t GetDownloadAssetIndex(TaskId taskId); 329 330 uint32_t GetCurrentTableUploadBatchIndex(); 331 332 void ResetCurrentTableUploadBatchIndex(); 333 334 void RecordWaterMark(TaskId taskId, Timestamp waterMark); 335 336 Timestamp GetResumeWaterMark(TaskId taskId); 337 338 void ReloadWaterMarkIfNeed(TaskId taskId, WaterMark &waterMark); 339 340 void ReloadCloudWaterMarkIfNeed(const std::string &tableName, std::string &cloudWaterMark); 341 342 void ReloadUploadInfoIfNeed(TaskId taskId, const UploadParam ¶m, InnerProcessInfo &info); 343 344 uint32_t GetLastUploadSuccessCount(const std::string &tableName); 345 346 QuerySyncObject GetQuerySyncObject(const std::string &tableName); 347 348 InnerProcessInfo GetInnerProcessInfo(const std::string &tableName, UploadParam &uploadParam); 349 350 void NotifyUploadFailed(int errCode, InnerProcessInfo &info); 351 352 void UpdateProccessWhenUploadFailed(InnerProcessInfo &info); 353 354 int BatchInsert(Info &insertInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo); 355 356 int BatchUpdate(Info &updateInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo); 357 358 int BatchDelete(Info &deleteInfo, CloudSyncData &uploadData, InnerProcessInfo &innerProcessInfo); 359 360 int DownloadAssetsOneByOne(const InnerProcessInfo &info, DownloadItem &downloadItem, 361 std::map<std::string, Assets> &downloadAssets); 362 363 std::pair<int, uint32_t> GetDBAssets(bool isSharedTable, const InnerProcessInfo &info, 364 const DownloadItem &downloadItem, VBucket &dbAssets); 365 366 std::map<std::string, Assets>& BackFillAssetsAfterDownload(int downloadCode, int deleteCode, 367 std::map<std::string, std::vector<uint32_t>> &tmpFlags, std::map<std::string, Assets> &tmpAssetsToDownload, 368 std::map<std::string, Assets> &tmpAssetsToDelete); 369 370 int IsNeedSkipDownload(bool isSharedTable, int &errCode, const InnerProcessInfo &info, 371 const DownloadItem &downloadItem, VBucket &dbAssets); 372 373 bool CheckDownloadOrDeleteCode(int &errCode, int downloadCode, int deleteCode, DownloadItem &downloadItem); 374 375 int DownloadAssetsOneByOneInner(bool isSharedTable, const InnerProcessInfo &info, DownloadItem &downloadItem, 376 std::map<std::string, Assets> &downloadAssets); 377 378 int CommitDownloadAssets(const DownloadItem &downloadItem, const std::string &tableName, 379 DownloadCommitList &commitList, uint32_t &successCount); 380 381 void ChkIgnoredProcess(InnerProcessInfo &info, const CloudSyncData &uploadData, UploadParam &uploadParam); 382 383 int SaveCursorIfNeed(const std::string &tableName); 384 385 int PrepareAndDownload(const std::string &table, const CloudTaskInfo &taskInfo, bool isFirstDownload); 386 387 int UpdateFlagForSavedRecord(const SyncParam ¶m); 388 389 bool IsNeedGetLocalWater(TaskId taskId); 390 391 void SetProxyUser(const std::string &user); 392 393 bool MergeTaskInfo(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId taskId); 394 395 std::pair<bool, TaskId> TryMergeTask(const std::shared_ptr<DataBaseSchema> &cloudSchema, TaskId tryTaskId); 396 397 bool IsTaskCanMerge(const CloudTaskInfo &taskInfo); 398 399 bool IsTasksCanMerge(TaskId taskId, TaskId tryMergeTaskId); 400 401 bool MergeTaskTablesIfConsistent(TaskId sourceId, TaskId targetId); 402 403 void AdjustTableBasedOnSchema(const std::shared_ptr<DataBaseSchema> &cloudSchema, CloudTaskInfo &taskInfo); 404 405 std::pair<TaskId, TaskId> SwapTwoTaskAndCopyTable(TaskId source, TaskId target); 406 407 bool IsQueryListEmpty(TaskId taskId); 408 409 std::pair<int, Timestamp> GetLocalWater(const std::string &tableName, UploadParam &uploadParam); 410 411 int HandleBatchUpload(UploadParam &uploadParam, InnerProcessInfo &info, CloudSyncData &uploadData, 412 ContinueToken &continueStmtToken); 413 414 bool IsNeedLock(const UploadParam ¶m); 415 416 int UploadVersionRecordIfNeed(const UploadParam &uploadParam); 417 418 std::vector<CloudTaskInfo> CopyAndClearTaskInfos(); 419 420 void WaitCurTaskFinished(); 421 422 bool IsLockInDownload(); 423 424 CloudSyncEvent SetCurrentTaskFailedInMachine(int errCode); 425 426 CloudSyncEvent SyncMachineDoRepeatCheck(); 427 428 void MarkDownloadFinishIfNeed(const std::string &downloadTable, bool isFinish = true); 429 430 bool IsTableFinishInUpload(const std::string &table); 431 432 void MarkUploadFinishIfNeed(const std::string &table); 433 434 bool IsNeedUpdateAsset(const VBucket &data); 435 436 int GenerateTaskIdIfNeed(CloudTaskInfo &taskInfo); 437 438 mutable std::mutex dataLock_; 439 TaskId lastTaskId_; 440 std::list<TaskId> taskQueue_; 441 std::list<TaskId> priorityTaskQueue_; 442 std::map<TaskId, CloudTaskInfo> cloudTaskInfos_; 443 std::map<TaskId, ResumeTaskInfo> resumeTaskInfos_; 444 445 TaskContext currentContext_; 446 std::condition_variable contextCv_; 447 std::mutex syncMutex_; // Clean Cloud Data and Sync are mutually exclusive 448 449 CloudDBProxy cloudDB_; 450 451 std::shared_ptr<StorageProxy> storageProxy_; 452 std::atomic<int32_t> queuedManualSyncLimit_; 453 454 std::atomic<bool> closed_; 455 456 std::atomic<TimerId> timerId_; 457 std::mutex heartbeatMutex_; 458 std::condition_variable heartbeatCv_; 459 std::map<TaskId, int32_t> heartbeatCount_; 460 std::map<TaskId, int32_t> failedHeartbeatCount_; 461 462 std::string id_; 463 464 // isKvScene_ is used to distinguish between the KV and RDB in the following scenarios: 465 // 1. Whether upload to the cloud after delete local data that does not have a gid. 466 // 2. Whether the local data need update for different flag when the local time is larger. 467 bool isKvScene_; 468 std::atomic<SingleVerConflictResolvePolicy> policy_; 469 470 static constexpr const TaskId INVALID_TASK_ID = 0u; 471 static constexpr const int MAX_HEARTBEAT_FAILED_LIMIT = 2; 472 static constexpr const int HEARTBEAT_PERIOD = 3; 473 474 CloudSyncStateMachine cloudSyncStateMachine_; 475 }; 476 } 477 #endif // CLOUD_SYNCER_H 478