1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "remote_executor.h"
17 
18 #include "db_constant.h"
19 #include "db_common.h"
20 #include "hash.h"
21 #include "prepared_stmt.h"
22 #include "semaphore_utils.h"
23 #include "sync_generic_interface.h"
24 #include "sync_types.h"
25 #include "time_helper.h"
26 #include "version.h"
27 
28 namespace DistributedDB {
29 namespace {
30     constexpr uint32_t MAX_SEARCH_TASK_EXECUTE = 2;
31     constexpr uint32_t MAX_SEARCH_TASK_PER_DEVICE = 5;
32     constexpr uint32_t MAX_QUEUE_COUNT = 10;
33     constexpr uint32_t REMOTE_EXECUTOR_SEND_TIME_OUT = 3000; // 3S
34 
ReleaseMessageAndPacket(Message * message,ISyncPacket * packet)35     void ReleaseMessageAndPacket(Message *message, ISyncPacket *packet)
36     {
37         delete message;
38         message = nullptr;
39         delete packet;
40         packet = nullptr;
41     }
42 }
43 
RemoteExecutor()44 RemoteExecutor::RemoteExecutor()
45     : workingThreadsCount_(0),
46       syncInterface_(nullptr),
47       communicator_(nullptr),
48       lastSessionId_(0),
49       lastTaskId_(0),
50       closed_(false)
51 {
52 }
53 
Initialize(ISyncInterface * syncInterface,ICommunicator * communicator)54 int RemoteExecutor::Initialize(ISyncInterface *syncInterface, ICommunicator *communicator)
55 {
56     if (syncInterface == nullptr || communicator == nullptr) {
57         return -E_INVALID_ARGS;
58     }
59     closed_ = false;
60     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
61     syncInterface_ = syncInterface;
62     communicator_ = communicator;
63     return E_OK;
64 }
65 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)66 int RemoteExecutor::RemoteQuery(const std::string &device, const RemoteCondition &condition,
67     uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
68 {
69     if (closed_) {
70         return -E_BUSY;
71     }
72     if (!CheckParamValid(device, timeout)) {
73         return -E_INVALID_ARGS;
74     }
75     int errCode = E_OK;
76     int taskErrCode = E_OK;
77     SemaphoreUtils semaphore(0);
78     Task task;
79     task.result = std::make_shared<RelationalResultSetImpl>();
80     task.target = device;
81     task.timeout = timeout;
82     task.condition = condition;
83     task.onFinished = [&semaphore, &taskErrCode, &result](int32_t retCode, std::shared_ptr<ResultSet> taskResult) {
84         taskErrCode = retCode;
85         result = taskResult;
86         semaphore.SendSemaphore();
87     };
88     task.connectionId = connectionId;
89     errCode = RemoteQueryInner(task);
90     if (errCode != E_OK) {
91         return errCode;
92     }
93     semaphore.WaitSemaphore();
94     return taskErrCode;
95 }
96 
ReceiveMessage(const std::string & targetDev,Message * inMsg)97 int RemoteExecutor::ReceiveMessage(const std::string &targetDev, Message *inMsg)
98 {
99     if (inMsg == nullptr) {
100         return -E_INVALID_ARGS;
101     }
102     if (closed_) {
103         LOGD("[RemoteExecutor][ReceiveMessageInner] db is closing ignore msg");
104         delete inMsg;
105         inMsg = nullptr;
106         return -E_BUSY;
107     }
108     RefObject::IncObjRef(this);
109     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetDev, inMsg]() {
110         ReceiveMessageInner(targetDev, inMsg);
111         RefObject::DecObjRef(this);
112     });
113     if (errCode != E_OK) {
114         RefObject::DecObjRef(this);
115     }
116     return errCode;
117 }
118 
NotifyDeviceOffline(const std::string & device)119 void RemoteExecutor::NotifyDeviceOffline(const std::string &device)
120 {
121     if (closed_) {
122         return;
123     }
124     LOGD("[RemoteExecutor][NotifyDeviceOffline] device=%s{private} offline", device.c_str());
125     std::vector<uint32_t> removeList;
126     RemoveTaskByDevice(device, removeList);
127     for (const auto &sessionId : removeList) {
128         DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
129     }
130 }
131 
NotifyUserChange()132 void RemoteExecutor::NotifyUserChange()
133 {
134     if (closed_) {
135         return;
136     }
137     LOGD("[RemoteExecutor][NotifyUserChange] userchange enter");
138     RemoveAllTask(-E_USER_CHANGE);
139     LOGD("[RemoteExecutor][NotifyUserChange] userchange exist");
140 }
141 
Close()142 void RemoteExecutor::Close()
143 {
144     closed_ = true;
145     LOGD("[RemoteExecutor][Close] close enter");
146     RemoveAllTask(-E_BUSY);
147     ClearInnerSource();
148     {
149         std::unique_lock<std::mutex> lock(msgQueueLock_);
150         clearCV_.wait(lock, [this] { return workingThreadsCount_ == 0; });
151     }
152     LOGD("[RemoteExecutor][Close] close exist");
153 }
154 
NotifyConnectionClosed(uint64_t connectionId)155 void RemoteExecutor::NotifyConnectionClosed(uint64_t connectionId)
156 {
157     if (closed_) {
158         return;
159     }
160     std::vector<uint32_t> removeList;
161     RemoveTaskByConnection(connectionId, removeList);
162     for (const auto &sessionId : removeList) {
163         DoFinished(sessionId, -E_BUSY);
164     }
165 }
166 
ReceiveRemoteExecutorRequest(const std::string & targetDev,Message * inMsg)167 int RemoteExecutor::ReceiveRemoteExecutorRequest(const std::string &targetDev, Message *inMsg)
168 {
169     LOGD("[RemoteExecutor][ReceiveRemoteExecutorRequest] receive request");
170     {
171         std::lock_guard<std::mutex> autoLock(msgQueueLock_);
172         searchMessageQueue_.push(std::make_pair(targetDev, inMsg));
173         if (workingThreadsCount_ + 1 > MAX_SEARCH_TASK_EXECUTE) {
174             // message deal in work thread, exist here
175             return -E_NOT_NEED_DELETE_MSG;
176         }
177         workingThreadsCount_++;
178     }
179     bool empty = true;
180     do {
181         std::pair<std::string, Message *> entry;
182         {
183             std::lock_guard<std::mutex> autoLock(msgQueueLock_);
184             empty = searchMessageQueue_.empty();
185             if (empty) {
186                 workingThreadsCount_--;
187                 continue;
188             }
189             entry = searchMessageQueue_.front();
190             searchMessageQueue_.pop();
191         }
192         ParseOneRequestMessage(entry.first, entry.second);
193         delete entry.second;
194         entry.second = nullptr;
195     } while (!empty);
196     clearCV_.notify_one();
197     return -E_NOT_NEED_DELETE_MSG;
198 }
199 
ParseOneRequestMessage(const std::string & device,Message * inMsg)200 void RemoteExecutor::ParseOneRequestMessage(const std::string &device, Message *inMsg)
201 {
202     if (closed_) {
203         LOGW("[RemoteExecutor][ParseOneRequestMessage] closed");
204         return;
205     }
206     int errCode = CheckPermissions(device, inMsg);
207     if (errCode != E_OK) {
208         (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
209         return;
210     }
211     errCode = SendRemoteExecutorData(device, inMsg);
212     if (errCode != E_OK) {
213         (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
214     }
215 }
216 
CheckPermissions(const std::string & device,Message * inMsg)217 int RemoteExecutor::CheckPermissions(const std::string &device, Message *inMsg)
218 {
219     SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
220     if (storage == nullptr) {
221         LOGE("[RemoteExecutor][CheckPermissions] storage is nullptr.");
222         return -E_BUSY;
223     }
224     // permission check
225     std::string appId = storage->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
226     std::string userId = storage->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
227     std::string storeId = storage->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
228     int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
229     int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
230         { userId, appId, storeId, device, instanceId }, CHECK_FLAG_SEND);
231     if (errCode != E_OK) {
232         LOGE("[RemoteExecutor][CheckPermissions] check permission errCode = %d.", errCode);
233         storage->DecRefCount();
234         return errCode;
235     }
236     const auto *packet = inMsg->GetObject<ISyncPacket>();
237     if (packet == nullptr) {
238         LOGE("[RemoteExecutor] get packet object failed");
239         storage->DecRefCount();
240         return -E_INVALID_ARGS;
241     }
242     const auto *requestPacket = static_cast<const RemoteExecutorRequestPacket *>(packet);
243     errCode = CheckRemoteRecvData(device, storage, requestPacket->GetSecLabel(), requestPacket->GetVersion());
244     storage->DecRefCount();
245     return errCode;
246 }
247 
SendRemoteExecutorData(const std::string & device,const Message * inMsg)248 int RemoteExecutor::SendRemoteExecutorData(const std::string &device, const Message *inMsg)
249 {
250     auto *syncInterface = GetAndIncSyncInterface();
251     if (syncInterface == nullptr) {
252         LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is nullptr.");
253         return -E_INVALID_ARGS;
254     }
255     if (syncInterface->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
256         LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is not relation.");
257         syncInterface->DecRefCount();
258         return -E_NOT_SUPPORT;
259     }
260     RelationalDBSyncInterface *storage = static_cast<RelationalDBSyncInterface *>(syncInterface);
261 
262     const auto *packet = inMsg->GetObject<ISyncPacket>();
263     if (packet == nullptr) {
264         LOGE("[RemoteExecutor] get packet object failed");
265         storage->DecRefCount();
266         return -E_INVALID_ARGS;
267     }
268     const RemoteExecutorRequestPacket *requestPacket = static_cast<const RemoteExecutorRequestPacket *>(packet);
269     int errCode = ResponseRemoteQueryRequest(storage, requestPacket->GetPreparedStmt(), device, inMsg->GetSessionId());
270     storage->DecRefCount();
271     return errCode;
272 }
273 
ReceiveRemoteExecutorAck(const std::string & targetDev,Message * inMsg)274 int RemoteExecutor::ReceiveRemoteExecutorAck(const std::string &targetDev, Message *inMsg)
275 {
276     const auto *ack = inMsg->GetObject<ISyncPacket>();
277     if (ack == nullptr) {
278         return -E_INVALID_ARGS;
279     }
280     const auto *packet = static_cast<const RemoteExecutorAckPacket *>(ack);
281     int errCode = packet->GetAckCode();
282     uint32_t sessionId = inMsg->GetSessionId();
283     uint32_t sequenceId = inMsg->GetSequenceId();
284     if (!IsPacketValid(sessionId)) {
285         LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
286         return -E_INVALID_ARGS;
287     }
288     if (errCode == E_OK) {
289         auto storage = GetAndIncSyncInterface();
290         auto communicator = GetAndIncCommunicator();
291         errCode = CheckSecurityOption(storage, communicator, packet->GetSecurityOption());
292         if (storage != nullptr) {
293             storage->DecRefCount();
294         }
295         RefObject::DecObjRef(communicator);
296     }
297     if (errCode != E_OK) {
298         DoFinished(sessionId, errCode);
299     } else {
300         ReceiveDataWithValidSession(targetDev, sessionId, sequenceId, packet);
301     }
302     return E_OK;
303 }
304 
CheckParamValid(const std::string & device,uint64_t timeout) const305 bool RemoteExecutor::CheckParamValid(const std::string &device, uint64_t timeout) const
306 {
307     if (timeout < DBConstant::MIN_TIMEOUT || timeout > DBConstant::MAX_TIMEOUT) {
308         LOGD("[RemoteExecutor][CheckParamValid] timeout=invalid %" PRIu64, timeout);
309         return false;
310     }
311     if (device.empty()) {
312         LOGD("[RemoteExecutor][CheckParamValid] device is empty");
313         return false;
314     }
315     if (device.length() > DBConstant::MAX_DEV_LENGTH) {
316         LOGE("[RemoteExecutor] dev is too long len=%zu", device.length());
317         return false;
318     }
319     ICommunicator *communicator = GetAndIncCommunicator();
320     if (communicator == nullptr) {
321         return false;
322     }
323     std::string localId;
324     int errCode = communicator->GetLocalIdentity(localId);
325     RefObject::DecObjRef(communicator);
326     if (errCode != E_OK) {
327         return false;
328     }
329     if (localId == device) {
330         LOGD("[RemoteExecutor][CheckParamValid] cannot sync to self");
331         return false;
332     }
333     return true;
334 }
335 
CheckTaskExeStatus(const std::string & device)336 bool RemoteExecutor::CheckTaskExeStatus(const std::string &device)
337 {
338     uint32_t queueCount = 0u;
339     uint32_t exeTaskCount = 0u;
340     uint32_t totalCount = 0u; // waiting task count in all queue
341     if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
342         queueCount = searchTaskQueue_.at(device).size();
343     }
344     if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
345         exeTaskCount = deviceWorkingSet_.at(device).size();
346     }
347     for (auto &entry : searchTaskQueue_) {
348         int currentExeCount = static_cast<int>(deviceWorkingSet_[device].size());
349         int currentQueueCount = static_cast<int>(entry.second.size());
350         if ((currentQueueCount + currentExeCount) < static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE)) {
351             // all task in this queue can execute, no need calculate as waiting task count
352             continue;
353         }
354         totalCount += static_cast<uint32_t>(currentQueueCount + currentExeCount -
355             static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE));
356     }
357     return (queueCount + exeTaskCount + 1 <= MAX_SEARCH_TASK_PER_DEVICE + MAX_SEARCH_TASK_EXECUTE) &&
358         (totalCount + 1 <= MAX_QUEUE_COUNT);
359 }
360 
GenerateSessionId()361 uint32_t RemoteExecutor::GenerateSessionId()
362 {
363     uint32_t sessionId = Hash::Hash32Func(std::to_string(TimeHelper::GetSysCurrentTime()));
364     while (taskMap_.find(sessionId) != taskMap_.end()) { // LCOV_EXCL_BR_LINE
365         sessionId++;
366         if (sessionId == 0) { // if over flow start with 1
367             sessionId++;
368         }
369     }
370     lastSessionId_ = sessionId;
371     return sessionId;
372 }
373 
GenerateTaskId()374 uint32_t RemoteExecutor::GenerateTaskId()
375 {
376     lastTaskId_++;
377     if (lastTaskId_ == 0) { // if over flow start with 1
378         lastTaskId_++;
379     }
380     return lastTaskId_;
381 }
382 
RemoteQueryInner(const Task & task)383 int RemoteExecutor::RemoteQueryInner(const Task &task)
384 {
385     uint32_t sessionId = 0u;
386     {
387         // check task count and push into queue in lock
388         std::lock_guard<std::mutex> autoLock(taskLock_);
389         if (!CheckTaskExeStatus(task.target)) {
390             LOGE("[RemoteExecutor][RemoteQueryInner] queue size is over limit");
391             return -E_MAX_LIMITS;
392         }
393         sessionId = GenerateSessionId();
394         searchTaskQueue_[task.target].push_back(sessionId);
395         if (taskMap_.find(sessionId) != taskMap_.end()) {
396             LOGE("[RemoteExecutor][RemoteQueryInner] task already exist");
397             return -E_INTERNAL_ERROR; // should not happen
398         }
399         taskMap_[sessionId] = task;
400         taskMap_[sessionId].taskId = GenerateTaskId();
401         LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery create task taskId=%" PRIu32 " target is %s",
402             taskMap_[sessionId].taskId, task.target.c_str());
403     }
404     std::string device = task.target;
405     RefObject::IncObjRef(this);
406     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
407         TryExecuteTaskInLock(device);
408         RefObject::DecObjRef(this);
409     });
410     if (errCode != E_OK) {
411         LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed try to roll back");
412         DoRollBack(sessionId);
413         RefObject::DecObjRef(this);
414     }
415     return errCode;
416 }
417 
TryExecuteTaskInLock(const std::string & device)418 void RemoteExecutor::TryExecuteTaskInLock(const std::string &device)
419 {
420     uint32_t sessionId = 0u;
421     {
422         std::lock_guard<std::mutex> autoLock(taskLock_);
423         if (deviceWorkingSet_[device].size() >= MAX_SEARCH_TASK_EXECUTE) {
424             return;
425         }
426         if (searchTaskQueue_[device].empty()) {
427             LOGD("[RemoteExecutor][TryExecuteTaskInLock] no task to execute");
428             return;
429         }
430         sessionId = searchTaskQueue_[device].front();
431         if (taskMap_.find(sessionId) == taskMap_.end()) {
432             searchTaskQueue_[device].pop_front();
433             LOGD("[RemoteExecutor][TryExecuteTaskInLock] task was removed no need execute");
434             return;
435         }
436         taskMap_[sessionId].status = Status::WORKING;
437         searchTaskQueue_[device].pop_front();
438         deviceWorkingSet_[device].insert(sessionId);
439         LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery execute taskId=%" PRIu32, taskMap_[sessionId].taskId);
440         StartTimer(taskMap_[sessionId].timeout, sessionId);
441     }
442     int errCode = RequestStart(sessionId);
443     if (errCode != E_OK) {
444         DoFinished(sessionId, errCode);
445     }
446 }
447 
DoRollBack(uint32_t sessionId)448 void RemoteExecutor::DoRollBack(uint32_t sessionId)
449 {
450     Task task;
451     std::lock_guard<std::mutex> autoLock(taskLock_);
452     if (taskMap_.find(sessionId) == taskMap_.end()) { // LCOV_EXCL_BR_LINE
453         return;
454     }
455     task = taskMap_[sessionId];
456     if (task.status != Status::WAITING) { // LCOV_EXCL_BR_LINE
457         // task is execute, abort roll back
458         return;
459     }
460     taskMap_.erase(sessionId);
461 
462     auto iter = searchTaskQueue_[task.target].begin();
463     while (iter != searchTaskQueue_[task.target].end()) { // LCOV_EXCL_BR_LINE
464         if ((*iter) == sessionId) { // LCOV_EXCL_BR_LINE
465             break;
466         }
467         iter++;
468     }
469     if (iter != searchTaskQueue_[task.target].end()) { // LCOV_EXCL_BR_LINE
470         searchTaskQueue_[task.target].erase(iter);
471     }
472     // this task should not in workingSet
473     deviceWorkingSet_[task.target].erase(sessionId);
474 }
475 
RequestStart(uint32_t sessionId)476 int RemoteExecutor::RequestStart(uint32_t sessionId)
477 {
478     Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
479     if (message == nullptr) {
480         LOGE("[RemoteExecutor][RequestStart] new message error");
481         return -E_OUT_OF_MEMORY;
482     }
483     message->SetSessionId(sessionId);
484     message->SetMessageType(TYPE_REQUEST);
485     auto *packet = new (std::nothrow) RemoteExecutorRequestPacket();
486     if (packet == nullptr) {
487         LOGE("[RemoteExecutor][RequestStart] new packet error");
488         ReleaseMessageAndPacket(message, nullptr);
489         return -E_OUT_OF_MEMORY;
490     }
491     std::string target;
492     int errCode = FillRequestPacket(packet, sessionId, target);
493     if (errCode != E_OK) {
494         ReleaseMessageAndPacket(message, packet);
495         return errCode;
496     }
497     auto exObj = static_cast<ISyncPacket *>(packet);
498     errCode = message->SetExternalObject(exObj);
499     if (errCode != E_OK) {
500         ReleaseMessageAndPacket(message, packet);
501         LOGE("[RemoteExecutor][RequestStart] set external object failed errCode=%d", errCode);
502         return errCode;
503     }
504     return SendRequestMessage(target, message, sessionId);
505 }
506 
SendRequestMessage(const std::string & target,Message * message,uint32_t sessionId)507 int RemoteExecutor::SendRequestMessage(const std::string &target, Message *message, uint32_t sessionId)
508 {
509     auto communicator = GetAndIncCommunicator();
510     auto syncInterface = GetAndIncSyncInterface();
511     if (communicator == nullptr || syncInterface == nullptr) {
512         ReleaseMessageAndPacket(message, nullptr);
513         if (syncInterface != nullptr) {
514             syncInterface->DecRefCount();
515         }
516         RefObject::DecObjRef(communicator);
517         return -E_BUSY;
518     }
519     SendConfig sendConfig;
520     SetSendConfigParam(syncInterface->GetDbProperties(), target, false, REMOTE_EXECUTOR_SEND_TIME_OUT, sendConfig);
521     RefObject::IncObjRef(this);
522     int errCode = communicator->SendMessage(target, message, sendConfig,
523         [this, sessionId](int errCode, bool isDirectEnd) {
524         if (errCode != E_OK) {
525             DoSendFailed(sessionId, errCode);
526         }
527         RefObject::DecObjRef(this);
528     });
529     if (errCode != E_OK) {
530         ReleaseMessageAndPacket(message, nullptr);
531         RefObject::DecObjRef(this);
532     }
533     RefObject::DecObjRef(communicator);
534     syncInterface->DecRefCount();
535     return errCode;
536 }
537 
ResponseFailed(int errCode,uint32_t sessionId,uint32_t sequenceId,const std::string & device)538 int RemoteExecutor::ResponseFailed(int errCode, uint32_t sessionId, uint32_t sequenceId,
539     const std::string &device)
540 {
541     RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
542     if (packet == nullptr) {
543         LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
544         return -E_OUT_OF_MEMORY;
545     }
546     packet->SetAckCode(errCode);
547     packet->SetLastAck();
548     return ResponseStart(packet, sessionId, sequenceId, device);
549 }
550 
ResponseData(RelationalRowDataSet && dataSet,const SendMessage & sendMessage,const std::string & device)551 int RemoteExecutor::ResponseData(RelationalRowDataSet &&dataSet, const SendMessage &sendMessage,
552     const std::string &device)
553 {
554     RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
555     if (packet == nullptr) {
556         LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
557         return -E_OUT_OF_MEMORY;
558     }
559     packet->SetAckCode(E_OK);
560     if (sendMessage.isLast) {
561         packet->SetLastAck();
562     }
563     packet->SetSecurityOption(sendMessage.option);
564     packet->MoveInRowDataSet(std::move(dataSet));
565     return ResponseStart(packet, sendMessage.sessionId, sendMessage.sequenceId, device);
566 }
567 
ResponseStart(RemoteExecutorAckPacket * packet,uint32_t sessionId,uint32_t sequenceId,const std::string & device)568 int RemoteExecutor::ResponseStart(RemoteExecutorAckPacket *packet, uint32_t sessionId, uint32_t sequenceId,
569     const std::string &device)
570 {
571     SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
572     if (storage == nullptr) {
573         ReleaseMessageAndPacket(nullptr, packet);
574         LOGE("[RemoteExecutor][ResponseStart] storage is nullptr.");
575         return -E_BUSY;
576     }
577     Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
578     if (message == nullptr) {
579         LOGE("[RemoteExecutor][ResponseStart] new message error");
580         storage->DecRefCount();
581         ReleaseMessageAndPacket(nullptr, packet);
582         return -E_OUT_OF_MEMORY;
583     }
584     packet->SetVersion(RemoteExecutorAckPacket::RESPONSE_PACKET_VERSION_CURRENT);
585     auto exObj = static_cast<ISyncPacket *>(packet);
586     int errCode = message->SetExternalObject(exObj);
587     if (errCode != E_OK) {
588         ReleaseMessageAndPacket(message, packet);
589         storage->DecRefCount();
590         LOGE("[RemoteExecutor][ResponseStart] set external object failed errCode: %d", errCode);
591         return errCode;
592     }
593     auto *communicator = GetAndIncCommunicator();
594     if (communicator == nullptr) {
595         ReleaseMessageAndPacket(message, nullptr);
596         storage->DecRefCount();
597         LOGD("[RemoteExecutor][ResponseStart] communicator is nullptr");
598         return -E_BUSY;
599     }
600 
601     message->SetTarget(device);
602     message->SetSessionId(sessionId);
603     message->SetSequenceId(sequenceId);
604     message->SetMessageType(TYPE_RESPONSE);
605     SendConfig sendConfig;
606     SetSendConfigParam(storage->GetDbProperties(), device, false, SEND_TIME_OUT, sendConfig);
607     errCode = communicator->SendMessage(device, message, sendConfig, nullptr);
608     RefObject::DecObjRef(communicator);
609     if (errCode != E_OK) {
610         ReleaseMessageAndPacket(message, nullptr);
611         LOGE("[RemoteExecutor][ParseOneRequestMessage] send message failed, errCode: %d", errCode);
612     }
613     storage->DecRefCount();
614     return errCode;
615 }
616 
StartTimer(uint64_t timeout,uint32_t sessionId)617 void RemoteExecutor::StartTimer(uint64_t timeout, uint32_t sessionId)
618 {
619     TimerId timerId = 0u;
620     RefObject::IncObjRef(this);
621     TimerAction timeoutCallBack = [this](TimerId timerId) { return TimeoutCallBack(timerId); };
622     int errCode = RuntimeContext::GetInstance()->SetTimer(timeout, timeoutCallBack, [this]() {
623         RefObject::DecObjRef(this);
624     }, timerId);
625     if (errCode != E_OK) {
626         RefObject::DecObjRef(this);
627         LOGW("[RemoteExecutor][StartTimer] errCode=%d", errCode);
628     }
629     LOGD("[RemoteExecutor][StartTimer] timerId=%" PRIu64, timerId);
630     std::lock_guard<std::mutex> autoLock(timeoutLock_);
631     timeoutMap_[timerId] = sessionId;
632     taskFinishMap_[sessionId] = timerId;
633 }
634 
RemoveTimer(uint32_t sessionId)635 void RemoteExecutor::RemoveTimer(uint32_t sessionId)
636 {
637     TimerId timerId = 0u;
638     {
639         std::lock_guard<std::mutex> autoLock(timeoutLock_);
640         if (taskFinishMap_.find(sessionId) == taskFinishMap_.end()) {
641             return;
642         }
643         timerId = taskFinishMap_[sessionId];
644         LOGD("[RemoteExecutor][RemoveTimer] timerId=%" PRIu32, timerId);
645         timeoutMap_.erase(timerId);
646         taskFinishMap_.erase(sessionId);
647     }
648     RuntimeContext::GetInstance()->RemoveTimer(timerId);
649 }
650 
TimeoutCallBack(TimerId timerId)651 int RemoteExecutor::TimeoutCallBack(TimerId timerId)
652 {
653     RefObject::IncObjRef(this);
654     int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, timerId]() {
655         DoTimeout(timerId);
656         RefObject::DecObjRef(this);
657     });
658     if (errCode != E_OK) {
659         LOGW("[RemoteExecutor][TimeoutCallBack] Schedule task failed");
660         RefObject::DecObjRef(this);
661     }
662     return -E_NO_NEED_TIMER;
663 }
664 
DoTimeout(TimerId timerId)665 void RemoteExecutor::DoTimeout(TimerId timerId)
666 {
667     LOGD("[RemoteExecutor][DoTimeout] timerId=%" PRIu64, timerId);
668     uint32_t sessionId = 0u;
669     {
670         std::lock_guard<std::mutex> autoLock(timeoutLock_);
671         if (timeoutMap_.find(timerId) == timeoutMap_.end()) {
672             return;
673         }
674         sessionId = timeoutMap_[timerId];
675     }
676     DoFinished(sessionId, -E_TIMEOUT);
677 }
678 
DoSendFailed(uint32_t sessionId,int errCode)679 void RemoteExecutor::DoSendFailed(uint32_t sessionId, int errCode)
680 {
681     LOGD("[RemoteExecutor][DoSendFailed] send failed errCode=%d", errCode);
682     DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
683 }
684 
DoFinished(uint32_t sessionId,int errCode)685 void RemoteExecutor::DoFinished(uint32_t sessionId, int errCode)
686 {
687     Task task;
688     if (ClearTaskInfo(sessionId, task) == E_OK) {
689         LOGD("[RemoteExecutor][DoFinished] taskId=%" PRIu32 " errCode=%d", task.taskId, errCode);
690     } else {
691         return;
692     }
693     RefObject::IncObjRef(this);
694     if (task.onFinished != nullptr) {
695         task.onFinished(errCode, task.result);
696         LOGD("[RemoteExecutor][DoFinished] onFinished");
697     }
698     std::string device = task.target;
699     int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
700         TryExecuteTaskInLock(device);
701         RefObject::DecObjRef(this);
702     });
703     if (retCode != E_OK) {
704         LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed");
705         RefObject::DecObjRef(this);
706     }
707 }
708 
ClearTaskInfo(uint32_t sessionId,Task & task)709 int RemoteExecutor::ClearTaskInfo(uint32_t sessionId, Task &task)
710 {
711     {
712         std::lock_guard<std::mutex> autoLock(taskLock_);
713         if (taskMap_.find(sessionId) == taskMap_.end()) {
714             return -E_NOT_FOUND;
715         }
716         task = taskMap_[sessionId];
717         taskMap_.erase(sessionId);
718         deviceWorkingSet_[task.target].erase(sessionId);
719     }
720     RemoveTimer(sessionId);
721     return E_OK;
722 }
723 
ClearInnerSource()724 void RemoteExecutor::ClearInnerSource()
725 {
726     {
727         std::lock_guard<std::mutex> autoLock(innerSourceLock_);
728         syncInterface_ = nullptr;
729         communicator_ = nullptr;
730     }
731     std::lock_guard<std::mutex> autoLock(msgQueueLock_);
732     LOGD("[RemoteExecutor][ClearInnerSource] clear message cache now");
733     while (!searchMessageQueue_.empty()) {
734         auto entry = searchMessageQueue_.front();
735         searchMessageQueue_.pop();
736         delete entry.second;
737         entry.second = nullptr;
738     }
739 }
740 
FillRequestPacket(RemoteExecutorRequestPacket * packet,uint32_t sessionId,std::string & target)741 int RemoteExecutor::FillRequestPacket(RemoteExecutorRequestPacket *packet, uint32_t sessionId, std::string &target)
742 {
743     ISyncInterface *storage = GetAndIncSyncInterface();
744     if (storage == nullptr) {
745         return -E_BUSY;
746     }
747     SecurityOption localOption;
748     int errCode = storage->GetSecurityOption(localOption);
749     storage->DecRefCount();
750     storage = nullptr;
751     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
752         return -E_SECURITY_OPTION_CHECK_ERROR;
753     }
754     if (errCode == E_OK && localOption.securityLabel == NOT_SET) {
755         LOGE("[AbilitySync] Local not set security option");
756         return -E_SECURITY_OPTION_CHECK_ERROR;
757     }
758     Task task;
759     {
760         std::lock_guard<std::mutex> autoLock(taskLock_);
761         if (taskMap_.find(sessionId) == taskMap_.end()) {
762             LOGD("[RemoteExecutor][FillRequestPacket] this task has finished");
763             return -E_FINISHED;
764         }
765         task = taskMap_[sessionId];
766     }
767     packet->SetVersion(RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_CURRENT);
768     packet->SetOpCode(PreparedStmt::ExecutorOperation::QUERY);
769     packet->SetSql(task.condition.sql);
770     packet->SetBindArgs(task.condition.bindArgs);
771     packet->SetNeedResponse();
772     packet->SetSecLabel(errCode == -E_NOT_SUPPORT ? NOT_SUPPORT_SEC_CLASSIFICATION : localOption.securityLabel);
773     target = task.target;
774     return E_OK;
775 }
776 
ReceiveMessageInner(const std::string & targetDev,Message * inMsg)777 void RemoteExecutor::ReceiveMessageInner(const std::string &targetDev, Message *inMsg)
778 {
779     int errCode = E_OK;
780     if (inMsg->IsFeedbackError() && IsPacketValid(inMsg->GetSessionId())) {
781         DoFinished(inMsg->GetSessionId(), -inMsg->GetErrorNo());
782         delete inMsg;
783         inMsg = nullptr;
784         return;
785     }
786     switch (inMsg->GetMessageType()) {
787         case TYPE_REQUEST:
788             errCode = ReceiveRemoteExecutorRequest(targetDev, inMsg);
789             break;
790         case TYPE_RESPONSE:
791             errCode = ReceiveRemoteExecutorAck(targetDev, inMsg);
792             break;
793         default:
794             LOGD("[RemoteExecutor][ReceiveMessageInner] Receive unknown message");
795             break;
796     }
797     if (errCode != -E_NOT_NEED_DELETE_MSG) {
798         delete inMsg;
799         inMsg = nullptr;
800     }
801 }
802 
IsPacketValid(uint32_t sessionId)803 bool RemoteExecutor::IsPacketValid(uint32_t sessionId)
804 {
805     std::lock_guard<std::mutex> autoLock(taskLock_);
806     return taskMap_.find(sessionId) != taskMap_.end() && taskMap_[sessionId].status == Status::WORKING;
807 }
808 
ReceiveDataWithValidSession(const std::string & targetDev,uint32_t sessionId,uint32_t sequenceId,const RemoteExecutorAckPacket * packet)809 void RemoteExecutor::ReceiveDataWithValidSession(const std::string &targetDev, uint32_t sessionId, uint32_t sequenceId,
810     const RemoteExecutorAckPacket *packet)
811 {
812     bool isReceiveFinished = false;
813     {
814         std::lock_guard<std::mutex> autoLock(taskLock_);
815         if (taskMap_.find(sessionId) == taskMap_.end() || taskMap_[sessionId].status != Status::WORKING) {
816             LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
817             return;
818         }
819         LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] taskId=%" PRIu32 " sequenceId=%" PRIu32,
820             taskMap_[sessionId].taskId, sequenceId);
821         taskMap_[sessionId].currentCount++;
822         if (packet->IsLastAck()) {
823             taskMap_[sessionId].targetCount = sequenceId;
824         }
825         taskMap_[sessionId].result->Put(targetDev, sequenceId, packet->MoveOutRowDataSet());
826         if (taskMap_[sessionId].currentCount == taskMap_[sessionId].targetCount) {
827             isReceiveFinished = true;
828         }
829     }
830     if (isReceiveFinished) {
831         DoFinished(sessionId, E_OK);
832     }
833 }
834 
GetAndIncCommunicator() const835 ICommunicator *RemoteExecutor::GetAndIncCommunicator() const
836 {
837     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
838     ICommunicator *communicator = communicator_;
839     RefObject::IncObjRef(communicator);
840     return communicator;
841 }
842 
GetAndIncSyncInterface() const843 ISyncInterface *RemoteExecutor::GetAndIncSyncInterface() const
844 {
845     std::lock_guard<std::mutex> autoLock(innerSourceLock_);
846     ISyncInterface *syncInterface = syncInterface_;
847     if (syncInterface != nullptr) {
848         syncInterface->IncRefCount();
849     }
850     return syncInterface;
851 }
852 
RemoveTaskByDevice(const std::string & device,std::vector<uint32_t> & removeList)853 void RemoteExecutor::RemoveTaskByDevice(const std::string &device, std::vector<uint32_t> &removeList)
854 {
855     std::lock_guard<std::mutex> autoLock(taskLock_);
856     if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
857         for (auto &sessionId : deviceWorkingSet_[device]) {
858             removeList.push_back(sessionId);
859         }
860     }
861     if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
862         for (auto &sessionId : searchTaskQueue_[device]) {
863             removeList.push_back(sessionId);
864         }
865     }
866 }
867 
RemoveAllTask(int errCode)868 void RemoteExecutor::RemoveAllTask(int errCode)
869 {
870     std::vector<OnFinished> waitToNotify;
871     std::vector<uint32_t> removeTimerList;
872     {
873         std::lock_guard<std::mutex> autoLock(taskLock_);
874         for (auto &taskEntry : taskMap_) {
875             if (taskEntry.second.onFinished != nullptr) {
876                 waitToNotify.push_back(taskEntry.second.onFinished);
877                 LOGD("[RemoteExecutor][RemoveAllTask] taskId=%" PRIu32 " result is %d",
878                     taskEntry.second.taskId, errCode);
879             }
880             if (taskEntry.second.status == Status::WORKING) {
881                 removeTimerList.push_back(taskEntry.first);
882             }
883         }
884         taskMap_.clear();
885         deviceWorkingSet_.clear();
886         searchTaskQueue_.clear();
887     }
888     for (const auto &callBack : waitToNotify) {
889         callBack(errCode, nullptr);
890     }
891     for (const auto &sessionId : removeTimerList) {
892         RemoveTimer(sessionId);
893     }
894     std::lock_guard<std::mutex> autoLock(timeoutLock_);
895     timeoutMap_.clear();
896     taskFinishMap_.clear();
897 }
898 
RemoveTaskByConnection(uint64_t connectionId,std::vector<uint32_t> & removeList)899 void RemoteExecutor::RemoveTaskByConnection(uint64_t connectionId, std::vector<uint32_t> &removeList)
900 {
901     std::lock_guard<std::mutex> autoLock(taskLock_);
902     for (auto &entry : taskMap_) {
903         if (entry.second.connectionId == connectionId) {
904             removeList.push_back(entry.first);
905         }
906     }
907 }
908 
GetPacketSize(const std::string & device,size_t & packetSize) const909 int RemoteExecutor::GetPacketSize(const std::string &device, size_t &packetSize) const
910 {
911     auto *communicator = GetAndIncCommunicator();
912     if (communicator == nullptr) {
913         LOGD("communicator is nullptr");
914         return -E_BUSY;
915     }
916 
917     packetSize = communicator->GetCommunicatorMtuSize(device) * 9 / 10; // get the 9/10 of the size
918     RefObject::DecObjRef(communicator);
919     return E_OK;
920 }
921 
CheckRemoteSecurityOption(const std::string & device,const SecurityOption & remoteOption,const SecurityOption & localOption)922 bool RemoteExecutor::CheckRemoteSecurityOption(const std::string &device, const SecurityOption &remoteOption,
923     const SecurityOption &localOption)
924 {
925     bool res = false;
926     if (remoteOption.securityLabel == localOption.securityLabel ||
927         (remoteOption.securityLabel == SecurityLabel::NOT_SET ||
928         localOption.securityLabel == SecurityLabel::NOT_SET)) {
929         res = RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, remoteOption);
930     }
931     if (!res) {
932         LOGE("[RemoteExecutor][CheckRemoteSecurityOption] check error remote:%d, %d local:%d, %d",
933             remoteOption.securityLabel, remoteOption.securityFlag,
934             localOption.securityLabel, localOption.securityFlag);
935     }
936     return res;
937 }
938 
ResponseRemoteQueryRequest(RelationalDBSyncInterface * storage,const PreparedStmt & stmt,const std::string & device,uint32_t sessionId)939 int RemoteExecutor::ResponseRemoteQueryRequest(RelationalDBSyncInterface *storage, const PreparedStmt &stmt,
940     const std::string &device, uint32_t sessionId)
941 {
942     size_t packetSize = 0u;
943     int errCode = GetPacketSize(device, packetSize);
944     if (errCode != E_OK) {
945         return errCode;
946     }
947     SecurityOption option;
948     errCode = storage->GetSecurityOption(option);
949     if (errCode == -E_NOT_SUPPORT) {
950         option.securityLabel = NOT_SUPPORT_SEC_CLASSIFICATION;
951         errCode = E_OK;
952     }
953     if (errCode != E_OK) {
954         LOGD("GetSecurityOption errCode:%d", errCode);
955         return -E_SECURITY_OPTION_CHECK_ERROR;
956     }
957     ContinueToken token = nullptr;
958     uint32_t sequenceId = 1u;
959     do {
960         RelationalRowDataSet dataSet;
961         errCode = storage->ExecuteQuery(stmt, packetSize, dataSet, token);
962         if (errCode != E_OK) {
963             LOGE("[RemoteExecutor] call ExecuteQuery failed: %d", errCode);
964             break;
965         }
966         SendMessage sendMessage = { sessionId, sequenceId, token == nullptr, option };
967         errCode = ResponseData(std::move(dataSet), sendMessage, device);
968         if (errCode != E_OK) {
969             break;
970         }
971         sequenceId++;
972     } while (token != nullptr);
973     if (token != nullptr) {
974         storage->ReleaseRemoteQueryContinueToken(token);
975     }
976     return errCode;
977 }
978 
CheckSecurityOption(ISyncInterface * storage,ICommunicator * communicator,const SecurityOption & remoteOption)979 int RemoteExecutor::CheckSecurityOption(ISyncInterface *storage, ICommunicator *communicator,
980     const SecurityOption &remoteOption)
981 {
982     if (storage == nullptr || communicator == nullptr) {
983         return -E_BUSY;
984     }
985     if (storage->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
986         return -E_NOT_SUPPORT;
987     }
988     std::string device;
989     communicator->GetLocalIdentity(device);
990     SecurityOption localOption;
991     int errCode = storage->GetSecurityOption(localOption);
992     if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
993         return -E_SECURITY_OPTION_CHECK_ERROR;
994     }
995     if (remoteOption.securityLabel == NOT_SUPPORT_SEC_CLASSIFICATION || errCode == -E_NOT_SUPPORT) {
996         return E_OK;
997     }
998     if (!CheckRemoteSecurityOption(device, remoteOption, localOption)) {
999         errCode = -E_SECURITY_OPTION_CHECK_ERROR;
1000     } else {
1001         errCode = E_OK;
1002     }
1003     return errCode;
1004 }
1005 
CheckRemoteRecvData(const std::string & device,SyncGenericInterface * storage,int32_t remoteSecLabel,uint32_t remoteVersion)1006 int RemoteExecutor::CheckRemoteRecvData(const std::string &device, SyncGenericInterface *storage,
1007     int32_t remoteSecLabel, uint32_t remoteVersion)
1008 {
1009     SecurityOption localOption;
1010     int errCode = storage->GetSecurityOption(localOption);
1011     LOGI("[RemoteExecutor] remote label:%d local l:%d, f:%d, errCode:%d, remote ver %" PRIu32, remoteSecLabel,
1012          localOption.securityLabel, localOption.securityFlag, errCode, remoteVersion);
1013     if (remoteSecLabel == NOT_SUPPORT_SEC_CLASSIFICATION && errCode == -E_NOT_SUPPORT) {
1014         return E_OK;
1015     }
1016     if (errCode != -E_NOT_SUPPORT && localOption.securityLabel == SecurityLabel::NOT_SET) {
1017         LOGE("[RemoteExecutor] local security label not set!");
1018         return -E_SECURITY_OPTION_CHECK_ERROR;
1019     }
1020     if (remoteVersion >= RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_V4 && remoteSecLabel == NOT_SET) {
1021         LOGE("[RemoteExecutor] remote security label not set!");
1022         return -E_SECURITY_OPTION_CHECK_ERROR;
1023     }
1024     if (errCode == -E_NOT_SUPPORT) {
1025         return E_OK;
1026     }
1027     if (errCode != E_OK) {
1028         return -E_SECURITY_OPTION_CHECK_ERROR;
1029     }
1030     if (remoteSecLabel == UNKNOWN_SECURITY_LABEL || remoteSecLabel == NOT_SUPPORT_SEC_CLASSIFICATION) {
1031         return E_OK;
1032     }
1033     if (RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, localOption)) {
1034         return E_OK;
1035     }
1036     return -E_SECURITY_OPTION_CHECK_ERROR;
1037 }
1038 }