1 /* 2 * Copyright (c) 2022 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef REMOTE_EXECUTOR_H 17 #define REMOTE_EXECUTOR_H 18 19 #include <deque> 20 #include <queue> 21 22 #include "db_types.h" 23 #include "distributeddb/result_set.h" 24 #include "icommunicator.h" 25 #include "isync_interface.h" 26 #include "message.h" 27 #include "relational_db_sync_interface.h" 28 #include "relational_result_set_impl.h" 29 #include "remote_executor_packet.h" 30 #include "runtime_context.h" 31 32 namespace DistributedDB { 33 class RemoteExecutor : public RefObject { 34 public: 35 enum class Status { 36 WAITING = 0, 37 WORKING 38 }; 39 40 using OnFinished = std::function<void(int32_t, std::shared_ptr<ResultSet>)>; 41 42 struct Task { 43 Status status = Status::WAITING; 44 uint32_t taskId = 0u; 45 uint64_t timeout = 0u; 46 uint32_t targetCount = 0; 47 uint32_t currentCount = 0; 48 uint64_t connectionId = 0u; 49 std::string target; 50 RemoteCondition condition; 51 OnFinished onFinished = nullptr; 52 std::shared_ptr<RelationalResultSetImpl> result; 53 }; 54 55 RemoteExecutor(); 56 57 ~RemoteExecutor() = default; 58 59 int Initialize(ISyncInterface *syncInterface, ICommunicator *communicator); 60 61 int RemoteQuery(const std::string &device, const RemoteCondition &condition, 62 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result); 63 64 // receive request and ack, and process in another thread 65 int ReceiveMessage(const std::string &targetDev, Message *inMsg); 66 67 void NotifyDeviceOffline(const std::string &device); 68 69 void NotifyUserChange(); 70 71 void Close(); 72 73 void NotifyConnectionClosed(uint64_t connectionId); 74 75 protected: 76 virtual void ParseOneRequestMessage(const std::string &device, Message *inMsg); 77 78 virtual bool IsPacketValid(uint32_t sessionId); 79 80 int ResponseFailed(int errCode, uint32_t sessionId, uint32_t sequenceId, const std::string &device); 81 82 private: 83 struct SendMessage { 84 uint32_t sessionId = 0u; 85 uint32_t sequenceId = 0u; 86 bool isLast = true; 87 SecurityOption option; 88 }; 89 90 void ReceiveMessageInner(const std::string &targetDev, Message *inMsg); 91 92 int ReceiveRemoteExecutorRequest(const std::string &targetDev, Message *inMsg); 93 94 int ReceiveRemoteExecutorAck(const std::string &targetDev, Message *inMsg); 95 96 int CheckPermissions(const std::string &device, Message *inMsg); 97 98 int SendRemoteExecutorData(const std::string &device, const Message *inMsg); 99 100 bool CheckParamValid(const std::string &device, uint64_t timeout) const; 101 102 bool CheckTaskExeStatus(const std::string &device); 103 104 uint32_t GenerateSessionId(); 105 uint32_t GenerateTaskId(); 106 107 int RemoteQueryInner(const Task &task); 108 void TryExecuteTaskInLock(const std::string &device); 109 void DoRollBack(uint32_t sessionId); 110 111 int RequestStart(uint32_t sessionId); 112 int SendRequestMessage(const std::string &target, Message *message, uint32_t sessionId); 113 114 int ResponseData(RelationalRowDataSet &&dataSet, const SendMessage &sendMessage, const std::string &device); 115 int ResponseStart(RemoteExecutorAckPacket *packet, uint32_t sessionId, uint32_t sequenceId, 116 const std::string &device); 117 118 void StartTimer(uint64_t timeout, uint32_t sessionId); 119 void RemoveTimer(uint32_t sessionId); 120 int TimeoutCallBack(TimerId timerId); 121 void DoTimeout(TimerId timerId); 122 123 void DoSendFailed(uint32_t sessionId, int errCode); 124 void DoFinished(uint32_t sessionId, int errCode); 125 126 int ClearTaskInfo(uint32_t sessionId, Task &task); 127 void ClearInnerSource(); 128 129 int FillRequestPacket(RemoteExecutorRequestPacket *packet, uint32_t sessionId, std::string &target); 130 131 void ReceiveDataWithValidSession(const std::string &targetDev, uint32_t sessionId, uint32_t sequenceId, 132 const RemoteExecutorAckPacket *packet); 133 134 void RemoveTaskByDevice(const std::string &device, std::vector<uint32_t> &removeList); 135 void RemoveAllTask(int errCode); 136 void RemoveTaskByConnection(uint64_t connectionId, std::vector<uint32_t> &removeList); 137 138 int GetPacketSize(const std::string &device, size_t &packetSize) const; 139 int CheckSecurityOption(ISyncInterface *storage, ICommunicator *communicator, const SecurityOption &remoteOption); 140 bool CheckRemoteSecurityOption(const std::string &device, const SecurityOption &remoteOption, 141 const SecurityOption &localOption); 142 int ResponseRemoteQueryRequest(RelationalDBSyncInterface *storage, const PreparedStmt &stmt, 143 const std::string &device, uint32_t sessionId); 144 145 ICommunicator *GetAndIncCommunicator() const; 146 ISyncInterface *GetAndIncSyncInterface() const; 147 static int CheckRemoteRecvData(const std::string &device, SyncGenericInterface *storage, int32_t remoteSecLabel, 148 uint32_t remoteVersion); 149 150 std::mutex taskLock_; 151 std::map<std::string, std::deque<uint32_t>> searchTaskQueue_; // key is device, value is sessionId queue 152 std::map<std::string, std::set<uint32_t>> deviceWorkingSet_; // key is device, value is sessionId set 153 std::map<uint32_t, Task> taskMap_; // key is sessionId 154 155 std::mutex timeoutLock_; 156 std::map<TimerId, uint32_t> timeoutMap_; // use to abort task when timeout 157 std::map<uint32_t, TimerId> taskFinishMap_; // use to wake up timer when task finished 158 159 std::mutex msgQueueLock_; 160 std::queue<std::pair<std::string, Message *>> searchMessageQueue_; 161 std::atomic<uint32_t> workingThreadsCount_; 162 163 mutable std::mutex innerSourceLock_; 164 ISyncInterface *syncInterface_; 165 ICommunicator *communicator_; 166 167 uint32_t lastSessionId_; 168 uint32_t lastTaskId_; 169 std::atomic<bool> closed_; 170 171 std::condition_variable clearCV_; // msgQueueLock_ 172 }; 173 } 174 #endif