1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef VIRTUAL_COMMUNICATOR_H
17 #define VIRTUAL_COMMUNICATOR_H
18 
19 #include <chrono>
20 #include <condition_variable>
21 #include <cstdint>
22 #include <functional>
23 #include <map>
24 #include <mutex>
25 #include <string>
26 
27 #include "icommunicator.h"
28 #include "ref_object.h"
29 #include "serial_buffer.h"
30 #include "sync_types.h"
31 
32 namespace DistributedDB {
33 class VirtualCommunicatorAggregator;
34 
35 class VirtualCommunicator : public ICommunicator {
36 public:
37     VirtualCommunicator(const std::string &deviceId, VirtualCommunicatorAggregator *communicatorAggregator);
38     ~VirtualCommunicator() override;
39 
40     DISABLE_COPY_ASSIGN_MOVE(VirtualCommunicator);
41 
42     int RegOnMessageCallback(const OnMessageCallback &onMessage, const Finalizer &inOper) override;
43     int RegOnConnectCallback(const OnConnectCallback &onConnect, const Finalizer &inOper) override;
44     int RegOnSendableCallback(const std::function<void(void)> &onSendable, const Finalizer &inOper) override;
45 
46     void Activate() override;
47 
48     uint32_t GetCommunicatorMtuSize() const override;
49     uint32_t GetCommunicatorMtuSize(const std::string &target) const override;
50     void SetCommunicatorMtuSize(uint32_t mtuSize);
51 
52     uint32_t GetTimeout() const override;
53     uint32_t GetTimeout(const std::string &target) const override;
54     void SetTimeout(uint32_t timeout);
55 
56     int GetLocalIdentity(std::string &outTarget) const override;
57 
58     int SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config) override;
59     int SendMessage(const std::string &dstTarget, const Message *inMsg, const SendConfig &config,
60         const OnSendEnd &onEnd) override;
61 
62     int GetRemoteCommunicatorVersion(const std::string &deviceId, uint16_t &version) const override;
63 
64     void CallbackOnMessage(const std::string &srcTarget, Message *inMsg);
65 
66     void CallbackOnConnect(const std::string &target, bool isConnect) const;
67 
68     int GeneralVirtualSyncId();
69 
70     void Disable();
71 
72     void Enable();
73 
74     void SetDeviceId(const std::string &deviceId);
75 
76     std::string GetDeviceId() const;
77 
78     bool IsEnabled() const;
79 
80     bool IsDeviceOnline(const std::string &device) const override;
81 
82     void SetDropMessageTypeByDevice(MessageId msgid, uint32_t dropTimes = 1);
83 
84     void SetRemoteVersion(uint16_t remoteVersion);
85 
86 private:
87     int TimeSync();
88     int DataSync();
89     int WaterMarkSync();
90     static int TranslateMsg(const Message *inMsg, Message *&outMsg);
91 
92     mutable std::mutex onMessageLock_;
93     OnMessageCallback onMessage_;
94 
95     mutable std::mutex onConnectLock_;
96     OnConnectCallback onConnect_;
97     mutable std::mutex devicesMapLock_;
98     mutable std::map<std::string, bool> onlineDevicesMap_;
99 
100     std::string remoteDeviceId_ = "real_device";
101     std::mutex syncIdLock_;
102     int currentSyncId_ = 1000;
103     bool isEnable_ = true;
104     std::string deviceId_;
105 
106     std::mutex onAggregatorLock_;
107     VirtualCommunicatorAggregator *communicatorAggregator_;
108 
109     uint32_t timeout_ = 5 * 1000; // 5 * 1000ms
110     MessageId dropMsgId_ = MessageId::UNKNOW_MESSAGE;
111     uint32_t dropMsgTimes_ = 0;
112     uint32_t mtuSize_ = 5 * 1024 * 1024; // 5 * 1024 * 1024B
113 
114     uint16_t remoteVersion_ = UINT16_MAX;
115 };
116 } // namespace DistributedDB
117 
118 #endif // VIRTUAL_COMMUNICATOR_H
119