1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef SINGLE_VER_SYNC_TASK_CONTEXT_H
17 #define SINGLE_VER_SYNC_TASK_CONTEXT_H
18 
19 #include <list>
20 #include <mutex>
21 #include <string>
22 #include <unordered_map>
23 
24 #include "db_ability.h"
25 #include "query_sync_object.h"
26 #include "schema_negotiate.h"
27 #include "single_ver_kvdb_sync_interface.h"
28 #include "single_ver_sync_target.h"
29 #include "subscribe_manager.h"
30 #include "sync_target.h"
31 #include "sync_task_context.h"
32 #include "time_helper.h"
33 
34 
35 namespace DistributedDB {
36 class SingleVerSyncTaskContext : public SyncTaskContext {
37 public:
38     explicit SingleVerSyncTaskContext();
39 
40     DISABLE_COPY_ASSIGN_MOVE(SingleVerSyncTaskContext);
41 
42     // Init SingleVerSyncTaskContext
43     int Initialize(const std::string &deviceId, ISyncInterface *syncInterface,
44         const std::shared_ptr<Metadata> &metadata, ICommunicator *communicator) override;
45 
46     // Add a sync task target with the operation to the queue
47     int AddSyncOperation(SyncOperation *operation) override;
48 
49     bool IsCurrentSyncTaskCanBeSkipped() const override;
50 
51     // Set the end watermark of this task
52     void SetEndMark(WaterMark endMark);
53 
54     // Get the end watermark of this task
55     WaterMark GetEndMark() const;
56 
57     void GetContinueToken(ContinueToken &outToken) const;
58 
59     void SetContinueToken(ContinueToken token);
60 
61     void ReleaseContinueToken();
62 
63     int PopResponseTarget(SingleVerSyncTarget &target);
64 
65     int GetRspTargetQueueSize() const;
66 
67     // responseSessionId used for mark the pull response task
68     void SetResponseSessionId(uint32_t responseSessionId);
69 
70     // responseSessionId used for mark the pull response task
71     uint32_t GetResponseSessionId() const;
72 
73     void Clear() override;
74 
75     void Abort(int status) override;
76 
77     void ClearAllSyncTask() override;
78 
79     // If set true, remote stale data will be clear when remote db rebuilt.
80     void EnableClearRemoteStaleData(bool enable);
81 
82     // Check if need to clear remote device stale data in syncing, when the remote db rebuilt.
83     bool IsNeedClearRemoteStaleData() const;
84 
85     // start a timer to ResetWatchDog when sync data one (key,value) size bigger than mtu
86     bool StartFeedDogForSync(uint32_t time, SyncDirectionFlag flag);
87 
88     // stop timer to ResetWatchDog when sync data one (key,value) size bigger than mtu
89     void StopFeedDogForSync(SyncDirectionFlag flag);
90 
91     // is receive waterMark err
92     bool IsReceiveWaterMarkErr() const;
93 
94     // set receive waterMark err
95     void SetReceiveWaterMarkErr(bool isErr);
96 
97     void SetRemoteSeccurityOption(SecurityOption secOption);
98 
99     SecurityOption GetRemoteSeccurityOption() const;
100 
101     void SetReceivcPermitCheck(bool isChecked);
102 
103     bool GetReceivcPermitCheck() const;
104 
105     void SetSendPermitCheck(bool isChecked);
106 
107     bool GetSendPermitCheck() const;
108     // pair<bool, bool>: first:SyncStrategy.permitSync, second: isSchemaSync_
109     virtual std::pair<bool, bool> GetSchemaSyncStatus(QuerySyncObject &querySyncObject) const = 0;
110 
111     bool IsSkipTimeoutError(int errCode) const;
112 
113     bool FindResponseSyncTarget(uint32_t responseSessionId) const;
114 
115     // For query sync
116     void SetQuery(const QuerySyncObject &query);
117     QuerySyncObject GetQuery() const;
118     void SetQuerySync(bool isQuerySync);
119     bool IsQuerySync() const;
120     std::set<CompressAlgorithm> GetRemoteCompressAlgo() const;
121     std::string GetRemoteCompressAlgoStr() const;
122     void SetDbAbility(DbAbility &remoteDbAbility) override;
123     CompressAlgorithm ChooseCompressAlgo() const;
124     bool IsNotSupportAbility(const AbilityItem &abilityItem) const;
125 
126     void SetSubscribeManager(std::shared_ptr<SubscribeManager> &subManager);
127     std::shared_ptr<SubscribeManager> GetSubscribeManager() const;
128 
129     void SaveLastPushTaskExecStatus(int finalStatus) override;
130     void ResetLastPushTaskStatus() override;
131 
132     virtual std::string GetQuerySyncId() const = 0;
133     virtual std::string GetDeleteSyncId() const = 0;
134 
135     void SetCommNormal(bool isCommNormal);
136 
137     void StartFeedDogForGetData(uint32_t sessionId);
138     void StopFeedDogForGetData();
139 
140     int32_t GetResponseTaskCount() override;
141 
142 protected:
143     ~SingleVerSyncTaskContext() override;
144     void CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam) override;
145 
146     // For querySync
147     mutable std::mutex queryMutex_;
148     QuerySyncObject query_;
149     bool isQuerySync_ = false;
150 
151     // for merge sync task
152     volatile int lastFullSyncTaskStatus_ = SyncOperation::Status::OP_WAITING;
153 private:
154     int GetCorrectedSendWaterMarkForCurrentTask(const SyncOperation *operation, uint64_t &waterMark) const;
155 
156     bool IsCurrentSyncTaskCanBeSkippedInner(const SyncOperation *operation) const;
157 
158     DECLARE_OBJECT_TAG(SingleVerSyncTaskContext);
159 
160     ContinueToken token_;
161     WaterMark endMark_;
162     volatile uint32_t responseSessionId_ = 0;
163 
164     bool needClearRemoteStaleData_;
165     mutable std::mutex securityOptionMutex_;
166     SecurityOption remoteSecOption_ = {0, 0}; // remote targe can handle secOption data or not.
167     volatile bool isReceivcPermitChecked_ = false;
168     volatile bool isSendPermitChecked_ = false;
169 
170     // is receive waterMark err, peerWaterMark bigger than remote localWaterMark
171     volatile bool isReceiveWaterMarkErr_ = false;
172 
173     // For db ability
174     mutable std::mutex remoteDbAbilityLock_;
175     DbAbility remoteDbAbility_;
176 
177     // For subscribe manager
178     std::shared_ptr<SubscribeManager> subManager_;
179 
180     mutable std::mutex queryTaskStatusMutex_;
181     // <queryId, lastExcuStatus>
182     std::unordered_map<std::string, int> lastQuerySyncTaskStatusMap_;
183 };
184 } // namespace DistributedDB
185 
186 #endif // SYNC_TASK_CONTEXT_H
187