1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "remote_executor.h"
17
18 #include "db_constant.h"
19 #include "db_common.h"
20 #include "hash.h"
21 #include "prepared_stmt.h"
22 #include "semaphore_utils.h"
23 #include "sync_generic_interface.h"
24 #include "sync_types.h"
25 #include "time_helper.h"
26 #include "version.h"
27
28 namespace DistributedDB {
29 namespace {
30 constexpr uint32_t MAX_SEARCH_TASK_EXECUTE = 2;
31 constexpr uint32_t MAX_SEARCH_TASK_PER_DEVICE = 5;
32 constexpr uint32_t MAX_QUEUE_COUNT = 10;
33 constexpr uint32_t REMOTE_EXECUTOR_SEND_TIME_OUT = 3000; // 3S
34
ReleaseMessageAndPacket(Message * message,ISyncPacket * packet)35 void ReleaseMessageAndPacket(Message *message, ISyncPacket *packet)
36 {
37 delete message;
38 message = nullptr;
39 delete packet;
40 packet = nullptr;
41 }
42 }
43
RemoteExecutor()44 RemoteExecutor::RemoteExecutor()
45 : workingThreadsCount_(0),
46 syncInterface_(nullptr),
47 communicator_(nullptr),
48 lastSessionId_(0),
49 lastTaskId_(0),
50 closed_(false)
51 {
52 }
53
Initialize(ISyncInterface * syncInterface,ICommunicator * communicator)54 int RemoteExecutor::Initialize(ISyncInterface *syncInterface, ICommunicator *communicator)
55 {
56 if (syncInterface == nullptr || communicator == nullptr) {
57 return -E_INVALID_ARGS;
58 }
59 closed_ = false;
60 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
61 syncInterface_ = syncInterface;
62 communicator_ = communicator;
63 return E_OK;
64 }
65
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,uint64_t connectionId,std::shared_ptr<ResultSet> & result)66 int RemoteExecutor::RemoteQuery(const std::string &device, const RemoteCondition &condition,
67 uint64_t timeout, uint64_t connectionId, std::shared_ptr<ResultSet> &result)
68 {
69 if (closed_) {
70 return -E_BUSY;
71 }
72 if (!CheckParamValid(device, timeout)) {
73 return -E_INVALID_ARGS;
74 }
75 int errCode = E_OK;
76 int taskErrCode = E_OK;
77 SemaphoreUtils semaphore(0);
78 Task task;
79 task.result = std::make_shared<RelationalResultSetImpl>();
80 task.target = device;
81 task.timeout = timeout;
82 task.condition = condition;
83 task.onFinished = [&semaphore, &taskErrCode, &result](int32_t retCode, std::shared_ptr<ResultSet> taskResult) {
84 taskErrCode = retCode;
85 result = taskResult;
86 semaphore.SendSemaphore();
87 };
88 task.connectionId = connectionId;
89 errCode = RemoteQueryInner(task);
90 if (errCode != E_OK) {
91 return errCode;
92 }
93 semaphore.WaitSemaphore();
94 return taskErrCode;
95 }
96
ReceiveMessage(const std::string & targetDev,Message * inMsg)97 int RemoteExecutor::ReceiveMessage(const std::string &targetDev, Message *inMsg)
98 {
99 if (inMsg == nullptr) {
100 return -E_INVALID_ARGS;
101 }
102 if (closed_) {
103 LOGD("[RemoteExecutor][ReceiveMessageInner] db is closing ignore msg");
104 delete inMsg;
105 inMsg = nullptr;
106 return -E_BUSY;
107 }
108 RefObject::IncObjRef(this);
109 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, targetDev, inMsg]() {
110 ReceiveMessageInner(targetDev, inMsg);
111 RefObject::DecObjRef(this);
112 });
113 if (errCode != E_OK) {
114 RefObject::DecObjRef(this);
115 }
116 return errCode;
117 }
118
NotifyDeviceOffline(const std::string & device)119 void RemoteExecutor::NotifyDeviceOffline(const std::string &device)
120 {
121 if (closed_) {
122 return;
123 }
124 LOGD("[RemoteExecutor][NotifyDeviceOffline] device=%s{private} offline", device.c_str());
125 std::vector<uint32_t> removeList;
126 RemoveTaskByDevice(device, removeList);
127 for (const auto &sessionId : removeList) {
128 DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
129 }
130 }
131
NotifyUserChange()132 void RemoteExecutor::NotifyUserChange()
133 {
134 if (closed_) {
135 return;
136 }
137 LOGD("[RemoteExecutor][NotifyUserChange] userchange enter");
138 RemoveAllTask(-E_USER_CHANGE);
139 LOGD("[RemoteExecutor][NotifyUserChange] userchange exist");
140 }
141
Close()142 void RemoteExecutor::Close()
143 {
144 closed_ = true;
145 LOGD("[RemoteExecutor][Close] close enter");
146 RemoveAllTask(-E_BUSY);
147 ClearInnerSource();
148 {
149 std::unique_lock<std::mutex> lock(msgQueueLock_);
150 clearCV_.wait(lock, [this] { return workingThreadsCount_ == 0; });
151 }
152 LOGD("[RemoteExecutor][Close] close exist");
153 }
154
NotifyConnectionClosed(uint64_t connectionId)155 void RemoteExecutor::NotifyConnectionClosed(uint64_t connectionId)
156 {
157 if (closed_) {
158 return;
159 }
160 std::vector<uint32_t> removeList;
161 RemoveTaskByConnection(connectionId, removeList);
162 for (const auto &sessionId : removeList) {
163 DoFinished(sessionId, -E_BUSY);
164 }
165 }
166
ReceiveRemoteExecutorRequest(const std::string & targetDev,Message * inMsg)167 int RemoteExecutor::ReceiveRemoteExecutorRequest(const std::string &targetDev, Message *inMsg)
168 {
169 LOGD("[RemoteExecutor][ReceiveRemoteExecutorRequest] receive request");
170 {
171 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
172 searchMessageQueue_.push(std::make_pair(targetDev, inMsg));
173 if (workingThreadsCount_ + 1 > MAX_SEARCH_TASK_EXECUTE) {
174 // message deal in work thread, exist here
175 return -E_NOT_NEED_DELETE_MSG;
176 }
177 workingThreadsCount_++;
178 }
179 bool empty = true;
180 do {
181 std::pair<std::string, Message *> entry;
182 {
183 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
184 empty = searchMessageQueue_.empty();
185 if (empty) {
186 workingThreadsCount_--;
187 continue;
188 }
189 entry = searchMessageQueue_.front();
190 searchMessageQueue_.pop();
191 }
192 ParseOneRequestMessage(entry.first, entry.second);
193 delete entry.second;
194 entry.second = nullptr;
195 } while (!empty);
196 clearCV_.notify_one();
197 return -E_NOT_NEED_DELETE_MSG;
198 }
199
ParseOneRequestMessage(const std::string & device,Message * inMsg)200 void RemoteExecutor::ParseOneRequestMessage(const std::string &device, Message *inMsg)
201 {
202 if (closed_) {
203 LOGW("[RemoteExecutor][ParseOneRequestMessage] closed");
204 return;
205 }
206 int errCode = CheckPermissions(device, inMsg);
207 if (errCode != E_OK) {
208 (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
209 return;
210 }
211 errCode = SendRemoteExecutorData(device, inMsg);
212 if (errCode != E_OK) {
213 (void)ResponseFailed(errCode, inMsg->GetSessionId(), inMsg->GetSequenceId(), device);
214 }
215 }
216
CheckPermissions(const std::string & device,Message * inMsg)217 int RemoteExecutor::CheckPermissions(const std::string &device, Message *inMsg)
218 {
219 SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
220 if (storage == nullptr) {
221 LOGE("[RemoteExecutor][CheckPermissions] storage is nullptr.");
222 return -E_BUSY;
223 }
224 // permission check
225 std::string appId = storage->GetDbProperties().GetStringProp(DBProperties::APP_ID, "");
226 std::string userId = storage->GetDbProperties().GetStringProp(DBProperties::USER_ID, "");
227 std::string storeId = storage->GetDbProperties().GetStringProp(DBProperties::STORE_ID, "");
228 int32_t instanceId = syncInterface_->GetDbProperties().GetIntProp(DBProperties::INSTANCE_ID, 0);
229 int errCode = RuntimeContext::GetInstance()->RunPermissionCheck(
230 { userId, appId, storeId, device, instanceId }, CHECK_FLAG_SEND);
231 if (errCode != E_OK) {
232 LOGE("[RemoteExecutor][CheckPermissions] check permission errCode = %d.", errCode);
233 storage->DecRefCount();
234 return errCode;
235 }
236 const auto *packet = inMsg->GetObject<ISyncPacket>();
237 if (packet == nullptr) {
238 LOGE("[RemoteExecutor] get packet object failed");
239 storage->DecRefCount();
240 return -E_INVALID_ARGS;
241 }
242 const auto *requestPacket = static_cast<const RemoteExecutorRequestPacket *>(packet);
243 errCode = CheckRemoteRecvData(device, storage, requestPacket->GetSecLabel(), requestPacket->GetVersion());
244 storage->DecRefCount();
245 return errCode;
246 }
247
SendRemoteExecutorData(const std::string & device,const Message * inMsg)248 int RemoteExecutor::SendRemoteExecutorData(const std::string &device, const Message *inMsg)
249 {
250 auto *syncInterface = GetAndIncSyncInterface();
251 if (syncInterface == nullptr) {
252 LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is nullptr.");
253 return -E_INVALID_ARGS;
254 }
255 if (syncInterface->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
256 LOGE("[RemoteExecutor][ParseOneRequestMessage] storage is not relation.");
257 syncInterface->DecRefCount();
258 return -E_NOT_SUPPORT;
259 }
260 RelationalDBSyncInterface *storage = static_cast<RelationalDBSyncInterface *>(syncInterface);
261
262 const auto *packet = inMsg->GetObject<ISyncPacket>();
263 if (packet == nullptr) {
264 LOGE("[RemoteExecutor] get packet object failed");
265 storage->DecRefCount();
266 return -E_INVALID_ARGS;
267 }
268 const RemoteExecutorRequestPacket *requestPacket = static_cast<const RemoteExecutorRequestPacket *>(packet);
269 int errCode = ResponseRemoteQueryRequest(storage, requestPacket->GetPreparedStmt(), device, inMsg->GetSessionId());
270 storage->DecRefCount();
271 return errCode;
272 }
273
ReceiveRemoteExecutorAck(const std::string & targetDev,Message * inMsg)274 int RemoteExecutor::ReceiveRemoteExecutorAck(const std::string &targetDev, Message *inMsg)
275 {
276 const auto *ack = inMsg->GetObject<ISyncPacket>();
277 if (ack == nullptr) {
278 return -E_INVALID_ARGS;
279 }
280 const auto *packet = static_cast<const RemoteExecutorAckPacket *>(ack);
281 int errCode = packet->GetAckCode();
282 uint32_t sessionId = inMsg->GetSessionId();
283 uint32_t sequenceId = inMsg->GetSequenceId();
284 if (!IsPacketValid(sessionId)) {
285 LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
286 return -E_INVALID_ARGS;
287 }
288 if (errCode == E_OK) {
289 auto storage = GetAndIncSyncInterface();
290 auto communicator = GetAndIncCommunicator();
291 errCode = CheckSecurityOption(storage, communicator, packet->GetSecurityOption());
292 if (storage != nullptr) {
293 storage->DecRefCount();
294 }
295 RefObject::DecObjRef(communicator);
296 }
297 if (errCode != E_OK) {
298 DoFinished(sessionId, errCode);
299 } else {
300 ReceiveDataWithValidSession(targetDev, sessionId, sequenceId, packet);
301 }
302 return E_OK;
303 }
304
CheckParamValid(const std::string & device,uint64_t timeout) const305 bool RemoteExecutor::CheckParamValid(const std::string &device, uint64_t timeout) const
306 {
307 if (timeout < DBConstant::MIN_TIMEOUT || timeout > DBConstant::MAX_TIMEOUT) {
308 LOGD("[RemoteExecutor][CheckParamValid] timeout=invalid %" PRIu64, timeout);
309 return false;
310 }
311 if (device.empty()) {
312 LOGD("[RemoteExecutor][CheckParamValid] device is empty");
313 return false;
314 }
315 if (device.length() > DBConstant::MAX_DEV_LENGTH) {
316 LOGE("[RemoteExecutor] dev is too long len=%zu", device.length());
317 return false;
318 }
319 ICommunicator *communicator = GetAndIncCommunicator();
320 if (communicator == nullptr) {
321 return false;
322 }
323 std::string localId;
324 int errCode = communicator->GetLocalIdentity(localId);
325 RefObject::DecObjRef(communicator);
326 if (errCode != E_OK) {
327 return false;
328 }
329 if (localId == device) {
330 LOGD("[RemoteExecutor][CheckParamValid] cannot sync to self");
331 return false;
332 }
333 return true;
334 }
335
CheckTaskExeStatus(const std::string & device)336 bool RemoteExecutor::CheckTaskExeStatus(const std::string &device)
337 {
338 uint32_t queueCount = 0u;
339 uint32_t exeTaskCount = 0u;
340 uint32_t totalCount = 0u; // waiting task count in all queue
341 if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
342 queueCount = searchTaskQueue_.at(device).size();
343 }
344 if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
345 exeTaskCount = deviceWorkingSet_.at(device).size();
346 }
347 for (auto &entry : searchTaskQueue_) {
348 int currentExeCount = static_cast<int>(deviceWorkingSet_[device].size());
349 int currentQueueCount = static_cast<int>(entry.second.size());
350 if ((currentQueueCount + currentExeCount) < static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE)) {
351 // all task in this queue can execute, no need calculate as waiting task count
352 continue;
353 }
354 totalCount += static_cast<uint32_t>(currentQueueCount + currentExeCount -
355 static_cast<int>(MAX_SEARCH_TASK_PER_DEVICE));
356 }
357 return (queueCount + exeTaskCount + 1 <= MAX_SEARCH_TASK_PER_DEVICE + MAX_SEARCH_TASK_EXECUTE) &&
358 (totalCount + 1 <= MAX_QUEUE_COUNT);
359 }
360
GenerateSessionId()361 uint32_t RemoteExecutor::GenerateSessionId()
362 {
363 uint32_t sessionId = Hash::Hash32Func(std::to_string(TimeHelper::GetSysCurrentTime()));
364 while (taskMap_.find(sessionId) != taskMap_.end()) { // LCOV_EXCL_BR_LINE
365 sessionId++;
366 if (sessionId == 0) { // if over flow start with 1
367 sessionId++;
368 }
369 }
370 lastSessionId_ = sessionId;
371 return sessionId;
372 }
373
GenerateTaskId()374 uint32_t RemoteExecutor::GenerateTaskId()
375 {
376 lastTaskId_++;
377 if (lastTaskId_ == 0) { // if over flow start with 1
378 lastTaskId_++;
379 }
380 return lastTaskId_;
381 }
382
RemoteQueryInner(const Task & task)383 int RemoteExecutor::RemoteQueryInner(const Task &task)
384 {
385 uint32_t sessionId = 0u;
386 {
387 // check task count and push into queue in lock
388 std::lock_guard<std::mutex> autoLock(taskLock_);
389 if (!CheckTaskExeStatus(task.target)) {
390 LOGE("[RemoteExecutor][RemoteQueryInner] queue size is over limit");
391 return -E_MAX_LIMITS;
392 }
393 sessionId = GenerateSessionId();
394 searchTaskQueue_[task.target].push_back(sessionId);
395 if (taskMap_.find(sessionId) != taskMap_.end()) {
396 LOGE("[RemoteExecutor][RemoteQueryInner] task already exist");
397 return -E_INTERNAL_ERROR; // should not happen
398 }
399 taskMap_[sessionId] = task;
400 taskMap_[sessionId].taskId = GenerateTaskId();
401 LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery create task taskId=%" PRIu32 " target is %s",
402 taskMap_[sessionId].taskId, task.target.c_str());
403 }
404 std::string device = task.target;
405 RefObject::IncObjRef(this);
406 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
407 TryExecuteTaskInLock(device);
408 RefObject::DecObjRef(this);
409 });
410 if (errCode != E_OK) {
411 LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed try to roll back");
412 DoRollBack(sessionId);
413 RefObject::DecObjRef(this);
414 }
415 return errCode;
416 }
417
TryExecuteTaskInLock(const std::string & device)418 void RemoteExecutor::TryExecuteTaskInLock(const std::string &device)
419 {
420 uint32_t sessionId = 0u;
421 {
422 std::lock_guard<std::mutex> autoLock(taskLock_);
423 if (deviceWorkingSet_[device].size() >= MAX_SEARCH_TASK_EXECUTE) {
424 return;
425 }
426 if (searchTaskQueue_[device].empty()) {
427 LOGD("[RemoteExecutor][TryExecuteTaskInLock] no task to execute");
428 return;
429 }
430 sessionId = searchTaskQueue_[device].front();
431 if (taskMap_.find(sessionId) == taskMap_.end()) {
432 searchTaskQueue_[device].pop_front();
433 LOGD("[RemoteExecutor][TryExecuteTaskInLock] task was removed no need execute");
434 return;
435 }
436 taskMap_[sessionId].status = Status::WORKING;
437 searchTaskQueue_[device].pop_front();
438 deviceWorkingSet_[device].insert(sessionId);
439 LOGD("[RemoteExecutor][RemoteQuery] RemoteQuery execute taskId=%" PRIu32, taskMap_[sessionId].taskId);
440 StartTimer(taskMap_[sessionId].timeout, sessionId);
441 }
442 int errCode = RequestStart(sessionId);
443 if (errCode != E_OK) {
444 DoFinished(sessionId, errCode);
445 }
446 }
447
DoRollBack(uint32_t sessionId)448 void RemoteExecutor::DoRollBack(uint32_t sessionId)
449 {
450 Task task;
451 std::lock_guard<std::mutex> autoLock(taskLock_);
452 if (taskMap_.find(sessionId) == taskMap_.end()) { // LCOV_EXCL_BR_LINE
453 return;
454 }
455 task = taskMap_[sessionId];
456 if (task.status != Status::WAITING) { // LCOV_EXCL_BR_LINE
457 // task is execute, abort roll back
458 return;
459 }
460 taskMap_.erase(sessionId);
461
462 auto iter = searchTaskQueue_[task.target].begin();
463 while (iter != searchTaskQueue_[task.target].end()) { // LCOV_EXCL_BR_LINE
464 if ((*iter) == sessionId) { // LCOV_EXCL_BR_LINE
465 break;
466 }
467 iter++;
468 }
469 if (iter != searchTaskQueue_[task.target].end()) { // LCOV_EXCL_BR_LINE
470 searchTaskQueue_[task.target].erase(iter);
471 }
472 // this task should not in workingSet
473 deviceWorkingSet_[task.target].erase(sessionId);
474 }
475
RequestStart(uint32_t sessionId)476 int RemoteExecutor::RequestStart(uint32_t sessionId)
477 {
478 Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
479 if (message == nullptr) {
480 LOGE("[RemoteExecutor][RequestStart] new message error");
481 return -E_OUT_OF_MEMORY;
482 }
483 message->SetSessionId(sessionId);
484 message->SetMessageType(TYPE_REQUEST);
485 auto *packet = new (std::nothrow) RemoteExecutorRequestPacket();
486 if (packet == nullptr) {
487 LOGE("[RemoteExecutor][RequestStart] new packet error");
488 ReleaseMessageAndPacket(message, nullptr);
489 return -E_OUT_OF_MEMORY;
490 }
491 std::string target;
492 int errCode = FillRequestPacket(packet, sessionId, target);
493 if (errCode != E_OK) {
494 ReleaseMessageAndPacket(message, packet);
495 return errCode;
496 }
497 auto exObj = static_cast<ISyncPacket *>(packet);
498 errCode = message->SetExternalObject(exObj);
499 if (errCode != E_OK) {
500 ReleaseMessageAndPacket(message, packet);
501 LOGE("[RemoteExecutor][RequestStart] set external object failed errCode=%d", errCode);
502 return errCode;
503 }
504 return SendRequestMessage(target, message, sessionId);
505 }
506
SendRequestMessage(const std::string & target,Message * message,uint32_t sessionId)507 int RemoteExecutor::SendRequestMessage(const std::string &target, Message *message, uint32_t sessionId)
508 {
509 auto communicator = GetAndIncCommunicator();
510 auto syncInterface = GetAndIncSyncInterface();
511 if (communicator == nullptr || syncInterface == nullptr) {
512 ReleaseMessageAndPacket(message, nullptr);
513 if (syncInterface != nullptr) {
514 syncInterface->DecRefCount();
515 }
516 RefObject::DecObjRef(communicator);
517 return -E_BUSY;
518 }
519 SendConfig sendConfig;
520 SetSendConfigParam(syncInterface->GetDbProperties(), target, false, REMOTE_EXECUTOR_SEND_TIME_OUT, sendConfig);
521 RefObject::IncObjRef(this);
522 int errCode = communicator->SendMessage(target, message, sendConfig,
523 [this, sessionId](int errCode, bool isDirectEnd) {
524 if (errCode != E_OK) {
525 DoSendFailed(sessionId, errCode);
526 }
527 RefObject::DecObjRef(this);
528 });
529 if (errCode != E_OK) {
530 ReleaseMessageAndPacket(message, nullptr);
531 RefObject::DecObjRef(this);
532 }
533 RefObject::DecObjRef(communicator);
534 syncInterface->DecRefCount();
535 return errCode;
536 }
537
ResponseFailed(int errCode,uint32_t sessionId,uint32_t sequenceId,const std::string & device)538 int RemoteExecutor::ResponseFailed(int errCode, uint32_t sessionId, uint32_t sequenceId,
539 const std::string &device)
540 {
541 RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
542 if (packet == nullptr) {
543 LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
544 return -E_OUT_OF_MEMORY;
545 }
546 packet->SetAckCode(errCode);
547 packet->SetLastAck();
548 return ResponseStart(packet, sessionId, sequenceId, device);
549 }
550
ResponseData(RelationalRowDataSet && dataSet,const SendMessage & sendMessage,const std::string & device)551 int RemoteExecutor::ResponseData(RelationalRowDataSet &&dataSet, const SendMessage &sendMessage,
552 const std::string &device)
553 {
554 RemoteExecutorAckPacket *packet = new (std::nothrow) RemoteExecutorAckPacket();
555 if (packet == nullptr) {
556 LOGE("[RemoteExecutor][ResponseFailed] new RemoteExecutorAckPacket error");
557 return -E_OUT_OF_MEMORY;
558 }
559 packet->SetAckCode(E_OK);
560 if (sendMessage.isLast) {
561 packet->SetLastAck();
562 }
563 packet->SetSecurityOption(sendMessage.option);
564 packet->MoveInRowDataSet(std::move(dataSet));
565 return ResponseStart(packet, sendMessage.sessionId, sendMessage.sequenceId, device);
566 }
567
ResponseStart(RemoteExecutorAckPacket * packet,uint32_t sessionId,uint32_t sequenceId,const std::string & device)568 int RemoteExecutor::ResponseStart(RemoteExecutorAckPacket *packet, uint32_t sessionId, uint32_t sequenceId,
569 const std::string &device)
570 {
571 SyncGenericInterface *storage = static_cast<SyncGenericInterface *>(GetAndIncSyncInterface());
572 if (storage == nullptr) {
573 ReleaseMessageAndPacket(nullptr, packet);
574 LOGE("[RemoteExecutor][ResponseStart] storage is nullptr.");
575 return -E_BUSY;
576 }
577 Message *message = new (std::nothrow) Message(REMOTE_EXECUTE_MESSAGE);
578 if (message == nullptr) {
579 LOGE("[RemoteExecutor][ResponseStart] new message error");
580 storage->DecRefCount();
581 ReleaseMessageAndPacket(nullptr, packet);
582 return -E_OUT_OF_MEMORY;
583 }
584 packet->SetVersion(RemoteExecutorAckPacket::RESPONSE_PACKET_VERSION_CURRENT);
585 auto exObj = static_cast<ISyncPacket *>(packet);
586 int errCode = message->SetExternalObject(exObj);
587 if (errCode != E_OK) {
588 ReleaseMessageAndPacket(message, packet);
589 storage->DecRefCount();
590 LOGE("[RemoteExecutor][ResponseStart] set external object failed errCode: %d", errCode);
591 return errCode;
592 }
593 auto *communicator = GetAndIncCommunicator();
594 if (communicator == nullptr) {
595 ReleaseMessageAndPacket(message, nullptr);
596 storage->DecRefCount();
597 LOGD("[RemoteExecutor][ResponseStart] communicator is nullptr");
598 return -E_BUSY;
599 }
600
601 message->SetTarget(device);
602 message->SetSessionId(sessionId);
603 message->SetSequenceId(sequenceId);
604 message->SetMessageType(TYPE_RESPONSE);
605 SendConfig sendConfig;
606 SetSendConfigParam(storage->GetDbProperties(), device, false, SEND_TIME_OUT, sendConfig);
607 errCode = communicator->SendMessage(device, message, sendConfig, nullptr);
608 RefObject::DecObjRef(communicator);
609 if (errCode != E_OK) {
610 ReleaseMessageAndPacket(message, nullptr);
611 LOGE("[RemoteExecutor][ParseOneRequestMessage] send message failed, errCode: %d", errCode);
612 }
613 storage->DecRefCount();
614 return errCode;
615 }
616
StartTimer(uint64_t timeout,uint32_t sessionId)617 void RemoteExecutor::StartTimer(uint64_t timeout, uint32_t sessionId)
618 {
619 TimerId timerId = 0u;
620 RefObject::IncObjRef(this);
621 TimerAction timeoutCallBack = [this](TimerId timerId) { return TimeoutCallBack(timerId); };
622 int errCode = RuntimeContext::GetInstance()->SetTimer(timeout, timeoutCallBack, [this]() {
623 RefObject::DecObjRef(this);
624 }, timerId);
625 if (errCode != E_OK) {
626 RefObject::DecObjRef(this);
627 LOGW("[RemoteExecutor][StartTimer] errCode=%d", errCode);
628 }
629 LOGD("[RemoteExecutor][StartTimer] timerId=%" PRIu64, timerId);
630 std::lock_guard<std::mutex> autoLock(timeoutLock_);
631 timeoutMap_[timerId] = sessionId;
632 taskFinishMap_[sessionId] = timerId;
633 }
634
RemoveTimer(uint32_t sessionId)635 void RemoteExecutor::RemoveTimer(uint32_t sessionId)
636 {
637 TimerId timerId = 0u;
638 {
639 std::lock_guard<std::mutex> autoLock(timeoutLock_);
640 if (taskFinishMap_.find(sessionId) == taskFinishMap_.end()) {
641 return;
642 }
643 timerId = taskFinishMap_[sessionId];
644 LOGD("[RemoteExecutor][RemoveTimer] timerId=%" PRIu32, timerId);
645 timeoutMap_.erase(timerId);
646 taskFinishMap_.erase(sessionId);
647 }
648 RuntimeContext::GetInstance()->RemoveTimer(timerId);
649 }
650
TimeoutCallBack(TimerId timerId)651 int RemoteExecutor::TimeoutCallBack(TimerId timerId)
652 {
653 RefObject::IncObjRef(this);
654 int errCode = RuntimeContext::GetInstance()->ScheduleTask([this, timerId]() {
655 DoTimeout(timerId);
656 RefObject::DecObjRef(this);
657 });
658 if (errCode != E_OK) {
659 LOGW("[RemoteExecutor][TimeoutCallBack] Schedule task failed");
660 RefObject::DecObjRef(this);
661 }
662 return -E_NO_NEED_TIMER;
663 }
664
DoTimeout(TimerId timerId)665 void RemoteExecutor::DoTimeout(TimerId timerId)
666 {
667 LOGD("[RemoteExecutor][DoTimeout] timerId=%" PRIu64, timerId);
668 uint32_t sessionId = 0u;
669 {
670 std::lock_guard<std::mutex> autoLock(timeoutLock_);
671 if (timeoutMap_.find(timerId) == timeoutMap_.end()) {
672 return;
673 }
674 sessionId = timeoutMap_[timerId];
675 }
676 DoFinished(sessionId, -E_TIMEOUT);
677 }
678
DoSendFailed(uint32_t sessionId,int errCode)679 void RemoteExecutor::DoSendFailed(uint32_t sessionId, int errCode)
680 {
681 LOGD("[RemoteExecutor][DoSendFailed] send failed errCode=%d", errCode);
682 DoFinished(sessionId, -E_PERIPHERAL_INTERFACE_FAIL);
683 }
684
DoFinished(uint32_t sessionId,int errCode)685 void RemoteExecutor::DoFinished(uint32_t sessionId, int errCode)
686 {
687 Task task;
688 if (ClearTaskInfo(sessionId, task) == E_OK) {
689 LOGD("[RemoteExecutor][DoFinished] taskId=%" PRIu32 " errCode=%d", task.taskId, errCode);
690 } else {
691 return;
692 }
693 RefObject::IncObjRef(this);
694 if (task.onFinished != nullptr) {
695 task.onFinished(errCode, task.result);
696 LOGD("[RemoteExecutor][DoFinished] onFinished");
697 }
698 std::string device = task.target;
699 int retCode = RuntimeContext::GetInstance()->ScheduleTask([this, device]() {
700 TryExecuteTaskInLock(device);
701 RefObject::DecObjRef(this);
702 });
703 if (retCode != E_OK) {
704 LOGD("[RemoteExecutor][RemoteQueryInner] Schedule task failed");
705 RefObject::DecObjRef(this);
706 }
707 }
708
ClearTaskInfo(uint32_t sessionId,Task & task)709 int RemoteExecutor::ClearTaskInfo(uint32_t sessionId, Task &task)
710 {
711 {
712 std::lock_guard<std::mutex> autoLock(taskLock_);
713 if (taskMap_.find(sessionId) == taskMap_.end()) {
714 return -E_NOT_FOUND;
715 }
716 task = taskMap_[sessionId];
717 taskMap_.erase(sessionId);
718 deviceWorkingSet_[task.target].erase(sessionId);
719 }
720 RemoveTimer(sessionId);
721 return E_OK;
722 }
723
ClearInnerSource()724 void RemoteExecutor::ClearInnerSource()
725 {
726 {
727 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
728 syncInterface_ = nullptr;
729 communicator_ = nullptr;
730 }
731 std::lock_guard<std::mutex> autoLock(msgQueueLock_);
732 LOGD("[RemoteExecutor][ClearInnerSource] clear message cache now");
733 while (!searchMessageQueue_.empty()) {
734 auto entry = searchMessageQueue_.front();
735 searchMessageQueue_.pop();
736 delete entry.second;
737 entry.second = nullptr;
738 }
739 }
740
FillRequestPacket(RemoteExecutorRequestPacket * packet,uint32_t sessionId,std::string & target)741 int RemoteExecutor::FillRequestPacket(RemoteExecutorRequestPacket *packet, uint32_t sessionId, std::string &target)
742 {
743 ISyncInterface *storage = GetAndIncSyncInterface();
744 if (storage == nullptr) {
745 return -E_BUSY;
746 }
747 SecurityOption localOption;
748 int errCode = storage->GetSecurityOption(localOption);
749 storage->DecRefCount();
750 storage = nullptr;
751 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
752 return -E_SECURITY_OPTION_CHECK_ERROR;
753 }
754 if (errCode == E_OK && localOption.securityLabel == NOT_SET) {
755 LOGE("[AbilitySync] Local not set security option");
756 return -E_SECURITY_OPTION_CHECK_ERROR;
757 }
758 Task task;
759 {
760 std::lock_guard<std::mutex> autoLock(taskLock_);
761 if (taskMap_.find(sessionId) == taskMap_.end()) {
762 LOGD("[RemoteExecutor][FillRequestPacket] this task has finished");
763 return -E_FINISHED;
764 }
765 task = taskMap_[sessionId];
766 }
767 packet->SetVersion(RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_CURRENT);
768 packet->SetOpCode(PreparedStmt::ExecutorOperation::QUERY);
769 packet->SetSql(task.condition.sql);
770 packet->SetBindArgs(task.condition.bindArgs);
771 packet->SetNeedResponse();
772 packet->SetSecLabel(errCode == -E_NOT_SUPPORT ? NOT_SUPPORT_SEC_CLASSIFICATION : localOption.securityLabel);
773 target = task.target;
774 return E_OK;
775 }
776
ReceiveMessageInner(const std::string & targetDev,Message * inMsg)777 void RemoteExecutor::ReceiveMessageInner(const std::string &targetDev, Message *inMsg)
778 {
779 int errCode = E_OK;
780 if (inMsg->IsFeedbackError() && IsPacketValid(inMsg->GetSessionId())) {
781 DoFinished(inMsg->GetSessionId(), -inMsg->GetErrorNo());
782 delete inMsg;
783 inMsg = nullptr;
784 return;
785 }
786 switch (inMsg->GetMessageType()) {
787 case TYPE_REQUEST:
788 errCode = ReceiveRemoteExecutorRequest(targetDev, inMsg);
789 break;
790 case TYPE_RESPONSE:
791 errCode = ReceiveRemoteExecutorAck(targetDev, inMsg);
792 break;
793 default:
794 LOGD("[RemoteExecutor][ReceiveMessageInner] Receive unknown message");
795 break;
796 }
797 if (errCode != -E_NOT_NEED_DELETE_MSG) {
798 delete inMsg;
799 inMsg = nullptr;
800 }
801 }
802
IsPacketValid(uint32_t sessionId)803 bool RemoteExecutor::IsPacketValid(uint32_t sessionId)
804 {
805 std::lock_guard<std::mutex> autoLock(taskLock_);
806 return taskMap_.find(sessionId) != taskMap_.end() && taskMap_[sessionId].status == Status::WORKING;
807 }
808
ReceiveDataWithValidSession(const std::string & targetDev,uint32_t sessionId,uint32_t sequenceId,const RemoteExecutorAckPacket * packet)809 void RemoteExecutor::ReceiveDataWithValidSession(const std::string &targetDev, uint32_t sessionId, uint32_t sequenceId,
810 const RemoteExecutorAckPacket *packet)
811 {
812 bool isReceiveFinished = false;
813 {
814 std::lock_guard<std::mutex> autoLock(taskLock_);
815 if (taskMap_.find(sessionId) == taskMap_.end() || taskMap_[sessionId].status != Status::WORKING) {
816 LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] receive unknown ack");
817 return;
818 }
819 LOGD("[RemoteExecutor][ReceiveRemoteExecutorAck] taskId=%" PRIu32 " sequenceId=%" PRIu32,
820 taskMap_[sessionId].taskId, sequenceId);
821 taskMap_[sessionId].currentCount++;
822 if (packet->IsLastAck()) {
823 taskMap_[sessionId].targetCount = sequenceId;
824 }
825 taskMap_[sessionId].result->Put(targetDev, sequenceId, packet->MoveOutRowDataSet());
826 if (taskMap_[sessionId].currentCount == taskMap_[sessionId].targetCount) {
827 isReceiveFinished = true;
828 }
829 }
830 if (isReceiveFinished) {
831 DoFinished(sessionId, E_OK);
832 }
833 }
834
GetAndIncCommunicator() const835 ICommunicator *RemoteExecutor::GetAndIncCommunicator() const
836 {
837 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
838 ICommunicator *communicator = communicator_;
839 RefObject::IncObjRef(communicator);
840 return communicator;
841 }
842
GetAndIncSyncInterface() const843 ISyncInterface *RemoteExecutor::GetAndIncSyncInterface() const
844 {
845 std::lock_guard<std::mutex> autoLock(innerSourceLock_);
846 ISyncInterface *syncInterface = syncInterface_;
847 if (syncInterface != nullptr) {
848 syncInterface->IncRefCount();
849 }
850 return syncInterface;
851 }
852
RemoveTaskByDevice(const std::string & device,std::vector<uint32_t> & removeList)853 void RemoteExecutor::RemoveTaskByDevice(const std::string &device, std::vector<uint32_t> &removeList)
854 {
855 std::lock_guard<std::mutex> autoLock(taskLock_);
856 if (deviceWorkingSet_.find(device) != deviceWorkingSet_.end()) {
857 for (auto &sessionId : deviceWorkingSet_[device]) {
858 removeList.push_back(sessionId);
859 }
860 }
861 if (searchTaskQueue_.find(device) != searchTaskQueue_.end()) {
862 for (auto &sessionId : searchTaskQueue_[device]) {
863 removeList.push_back(sessionId);
864 }
865 }
866 }
867
RemoveAllTask(int errCode)868 void RemoteExecutor::RemoveAllTask(int errCode)
869 {
870 std::vector<OnFinished> waitToNotify;
871 std::vector<uint32_t> removeTimerList;
872 {
873 std::lock_guard<std::mutex> autoLock(taskLock_);
874 for (auto &taskEntry : taskMap_) {
875 if (taskEntry.second.onFinished != nullptr) {
876 waitToNotify.push_back(taskEntry.second.onFinished);
877 LOGD("[RemoteExecutor][RemoveAllTask] taskId=%" PRIu32 " result is %d",
878 taskEntry.second.taskId, errCode);
879 }
880 if (taskEntry.second.status == Status::WORKING) {
881 removeTimerList.push_back(taskEntry.first);
882 }
883 }
884 taskMap_.clear();
885 deviceWorkingSet_.clear();
886 searchTaskQueue_.clear();
887 }
888 for (const auto &callBack : waitToNotify) {
889 callBack(errCode, nullptr);
890 }
891 for (const auto &sessionId : removeTimerList) {
892 RemoveTimer(sessionId);
893 }
894 std::lock_guard<std::mutex> autoLock(timeoutLock_);
895 timeoutMap_.clear();
896 taskFinishMap_.clear();
897 }
898
RemoveTaskByConnection(uint64_t connectionId,std::vector<uint32_t> & removeList)899 void RemoteExecutor::RemoveTaskByConnection(uint64_t connectionId, std::vector<uint32_t> &removeList)
900 {
901 std::lock_guard<std::mutex> autoLock(taskLock_);
902 for (auto &entry : taskMap_) {
903 if (entry.second.connectionId == connectionId) {
904 removeList.push_back(entry.first);
905 }
906 }
907 }
908
GetPacketSize(const std::string & device,size_t & packetSize) const909 int RemoteExecutor::GetPacketSize(const std::string &device, size_t &packetSize) const
910 {
911 auto *communicator = GetAndIncCommunicator();
912 if (communicator == nullptr) {
913 LOGD("communicator is nullptr");
914 return -E_BUSY;
915 }
916
917 packetSize = communicator->GetCommunicatorMtuSize(device) * 9 / 10; // get the 9/10 of the size
918 RefObject::DecObjRef(communicator);
919 return E_OK;
920 }
921
CheckRemoteSecurityOption(const std::string & device,const SecurityOption & remoteOption,const SecurityOption & localOption)922 bool RemoteExecutor::CheckRemoteSecurityOption(const std::string &device, const SecurityOption &remoteOption,
923 const SecurityOption &localOption)
924 {
925 bool res = false;
926 if (remoteOption.securityLabel == localOption.securityLabel ||
927 (remoteOption.securityLabel == SecurityLabel::NOT_SET ||
928 localOption.securityLabel == SecurityLabel::NOT_SET)) {
929 res = RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, remoteOption);
930 }
931 if (!res) {
932 LOGE("[RemoteExecutor][CheckRemoteSecurityOption] check error remote:%d, %d local:%d, %d",
933 remoteOption.securityLabel, remoteOption.securityFlag,
934 localOption.securityLabel, localOption.securityFlag);
935 }
936 return res;
937 }
938
ResponseRemoteQueryRequest(RelationalDBSyncInterface * storage,const PreparedStmt & stmt,const std::string & device,uint32_t sessionId)939 int RemoteExecutor::ResponseRemoteQueryRequest(RelationalDBSyncInterface *storage, const PreparedStmt &stmt,
940 const std::string &device, uint32_t sessionId)
941 {
942 size_t packetSize = 0u;
943 int errCode = GetPacketSize(device, packetSize);
944 if (errCode != E_OK) {
945 return errCode;
946 }
947 SecurityOption option;
948 errCode = storage->GetSecurityOption(option);
949 if (errCode == -E_NOT_SUPPORT) {
950 option.securityLabel = NOT_SUPPORT_SEC_CLASSIFICATION;
951 errCode = E_OK;
952 }
953 if (errCode != E_OK) {
954 LOGD("GetSecurityOption errCode:%d", errCode);
955 return -E_SECURITY_OPTION_CHECK_ERROR;
956 }
957 ContinueToken token = nullptr;
958 uint32_t sequenceId = 1u;
959 do {
960 RelationalRowDataSet dataSet;
961 errCode = storage->ExecuteQuery(stmt, packetSize, dataSet, token);
962 if (errCode != E_OK) {
963 LOGE("[RemoteExecutor] call ExecuteQuery failed: %d", errCode);
964 break;
965 }
966 SendMessage sendMessage = { sessionId, sequenceId, token == nullptr, option };
967 errCode = ResponseData(std::move(dataSet), sendMessage, device);
968 if (errCode != E_OK) {
969 break;
970 }
971 sequenceId++;
972 } while (token != nullptr);
973 if (token != nullptr) {
974 storage->ReleaseRemoteQueryContinueToken(token);
975 }
976 return errCode;
977 }
978
CheckSecurityOption(ISyncInterface * storage,ICommunicator * communicator,const SecurityOption & remoteOption)979 int RemoteExecutor::CheckSecurityOption(ISyncInterface *storage, ICommunicator *communicator,
980 const SecurityOption &remoteOption)
981 {
982 if (storage == nullptr || communicator == nullptr) {
983 return -E_BUSY;
984 }
985 if (storage->GetInterfaceType() != ISyncInterface::SYNC_RELATION) {
986 return -E_NOT_SUPPORT;
987 }
988 std::string device;
989 communicator->GetLocalIdentity(device);
990 SecurityOption localOption;
991 int errCode = storage->GetSecurityOption(localOption);
992 if (errCode != E_OK && errCode != -E_NOT_SUPPORT) {
993 return -E_SECURITY_OPTION_CHECK_ERROR;
994 }
995 if (remoteOption.securityLabel == NOT_SUPPORT_SEC_CLASSIFICATION || errCode == -E_NOT_SUPPORT) {
996 return E_OK;
997 }
998 if (!CheckRemoteSecurityOption(device, remoteOption, localOption)) {
999 errCode = -E_SECURITY_OPTION_CHECK_ERROR;
1000 } else {
1001 errCode = E_OK;
1002 }
1003 return errCode;
1004 }
1005
CheckRemoteRecvData(const std::string & device,SyncGenericInterface * storage,int32_t remoteSecLabel,uint32_t remoteVersion)1006 int RemoteExecutor::CheckRemoteRecvData(const std::string &device, SyncGenericInterface *storage,
1007 int32_t remoteSecLabel, uint32_t remoteVersion)
1008 {
1009 SecurityOption localOption;
1010 int errCode = storage->GetSecurityOption(localOption);
1011 LOGI("[RemoteExecutor] remote label:%d local l:%d, f:%d, errCode:%d, remote ver %" PRIu32, remoteSecLabel,
1012 localOption.securityLabel, localOption.securityFlag, errCode, remoteVersion);
1013 if (remoteSecLabel == NOT_SUPPORT_SEC_CLASSIFICATION && errCode == -E_NOT_SUPPORT) {
1014 return E_OK;
1015 }
1016 if (errCode != -E_NOT_SUPPORT && localOption.securityLabel == SecurityLabel::NOT_SET) {
1017 LOGE("[RemoteExecutor] local security label not set!");
1018 return -E_SECURITY_OPTION_CHECK_ERROR;
1019 }
1020 if (remoteVersion >= RemoteExecutorRequestPacket::REQUEST_PACKET_VERSION_V4 && remoteSecLabel == NOT_SET) {
1021 LOGE("[RemoteExecutor] remote security label not set!");
1022 return -E_SECURITY_OPTION_CHECK_ERROR;
1023 }
1024 if (errCode == -E_NOT_SUPPORT) {
1025 return E_OK;
1026 }
1027 if (errCode != E_OK) {
1028 return -E_SECURITY_OPTION_CHECK_ERROR;
1029 }
1030 if (remoteSecLabel == UNKNOWN_SECURITY_LABEL || remoteSecLabel == NOT_SUPPORT_SEC_CLASSIFICATION) {
1031 return E_OK;
1032 }
1033 if (RuntimeContext::GetInstance()->CheckDeviceSecurityAbility(device, localOption)) {
1034 return E_OK;
1035 }
1036 return -E_SECURITY_OPTION_CHECK_ERROR;
1037 }
1038 }