1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "session_manager.h"
17 
18 #include <uv.h>
19 
20 #include "node_api.h"
21 
22 #include "client_helper.h"
23 #include "update_define.h"
24 #include "update_session.h"
25 
26 using namespace std;
27 
28 namespace OHOS::UpdateEngine {
SessionManager(napi_env env,napi_ref thisReference)29 SessionManager::SessionManager(napi_env env, napi_ref thisReference) : env_(env), thisReference_(thisReference)
30 {
31     ENGINE_LOGI("SessionManager constructor");
32 }
33 
~SessionManager()34 SessionManager::~SessionManager()
35 {
36     ENGINE_LOGI("SessionManager destructor");
37     if (thisReference_ != nullptr) {
38         napi_delete_reference(env_, thisReference_);
39         thisReference_ = nullptr;
40     }
41 }
42 
AddSession(std::shared_ptr<BaseSession> session)43 void SessionManager::AddSession(std::shared_ptr<BaseSession> session)
44 {
45     PARAM_CHECK(session != nullptr, return, "Invalid param");
46     std::lock_guard<std::recursive_mutex> guard(sessionMutex_);
47     sessions_.insert(make_pair(session->GetSessionId(), session));
48 }
49 
RemoveSession(uint32_t sessionId)50 void SessionManager::RemoveSession(uint32_t sessionId)
51 {
52     std::lock_guard<std::recursive_mutex> guard(sessionMutex_);
53     sessions_.erase(sessionId);
54 }
55 
GetFirstSessionId(uint32_t & sessionId)56 bool SessionManager::GetFirstSessionId(uint32_t &sessionId)
57 {
58     std::lock_guard<std::recursive_mutex> guard(sessionMutex_);
59     {
60         if (sessions_.empty()) {
61             return false;
62         }
63         sessionId = sessions_.begin()->second->GetSessionId();
64         return true;
65     }
66 }
67 
GetNextSessionId(uint32_t & sessionId)68 bool SessionManager::GetNextSessionId(uint32_t &sessionId)
69 {
70     std::lock_guard<std::recursive_mutex> guard(sessionMutex_);
71     {
72         auto iter = sessions_.find(sessionId);
73         if (iter == sessions_.end()) {
74             return false;
75         }
76         iter++;
77         if (iter == sessions_.end()) {
78             return false;
79         }
80         sessionId = iter->second->GetSessionId();
81     }
82     return true;
83 }
84 
ProcessUnsubscribe(const std::string & eventType,size_t argc,napi_value arg)85 int32_t SessionManager::ProcessUnsubscribe(const std::string &eventType, size_t argc, napi_value arg)
86 {
87     napi_handle_scope scope;
88     napi_status status = napi_open_handle_scope(env_, &scope);
89     PARAM_CHECK(status == napi_ok, return -1, "Error open handle");
90 
91     uint32_t nextSessId = 0;
92     bool hasNext = GetFirstSessionId(nextSessId);
93     while (hasNext) {
94         uint32_t currSessId = nextSessId;
95         auto iter = sessions_.find(currSessId);
96         if (iter == sessions_.end()) {
97             break;
98         }
99         hasNext = GetNextSessionId(nextSessId);
100 
101         auto listener = (std::shared_ptr<UpdateListener> &)(iter->second);
102         if (listener == nullptr) {
103             iter = sessions_.erase(iter);
104             continue;
105         }
106         if (listener->GetType() != SessionType::SESSION_SUBSCRIBE ||
107             eventType.compare(listener->GetEventType()) != 0) {
108             continue;
109         }
110         ENGINE_LOGI("ProcessUnsubscribe remove session");
111         if (argc == 1) {
112             listener->RemoveHandlerRef(env_);
113             RemoveSession(currSessId);
114         } else if (listener->CheckEqual(env_, arg, eventType)) {
115             listener->RemoveHandlerRef(env_);
116             RemoveSession(currSessId);
117             break;
118         }
119     }
120     napi_close_handle_scope(env_, scope);
121     return 0;
122 }
123 
Unsubscribe(const EventClassifyInfo & eventClassifyInfo,napi_value handle)124 void SessionManager::Unsubscribe(const EventClassifyInfo &eventClassifyInfo, napi_value handle)
125 {
126     std::lock_guard<std::recursive_mutex> guard(sessionMutex_);
127     for (auto iter = sessions_.begin(); iter != sessions_.end();) {
128         if (iter->second == nullptr) {
129             iter = sessions_.erase(iter); // erase nullptr
130             continue;
131         }
132 
133         if (iter->second->GetType() != SessionType::SESSION_SUBSCRIBE) {
134             ++iter;
135             continue;
136         }
137 
138         auto listener = (std::shared_ptr<UpdateListener> &)(iter->second);
139         if (handle == nullptr && listener->IsSubscribeEvent(eventClassifyInfo)) {
140             ENGINE_LOGI("Unsubscribe, remove session %{public}d without handle", listener->GetSessionId());
141             listener->RemoveHandlerRef(env_);
142             iter = sessions_.erase(iter);
143             continue;
144         }
145 
146         if (listener->IsSameListener(env_, eventClassifyInfo, handle)) {
147             ENGINE_LOGI("Unsubscribe, remove session %{public}d", listener->GetSessionId());
148             listener->RemoveHandlerRef(env_);
149             iter = sessions_.erase(iter);
150             continue;
151         }
152 
153         ++iter;
154     }
155 }
156 
FindSessionByHandle(napi_env env,const std::string & eventType,napi_value arg)157 BaseSession *SessionManager::FindSessionByHandle(napi_env env, const std::string &eventType, napi_value arg)
158 {
159     uint32_t nextSessId = 0;
160     bool hasNext = GetFirstSessionId(nextSessId);
161     while (hasNext) {
162         uint32_t currSessId = nextSessId;
163         auto iter = sessions_.find(currSessId);
164         if (iter == sessions_.end()) {
165             break;
166         }
167         hasNext = GetNextSessionId(nextSessId);
168 
169         auto listener = (std::shared_ptr<UpdateListener> &)(iter->second);
170         if (listener == nullptr) {
171             iter = sessions_.erase(iter);
172             continue;
173         }
174         if (listener->GetType() != SessionType::SESSION_SUBSCRIBE) {
175             continue;
176         }
177         if ((eventType.compare(listener->GetEventType()) == 0) && listener->CheckEqual(env_, arg, eventType)) {
178             return listener.get();
179         }
180     }
181     return nullptr;
182 }
183 
FindSessionByHandle(napi_env env,const EventClassifyInfo & eventClassifyInfo,napi_value arg)184 BaseSession *SessionManager::FindSessionByHandle(napi_env env, const EventClassifyInfo &eventClassifyInfo,
185     napi_value arg)
186 {
187     std::lock_guard<std::recursive_mutex> guard(sessionMutex_);
188     for (auto &iter : sessions_) {
189         if (iter.second == nullptr) {
190             continue;
191         }
192 
193         if (iter.second->GetType() != SessionType::SESSION_SUBSCRIBE) {
194             continue;
195         }
196 
197         auto listener = (std::shared_ptr<UpdateListener> &)(iter.second);
198         if (listener->IsSameListener(env, eventClassifyInfo, arg)) {
199             return listener.get();
200         }
201     }
202     return nullptr;
203 }
204 
PublishToJS(const EventClassifyInfo & eventClassifyInfo,const EventInfo & eventInfo)205 void SessionManager::PublishToJS(const EventClassifyInfo &eventClassifyInfo, const EventInfo &eventInfo)
206 {
207     napi_handle_scope scope;
208     napi_status status = napi_open_handle_scope(env_, &scope);
209     PARAM_CHECK_NAPI_CALL(env_, status == napi_ok, return, "Error open_handle_scope");
210     napi_value thisVar = nullptr;
211     status = napi_get_reference_value(env_, thisReference_, &thisVar);
212     PARAM_CHECK_NAPI_CALL(env_, status == napi_ok, napi_close_handle_scope(env_, scope); return,
213         "Error get_reference");
214 
215     std::lock_guard<std::recursive_mutex> guard(sessionMutex_);
216     for (auto &iter : sessions_) {
217         if (iter.second == nullptr) {
218             continue;
219         }
220 
221         if (iter.second->GetType() != SessionType::SESSION_SUBSCRIBE) {
222             continue;
223         }
224 
225         auto listener = (std::shared_ptr<UpdateListener> &)(iter.second);
226         if (!listener->IsSubscribeEvent(eventClassifyInfo)) {
227             continue;
228         }
229 
230         listener->NotifyJS(env_, thisVar, eventInfo);
231     }
232     napi_close_handle_scope(env_, scope);
233 }
234 
Emit(const EventClassifyInfo & eventClassifyInfo,const EventInfo & eventInfo)235 void SessionManager::Emit(const EventClassifyInfo &eventClassifyInfo, const EventInfo &eventInfo)
236 {
237     ENGINE_LOGI("SessionManager::Emit 0x%{public}x", CAST_INT(eventClassifyInfo.eventClassify));
238     uv_loop_s *loop = nullptr;
239     napi_get_uv_event_loop(env_, &loop);
240     PARAM_CHECK(loop != nullptr, return, "get event loop failed.");
241 
242     using UvWorkData = std::tuple<SessionManager*, EventClassifyInfo, EventInfo>;
243     UvWorkData *data = new (std::nothrow) std::tuple(this, eventClassifyInfo, eventInfo);
244     PARAM_CHECK(data != nullptr, return, "alloc data failed.");
245 
246     uv_work_t *work = new (std::nothrow) uv_work_t;
247     PARAM_CHECK(work != nullptr, delete data; return, "alloc work failed.");
248 
249     work->data = static_cast<void *>(data);
250     uv_queue_work_with_qos(
251         loop,
252         work,
253         [](uv_work_t *work) { ENGINE_LOGI("print job info"); },
254         [](uv_work_t *work, int status) {
255             UvWorkData *data = static_cast<UvWorkData*>(work->data);
256             auto &[mgr, eventClassifyInfo, eventInfo] = *data;
257             mgr->PublishToJS(eventClassifyInfo, eventInfo);
258             delete data;
259             delete work;
260         },
261         uv_qos_default);
262 }
263 } // namespace OHOS::UpdateEngine