1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef VIRTUAL_ICOMMUNICATORAGGREGATOR_H
17 #define VIRTUAL_ICOMMUNICATORAGGREGATOR_H
18 
19 #include <cstdint>
20 #include <set>
21 
22 #include "icommunicator_aggregator.h"
23 #include "virtual_communicator.h"
24 
25 namespace DistributedDB {
26 class ICommunicator;  // Forward Declaration
27 
28 class VirtualCommunicatorAggregator : public ICommunicatorAggregator {
29 public:
30     // Return 0 as success. Return negative as error
31     int Initialize(IAdapter *inAdapter, const std::shared_ptr<DBStatusAdapter> &statusAdapter) override;
32 
33     void Finalize() override;
34 
35     // If not success, return nullptr and set outErrorNo
36     ICommunicator *AllocCommunicator(uint64_t commLabel, int &outErrorNo) override;
37     ICommunicator *AllocCommunicator(const LabelType &commLabel, int &outErrorNo) override;
38 
39     void ReleaseCommunicator(ICommunicator *inCommunicator) override;
40 
41     int RegCommunicatorLackCallback(const CommunicatorLackCallback &onCommLack, const Finalizer &inOper) override;
42     int RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper) override;
43     void RunCommunicatorLackCallback(const LabelType &commLabel);
44     void RunOnConnectCallback(const std::string &target, bool isConnect);
45 
46     int GetLocalIdentity(std::string &outTarget) const override;
47 
48     // online a virtual device to the VirtualCommunicator, should call in main thread
49     void OnlineDevice(const std::string &deviceId) const;
50 
51     // offline a virtual device to the VirtualCommunicator, should call in main thread
52     void OfflineDevice(const std::string &deviceId) const;
53 
54     void DispatchMessage(const std::string &srcTarget, const std::string &dstTarget, const Message *inMsg,
55         const OnSendEnd &onEnd);
56 
57     // If not success, return nullptr and set outErrorNo
58     ICommunicator *AllocCommunicator(const std::string &deviceId, int &outErrorNo);
59 
60     ICommunicator *GetCommunicator(const std::string &deviceId) const;
61 
62     void Disable();
63 
64     void Enable();
65 
66     void SetBlockValue(bool value);
67 
68     bool GetBlockValue() const;
69 
70     void RegOnDispatch(const std::function<void(const std::string &target, Message *inMsg)> &onDispatch);
71 
72     void SetCurrentUserId(const std::string &userId);
73 
74     void SetTimeout(const std::string &deviceId, uint32_t timeout);
75 
76     void SetDropMessageTypeByDevice(const std::string &deviceId, MessageId msgid, uint32_t dropTimes = 1);
77 
78     void SetDeviceMtuSize(const std::string &deviceId, uint32_t mtuSize);
79 
80     void SetSendDelayInfo(uint32_t sendDelayTime, uint32_t delayMessageId, uint32_t delayTimes, uint32_t skipTimes,
81         std::set<std::string> &delayDevices);
82     void ResetSendDelayInfo();
83 
84     std::set<std::string> GetOnlineDevices();
85 
86     void DisableCommunicator();
87 
88     void EnableCommunicator();
89 
90     void RegBeforeDispatch(const std::function<void(const std::string &, const Message *)> &beforeDispatch);
91 
92     void SetLocalDeviceId(const std::string &deviceId);
93 
94     void MockGetLocalDeviceRes(int mockRes);
95 
96     void MockCommErrCode(int mockErrCode);
97 
98     void MockDirectEndFlag(bool isDirectEnd);
99 
100     ~VirtualCommunicatorAggregator() override = default;
101     VirtualCommunicatorAggregator() = default;
102 
103 private:
104     void CallSendEnd(int errCode, const OnSendEnd &onEnd);
105     void DelayTimeHandle(uint32_t messageId, const std::string &dstTarget);
106     void DispatchMessageInner(const std::string &srcTarget, const std::string &dstTarget, const Message *inMsg,
107         const OnSendEnd &onEnd);
108 
109     mutable std::mutex communicatorsLock_;
110     std::map<std::string, VirtualCommunicator *> communicators_;
111     std::string remoteDeviceId_ = "real_device";
112     std::mutex blockLock_;
113     std::condition_variable conditionVar_;
114     bool isEnable_ = true;
115     bool isBlock_ = false;
116     CommunicatorLackCallback onCommLack_;
117     OnConnectCallback onConnect_;
118     std::function<void(const std::string &target, Message *inMsg)> onDispatch_;
119     std::function<void(const std::string &target, const Message *inMsg)> beforeDispatch_;
120     std::string userId_;
121 
122     uint32_t sendDelayTime_ = 0;
123     uint32_t delayMessageId_ = INVALID_MESSAGE_ID;
124     uint32_t delayTimes_ = 0; // ms
125     uint32_t skipTimes_ = 0;
126     std::set<std::string> delayDevices_;
127 
128     mutable std::mutex localDeviceIdMutex_;
129     std::string localDeviceId_;
130     int getLocalDeviceRet_ = E_OK;
131     int commErrCodeMock_ = E_OK;
132     bool isDirectEnd_ = true;
133 };
134 } // namespace DistributedDB
135 
136 #endif // VIRTUAL_ICOMMUNICATORAGGREGATOR_H