1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "event_loop_impl.h"
17 
18 #include <ctime>
19 
20 #include "db_errno.h"
21 #include "log_print.h"
22 #include "event_impl.h"
23 
24 namespace DistributedDB {
25 class EventRequest {
26 public:
27     enum {
28         ADD_EVENT = 1,
29         REMOVE_EVENT,
30         SET_TIMEOUT,
31         MOD_EVENTS_ADD,
32         MOD_EVENTS_REMOVE,
33     };
34 
EventRequest(int type,EventImpl * event,EventsMask events)35     EventRequest(int type, EventImpl *event, EventsMask events)
36         : type_(type),
37           event_(event),
38           events_(events),
39           timeout_(0)
40     {
41         if (event != nullptr) {
42             event->IncObjRef(event);
43         }
44     }
45 
EventRequest(int type,EventImpl * event,EventTime timeout)46     EventRequest(int type, EventImpl *event, EventTime timeout)
47         : type_(type),
48           event_(event),
49           events_(0),
50           timeout_(timeout)
51     {
52         if (event != nullptr) {
53             event->IncObjRef(event);
54         }
55     }
56 
~EventRequest()57     ~EventRequest()
58     {
59         if (event_ != nullptr) {
60             event_->DecObjRef(event_);
61             event_ = nullptr;
62         }
63     }
64 
IsValidType(int type)65     static bool IsValidType(int type)
66     {
67         if (type < ADD_EVENT || type > MOD_EVENTS_REMOVE) {
68             return false;
69         }
70         return true;
71     }
72 
GetType() const73     int GetType() const
74     {
75         return type_;
76     }
77 
GetEvent(EventImpl * & event) const78     void GetEvent(EventImpl *&event) const
79     {
80         event = event_;
81     }
82 
GetEvents() const83     EventsMask GetEvents() const
84     {
85         return events_;
86     }
87 
GetTimeout() const88     EventTime GetTimeout() const
89     {
90         return timeout_;
91     }
92 
93 private:
94     int type_;
95     EventImpl *event_;
96     EventsMask events_;
97     EventTime timeout_;
98 };
99 
EventLoopImpl()100 EventLoopImpl::EventLoopImpl()
101     : pollingSetChanged_(false),
102       running_(true)
103 {
104     OnKill([this]() { OnKillLoop(); });
105 }
106 
~EventLoopImpl()107 EventLoopImpl::~EventLoopImpl()
108 {}
109 
Add(IEvent * event)110 int EventLoopImpl::Add(IEvent *event)
111 {
112     if (event == nullptr) {
113         return -E_INVALID_ARGS;
114     }
115 
116     auto eventImpl = static_cast<EventImpl *>(event);
117     if (!eventImpl->SetLoop(this)) {
118         LOGE("Add ev to loop failed, already attached.");
119         return -E_INVALID_ARGS;
120     }
121 
122     EventTime timeout = 0;
123     int errCode = QueueRequest(EventRequest::ADD_EVENT, eventImpl, timeout);
124     if (errCode != E_OK) {
125         eventImpl->SetLoop(nullptr);
126         LOGE("Add ev to loop failed. err: '%d'.", errCode);
127     }
128     return errCode;
129 }
130 
Remove(IEvent * event)131 int EventLoopImpl::Remove(IEvent *event)
132 {
133     if (event == nullptr) {
134         return -E_INVALID_ARGS;
135     }
136 
137     auto eventImpl = static_cast<EventImpl *>(event);
138     bool isLoopConfused = false;
139     if (!eventImpl->Attached(this, isLoopConfused)) {
140         if (isLoopConfused) {
141             LOGE("Remove ev' from loop failed, loop confused.");
142             return -E_UNEXPECTED_DATA;
143         }
144         return E_OK;
145     }
146 
147     EventTime timeout = 0;
148     int errCode = QueueRequest(EventRequest::REMOVE_EVENT, eventImpl, timeout);
149     if (errCode != E_OK) {
150         LOGE("Remove ev from loop failed. err: '%d'.", errCode);
151     }
152     return errCode;
153 }
154 
Run()155 int EventLoopImpl::Run()
156 {
157     {
158         RefObject::AutoLock lockGuard(this);
159         if (IsKilled()) {
160             LOGE("Try to run a killed loop.");
161             return -E_OBJ_IS_KILLED;
162         }
163         if (loopThread_ != std::thread::id()) {
164             LOGE("Try to run a threaded loop.");
165             return -E_BUSY;
166         }
167         loopThread_ = std::this_thread::get_id();
168     }
169 
170     int errCode = E_OK;
171     IncObjRef(this);
172 
173     while (running_) {
174         errCode = ProcessRequest();
175         if (errCode != E_OK) {
176             break;
177         }
178 
179         errCode = Prepare(polling_);
180         if (errCode != E_OK) {
181             break;
182         }
183 
184         EventTime sleepTime = CalSleepTime();
185         errCode = Poll(sleepTime);
186         if (errCode != E_OK) {
187             break;
188         }
189 
190         errCode = ProcessRequest();
191         if (errCode != E_OK) {
192             break;
193         }
194 
195         errCode = DispatchAll();
196         if (errCode != E_OK) {
197             break;
198         }
199     }
200 
201     CleanLoop();
202     DecObjRef(this);
203     if (errCode == -E_OBJ_IS_KILLED) {
204         LOGD("Loop exited.");
205     } else {
206         LOGE("Loop exited, err:'%d'.", errCode);
207     }
208     return errCode;
209 }
210 
Stop()211 int EventLoopImpl::Stop()
212 {
213     running_ = false;
214     return E_OK;
215 }
216 
Modify(EventImpl * event,bool isAdd,EventsMask events)217 int EventLoopImpl::Modify(EventImpl *event, bool isAdd, EventsMask events)
218 {
219     if (event == nullptr) {
220         return -E_INVALID_ARGS;
221     }
222 
223     int type = isAdd ? EventRequest::MOD_EVENTS_ADD :
224         EventRequest::MOD_EVENTS_REMOVE;
225     int errCode = QueueRequest(type, event, events);
226     if (errCode != E_OK) {
227         LOGE("Modify loop ev events failed. err: '%d'.", errCode);
228     }
229     return errCode;
230 }
231 
Modify(EventImpl * event,EventTime time)232 int EventLoopImpl::Modify(EventImpl *event, EventTime time)
233 {
234     if (event == nullptr) {
235         return -E_INVALID_ARGS;
236     }
237 
238     int errCode = QueueRequest(EventRequest::SET_TIMEOUT, event, time);
239     if (errCode != E_OK) {
240         LOGE("Mod loop ev time failed. err: '%d'.", errCode);
241     }
242     return errCode;
243 }
244 
GetTime() const245 EventTime EventLoopImpl::GetTime() const
246 {
247     uint64_t microsecond = 0;
248     OS::GetMonotonicRelativeTimeInMicrosecond(microsecond); // It is not very possible to fail, if so use 0 as default
249     return static_cast<EventTime>(microsecond / 1000); // 1000 is the multiple between microsecond and millisecond
250 }
251 
SendRequestToLoop(EventRequest * eventRequest)252 int EventLoopImpl::SendRequestToLoop(EventRequest *eventRequest)
253 {
254     if (eventRequest == nullptr) {
255         return -E_INVALID_ARGS;
256     }
257 
258     RefObject::AutoLock lockGuard(this);
259     if (IsKilled()) {
260         return -E_OBJ_IS_KILLED;
261     }
262     requests_.push_back(eventRequest);
263     WakeUp();
264     return E_OK;
265 }
266 
267 template<typename T>
QueueRequest(int type,EventImpl * event,T argument)268 int EventLoopImpl::QueueRequest(int type, EventImpl *event, T argument)
269 {
270     if (!EventRequest::IsValidType(type)) {
271         return -E_INVALID_ARGS;
272     }
273     if (event == nullptr ||
274         !event->IsValidArg(argument)) {
275         return -E_INVALID_ARGS;
276     }
277 
278     if (IsKilled()) { // pre-check
279         return -E_OBJ_IS_KILLED;
280     }
281 
282     int errCode = event->CheckStatus();
283     if (errCode != E_OK) {
284         if (errCode != -E_OBJ_IS_KILLED ||
285             type != EventRequest::REMOVE_EVENT) {
286             return errCode;
287         }
288     }
289 
290     auto eventRequest = new (std::nothrow) EventRequest(type, event, argument);
291     if (eventRequest == nullptr) {
292         return -E_OUT_OF_MEMORY;
293     }
294 
295     errCode = SendRequestToLoop(eventRequest);
296     if (errCode != E_OK) {
297         delete eventRequest;
298         eventRequest = nullptr;
299     }
300     return errCode;
301 }
302 
IsInLoopThread(bool & started) const303 bool EventLoopImpl::IsInLoopThread(bool &started) const
304 {
305     if (loopThread_ == std::thread::id()) {
306         started = false;
307     } else {
308         started = true;
309     }
310     return std::this_thread::get_id() == loopThread_;
311 }
312 
EventObjectExists(EventImpl * event) const313 bool EventLoopImpl::EventObjectExists(EventImpl *event) const
314 {
315     return polling_.find(event) != polling_.end();
316 }
317 
EventFdExists(const EventImpl * event) const318 bool EventLoopImpl::EventFdExists(const EventImpl *event) const
319 {
320     if (!event->IsValidFd()) {
321         return false;
322     }
323     for (auto ev : polling_) {
324         if (ev->GetEventFd() == event->GetEventFd()) {
325             return true;
326         }
327     }
328     return false;
329 }
330 
AddEventObject(EventImpl * event,EventTime now)331 int EventLoopImpl::AddEventObject(EventImpl *event, EventTime now)
332 {
333     if (event == nullptr) {
334         return -E_INVALID_ARGS;
335     }
336     if (EventObjectExists(event)) {
337         LOGE("Add event object failed. ev already exists.");
338         return -EEXIST;
339     }
340     if (EventFdExists(event)) {
341         LOGE("Add event object failed. ev fd already exists.");
342         return -EEXIST;
343     }
344 
345     int errCode = E_OK;
346     if (!event->IsTimer()) {
347         errCode = AddEvent(event);
348     }
349 
350     if (errCode == E_OK) {
351         polling_.insert(event);
352         event->SetStartTime(now);
353         event->SetRevents(0);
354         event->IncObjRef(event);
355         pollingSetChanged_ = true;
356     } else {
357         LOGE("Add event failed. err: '%d'.", errCode);
358     }
359     return errCode;
360 }
361 
RemoveEventObject(EventImpl * event)362 int EventLoopImpl::RemoveEventObject(EventImpl *event)
363 {
364     if (event == nullptr) {
365         return -E_INVALID_ARGS;
366     }
367     if (!EventObjectExists(event)) {
368         return -E_NO_SUCH_ENTRY;
369     }
370 
371     int errCode = E_OK;
372     if (!event->IsTimer()) {
373         errCode = RemoveEvent(event);
374     }
375 
376     if (errCode == E_OK) {
377         polling_.erase(event);
378         event->SetLoop(nullptr);
379         event->DecObjRef(event);
380         pollingSetChanged_ = true;
381     } else {
382         LOGE("Remove event failed. err: '%d'.", errCode);
383     }
384     return errCode;
385 }
386 
ModifyEventObject(EventImpl * event,bool isAdd,EventsMask events)387 int EventLoopImpl::ModifyEventObject(EventImpl *event, bool isAdd, EventsMask events)
388 {
389     if (event == nullptr) {
390         return -E_INVALID_ARGS;
391     }
392     if (!EventObjectExists(event)) {
393         return -EEXIST;
394     }
395 
396     int errCode = E_OK;
397     if (!event->IsTimer()) {
398         EventsMask genericEvents = events & (~IEvent::ET_TIMEOUT);
399         if (genericEvents) {
400             errCode = ModifyEvent(event, isAdd, genericEvents);
401         }
402     }
403 
404     if (errCode == E_OK) {
405         event->SetEvents(isAdd, events);
406     } else {
407         LOGE("Modify event' failed. err: '%d'.", errCode);
408     }
409     return errCode;
410 }
411 
ModifyEventObject(EventImpl * event,EventTime timeout)412 int EventLoopImpl::ModifyEventObject(EventImpl *event, EventTime timeout)
413 {
414     if (event == nullptr) {
415         return -E_INVALID_ARGS;
416     }
417     if (!EventObjectExists(event)) {
418         return -E_NO_SUCH_ENTRY;
419     }
420     event->SetTimeoutPeriod(timeout);
421     return E_OK;
422 }
423 
ProcessRequest(std::list<EventRequest * > & requests)424 void EventLoopImpl::ProcessRequest(std::list<EventRequest *> &requests)
425 {
426     EventTime now = GetTime();
427     while (true) {
428         if (requests.empty()) {
429             break;
430         }
431 
432         EventRequest *request = requests.front();
433         requests.pop_front();
434         if (request == nullptr) {
435             continue;
436         }
437 
438         if (!IsKilled()) {
439             EventImpl *event = nullptr;
440             request->GetEvent(event);
441             EventsMask events = request->GetEvents();
442             EventTime timeout = request->GetTimeout();
443 
444             switch (request->GetType()) {
445                 case EventRequest::ADD_EVENT:
446                     (void)(AddEventObject(event, now));
447                     break;
448 
449                 case EventRequest::REMOVE_EVENT:
450                     (void)(RemoveEventObject(event));
451                     break;
452 
453                 case EventRequest::MOD_EVENTS_ADD:
454                     (void)(ModifyEventObject(event, true, events));
455                     break;
456 
457                 case EventRequest::MOD_EVENTS_REMOVE:
458                     (void)(ModifyEventObject(event, false, events));
459                     break;
460 
461                 case EventRequest::SET_TIMEOUT:
462                     (void)(ModifyEventObject(event, timeout));
463                     break;
464 
465                 default:
466                     break;
467             }
468         }
469 
470         delete request;
471         request = nullptr;
472     }
473 }
474 
ProcessRequest()475 int EventLoopImpl::ProcessRequest()
476 {
477     int errCode = E_OK;
478     std::list<EventRequest *> requests;
479     {
480         RefObject::AutoLock lockGuard(this);
481         if (IsKilled()) {
482             errCode = -E_OBJ_IS_KILLED;
483         }
484         if (requests_.empty()) {
485             return errCode;
486         }
487         std::swap(requests, requests_);
488     }
489 
490     ProcessRequest(requests);
491     return errCode;
492 }
493 
CalSleepTime() const494 EventTime EventLoopImpl::CalSleepTime() const
495 {
496     EventTime now = GetTime();
497     EventTime minInterval = EventImpl::MAX_TIME_VALUE;
498 
499     for (auto event : polling_) {
500         if (event == nullptr) {
501             continue;
502         }
503 
504         EventTime t;
505         bool valid = event->GetTimeoutPoint(t);
506         if (!valid) {
507             continue;
508         }
509 
510         if (t <= now) {
511             return 0;
512         }
513 
514         EventTime interval = t - now;
515         if (interval < minInterval) {
516             minInterval = interval;
517         }
518     }
519 
520     return minInterval;
521 }
522 
DispatchAll()523 int EventLoopImpl::DispatchAll()
524 {
525     do {
526         EventTime now = GetTime();
527         pollingSetChanged_ = false;
528 
529         for (auto event : polling_) {
530             if (IsKilled()) {
531                 return -E_OBJ_IS_KILLED;
532             }
533             if (event == nullptr) {
534                 continue;
535             }
536 
537             event->IncObjRef(event);
538             event->UpdateElapsedTime(now);
539             int errCode = event->Dispatch();
540             if (errCode != E_OK) {
541                 RemoveEventObject(event);
542             } else {
543                 event->SetRevents(0);
544             }
545             event->DecObjRef(event);
546 
547             if (pollingSetChanged_) {
548                 break;
549             }
550         }
551     } while (pollingSetChanged_);
552     return E_OK;
553 }
554 
CleanLoop()555 void EventLoopImpl::CleanLoop()
556 {
557     if (!IsKilled()) {
558         return;
559     }
560 
561     ProcessRequest();
562     std::set<EventImpl *> polling = std::move(polling_);
563     int errCode = Exit(polling);
564     if (errCode != E_OK) {
565         LOGE("Exit loop failed when cleanup, err:'%d'.", errCode);
566     }
567 
568     for (auto event : polling) {
569         if (event != nullptr) {
570             event->KillAndDecObjRef(event);
571         }
572     }
573 }
574 
OnKillLoop()575 void EventLoopImpl::OnKillLoop()
576 {
577     bool started = true;
578     if (IsInLoopThread(started)) {
579         // Loop object is set to state: killed,
580         // everything will be done in loop.Run()
581         return;
582     }
583 
584     if (started) {
585         // Ditto
586         WakeUp();
587     } else {
588         // Drop the lock.
589         UnlockObj();
590         CleanLoop();
591         LockObj();
592     }
593 }
594 }
595