1 /*
2 * Copyright (c) 2020-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "pub_sub_implement.h"
17 #include "securec.h"
18 #include "ohos_errno.h"
19 #include "memory_adapter.h"
20 #include "thread_adapter.h"
21
22 static int AddTopic(IUnknown *iUnknown, const Topic *topic);
23 static int Subscribe(IUnknown *iUnknown, const Topic *topic, Consumer *consumer);
24 static Consumer *ModifyConsumer(IUnknown *iUnknown, const Topic *topic, Consumer *oldConsumer, Consumer *newConsumer);
25 static Consumer *Unsubscribe(IUnknown *iUnknown, const Topic *topic, const Consumer *consumer);
26 static BOOL Publish(IUnknown *iUnknown, const Topic *topic, uint8 *data, int16 len);
27 static void DefaultHandle(const Request *request, const Response *response);
28 static BOOL ImmediatelyPublish(PubSubFeature *feature, const Topic *topic, const Request *request);
29
30 static PubSubImplement g_pubSubImplement = {
31 DEFAULT_IUNKNOWN_ENTRY_BEGIN,
32 .subscriber.AddTopic = AddTopic,
33 .subscriber.Subscribe = Subscribe,
34 .subscriber.ModifyConsumer = ModifyConsumer,
35 .subscriber.Unsubscribe = Unsubscribe,
36 .provider.Publish = Publish,
37 DEFAULT_IUNKNOWN_ENTRY_END,
38 .feature = NULL
39 };
BCE_CreateInstance(Feature * feature)40 PubSubImplement *BCE_CreateInstance(Feature *feature)
41 {
42 g_pubSubImplement.feature = (PubSubFeature *)feature;
43 return &g_pubSubImplement;
44 }
45
46
AddTopic(IUnknown * iUnknown,const Topic * topic)47 static int AddTopic(IUnknown *iUnknown, const Topic *topic)
48 {
49 if (iUnknown == NULL || topic == NULL) {
50 return EC_INVALID;
51 }
52
53 PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
54 if (broadcast->feature == NULL || broadcast->feature->GetRelation == NULL) {
55 return EC_FAILURE;
56 }
57
58 if (broadcast->feature->GetRelation(broadcast->feature, topic) != NULL) {
59 return EC_FAILURE;
60 }
61
62 Relation *head = &broadcast->feature->relations;
63 Relation *newRelation = (Relation *)SAMGR_Malloc(sizeof(Relation));
64 if (newRelation == NULL) {
65 return EC_NOMEMORY;
66 }
67 newRelation->topic = *topic;
68 newRelation->callbacks.consumer = NULL;
69 UtilsListInit(&newRelation->callbacks.node);
70
71 MUTEX_Lock(broadcast->feature->mutex);
72 UtilsListAdd(&head->node, &(newRelation->node));
73 MUTEX_Unlock(broadcast->feature->mutex);
74 return EC_SUCCESS;
75 }
76
Subscribe(IUnknown * iUnknown,const Topic * topic,Consumer * consumer)77 static int Subscribe(IUnknown *iUnknown, const Topic *topic, Consumer *consumer)
78 {
79 if (iUnknown == NULL || topic == NULL || consumer == NULL) {
80 return EC_INVALID;
81 }
82
83 PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
84 if (broadcast->feature == NULL || broadcast->feature->GetRelation == NULL) {
85 return EC_FAILURE;
86 }
87
88 Relation *relation = broadcast->feature->GetRelation(broadcast->feature, topic);
89 if (relation == NULL) {
90 return EC_FAILURE;
91 }
92
93 MUTEX_Lock(broadcast->feature->mutex);
94 ConsumerNode *item = NULL;
95 UTILS_DL_LIST_FOR_EACH_ENTRY(item, &relation->callbacks.node, ConsumerNode, node) {
96 if (item->consumer->Equal(item->consumer, consumer)) {
97 MUTEX_Unlock(broadcast->feature->mutex);
98 return EC_ALREADY_SUBSCRIBED;
99 }
100 }
101 MUTEX_Unlock(broadcast->feature->mutex);
102 ConsumerNode *consumerNode = (ConsumerNode *)SAMGR_Malloc(sizeof(ConsumerNode));
103 if (consumerNode == NULL) {
104 return EC_NOMEMORY;
105 }
106
107 UtilsListInit(&consumerNode->node);
108 consumerNode->consumer = consumer;
109 MUTEX_Lock(broadcast->feature->mutex);
110 ConsumerNode *head = &relation->callbacks;
111 UtilsListAdd(&head->node, &consumerNode->node);
112 MUTEX_Unlock(broadcast->feature->mutex);
113 return EC_SUCCESS;
114 }
115
ModifyConsumer(IUnknown * iUnknown,const Topic * topic,Consumer * oldConsumer,Consumer * newConsumer)116 static Consumer *ModifyConsumer(IUnknown *iUnknown, const Topic *topic, Consumer *oldConsumer, Consumer *newConsumer)
117 {
118 if (iUnknown == NULL || topic == NULL || oldConsumer == NULL || newConsumer == NULL) {
119 return NULL;
120 }
121
122 PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
123 if (broadcast->feature == NULL || broadcast->feature->GetRelation == NULL) {
124 return NULL;
125 }
126
127 Relation *relation = broadcast->feature->GetRelation(broadcast->feature, topic);
128 if (relation == NULL) {
129 return NULL;
130 }
131
132 MUTEX_Lock(broadcast->feature->mutex);
133 ConsumerNode *item = NULL;
134 UTILS_DL_LIST_FOR_EACH_ENTRY(item, &relation->callbacks.node, ConsumerNode, node) {
135 if (item->consumer->Equal(item->consumer, oldConsumer)) {
136 Consumer *older = item->consumer;
137 item->consumer = newConsumer;
138 MUTEX_Unlock(broadcast->feature->mutex);
139 return older;
140 }
141 }
142 MUTEX_Unlock(broadcast->feature->mutex);
143 return NULL;
144 }
145
Unsubscribe(IUnknown * iUnknown,const Topic * topic,const Consumer * consumer)146 static Consumer *Unsubscribe(IUnknown *iUnknown, const Topic *topic, const Consumer *consumer)
147 {
148 if (iUnknown == NULL || topic == NULL || consumer == NULL) {
149 return NULL;
150 }
151
152 PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
153 if (broadcast->feature == NULL || broadcast->feature->GetRelation == NULL) {
154 return NULL;
155 }
156
157 Relation *relation = broadcast->feature->GetRelation(broadcast->feature, topic);
158 if (relation == NULL) {
159 return NULL;
160 }
161 MUTEX_Lock(broadcast->feature->mutex);
162 ConsumerNode *item = NULL;
163 UTILS_DL_LIST_FOR_EACH_ENTRY(item, &relation->callbacks.node, ConsumerNode, node) {
164 if (item->consumer->Equal(item->consumer, consumer)) {
165 UtilsListDelete(&item->node);
166 break;
167 }
168 }
169 MUTEX_Unlock(broadcast->feature->mutex);
170 if (item == &relation->callbacks || item == NULL) {
171 return NULL;
172 }
173 Consumer *oldConsumer = item->consumer;
174 SAMGR_Free(item);
175 return oldConsumer;
176 }
177
Publish(IUnknown * iUnknown,const Topic * topic,uint8 * data,int16 len)178 static BOOL Publish(IUnknown *iUnknown, const Topic *topic, uint8 *data, int16 len)
179 {
180 PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
181 PubSubFeature *feature = broadcast->feature;
182 if (feature == NULL) {
183 return FALSE;
184 }
185
186 Request request = {MSG_PUBLISH, 0, NULL, *(uint32 *)topic};
187 if (data != NULL && len > 0) {
188 request.data = (uint8 *)SAMGR_Malloc(len);
189 if (request.data == NULL) {
190 return FALSE;
191 }
192 request.len = len;
193 // There is no problem, the request.data length is equal the input data length.
194 (void)memcpy_s(request.data, request.len, data, len);
195 }
196
197 if (!ImmediatelyPublish(feature, topic, &request)) {
198 (void)SAMGR_Free(request.data);
199 request.data = NULL;
200 request.len = 0;
201 return FALSE;
202 }
203 return TRUE;
204 }
205
ImmediatelyPublish(PubSubFeature * feature,const Topic * topic,const Request * request)206 static BOOL ImmediatelyPublish(PubSubFeature *feature, const Topic *topic, const Request *request)
207 {
208 if (feature->GetRelation == NULL) {
209 return FALSE;
210 }
211
212 Relation *relation = feature->GetRelation(feature, topic);
213 if (relation == NULL) {
214 return FALSE;
215 }
216
217 if (UtilsListEmpty(&relation->callbacks.node)) {
218 return FALSE;
219 }
220
221 BOOL needAync = FALSE;
222 ConsumerNode *item = NULL;
223 uint32 *token = NULL;
224 MUTEX_Lock(feature->mutex);
225 UTILS_DL_LIST_FOR_EACH_ENTRY(item, &relation->callbacks.node, ConsumerNode, node) {
226 if (item->consumer->identity == NULL) {
227 needAync = TRUE;
228 continue;
229 }
230
231 Response response = {item->consumer, 0};
232 int ret = SAMGR_SendSharedDirectRequest(item->consumer->identity, request, &response, &token, DefaultHandle);
233 if (ret != EC_SUCCESS) {
234 needAync = FALSE;
235 break;
236 }
237 }
238 if (needAync) {
239 token = SAMGR_SendSharedRequest(&feature->identity, request, token, NULL);
240 }
241 MUTEX_Unlock(feature->mutex);
242 return (token != NULL);
243 }
244
DefaultHandle(const Request * request,const Response * response)245 static void DefaultHandle(const Request *request, const Response *response)
246 {
247 Consumer *consumer = (Consumer *)response->data;
248 if (consumer == NULL || consumer->Notify == NULL || g_pubSubImplement.feature == NULL) {
249 return;
250 }
251
252 // wait ImmediatelyPublish finished.
253 MUTEX_Lock(g_pubSubImplement.feature->mutex);
254 MUTEX_Unlock(g_pubSubImplement.feature->mutex);
255 Topic topic = request->msgValue;
256 consumer->Notify(consumer, &topic, request);
257 }