1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "communication_adapter/include/client_listener_handler.h"
17
18 #include "ipc_skeleton.h"
19 #include "rpc_errno.h"
20 #include "platform/os_wrapper/ipc/include/aie_ipc.h"
21 #include "protocol/ipc_interface/ai_service.h"
22 #include "protocol/retcode_inner/aie_retcode_inner.h"
23 #include "protocol/struct_definition/aie_info_define.h"
24 #include "utils/aie_guard.h"
25 #include "utils/aie_macros.h"
26 #include "utils/log/aie_log.h"
27
28 namespace OHOS {
29 namespace AI {
30 namespace {
31 const char * const ASYNC_PROCESS_WORKER = "AsyncProcessWorker";
32 const int EVENT_WAIT_TIME_MS = 1000;
33 } // anonymous namespace
34
AsyncProcessWorker(ClientListenerHandler * handler,int clientId,SaServerAdapter * adapter)35 AsyncProcessWorker::AsyncProcessWorker(ClientListenerHandler *handler, int clientId, SaServerAdapter *adapter)
36 : handler_(handler), clientId_(clientId), adapter_(adapter)
37 {
38 }
39
GetName() const40 const char *AsyncProcessWorker::GetName() const
41 {
42 return ASYNC_PROCESS_WORKER;
43 }
44
IpcIoResponse(IResponse * response,IpcIo & io,char * data,int length)45 void AsyncProcessWorker::IpcIoResponse(IResponse *response, IpcIo &io, char *data, int length)
46 {
47 if (response == nullptr) {
48 HILOGE("[ClientListenerHandler]Input param response is nullptr.");
49 return;
50 }
51 IpcIoInit(&io, data, length, IPC_OBJECT_COUNTS);
52
53 int retCode = response->GetRetCode();
54 WriteInt32(&io, retCode);
55
56 int requestId = response->GetRequestId();
57 WriteInt32(&io, requestId);
58
59 long long transactionId = response->GetTransactionId();
60 int sessionId = adapter_->GetSessionId(transactionId);
61 WriteInt32(&io, sessionId);
62
63 DataInfo result = response->GetResult();
64 ParcelDataInfo(&io, &result, response->GetClientUid());
65 }
66
OneAction()67 bool AsyncProcessWorker::OneAction()
68 {
69 IResponse *response = handler_->FetchCallbackRecord();
70 CHK_RET(response == nullptr, true);
71 ResGuard<IResponse> guard(response);
72
73 IpcIo io;
74 char tmpData[MAX_IO_SIZE];
75 IpcIoResponse(response, io, tmpData, MAX_IO_SIZE);
76
77 SvcIdentity *svcIdentity = adapter_->GetEngineListener();
78 if (svcIdentity == nullptr) {
79 HILOGE("[ClientListenerHandler]Fail to get engine listener, clientId: %d.", clientId_);
80 return true;
81 }
82
83 IpcIo reply;
84 MessageOption option;
85 MessageOptionInit(&option);
86 option.flags = TF_OP_ASYNC;
87 int32_t retCode = SendRequest(*svcIdentity, ON_ASYNC_PROCESS_CODE, &io, &reply, option, nullptr);
88 if (retCode != ERR_NONE) {
89 HILOGI("[ClientListenerHandler]End to deal response, ret is %d, clientId: %d.", retCode, clientId_);
90 }
91 return retCode == ERR_NONE;
92 }
93
Initialize()94 bool AsyncProcessWorker::Initialize()
95 {
96 return true;
97 }
98
Uninitialize()99 void AsyncProcessWorker::Uninitialize()
100 {
101 }
102
ClientListenerHandler()103 ClientListenerHandler::ClientListenerHandler()
104 {
105 event_ = IEvent::MakeShared();
106 }
107
~ClientListenerHandler()108 ClientListenerHandler::~ClientListenerHandler()
109 {
110 std::lock_guard<std::mutex> guard(mutex_);
111 for (auto &response : responses_) {
112 IResponse::Destroy(response);
113 }
114 }
115
FetchCallbackRecord()116 IResponse *ClientListenerHandler::FetchCallbackRecord()
117 {
118 IResponse *response = nullptr;
119 {
120 std::lock_guard<std::mutex> guard(mutex_);
121 if (!responses_.empty()) {
122 response = responses_.front();
123 responses_.pop_front();
124 if (responses_.empty()) { // if it's empty now, block thread.
125 event_->Reset();
126 }
127 return response;
128 }
129 }
130
131 // Active the thread every 1s. complete blocking will prevent from joining thread.
132 if ((event_->Wait(EVENT_WAIT_TIME_MS)) && (!responses_.empty())) {
133 std::lock_guard<std::mutex> guard(mutex_);
134 response = responses_.front();
135 responses_.pop_front();
136 if (responses_.empty()) { // if it's empty now, block thread.
137 event_->Reset();
138 }
139 }
140 return response;
141 }
142
AddCallbackRecord(IResponse * response)143 void ClientListenerHandler::AddCallbackRecord(IResponse *response)
144 {
145 std::lock_guard<std::mutex> guard(mutex_);
146 bool empty = responses_.empty();
147 responses_.push_back(response);
148 CHK_RET_NONE(!empty);
149
150 // If it was empty and new response is coming, stop blocking.
151 event_->Signal();
152 }
153
StartAsyncProcessThread(int clientId,SaServerAdapter * adapter)154 int ClientListenerHandler::StartAsyncProcessThread(int clientId, SaServerAdapter *adapter)
155 {
156 ThreadPool *threadPool = ThreadPool::GetInstance();
157 CHK_RET(threadPool == nullptr, RETCODE_OUT_OF_MEMORY);
158
159 CHK_RET(asyncProcessThread_ != nullptr, RETCODE_ASYNC_CB_STARTED);
160 CHK_RET(asyncProcessWorker_ != nullptr, RETCODE_ASYNC_CB_STARTED);
161
162 asyncProcessThread_ = threadPool->Pop();
163 CHK_RET(asyncProcessThread_ == nullptr, RETCODE_OUT_OF_MEMORY);
164
165 AIE_NEW(asyncProcessWorker_, AsyncProcessWorker(this, clientId, adapter));
166 if (asyncProcessWorker_ == nullptr) {
167 threadPool->Push(asyncProcessThread_);
168 asyncProcessThread_ = nullptr;
169 return RETCODE_OUT_OF_MEMORY;
170 }
171
172 bool startSuccess = asyncProcessThread_->StartThread(asyncProcessWorker_);
173 if (!startSuccess) {
174 threadPool->Push(asyncProcessThread_);
175 asyncProcessThread_ = nullptr;
176 AIE_DELETE(asyncProcessWorker_);
177 return RETCODE_START_THREAD_FAILED;
178 }
179
180 HILOGI("[ClientListenerHandler]Async process thread run to succeed.");
181 return RETCODE_SUCCESS;
182 }
183
StopAsyncProcessThread()184 void ClientListenerHandler::StopAsyncProcessThread()
185 {
186 if (asyncProcessThread_ != nullptr) {
187 asyncProcessThread_->StopThread();
188 ThreadPool *threadPool = ThreadPool::GetInstance();
189 CHK_RET_NONE(threadPool == nullptr);
190 threadPool->Push(asyncProcessThread_);
191 asyncProcessThread_ = nullptr;
192 }
193 AIE_DELETE(asyncProcessWorker_);
194 }
195 } // namespace AI
196 } // namespace OHOS
197