1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef COMMUNICATORAGGREGATOR_H
17 #define COMMUNICATORAGGREGATOR_H
18 
19 #include <atomic>
20 #include <condition_variable>
21 #include <cstdint>
22 #include <map>
23 #include <mutex>
24 #include <string>
25 #include <thread>
26 #include "frame_combiner.h"
27 #include "frame_retainer.h"
28 #include "iadapter.h"
29 #include "icommunicator.h"
30 #include "icommunicator_aggregator.h"
31 #include "parse_result.h"
32 #include "send_task_scheduler.h"
33 
34 namespace DistributedDB {
35 // Forward Declarations
36 class Communicator;
37 class SerialBuffer;
38 class CommunicatorLinker;
39 
40 struct TaskConfig {
41     bool nonBlock = true;
42     uint32_t timeout = 0u;
43     Priority prio = Priority::NORMAL;
44 };
45 
46 /*
47  * Upper layer Module should comply with calling convention, Inner Module interface will not do excessive check
48  */
49 class CommunicatorAggregator : public ICommunicatorAggregator {
50 public:
51     CommunicatorAggregator();
52     ~CommunicatorAggregator() override;
53 
54     DISABLE_COPY_ASSIGN_MOVE(CommunicatorAggregator);
55 
56     // See ICommunicatorAggregator for detail
57     int Initialize(IAdapter *inAdapter, const std::shared_ptr<DBStatusAdapter> &statusAdapter) override;
58 
59     // Must not call any other functions if Finalize had been called. In fact, Finalize has no chance to be called.
60     void Finalize() override;
61 
62     ICommunicator *AllocCommunicator(uint64_t commLabel, int &outErrorNo) override;
63     ICommunicator *AllocCommunicator(const LabelType &commLabel, int &outErrorNo) override;
64 
65     void ReleaseCommunicator(ICommunicator *inCommunicator) override;
66 
67     int RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack, const Finalizer &inOper) override;
68     int RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper) override;
69 
70     // return optimal allowed data size(Some header is taken into account and subtract)
71     uint32_t GetCommunicatorAggregatorMtuSize() const;
72     uint32_t GetCommunicatorAggregatorMtuSize(const std::string &target) const;
73 
74     // return timeout in range [5s, 60s]
75     uint32_t GetCommunicatorAggregatorTimeout() const;
76     uint32_t GetCommunicatorAggregatorTimeout(const std::string &target) const;
77     bool IsDeviceOnline(const std::string &device) const;
78     int GetLocalIdentity(std::string &outTarget) const override;
79 
80     // Get the protocol version of remote target. Return -E_NOT_FOUND if no record.
81     int GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const;
82 
83     // Called by communicator to make itself really in work
84     void ActivateCommunicator(const LabelType &commLabel);
85 
86     // SerialBuffer surely is heap memory, ScheduleSendTask responsible for lifecycle
87     int ScheduleSendTask(const std::string &dstTarget, SerialBuffer *inBuff, FrameType inType,
88         const TaskConfig &inConfig, const OnSendEnd &onEnd = nullptr);
89 
90     static void EnableCommunicatorNotFoundFeedback(bool isEnable);
91 
92     std::shared_ptr<ExtendHeaderHandle> GetExtendHeaderHandle(const ExtendInfo &paramInfo);
93 
94 private:
95     // Working in a dedicated thread
96     void SendDataRoutine();
97     void SendPacketsAndDisposeTask(const SendTask &inTask, uint32_t mtu,
98         const std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> &eachPacket, uint32_t totalLength);
99 
100     int RetryUntilTimeout(SendTask &inTask, uint32_t timeout, Priority inPrio);
101     void TaskFinalizer(const SendTask &inTask, int result);
102     void NotifySendableToAllCommunicator();
103 
104     // Call from Adapter by register these function
105     void OnBytesReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
106         const std::string &userId);
107     void OnTargetChange(const std::string &target, bool isConnect);
108     void OnSendable(const std::string &target);
109 
110     void OnFragmentReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length,
111         const ParseResult &inResult, const std::string &userId);
112 
113     int OnCommLayerFrameReceive(const std::string &srcTarget, const ParseResult &inResult);
114     int OnAppLayerFrameReceive(const std::string &srcTarget, const uint8_t *bytes,
115         uint32_t length, const ParseResult &inResult, const std::string &userId);
116     int OnAppLayerFrameReceive(const std::string &srcTarget, SerialBuffer *&inFrameBuffer,
117         const ParseResult &inResult, const std::string &userId);
118 
119     // Function with suffix NoMutex should be called with mutex in the caller
120     int TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget, SerialBuffer *&inFrameBuffer,
121         const LabelType &toLabel);
122 
123     // Auxiliary function for cutting short primary function
124     int RegCallbackToAdapter();
125     void UnRegCallbackFromAdapter();
126     void GenerateLocalSourceId();
127     bool ReGenerateLocalSourceIdIfNeed();
128 
129     // Feedback related functions
130     void TriggerVersionNegotiation(const std::string &dstTarget);
131     void TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget, const LabelType &dstLabel,
132         const SerialBuffer *inOriFrame);
133     void TriggerCommunicatorNotFoundFeedback(const std::string &dstTarget, const LabelType &dstLabel, Message* &oriMsg);
134 
135     // Record the protocol version of remote target.
136     void SetRemoteCommunicatorVersion(const std::string &target, uint16_t version);
137 
138     void OnRemoteDBStatusChange(const std::string &devInfo, const std::vector<DBInfo> &dbInfos);
139 
140     void NotifyConnectChange(const std::string &srcTarget, const std::map<LabelType, bool> &changedLabels);
141 
142     void RegDBChangeCallback();
143 
144     void InitSendThread();
145 
146     void SendOnceData();
147 
148     void TriggerSendData();
149 
150     void ResetFrameRecordIfNeed(const uint32_t frameId, const uint32_t mtu);
151 
152     void RetrySendTaskIfNeed(const std::string &target, uint64_t sendSequenceId);
153 
154     void RetrySendTask(const std::string &target, uint64_t sendSequenceId);
155 
156     bool IsRetryOutOfLimit(const std::string &target);
157 
158     int32_t GetNextRetryInterval(const std::string &target, int32_t currentRetryCount);
159 
160     uint64_t GetSendSequenceId(const std::string &target);
161 
162     uint64_t IncreaseSendSequenceId(const std::string &target);
163 
164     DECLARE_OBJECT_TAG(CommunicatorAggregator);
165 
166     static std::atomic<bool> isCommunicatorNotFoundFeedbackEnable_;
167 
168     std::atomic<bool> shutdown_;
169     std::atomic<uint32_t> incFrameId_;
170     std::atomic<uint64_t> localSourceId_;
171 
172     // Handle related
173     mutable std::mutex commMapMutex_;
174     std::map<LabelType, std::pair<Communicator *, bool>> commMap_; // bool true indicate communicator activated
175     FrameCombiner combiner_;
176     FrameRetainer retainer_;
177     SendTaskScheduler scheduler_;
178     IAdapter *adapterHandle_ = nullptr;
179     CommunicatorLinker *commLinker_ = nullptr;
180 
181     // Thread related
182     std::thread exclusiveThread_;
183     bool wakingSignal_ = false;
184     mutable std::mutex wakingMutex_;
185     std::condition_variable wakingCv_;
186 
187     // RetryCreateTask related
188     mutable std::mutex retryMutex_;
189     std::condition_variable retryCv_;
190 
191     // Remote target version related
192     mutable std::mutex versionMapMutex_;
193     std::map<std::string, uint16_t> versionMap_;
194 
195     // CommLack Callback related
196     CommunicatorLackCallback onCommLackHandle_;
197     Finalizer onCommLackFinalizer_;
198     mutable std::mutex onCommLackMutex_;
199 
200     // Connect Callback related
201     OnConnectCallback onConnectHandle_;
202     Finalizer onConnectFinalizer_;
203     mutable std::mutex onConnectMutex_;
204 
205     std::shared_ptr<DBStatusAdapter> dbStatusAdapter_;
206 
207     std::atomic<bool> useExclusiveThread_ = false;
208     bool sendTaskStart_ = false;
209     mutable std::mutex scheduleSendTaskMutex_;
210     std::condition_variable finalizeCv_;
211 
212     struct FrameSendRecord {
213         uint32_t splitMtu = 0u;
214         uint32_t sendIndex = 0u;
215     };
216     std::mutex sendRecordMutex_;
217     std::map<uint32_t, FrameSendRecord> sendRecord_;
218 
219     std::mutex retryCountMutex_;
220     std::map<std::string, int32_t> retryCount_;
221 
222     std::mutex sendSequenceMutex_;
223     std::map<std::string, uint64_t> sendSequence_;
224 };
225 } // namespace DistributedDB
226 
227 #endif // COMMUNICATORAGGREGATOR_H
228