1  /*
2   * Copyright (c) 2021 Huawei Device Co., Ltd.
3   * Licensed under the Apache License, Version 2.0 (the "License");
4   * you may not use this file except in compliance with the License.
5   * You may obtain a copy of the License at
6   *
7   *     http://www.apache.org/licenses/LICENSE-2.0
8   *
9   * Unless required by applicable law or agreed to in writing, software
10   * distributed under the License is distributed on an "AS IS" BASIS,
11   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   * See the License for the specific language governing permissions and
13   * limitations under the License.
14   */
15  
16  #ifndef SINGLE_VER_SYNC_TASK_CONTEXT_H
17  #define SINGLE_VER_SYNC_TASK_CONTEXT_H
18  
19  #include <list>
20  #include <mutex>
21  #include <string>
22  #include <unordered_map>
23  
24  #include "db_ability.h"
25  #include "query_sync_object.h"
26  #include "schema_negotiate.h"
27  #include "single_ver_kvdb_sync_interface.h"
28  #include "single_ver_sync_target.h"
29  #include "subscribe_manager.h"
30  #include "sync_target.h"
31  #include "sync_task_context.h"
32  #include "time_helper.h"
33  
34  
35  namespace DistributedDB {
36  class SingleVerSyncTaskContext : public SyncTaskContext {
37  public:
38      explicit SingleVerSyncTaskContext();
39  
40      DISABLE_COPY_ASSIGN_MOVE(SingleVerSyncTaskContext);
41  
42      // Init SingleVerSyncTaskContext
43      int Initialize(const std::string &deviceId, ISyncInterface *syncInterface,
44          const std::shared_ptr<Metadata> &metadata, ICommunicator *communicator) override;
45  
46      // Add a sync task target with the operation to the queue
47      int AddSyncOperation(SyncOperation *operation) override;
48  
49      bool IsCurrentSyncTaskCanBeSkipped() const override;
50  
51      // Set the end watermark of this task
52      void SetEndMark(WaterMark endMark);
53  
54      // Get the end watermark of this task
55      WaterMark GetEndMark() const;
56  
57      void GetContinueToken(ContinueToken &outToken) const;
58  
59      void SetContinueToken(ContinueToken token);
60  
61      void ReleaseContinueToken();
62  
63      int PopResponseTarget(SingleVerSyncTarget &target);
64  
65      int GetRspTargetQueueSize() const;
66  
67      // responseSessionId used for mark the pull response task
68      void SetResponseSessionId(uint32_t responseSessionId);
69  
70      // responseSessionId used for mark the pull response task
71      uint32_t GetResponseSessionId() const;
72  
73      void Clear() override;
74  
75      void Abort(int status) override;
76  
77      void ClearAllSyncTask() override;
78  
79      // If set true, remote stale data will be clear when remote db rebuilt.
80      void EnableClearRemoteStaleData(bool enable);
81  
82      // Check if need to clear remote device stale data in syncing, when the remote db rebuilt.
83      bool IsNeedClearRemoteStaleData() const;
84  
85      // start a timer to ResetWatchDog when sync data one (key,value) size bigger than mtu
86      bool StartFeedDogForSync(uint32_t time, SyncDirectionFlag flag);
87  
88      // stop timer to ResetWatchDog when sync data one (key,value) size bigger than mtu
89      void StopFeedDogForSync(SyncDirectionFlag flag);
90  
91      // is receive waterMark err
92      bool IsReceiveWaterMarkErr() const;
93  
94      // set receive waterMark err
95      void SetReceiveWaterMarkErr(bool isErr);
96  
97      void SetRemoteSeccurityOption(SecurityOption secOption);
98  
99      SecurityOption GetRemoteSeccurityOption() const;
100  
101      void SetReceivcPermitCheck(bool isChecked);
102  
103      bool GetReceivcPermitCheck() const;
104  
105      void SetSendPermitCheck(bool isChecked);
106  
107      bool GetSendPermitCheck() const;
108      // pair<bool, bool>: first:SyncStrategy.permitSync, second: isSchemaSync_
109      virtual std::pair<bool, bool> GetSchemaSyncStatus(QuerySyncObject &querySyncObject) const = 0;
110  
111      bool IsSkipTimeoutError(int errCode) const;
112  
113      bool FindResponseSyncTarget(uint32_t responseSessionId) const;
114  
115      // For query sync
116      void SetQuery(const QuerySyncObject &query);
117      QuerySyncObject GetQuery() const;
118      void SetQuerySync(bool isQuerySync);
119      bool IsQuerySync() const;
120      std::set<CompressAlgorithm> GetRemoteCompressAlgo() const;
121      std::string GetRemoteCompressAlgoStr() const;
122      void SetDbAbility(DbAbility &remoteDbAbility) override;
123      CompressAlgorithm ChooseCompressAlgo() const;
124      bool IsNotSupportAbility(const AbilityItem &abilityItem) const;
125  
126      void SetSubscribeManager(std::shared_ptr<SubscribeManager> &subManager);
127      std::shared_ptr<SubscribeManager> GetSubscribeManager() const;
128  
129      void SaveLastPushTaskExecStatus(int finalStatus) override;
130      void ResetLastPushTaskStatus() override;
131  
132      virtual std::string GetQuerySyncId() const = 0;
133      virtual std::string GetDeleteSyncId() const = 0;
134  
135      void SetCommNormal(bool isCommNormal);
136  
137      void StartFeedDogForGetData(uint32_t sessionId);
138      void StopFeedDogForGetData();
139  
140      int32_t GetResponseTaskCount() override;
141  
142  protected:
143      ~SingleVerSyncTaskContext() override;
144      void CopyTargetData(const ISyncTarget *target, const TaskParam &taskParam) override;
145  
146      // For querySync
147      mutable std::mutex queryMutex_;
148      QuerySyncObject query_;
149      bool isQuerySync_ = false;
150  
151      // for merge sync task
152      volatile int lastFullSyncTaskStatus_ = SyncOperation::Status::OP_WAITING;
153  private:
154      int GetCorrectedSendWaterMarkForCurrentTask(const SyncOperation *operation, uint64_t &waterMark) const;
155  
156      bool IsCurrentSyncTaskCanBeSkippedInner(const SyncOperation *operation) const;
157  
158      DECLARE_OBJECT_TAG(SingleVerSyncTaskContext);
159  
160      ContinueToken token_;
161      WaterMark endMark_;
162      volatile uint32_t responseSessionId_ = 0;
163  
164      bool needClearRemoteStaleData_;
165      mutable std::mutex securityOptionMutex_;
166      SecurityOption remoteSecOption_ = {0, 0}; // remote targe can handle secOption data or not.
167      volatile bool isReceivcPermitChecked_ = false;
168      volatile bool isSendPermitChecked_ = false;
169  
170      // is receive waterMark err, peerWaterMark bigger than remote localWaterMark
171      volatile bool isReceiveWaterMarkErr_ = false;
172  
173      // For db ability
174      mutable std::mutex remoteDbAbilityLock_;
175      DbAbility remoteDbAbility_;
176  
177      // For subscribe manager
178      std::shared_ptr<SubscribeManager> subManager_;
179  
180      mutable std::mutex queryTaskStatusMutex_;
181      // <queryId, lastExcuStatus>
182      std::unordered_map<std::string, int> lastQuerySyncTaskStatusMap_;
183  };
184  } // namespace DistributedDB
185  
186  #endif // SYNC_TASK_CONTEXT_H
187