1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef SYNC_OPERATION_H 17 #define SYNC_OPERATION_H 18 19 #include <functional> 20 #include <map> 21 #include <mutex> 22 #include <string> 23 #include <vector> 24 25 #include "ikvdb_sync_interface.h" 26 #include "notification_chain.h" 27 #include "query_sync_object.h" 28 #include "ref_object.h" 29 #include "runtime_context.h" 30 #include "semaphore_utils.h" 31 #include "sync_types.h" 32 33 namespace DistributedDB { 34 class SyncOperation : public RefObject { 35 public: 36 enum Status { 37 OP_WAITING = 0, 38 OP_SYNCING, 39 OP_SEND_FINISHED, 40 OP_RECV_FINISHED, 41 OP_FINISHED_ALL, // status >= OP_FINISHED_ALL is final status. 42 OP_FAILED, 43 OP_TIMEOUT, 44 OP_PERMISSION_CHECK_FAILED, 45 OP_COMM_ABNORMAL, 46 OP_SECURITY_OPTION_CHECK_FAILURE, // remote device's SecurityOption not equal to local 47 OP_EKEYREVOKED_FAILURE, // EKEYREVOKED error 48 OP_BUSY_FAILURE, 49 OP_SCHEMA_INCOMPATIBLE, 50 OP_QUERY_FORMAT_FAILURE, 51 OP_QUERY_FIELD_FAILURE, 52 OP_NOT_SUPPORT, 53 OP_INTERCEPT_DATA_FAIL, 54 OP_MAX_LIMITS, 55 OP_SCHEMA_CHANGED, 56 OP_INVALID_ARGS, 57 OP_USER_CHANGED, 58 OP_DENIED_SQL, 59 OP_NOTADB_OR_CORRUPTED, 60 }; 61 62 using UserCallback = std::function<void(std::map<std::string, int>)>; 63 using OnSyncFinished = std::function<void(int)>; 64 using OnSyncFinalize = std::function<void(void)>; 65 66 SyncOperation(uint32_t syncId, const std::vector<std::string> &devices, int mode, 67 const UserCallback &userCallback, bool isBlockSync); 68 69 DISABLE_COPY_ASSIGN_MOVE(SyncOperation); 70 71 // Init the status for callback 72 int Initialize(); 73 74 // Set the OnSyncFinalize callback 75 void SetOnSyncFinalize(const OnSyncFinalize &callback); 76 77 // Set the OnSyncFinished callback, it will be called either success or failed. 78 void SetOnSyncFinished(const OnSyncFinished &callback); 79 80 // Set the sync status, running or finished 81 void SetStatus(const std::string &deviceId, int status, int commErrCode = E_OK); 82 83 // Set the unfinished devices sync status, running or finished 84 void SetUnfinishedDevStatus(int status); 85 86 // Set the identifier, used in SyncOperation::Finished 87 void SetIdentifier(const std::vector<uint8_t> &identifier); 88 89 // Get the sync status, running or finished 90 int GetStatus(const std::string &deviceId) const; 91 92 // Get the sync id. 93 uint32_t GetSyncId() const; 94 95 // Get the sync mode 96 int GetMode() const; 97 98 // Used to call the onFinished and caller's on complete 99 void Finished(); 100 101 // Get the deviceId of this sync status 102 const std::vector<std::string> &GetDevices() const; 103 104 // Wait if it's a block sync 105 void WaitIfNeed(); 106 107 // Notify if it's a block sync 108 void NotifyIfNeed(); 109 110 // Return if this sync is auto sync 111 bool IsAutoSync() const; 112 113 // Return if this sync is block sync 114 bool IsBlockSync() const; 115 116 // Return if this sync is AUTO_SUBSCRIBE_QUERY 117 bool IsAutoControlCmd() const; 118 119 // Check if All devices sync finished. 120 bool CheckIsAllFinished() const; 121 122 // For query sync 123 void SetQuery(const QuerySyncObject &query); 124 void GetQuery(QuerySyncObject &targetObject) const; 125 bool IsQuerySync() const; 126 std::string GetQueryId() const; 127 static SyncType GetSyncType(int mode); 128 static int TransferSyncMode(int mode); 129 130 static DBStatus DBStatusTrans(int operationStatus); 131 132 void SetSyncContext(RefObject *context); 133 134 protected: 135 virtual ~SyncOperation(); 136 137 RefObject *context_ = nullptr; 138 private: 139 DECLARE_OBJECT_TAG(SyncOperation); 140 141 // called by destruction 142 void Finalize(); 143 144 static std::string GetFinishDetailMsg(const std::map<std::string, int> &finishStatus); 145 146 void ReplaceCommErrCode(std::map<std::string, int> &finishStatus); 147 148 // The device list 149 const std::vector<std::string> devices_; 150 151 // The Syncid 152 uint32_t syncId_; 153 154 // The sync mode_ see SyncMode 155 int mode_; 156 157 // The callback caller registered 158 UserCallback userCallback_; 159 160 // The callback caller registered, when sync timeout, call 161 OnSyncFinished onFinished_; 162 163 // The callback caller registered, will be called when destruction. 164 OnSyncFinalize onFinalize_; 165 166 // The device id we sync with 167 std::map<std::string, int> statuses_; 168 169 // passthrough errCode 170 std::map<std::string, int> commErrCodeMap_; 171 172 // Is this operation is a block sync 173 volatile bool isBlockSync_; 174 175 // Is this operation is an auto sync 176 volatile bool isAutoSync_; 177 178 // Is this operation has finished 179 volatile bool isFinished_; 180 181 // Used for block sync 182 std::unique_ptr<SemaphoreUtils> semaphore_; 183 184 mutable std::mutex queryMutex_; 185 QuerySyncObject query_; 186 volatile bool isQuerySync_; 187 188 volatile bool isAutoSubscribe_; 189 190 // record identifier used to call ScheduleQueuedTask in SyncOperation::Finished 191 std::string identifier_; 192 }; 193 } // namespace DistributedDB 194 195 #endif // SYNC_OPERATION_H 196