1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SYNC_OPERATION_H
17 #define SYNC_OPERATION_H
18 
19 #include <functional>
20 #include <map>
21 #include <mutex>
22 #include <string>
23 #include <vector>
24 
25 #include "ikvdb_sync_interface.h"
26 #include "notification_chain.h"
27 #include "query_sync_object.h"
28 #include "ref_object.h"
29 #include "runtime_context.h"
30 #include "semaphore_utils.h"
31 #include "sync_types.h"
32 
33 namespace DistributedDB {
34 class SyncOperation : public RefObject {
35 public:
36     enum Status {
37         OP_WAITING = 0,
38         OP_SYNCING,
39         OP_SEND_FINISHED,
40         OP_RECV_FINISHED,
41         OP_FINISHED_ALL, // status >= OP_FINISHED_ALL is final status.
42         OP_FAILED,
43         OP_TIMEOUT,
44         OP_PERMISSION_CHECK_FAILED,
45         OP_COMM_ABNORMAL,
46         OP_SECURITY_OPTION_CHECK_FAILURE, // remote device's SecurityOption not equal to local
47         OP_EKEYREVOKED_FAILURE, // EKEYREVOKED error
48         OP_BUSY_FAILURE,
49         OP_SCHEMA_INCOMPATIBLE,
50         OP_QUERY_FORMAT_FAILURE,
51         OP_QUERY_FIELD_FAILURE,
52         OP_NOT_SUPPORT,
53         OP_INTERCEPT_DATA_FAIL,
54         OP_MAX_LIMITS,
55         OP_SCHEMA_CHANGED,
56         OP_INVALID_ARGS,
57         OP_USER_CHANGED,
58         OP_DENIED_SQL,
59         OP_NOTADB_OR_CORRUPTED,
60     };
61 
62     using UserCallback = std::function<void(std::map<std::string, int>)>;
63     using OnSyncFinished = std::function<void(int)>;
64     using OnSyncFinalize = std::function<void(void)>;
65 
66     SyncOperation(uint32_t syncId, const std::vector<std::string> &devices, int mode,
67         const UserCallback &userCallback, bool isBlockSync);
68 
69     DISABLE_COPY_ASSIGN_MOVE(SyncOperation);
70 
71     // Init the status for callback
72     int Initialize();
73 
74     // Set the OnSyncFinalize callback
75     void SetOnSyncFinalize(const OnSyncFinalize &callback);
76 
77     // Set the OnSyncFinished callback, it will be called either success or failed.
78     void SetOnSyncFinished(const OnSyncFinished &callback);
79 
80     // Set the sync status, running or finished
81     void SetStatus(const std::string &deviceId, int status, int commErrCode = E_OK);
82 
83     // Set the unfinished devices sync status, running or finished
84     void SetUnfinishedDevStatus(int status);
85 
86     // Set the identifier, used in SyncOperation::Finished
87     void SetIdentifier(const std::vector<uint8_t> &identifier);
88 
89     // Get the sync status, running or finished
90     int GetStatus(const std::string &deviceId) const;
91 
92     // Get the sync id.
93     uint32_t GetSyncId() const;
94 
95     // Get the sync mode
96     int GetMode() const;
97 
98     // Used to call the onFinished and caller's on complete
99     void Finished();
100 
101     // Get the deviceId of this sync status
102     const std::vector<std::string> &GetDevices() const;
103 
104     // Wait if it's a block sync
105     void WaitIfNeed();
106 
107     // Notify if it's a block sync
108     void NotifyIfNeed();
109 
110     // Return if this sync is auto sync
111     bool IsAutoSync() const;
112 
113     // Return if this sync is block sync
114     bool IsBlockSync() const;
115 
116     // Return if this sync is AUTO_SUBSCRIBE_QUERY
117     bool IsAutoControlCmd() const;
118 
119     // Check if All devices sync finished.
120     bool CheckIsAllFinished() const;
121 
122     // For query sync
123     void SetQuery(const QuerySyncObject &query);
124     void GetQuery(QuerySyncObject &targetObject) const;
125     bool IsQuerySync() const;
126     std::string GetQueryId() const;
127     static SyncType GetSyncType(int mode);
128     static int TransferSyncMode(int mode);
129 
130     static DBStatus DBStatusTrans(int operationStatus);
131 
132     void SetSyncContext(RefObject *context);
133 
134 protected:
135     virtual ~SyncOperation();
136 
137     RefObject *context_ = nullptr;
138 private:
139     DECLARE_OBJECT_TAG(SyncOperation);
140 
141     // called by destruction
142     void Finalize();
143 
144     static std::string GetFinishDetailMsg(const std::map<std::string, int> &finishStatus);
145 
146     void ReplaceCommErrCode(std::map<std::string, int> &finishStatus);
147 
148     // The device list
149     const std::vector<std::string> devices_;
150 
151     // The Syncid
152     uint32_t syncId_;
153 
154     // The sync mode_ see SyncMode
155     int mode_;
156 
157     // The callback caller registered
158     UserCallback userCallback_;
159 
160     // The callback caller registered, when sync timeout, call
161     OnSyncFinished onFinished_;
162 
163     // The callback caller registered, will be called when destruction.
164     OnSyncFinalize onFinalize_;
165 
166     // The device id we sync with
167     std::map<std::string, int> statuses_;
168 
169     // passthrough errCode
170     std::map<std::string, int> commErrCodeMap_;
171 
172     // Is this operation is a block sync
173     volatile bool isBlockSync_;
174 
175     // Is this operation is an auto sync
176     volatile bool isAutoSync_;
177 
178     // Is this operation has finished
179     volatile bool isFinished_;
180 
181     // Used for block sync
182     std::unique_ptr<SemaphoreUtils> semaphore_;
183 
184     mutable std::mutex queryMutex_;
185     QuerySyncObject query_;
186     volatile bool isQuerySync_;
187 
188     volatile bool isAutoSubscribe_;
189 
190     // record identifier used to call ScheduleQueuedTask in SyncOperation::Finished
191     std::string identifier_;
192 };
193 } // namespace DistributedDB
194 
195 #endif  // SYNC_OPERATION_H
196