1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "communication_adapter/include/client_listener_handler.h"
17 
18 #include "ipc_skeleton.h"
19 #include "rpc_errno.h"
20 #include "platform/os_wrapper/ipc/include/aie_ipc.h"
21 #include "protocol/ipc_interface/ai_service.h"
22 #include "protocol/retcode_inner/aie_retcode_inner.h"
23 #include "protocol/struct_definition/aie_info_define.h"
24 #include "utils/aie_guard.h"
25 #include "utils/aie_macros.h"
26 #include "utils/log/aie_log.h"
27 
28 namespace OHOS {
29 namespace AI {
30 namespace {
31 const char * const ASYNC_PROCESS_WORKER = "AsyncProcessWorker";
32 const int EVENT_WAIT_TIME_MS = 1000;
33 } // anonymous namespace
34 
AsyncProcessWorker(ClientListenerHandler * handler,int clientId,SaServerAdapter * adapter)35 AsyncProcessWorker::AsyncProcessWorker(ClientListenerHandler *handler, int clientId, SaServerAdapter *adapter)
36     : handler_(handler), clientId_(clientId), adapter_(adapter)
37 {
38 }
39 
GetName() const40 const char *AsyncProcessWorker::GetName() const
41 {
42     return ASYNC_PROCESS_WORKER;
43 }
44 
IpcIoResponse(IResponse * response,IpcIo & io,char * data,int length)45 void AsyncProcessWorker::IpcIoResponse(IResponse *response, IpcIo &io, char *data, int length)
46 {
47     if (response == nullptr) {
48         HILOGE("[ClientListenerHandler]Input param response is nullptr.");
49         return;
50     }
51     IpcIoInit(&io, data, length, IPC_OBJECT_COUNTS);
52 
53     int retCode = response->GetRetCode();
54     WriteInt32(&io, retCode);
55 
56     int requestId = response->GetRequestId();
57     WriteInt32(&io, requestId);
58 
59     long long transactionId = response->GetTransactionId();
60     int sessionId = adapter_->GetSessionId(transactionId);
61     WriteInt32(&io, sessionId);
62 
63     DataInfo result = response->GetResult();
64     ParcelDataInfo(&io, &result, response->GetClientUid());
65 }
66 
OneAction()67 bool AsyncProcessWorker::OneAction()
68 {
69     IResponse *response = handler_->FetchCallbackRecord();
70     CHK_RET(response == nullptr, true);
71     ResGuard<IResponse> guard(response);
72 
73     IpcIo io;
74     char tmpData[MAX_IO_SIZE];
75     IpcIoResponse(response, io, tmpData, MAX_IO_SIZE);
76 
77     SvcIdentity *svcIdentity = adapter_->GetEngineListener();
78     if (svcIdentity == nullptr) {
79         HILOGE("[ClientListenerHandler]Fail to get engine listener, clientId: %d.", clientId_);
80         return true;
81     }
82 
83     IpcIo reply;
84     MessageOption option;
85     MessageOptionInit(&option);
86     option.flags = TF_OP_ASYNC;
87     int32_t retCode = SendRequest(*svcIdentity, ON_ASYNC_PROCESS_CODE, &io, &reply, option, nullptr);
88     if (retCode != ERR_NONE) {
89         HILOGI("[ClientListenerHandler]End to deal response, ret is %d, clientId: %d.", retCode, clientId_);
90     }
91     return retCode == ERR_NONE;
92 }
93 
Initialize()94 bool AsyncProcessWorker::Initialize()
95 {
96     return true;
97 }
98 
Uninitialize()99 void AsyncProcessWorker::Uninitialize()
100 {
101 }
102 
ClientListenerHandler()103 ClientListenerHandler::ClientListenerHandler()
104 {
105     event_ = IEvent::MakeShared();
106 }
107 
~ClientListenerHandler()108 ClientListenerHandler::~ClientListenerHandler()
109 {
110     std::lock_guard<std::mutex> guard(mutex_);
111     for (auto &response : responses_) {
112         IResponse::Destroy(response);
113     }
114 }
115 
FetchCallbackRecord()116 IResponse *ClientListenerHandler::FetchCallbackRecord()
117 {
118     IResponse *response = nullptr;
119     {
120         std::lock_guard<std::mutex> guard(mutex_);
121         if (!responses_.empty()) {
122             response = responses_.front();
123             responses_.pop_front();
124             if (responses_.empty()) { // if it's empty now, block thread.
125                 event_->Reset();
126             }
127             return response;
128         }
129     }
130 
131     // Active the thread every 1s. complete blocking will prevent from joining thread.
132     if ((event_->Wait(EVENT_WAIT_TIME_MS)) && (!responses_.empty())) {
133         std::lock_guard<std::mutex> guard(mutex_);
134         response = responses_.front();
135         responses_.pop_front();
136         if (responses_.empty()) { // if it's empty now, block thread.
137             event_->Reset();
138         }
139     }
140     return response;
141 }
142 
AddCallbackRecord(IResponse * response)143 void ClientListenerHandler::AddCallbackRecord(IResponse *response)
144 {
145     std::lock_guard<std::mutex> guard(mutex_);
146     bool empty = responses_.empty();
147     responses_.push_back(response);
148     CHK_RET_NONE(!empty);
149 
150     // If it was empty and new response is coming, stop blocking.
151     event_->Signal();
152 }
153 
StartAsyncProcessThread(int clientId,SaServerAdapter * adapter)154 int ClientListenerHandler::StartAsyncProcessThread(int clientId, SaServerAdapter *adapter)
155 {
156     ThreadPool *threadPool = ThreadPool::GetInstance();
157     CHK_RET(threadPool == nullptr, RETCODE_OUT_OF_MEMORY);
158 
159     CHK_RET(asyncProcessThread_ != nullptr, RETCODE_ASYNC_CB_STARTED);
160     CHK_RET(asyncProcessWorker_ != nullptr, RETCODE_ASYNC_CB_STARTED);
161 
162     asyncProcessThread_ = threadPool->Pop();
163     CHK_RET(asyncProcessThread_ == nullptr, RETCODE_OUT_OF_MEMORY);
164 
165     AIE_NEW(asyncProcessWorker_, AsyncProcessWorker(this, clientId, adapter));
166     if (asyncProcessWorker_ == nullptr) {
167         threadPool->Push(asyncProcessThread_);
168         asyncProcessThread_ = nullptr;
169         return RETCODE_OUT_OF_MEMORY;
170     }
171 
172     bool startSuccess = asyncProcessThread_->StartThread(asyncProcessWorker_);
173     if (!startSuccess) {
174         threadPool->Push(asyncProcessThread_);
175         asyncProcessThread_ = nullptr;
176         AIE_DELETE(asyncProcessWorker_);
177         return RETCODE_START_THREAD_FAILED;
178     }
179 
180     HILOGI("[ClientListenerHandler]Async process thread run to succeed.");
181     return RETCODE_SUCCESS;
182 }
183 
StopAsyncProcessThread()184 void ClientListenerHandler::StopAsyncProcessThread()
185 {
186     if (asyncProcessThread_ != nullptr) {
187         asyncProcessThread_->StopThread();
188         ThreadPool *threadPool = ThreadPool::GetInstance();
189         CHK_RET_NONE(threadPool == nullptr);
190         threadPool->Push(asyncProcessThread_);
191         asyncProcessThread_ = nullptr;
192     }
193     AIE_DELETE(asyncProcessWorker_);
194 }
195 } // namespace AI
196 } // namespace OHOS
197