1 /* 2 * Copyright (c) 2021 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef COMMUNICATOR_LINKER_H 17 #define COMMUNICATOR_LINKER_H 18 19 #include <atomic> 20 #include <cstdlib> 21 #include <functional> 22 #include <map> 23 #include <mutex> 24 #include <set> 25 #include <string> 26 #include <vector> 27 #include "communicator_type_define.h" 28 #include "db_status_adapter.h" 29 #include "ref_object.h" 30 #include "serial_buffer.h" 31 32 namespace DistributedDB { 33 class CommunicatorAggregator; // Forward Declaration 34 35 class CommunicatorLinker : public virtual RefObject { 36 public: 37 explicit CommunicatorLinker(CommunicatorAggregator *inAggregator, 38 std::shared_ptr<DBStatusAdapter> statusAdapter); 39 ~CommunicatorLinker(); 40 41 DISABLE_COPY_ASSIGN_MOVE(CommunicatorLinker); 42 43 void Initialize(); 44 45 // Create async task to send out label_exchange and waiting for label_exchange_ack. 46 // If waiting timeout, pass the send&wait task to overrall timing retry task. 47 int TargetOnline(const std::string &inTarget, std::set<LabelType> &outRelatedLabels); 48 49 // Clear all labels related to this target. Let no longer waiting for ack of this target. 50 // The caller should notify all related communicator about this target offline. 51 void TargetOffline(const std::string &inTarget, std::set<LabelType> &outRelatedLabels); 52 53 // Add local label. Create async task to send out label_exchange and waiting for label_exchange_ack. 54 // If waiting timeout, pass the send&wait task to overrall timing retry task. 55 // Find out targets for this label that is already online. 56 // The caller should notify communicator of this label about already online target. 57 int IncreaseLocalLabel(const LabelType &inLabel, std::set<std::string> &outOnlineTarget); 58 59 // Del local label. Create async task to send out label_exchange and waiting for label_exchange_ack. 60 // If waiting timeout, pass the send&wait task to overrall timing retry task. 61 int DecreaseLocalLabel(const LabelType &inLabel); 62 63 // Compare the latest labels with previous Label, find out label changes. 64 // The caller should notify the target changes according to label changes. 65 // Update the online labels of this target. Send out label_exchange_ack. 66 int ReceiveLabelExchange(const std::string &inTarget, const std::set<LabelType> &inLatestLabels, 67 uint64_t inDistinctValue, uint64_t inSequenceId, std::map<LabelType, bool> &outChangeLabels); 68 69 // Waiting finish if the ack is what linker wait by check inSequenceId 70 // Similarly, stop the retry task of this Target. 71 int ReceiveLabelExchangeAck(const std::string &inTarget, uint64_t inDistinctValue, uint64_t inSequenceId); 72 73 std::set<std::string> GetOnlineRemoteTarget() const; 74 75 bool IsRemoteTargetOnline(const std::string &inTarget) const; 76 77 void UpdateOnlineLabels(const std::string &device, const std::map<LabelType, bool> &labels); 78 79 bool TriggerLabelExchangeEvent(bool checkAdapter = true); 80 private: 81 DECLARE_OBJECT_TAG(CommunicatorLinker); 82 83 // inCountDown is in millisecond 84 void SuspendByOnceTimer(const std::function<void(void)> &inAction, uint32_t inCountDown); 85 86 // This function should be called under protection of entireInfoMutex_ 87 void DetectDistinctValueChange(const std::string &inTarget, uint64_t inDistinctValue); 88 89 int TriggerLabelExchangeEvent(const std::string &toTarget); 90 int TriggerLabelExchangeAckEvent(const std::string &toTarget, uint64_t inSequenceId); 91 92 void SendLabelExchange(const std::string &toTarget, SerialBuffer *inBuff, uint64_t inSequenceId, 93 uint32_t inRetransmitCount); 94 void SendLabelExchangeAck(const std::string &toTarget, SerialBuffer *inBuff, uint64_t inSequenceId, 95 uint64_t inAckTriggerId); 96 97 uint64_t localDistinctValue_ = 0; 98 std::atomic<uint64_t> incSequenceId_; 99 std::atomic<uint64_t> incAckTriggerId_; 100 CommunicatorAggregator *aggregator_ = nullptr; 101 102 mutable std::mutex entireInfoMutex_; 103 104 // Point out the distinctValue for each target in order to detect malfunctioning "target offline" 105 std::map<std::string, uint64_t> targetDistinctValue_; 106 107 // Point out the largest sequenceId of LabelExchange that ever received for each target 108 std::map<std::string, uint64_t> topRecvLabelSeq_; 109 110 // Point out currently which sequenceId of ack is being waited for each target 111 std::map<std::string, uint64_t> waitAckSeq_; 112 113 // Point out the largest sequenceId of LabelExchangeAck that ever received for each target 114 std::map<std::string, uint64_t> recvAckSeq_; 115 116 // Point out the latest ackTriggerId for each target in order to abort outdated triggered event 117 std::map<std::string, uint64_t> ackTriggerId_; 118 119 // Core Info : Online Labels 120 std::set<LabelType> localOnlineLabels_; 121 std::set<std::string> remoteOnlineTarget_; 122 123 // remember the opened labels no matter target now online or offline 124 std::map<std::string, std::set<LabelType>> targetMapOnlineLabels_; 125 126 std::shared_ptr<DBStatusAdapter> statusAdapter_; 127 }; 128 } 129 130 #endif