1 /*
2  * Copyright (c) 2020-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "pub_sub_implement.h"
17 #include "securec.h"
18 #include "ohos_errno.h"
19 #include "memory_adapter.h"
20 #include "thread_adapter.h"
21 
22 static int AddTopic(IUnknown *iUnknown, const Topic *topic);
23 static int Subscribe(IUnknown *iUnknown, const Topic *topic, Consumer *consumer);
24 static Consumer *ModifyConsumer(IUnknown *iUnknown, const Topic *topic, Consumer *oldConsumer, Consumer *newConsumer);
25 static Consumer *Unsubscribe(IUnknown *iUnknown, const Topic *topic, const Consumer *consumer);
26 static BOOL Publish(IUnknown *iUnknown, const Topic *topic, uint8 *data, int16 len);
27 static void DefaultHandle(const Request *request, const Response *response);
28 static BOOL ImmediatelyPublish(PubSubFeature *feature, const Topic *topic, const Request *request);
29 
30 static PubSubImplement g_pubSubImplement = {
31     DEFAULT_IUNKNOWN_ENTRY_BEGIN,
32     .subscriber.AddTopic = AddTopic,
33     .subscriber.Subscribe = Subscribe,
34     .subscriber.ModifyConsumer = ModifyConsumer,
35     .subscriber.Unsubscribe = Unsubscribe,
36     .provider.Publish = Publish,
37     DEFAULT_IUNKNOWN_ENTRY_END,
38     .feature = NULL
39 };
BCE_CreateInstance(Feature * feature)40 PubSubImplement *BCE_CreateInstance(Feature *feature)
41 {
42     g_pubSubImplement.feature = (PubSubFeature *)feature;
43     return &g_pubSubImplement;
44 }
45 
46 
AddTopic(IUnknown * iUnknown,const Topic * topic)47 static int AddTopic(IUnknown *iUnknown, const Topic *topic)
48 {
49     if (iUnknown == NULL || topic == NULL) {
50         return EC_INVALID;
51     }
52 
53     PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
54     if (broadcast->feature == NULL || broadcast->feature->GetRelation == NULL) {
55         return EC_FAILURE;
56     }
57 
58     if (broadcast->feature->GetRelation(broadcast->feature, topic) != NULL) {
59         return EC_FAILURE;
60     }
61 
62     Relation *head = &broadcast->feature->relations;
63     Relation *newRelation = (Relation *)SAMGR_Malloc(sizeof(Relation));
64     if (newRelation == NULL) {
65         return EC_NOMEMORY;
66     }
67     newRelation->topic = *topic;
68     newRelation->callbacks.consumer = NULL;
69     UtilsListInit(&newRelation->callbacks.node);
70 
71     MUTEX_Lock(broadcast->feature->mutex);
72     UtilsListAdd(&head->node, &(newRelation->node));
73     MUTEX_Unlock(broadcast->feature->mutex);
74     return EC_SUCCESS;
75 }
76 
Subscribe(IUnknown * iUnknown,const Topic * topic,Consumer * consumer)77 static int Subscribe(IUnknown *iUnknown, const Topic *topic, Consumer *consumer)
78 {
79     if (iUnknown == NULL || topic == NULL || consumer == NULL) {
80         return EC_INVALID;
81     }
82 
83     PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
84     if (broadcast->feature == NULL || broadcast->feature->GetRelation == NULL) {
85         return EC_FAILURE;
86     }
87 
88     Relation *relation = broadcast->feature->GetRelation(broadcast->feature, topic);
89     if (relation == NULL) {
90         return EC_FAILURE;
91     }
92 
93     MUTEX_Lock(broadcast->feature->mutex);
94     ConsumerNode *item = NULL;
95     UTILS_DL_LIST_FOR_EACH_ENTRY(item, &relation->callbacks.node, ConsumerNode, node) {
96         if (item->consumer->Equal(item->consumer, consumer)) {
97             MUTEX_Unlock(broadcast->feature->mutex);
98             return EC_ALREADY_SUBSCRIBED;
99         }
100     }
101     MUTEX_Unlock(broadcast->feature->mutex);
102     ConsumerNode *consumerNode = (ConsumerNode *)SAMGR_Malloc(sizeof(ConsumerNode));
103     if (consumerNode == NULL) {
104         return EC_NOMEMORY;
105     }
106 
107     UtilsListInit(&consumerNode->node);
108     consumerNode->consumer = consumer;
109     MUTEX_Lock(broadcast->feature->mutex);
110     ConsumerNode *head = &relation->callbacks;
111     UtilsListAdd(&head->node, &consumerNode->node);
112     MUTEX_Unlock(broadcast->feature->mutex);
113     return EC_SUCCESS;
114 }
115 
ModifyConsumer(IUnknown * iUnknown,const Topic * topic,Consumer * oldConsumer,Consumer * newConsumer)116 static Consumer *ModifyConsumer(IUnknown *iUnknown, const Topic *topic, Consumer *oldConsumer, Consumer *newConsumer)
117 {
118     if (iUnknown == NULL || topic == NULL || oldConsumer == NULL || newConsumer == NULL) {
119         return NULL;
120     }
121 
122     PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
123     if (broadcast->feature == NULL || broadcast->feature->GetRelation == NULL) {
124         return NULL;
125     }
126 
127     Relation *relation = broadcast->feature->GetRelation(broadcast->feature, topic);
128     if (relation == NULL) {
129         return NULL;
130     }
131 
132     MUTEX_Lock(broadcast->feature->mutex);
133     ConsumerNode *item = NULL;
134     UTILS_DL_LIST_FOR_EACH_ENTRY(item, &relation->callbacks.node, ConsumerNode, node) {
135         if (item->consumer->Equal(item->consumer, oldConsumer)) {
136             Consumer *older = item->consumer;
137             item->consumer = newConsumer;
138             MUTEX_Unlock(broadcast->feature->mutex);
139             return older;
140         }
141     }
142     MUTEX_Unlock(broadcast->feature->mutex);
143     return NULL;
144 }
145 
Unsubscribe(IUnknown * iUnknown,const Topic * topic,const Consumer * consumer)146 static Consumer *Unsubscribe(IUnknown *iUnknown, const Topic *topic, const Consumer *consumer)
147 {
148     if (iUnknown == NULL || topic == NULL || consumer == NULL) {
149         return NULL;
150     }
151 
152     PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
153     if (broadcast->feature == NULL || broadcast->feature->GetRelation == NULL) {
154         return NULL;
155     }
156 
157     Relation *relation = broadcast->feature->GetRelation(broadcast->feature, topic);
158     if (relation == NULL) {
159         return NULL;
160     }
161     MUTEX_Lock(broadcast->feature->mutex);
162     ConsumerNode *item = NULL;
163     UTILS_DL_LIST_FOR_EACH_ENTRY(item, &relation->callbacks.node, ConsumerNode, node) {
164         if (item->consumer->Equal(item->consumer, consumer)) {
165             UtilsListDelete(&item->node);
166             break;
167         }
168     }
169     MUTEX_Unlock(broadcast->feature->mutex);
170     if (item == &relation->callbacks || item == NULL) {
171         return NULL;
172     }
173     Consumer *oldConsumer = item->consumer;
174     SAMGR_Free(item);
175     return oldConsumer;
176 }
177 
Publish(IUnknown * iUnknown,const Topic * topic,uint8 * data,int16 len)178 static BOOL Publish(IUnknown *iUnknown, const Topic *topic, uint8 *data, int16 len)
179 {
180     PubSubImplement *broadcast = GET_OBJECT(iUnknown, PubSubImplement, iUnknown);
181     PubSubFeature *feature = broadcast->feature;
182     if (feature == NULL) {
183         return FALSE;
184     }
185 
186     Request request = {MSG_PUBLISH, 0, NULL, *(uint32 *)topic};
187     if (data != NULL && len > 0) {
188         request.data = (uint8 *)SAMGR_Malloc(len);
189         if (request.data == NULL) {
190             return FALSE;
191         }
192         request.len = len;
193         // There is no problem, the request.data length is equal the input data length.
194         (void)memcpy_s(request.data, request.len, data, len);
195     }
196 
197     if (!ImmediatelyPublish(feature, topic, &request)) {
198         (void)SAMGR_Free(request.data);
199         request.data = NULL;
200         request.len = 0;
201         return FALSE;
202     }
203     return TRUE;
204 }
205 
ImmediatelyPublish(PubSubFeature * feature,const Topic * topic,const Request * request)206 static BOOL ImmediatelyPublish(PubSubFeature *feature, const Topic *topic, const Request *request)
207 {
208     if (feature->GetRelation == NULL) {
209         return FALSE;
210     }
211 
212     Relation *relation = feature->GetRelation(feature, topic);
213     if (relation == NULL) {
214         return FALSE;
215     }
216 
217     if (UtilsListEmpty(&relation->callbacks.node)) {
218         return FALSE;
219     }
220 
221     BOOL needAync = FALSE;
222     ConsumerNode *item = NULL;
223     uint32 *token = NULL;
224     MUTEX_Lock(feature->mutex);
225     UTILS_DL_LIST_FOR_EACH_ENTRY(item, &relation->callbacks.node, ConsumerNode, node) {
226         if (item->consumer->identity == NULL) {
227             needAync = TRUE;
228             continue;
229         }
230 
231         Response response = {item->consumer, 0};
232         int ret = SAMGR_SendSharedDirectRequest(item->consumer->identity, request, &response, &token, DefaultHandle);
233         if (ret != EC_SUCCESS) {
234             needAync = FALSE;
235             break;
236         }
237     }
238     if (needAync) {
239         token = SAMGR_SendSharedRequest(&feature->identity, request, token, NULL);
240     }
241     MUTEX_Unlock(feature->mutex);
242     return (token != NULL);
243 }
244 
DefaultHandle(const Request * request,const Response * response)245 static void DefaultHandle(const Request *request, const Response *response)
246 {
247     Consumer *consumer = (Consumer *)response->data;
248     if (consumer == NULL || consumer->Notify == NULL || g_pubSubImplement.feature == NULL) {
249         return;
250     }
251 
252     // wait ImmediatelyPublish finished.
253     MUTEX_Lock(g_pubSubImplement.feature->mutex);
254     MUTEX_Unlock(g_pubSubImplement.feature->mutex);
255     Topic topic = request->msgValue;
256     consumer->Notify(consumer, &topic, request);
257 }