1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SINGLE_VER_SYNC_TASK_CONTEXT_H 17 #define SINGLE_VER_SYNC_TASK_CONTEXT_H 18 19 #include <list> 20 #include <mutex> 21 #include <string> 22 #include <unordered_map> 23 24 #include "db_ability.h" 25 #include "query_sync_object.h" 26 #include "schema_negotiate.h" 27 #include "single_ver_kvdb_sync_interface.h" 28 #include "single_ver_sync_target.h" 29 #include "subscribe_manager.h" 30 #include "sync_target.h" 31 #include "sync_task_context.h" 32 #include "time_helper.h" 33 34 35 namespace DistributedDB { 36 class SingleVerSyncTaskContext : public SyncTaskContext { 37 public: 38 explicit SingleVerSyncTaskContext(); 39 40 DISABLE_COPY_ASSIGN_MOVE(SingleVerSyncTaskContext); 41 42 // Init SingleVerSyncTaskContext 43 int Initialize(const std::string &deviceId, ISyncInterface *syncInterface, 44 const std::shared_ptr<Metadata> &metadata, ICommunicator *communicator) override; 45 46 // Add a sync task target with the operation to the queue 47 int AddSyncOperation(SyncOperation *operation) override; 48 49 bool IsCurrentSyncTaskCanBeSkipped() const override; 50 51 // Set the end watermark of this task 52 void SetEndMark(WaterMark endMark); 53 54 // Get the end watermark of this task 55 WaterMark GetEndMark() const; 56 57 void GetContinueToken(ContinueToken &outToken) const; 58 59 void SetContinueToken(ContinueToken token); 60 61 void ReleaseContinueToken(); 62 63 int PopResponseTarget(SingleVerSyncTarget &target); 64 65 int GetRspTargetQueueSize() const; 66 67 // responseSessionId used for mark the pull response task 68 void SetResponseSessionId(uint32_t responseSessionId); 69 70 // responseSessionId used for mark the pull response task 71 uint32_t GetResponseSessionId() const; 72 73 void Clear() override; 74 75 void Abort(int status) override; 76 77 void ClearAllSyncTask() override; 78 79 // If set true, remote stale data will be clear when remote db rebuilt. 80 void EnableClearRemoteStaleData(bool enable); 81 82 // Check if need to clear remote device stale data in syncing, when the remote db rebuilt. 83 bool IsNeedClearRemoteStaleData() const; 84 85 // start a timer to ResetWatchDog when sync data one (key,value) size bigger than mtu 86 bool StartFeedDogForSync(uint32_t time, SyncDirectionFlag flag); 87 88 // stop timer to ResetWatchDog when sync data one (key,value) size bigger than mtu 89 void StopFeedDogForSync(SyncDirectionFlag flag); 90 91 // is receive waterMark err 92 bool IsReceiveWaterMarkErr() const; 93 94 // set receive waterMark err 95 void SetReceiveWaterMarkErr(bool isErr); 96 97 void SetRemoteSeccurityOption(SecurityOption secOption); 98 99 SecurityOption GetRemoteSeccurityOption() const; 100 101 void SetReceivcPermitCheck(bool isChecked); 102 103 bool GetReceivcPermitCheck() const; 104 105 void SetSendPermitCheck(bool isChecked); 106 107 bool GetSendPermitCheck() const; 108 // pair<bool, bool>: first:SyncStrategy.permitSync, second: isSchemaSync_ 109 virtual std::pair<bool, bool> GetSchemaSyncStatus(QuerySyncObject &querySyncObject) const = 0; 110 111 bool IsSkipTimeoutError(int errCode) const; 112 113 bool FindResponseSyncTarget(uint32_t responseSessionId) const; 114 115 // For query sync 116 void SetQuery(const QuerySyncObject &query); 117 QuerySyncObject GetQuery() const; 118 void SetQuerySync(bool isQuerySync); 119 bool IsQuerySync() const; 120 std::set<CompressAlgorithm> GetRemoteCompressAlgo() const; 121 std::string GetRemoteCompressAlgoStr() const; 122 void SetDbAbility(DbAbility &remoteDbAbility) override; 123 CompressAlgorithm ChooseCompressAlgo() const; 124 bool IsNotSupportAbility(const AbilityItem &abilityItem) const; 125 126 void SetSubscribeManager(std::shared_ptr<SubscribeManager> &subManager); 127 std::shared_ptr<SubscribeManager> GetSubscribeManager() const; 128 129 void SaveLastPushTaskExecStatus(int finalStatus) override; 130 void ResetLastPushTaskStatus() override; 131 132 virtual std::string GetQuerySyncId() const = 0; 133 virtual std::string GetDeleteSyncId() const = 0; 134 135 void SetCommNormal(bool isCommNormal); 136 137 void StartFeedDogForGetData(uint32_t sessionId); 138 void StopFeedDogForGetData(); 139 140 int32_t GetResponseTaskCount() override; 141 142 protected: 143 ~SingleVerSyncTaskContext() override; 144 void CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam) override; 145 146 // For querySync 147 mutable std::mutex queryMutex_; 148 QuerySyncObject query_; 149 bool isQuerySync_ = false; 150 151 // for merge sync task 152 volatile int lastFullSyncTaskStatus_ = SyncOperation::Status::OP_WAITING; 153 private: 154 int GetCorrectedSendWaterMarkForCurrentTask(const SyncOperation *operation, uint64_t &waterMark) const; 155 156 bool IsCurrentSyncTaskCanBeSkippedInner(const SyncOperation *operation) const; 157 158 DECLARE_OBJECT_TAG(SingleVerSyncTaskContext); 159 160 ContinueToken token_; 161 WaterMark endMark_; 162 volatile uint32_t responseSessionId_ = 0; 163 164 bool needClearRemoteStaleData_; 165 mutable std::mutex securityOptionMutex_; 166 SecurityOption remoteSecOption_ = {0, 0}; // remote targe can handle secOption data or not. 167 volatile bool isReceivcPermitChecked_ = false; 168 volatile bool isSendPermitChecked_ = false; 169 170 // is receive waterMark err, peerWaterMark bigger than remote localWaterMark 171 volatile bool isReceiveWaterMarkErr_ = false; 172 173 // For db ability 174 mutable std::mutex remoteDbAbilityLock_; 175 DbAbility remoteDbAbility_; 176 177 // For subscribe manager 178 std::shared_ptr<SubscribeManager> subManager_; 179 180 mutable std::mutex queryTaskStatusMutex_; 181 // <queryId, lastExcuStatus> 182 std::unordered_map<std::string, int> lastQuerySyncTaskStatusMap_; 183 }; 184 } // namespace DistributedDB 185 186 #endif // SYNC_TASK_CONTEXT_H 187