1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef COMMUNICATORAGGREGATOR_H 17 #define COMMUNICATORAGGREGATOR_H 18 19 #include <atomic> 20 #include <condition_variable> 21 #include <cstdint> 22 #include <map> 23 #include <mutex> 24 #include <string> 25 #include <thread> 26 #include "frame_combiner.h" 27 #include "frame_retainer.h" 28 #include "iadapter.h" 29 #include "icommunicator.h" 30 #include "icommunicator_aggregator.h" 31 #include "parse_result.h" 32 #include "send_task_scheduler.h" 33 34 namespace DistributedDB { 35 // Forward Declarations 36 class Communicator; 37 class SerialBuffer; 38 class CommunicatorLinker; 39 40 struct TaskConfig { 41 bool nonBlock = true; 42 uint32_t timeout = 0u; 43 Priority prio = Priority::NORMAL; 44 }; 45 46 /* 47 * Upper layer Module should comply with calling convention, Inner Module interface will not do excessive check 48 */ 49 class CommunicatorAggregator : public ICommunicatorAggregator { 50 public: 51 CommunicatorAggregator(); 52 ~CommunicatorAggregator() override; 53 54 DISABLE_COPY_ASSIGN_MOVE(CommunicatorAggregator); 55 56 // See ICommunicatorAggregator for detail 57 int Initialize(IAdapter *inAdapter, const std::shared_ptr<DBStatusAdapter> &statusAdapter) override; 58 59 // Must not call any other functions if Finalize had been called. In fact, Finalize has no chance to be called. 60 void Finalize() override; 61 62 ICommunicator *AllocCommunicator(uint64_t commLabel, int &outErrorNo) override; 63 ICommunicator *AllocCommunicator(const LabelType &commLabel, int &outErrorNo) override; 64 65 void ReleaseCommunicator(ICommunicator *inCommunicator) override; 66 67 int RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack, const Finalizer &inOper) override; 68 int RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper) override; 69 70 // return optimal allowed data size(Some header is taken into account and subtract) 71 uint32_t GetCommunicatorAggregatorMtuSize() const; 72 uint32_t GetCommunicatorAggregatorMtuSize(const std::string &target) const; 73 74 // return timeout in range [5s, 60s] 75 uint32_t GetCommunicatorAggregatorTimeout() const; 76 uint32_t GetCommunicatorAggregatorTimeout(const std::string &target) const; 77 bool IsDeviceOnline(const std::string &device) const; 78 int GetLocalIdentity(std::string &outTarget) const override; 79 80 // Get the protocol version of remote target. Return -E_NOT_FOUND if no record. 81 int GetRemoteCommunicatorVersion(const std::string &target, uint16_t &outVersion) const; 82 83 // Called by communicator to make itself really in work 84 void ActivateCommunicator(const LabelType &commLabel); 85 86 // SerialBuffer surely is heap memory, ScheduleSendTask responsible for lifecycle 87 int ScheduleSendTask(const std::string &dstTarget, SerialBuffer *inBuff, FrameType inType, 88 const TaskConfig &inConfig, const OnSendEnd &onEnd = nullptr); 89 90 static void EnableCommunicatorNotFoundFeedback(bool isEnable); 91 92 std::shared_ptr<ExtendHeaderHandle> GetExtendHeaderHandle(const ExtendInfo ¶mInfo); 93 94 private: 95 // Working in a dedicated thread 96 void SendDataRoutine(); 97 void SendPacketsAndDisposeTask(const SendTask &inTask, uint32_t mtu, 98 const std::vector<std::pair<const uint8_t *, std::pair<uint32_t, uint32_t>>> &eachPacket, uint32_t totalLength); 99 100 int RetryUntilTimeout(SendTask &inTask, uint32_t timeout, Priority inPrio); 101 void TaskFinalizer(const SendTask &inTask, int result); 102 void NotifySendableToAllCommunicator(); 103 104 // Call from Adapter by register these function 105 void OnBytesReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length, 106 const std::string &userId); 107 void OnTargetChange(const std::string &target, bool isConnect); 108 void OnSendable(const std::string &target); 109 110 void OnFragmentReceive(const std::string &srcTarget, const uint8_t *bytes, uint32_t length, 111 const ParseResult &inResult, const std::string &userId); 112 113 int OnCommLayerFrameReceive(const std::string &srcTarget, const ParseResult &inResult); 114 int OnAppLayerFrameReceive(const std::string &srcTarget, const uint8_t *bytes, 115 uint32_t length, const ParseResult &inResult, const std::string &userId); 116 int OnAppLayerFrameReceive(const std::string &srcTarget, SerialBuffer *&inFrameBuffer, 117 const ParseResult &inResult, const std::string &userId); 118 119 // Function with suffix NoMutex should be called with mutex in the caller 120 int TryDeliverAppLayerFrameToCommunicatorNoMutex(const std::string &srcTarget, SerialBuffer *&inFrameBuffer, 121 const LabelType &toLabel); 122 123 // Auxiliary function for cutting short primary function 124 int RegCallbackToAdapter(); 125 void UnRegCallbackFromAdapter(); 126 void GenerateLocalSourceId(); 127 bool ReGenerateLocalSourceIdIfNeed(); 128 129 // Feedback related functions 130 void TriggerVersionNegotiation(const std::string &dstTarget); 131 void TryToFeedbackWhenCommunicatorNotFound(const std::string &dstTarget, const LabelType &dstLabel, 132 const SerialBuffer *inOriFrame); 133 void TriggerCommunicatorNotFoundFeedback(const std::string &dstTarget, const LabelType &dstLabel, Message* &oriMsg); 134 135 // Record the protocol version of remote target. 136 void SetRemoteCommunicatorVersion(const std::string &target, uint16_t version); 137 138 void OnRemoteDBStatusChange(const std::string &devInfo, const std::vector<DBInfo> &dbInfos); 139 140 void NotifyConnectChange(const std::string &srcTarget, const std::map<LabelType, bool> &changedLabels); 141 142 void RegDBChangeCallback(); 143 144 void InitSendThread(); 145 146 void SendOnceData(); 147 148 void TriggerSendData(); 149 150 void ResetFrameRecordIfNeed(const uint32_t frameId, const uint32_t mtu); 151 152 void RetrySendTaskIfNeed(const std::string &target, uint64_t sendSequenceId); 153 154 void RetrySendTask(const std::string &target, uint64_t sendSequenceId); 155 156 bool IsRetryOutOfLimit(const std::string &target); 157 158 int32_t GetNextRetryInterval(const std::string &target, int32_t currentRetryCount); 159 160 uint64_t GetSendSequenceId(const std::string &target); 161 162 uint64_t IncreaseSendSequenceId(const std::string &target); 163 164 DECLARE_OBJECT_TAG(CommunicatorAggregator); 165 166 static std::atomic<bool> isCommunicatorNotFoundFeedbackEnable_; 167 168 std::atomic<bool> shutdown_; 169 std::atomic<uint32_t> incFrameId_; 170 std::atomic<uint64_t> localSourceId_; 171 172 // Handle related 173 mutable std::mutex commMapMutex_; 174 std::map<LabelType, std::pair<Communicator *, bool>> commMap_; // bool true indicate communicator activated 175 FrameCombiner combiner_; 176 FrameRetainer retainer_; 177 SendTaskScheduler scheduler_; 178 IAdapter *adapterHandle_ = nullptr; 179 CommunicatorLinker *commLinker_ = nullptr; 180 181 // Thread related 182 std::thread exclusiveThread_; 183 bool wakingSignal_ = false; 184 mutable std::mutex wakingMutex_; 185 std::condition_variable wakingCv_; 186 187 // RetryCreateTask related 188 mutable std::mutex retryMutex_; 189 std::condition_variable retryCv_; 190 191 // Remote target version related 192 mutable std::mutex versionMapMutex_; 193 std::map<std::string, uint16_t> versionMap_; 194 195 // CommLack Callback related 196 CommunicatorLackCallback onCommLackHandle_; 197 Finalizer onCommLackFinalizer_; 198 mutable std::mutex onCommLackMutex_; 199 200 // Connect Callback related 201 OnConnectCallback onConnectHandle_; 202 Finalizer onConnectFinalizer_; 203 mutable std::mutex onConnectMutex_; 204 205 std::shared_ptr<DBStatusAdapter> dbStatusAdapter_; 206 207 std::atomic<bool> useExclusiveThread_ = false; 208 bool sendTaskStart_ = false; 209 mutable std::mutex scheduleSendTaskMutex_; 210 std::condition_variable finalizeCv_; 211 212 struct FrameSendRecord { 213 uint32_t splitMtu = 0u; 214 uint32_t sendIndex = 0u; 215 }; 216 std::mutex sendRecordMutex_; 217 std::map<uint32_t, FrameSendRecord> sendRecord_; 218 219 std::mutex retryCountMutex_; 220 std::map<std::string, int32_t> retryCount_; 221 222 std::mutex sendSequenceMutex_; 223 std::map<std::string, uint64_t> sendSequence_; 224 }; 225 } // namespace DistributedDB 226 227 #endif // COMMUNICATORAGGREGATOR_H 228