1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "event_loop_impl.h"
17
18 #include <ctime>
19
20 #include "db_errno.h"
21 #include "log_print.h"
22 #include "event_impl.h"
23
24 namespace DistributedDB {
25 class EventRequest {
26 public:
27 enum {
28 ADD_EVENT = 1,
29 REMOVE_EVENT,
30 SET_TIMEOUT,
31 MOD_EVENTS_ADD,
32 MOD_EVENTS_REMOVE,
33 };
34
EventRequest(int type,EventImpl * event,EventsMask events)35 EventRequest(int type, EventImpl *event, EventsMask events)
36 : type_(type),
37 event_(event),
38 events_(events),
39 timeout_(0)
40 {
41 if (event != nullptr) {
42 event->IncObjRef(event);
43 }
44 }
45
EventRequest(int type,EventImpl * event,EventTime timeout)46 EventRequest(int type, EventImpl *event, EventTime timeout)
47 : type_(type),
48 event_(event),
49 events_(0),
50 timeout_(timeout)
51 {
52 if (event != nullptr) {
53 event->IncObjRef(event);
54 }
55 }
56
~EventRequest()57 ~EventRequest()
58 {
59 if (event_ != nullptr) {
60 event_->DecObjRef(event_);
61 event_ = nullptr;
62 }
63 }
64
IsValidType(int type)65 static bool IsValidType(int type)
66 {
67 if (type < ADD_EVENT || type > MOD_EVENTS_REMOVE) {
68 return false;
69 }
70 return true;
71 }
72
GetType() const73 int GetType() const
74 {
75 return type_;
76 }
77
GetEvent(EventImpl * & event) const78 void GetEvent(EventImpl *&event) const
79 {
80 event = event_;
81 }
82
GetEvents() const83 EventsMask GetEvents() const
84 {
85 return events_;
86 }
87
GetTimeout() const88 EventTime GetTimeout() const
89 {
90 return timeout_;
91 }
92
93 private:
94 int type_;
95 EventImpl *event_;
96 EventsMask events_;
97 EventTime timeout_;
98 };
99
EventLoopImpl()100 EventLoopImpl::EventLoopImpl()
101 : pollingSetChanged_(false),
102 running_(true)
103 {
104 OnKill([this]() { OnKillLoop(); });
105 }
106
~EventLoopImpl()107 EventLoopImpl::~EventLoopImpl()
108 {}
109
Add(IEvent * event)110 int EventLoopImpl::Add(IEvent *event)
111 {
112 if (event == nullptr) {
113 return -E_INVALID_ARGS;
114 }
115
116 auto eventImpl = static_cast<EventImpl *>(event);
117 if (!eventImpl->SetLoop(this)) {
118 LOGE("Add ev to loop failed, already attached.");
119 return -E_INVALID_ARGS;
120 }
121
122 EventTime timeout = 0;
123 int errCode = QueueRequest(EventRequest::ADD_EVENT, eventImpl, timeout);
124 if (errCode != E_OK) {
125 eventImpl->SetLoop(nullptr);
126 LOGE("Add ev to loop failed. err: '%d'.", errCode);
127 }
128 return errCode;
129 }
130
Remove(IEvent * event)131 int EventLoopImpl::Remove(IEvent *event)
132 {
133 if (event == nullptr) {
134 return -E_INVALID_ARGS;
135 }
136
137 auto eventImpl = static_cast<EventImpl *>(event);
138 bool isLoopConfused = false;
139 if (!eventImpl->Attached(this, isLoopConfused)) {
140 if (isLoopConfused) {
141 LOGE("Remove ev' from loop failed, loop confused.");
142 return -E_UNEXPECTED_DATA;
143 }
144 return E_OK;
145 }
146
147 EventTime timeout = 0;
148 int errCode = QueueRequest(EventRequest::REMOVE_EVENT, eventImpl, timeout);
149 if (errCode != E_OK) {
150 LOGE("Remove ev from loop failed. err: '%d'.", errCode);
151 }
152 return errCode;
153 }
154
Run()155 int EventLoopImpl::Run()
156 {
157 {
158 RefObject::AutoLock lockGuard(this);
159 if (IsKilled()) {
160 LOGE("Try to run a killed loop.");
161 return -E_OBJ_IS_KILLED;
162 }
163 if (loopThread_ != std::thread::id()) {
164 LOGE("Try to run a threaded loop.");
165 return -E_BUSY;
166 }
167 loopThread_ = std::this_thread::get_id();
168 }
169
170 int errCode = E_OK;
171 IncObjRef(this);
172
173 while (running_) {
174 errCode = ProcessRequest();
175 if (errCode != E_OK) {
176 break;
177 }
178
179 errCode = Prepare(polling_);
180 if (errCode != E_OK) {
181 break;
182 }
183
184 EventTime sleepTime = CalSleepTime();
185 errCode = Poll(sleepTime);
186 if (errCode != E_OK) {
187 break;
188 }
189
190 errCode = ProcessRequest();
191 if (errCode != E_OK) {
192 break;
193 }
194
195 errCode = DispatchAll();
196 if (errCode != E_OK) {
197 break;
198 }
199 }
200
201 CleanLoop();
202 DecObjRef(this);
203 if (errCode == -E_OBJ_IS_KILLED) {
204 LOGD("Loop exited.");
205 } else {
206 LOGE("Loop exited, err:'%d'.", errCode);
207 }
208 return errCode;
209 }
210
Stop()211 int EventLoopImpl::Stop()
212 {
213 running_ = false;
214 return E_OK;
215 }
216
Modify(EventImpl * event,bool isAdd,EventsMask events)217 int EventLoopImpl::Modify(EventImpl *event, bool isAdd, EventsMask events)
218 {
219 if (event == nullptr) {
220 return -E_INVALID_ARGS;
221 }
222
223 int type = isAdd ? EventRequest::MOD_EVENTS_ADD :
224 EventRequest::MOD_EVENTS_REMOVE;
225 int errCode = QueueRequest(type, event, events);
226 if (errCode != E_OK) {
227 LOGE("Modify loop ev events failed. err: '%d'.", errCode);
228 }
229 return errCode;
230 }
231
Modify(EventImpl * event,EventTime time)232 int EventLoopImpl::Modify(EventImpl *event, EventTime time)
233 {
234 if (event == nullptr) {
235 return -E_INVALID_ARGS;
236 }
237
238 int errCode = QueueRequest(EventRequest::SET_TIMEOUT, event, time);
239 if (errCode != E_OK) {
240 LOGE("Mod loop ev time failed. err: '%d'.", errCode);
241 }
242 return errCode;
243 }
244
GetTime() const245 EventTime EventLoopImpl::GetTime() const
246 {
247 uint64_t microsecond = 0;
248 OS::GetMonotonicRelativeTimeInMicrosecond(microsecond); // It is not very possible to fail, if so use 0 as default
249 return static_cast<EventTime>(microsecond / 1000); // 1000 is the multiple between microsecond and millisecond
250 }
251
SendRequestToLoop(EventRequest * eventRequest)252 int EventLoopImpl::SendRequestToLoop(EventRequest *eventRequest)
253 {
254 if (eventRequest == nullptr) {
255 return -E_INVALID_ARGS;
256 }
257
258 RefObject::AutoLock lockGuard(this);
259 if (IsKilled()) {
260 return -E_OBJ_IS_KILLED;
261 }
262 requests_.push_back(eventRequest);
263 WakeUp();
264 return E_OK;
265 }
266
267 template<typename T>
QueueRequest(int type,EventImpl * event,T argument)268 int EventLoopImpl::QueueRequest(int type, EventImpl *event, T argument)
269 {
270 if (!EventRequest::IsValidType(type)) {
271 return -E_INVALID_ARGS;
272 }
273 if (event == nullptr ||
274 !event->IsValidArg(argument)) {
275 return -E_INVALID_ARGS;
276 }
277
278 if (IsKilled()) { // pre-check
279 return -E_OBJ_IS_KILLED;
280 }
281
282 int errCode = event->CheckStatus();
283 if (errCode != E_OK) {
284 if (errCode != -E_OBJ_IS_KILLED ||
285 type != EventRequest::REMOVE_EVENT) {
286 return errCode;
287 }
288 }
289
290 auto eventRequest = new (std::nothrow) EventRequest(type, event, argument);
291 if (eventRequest == nullptr) {
292 return -E_OUT_OF_MEMORY;
293 }
294
295 errCode = SendRequestToLoop(eventRequest);
296 if (errCode != E_OK) {
297 delete eventRequest;
298 eventRequest = nullptr;
299 }
300 return errCode;
301 }
302
IsInLoopThread(bool & started) const303 bool EventLoopImpl::IsInLoopThread(bool &started) const
304 {
305 if (loopThread_ == std::thread::id()) {
306 started = false;
307 } else {
308 started = true;
309 }
310 return std::this_thread::get_id() == loopThread_;
311 }
312
EventObjectExists(EventImpl * event) const313 bool EventLoopImpl::EventObjectExists(EventImpl *event) const
314 {
315 return polling_.find(event) != polling_.end();
316 }
317
EventFdExists(const EventImpl * event) const318 bool EventLoopImpl::EventFdExists(const EventImpl *event) const
319 {
320 if (!event->IsValidFd()) {
321 return false;
322 }
323 for (auto ev : polling_) {
324 if (ev->GetEventFd() == event->GetEventFd()) {
325 return true;
326 }
327 }
328 return false;
329 }
330
AddEventObject(EventImpl * event,EventTime now)331 int EventLoopImpl::AddEventObject(EventImpl *event, EventTime now)
332 {
333 if (event == nullptr) {
334 return -E_INVALID_ARGS;
335 }
336 if (EventObjectExists(event)) {
337 LOGE("Add event object failed. ev already exists.");
338 return -EEXIST;
339 }
340 if (EventFdExists(event)) {
341 LOGE("Add event object failed. ev fd already exists.");
342 return -EEXIST;
343 }
344
345 int errCode = E_OK;
346 if (!event->IsTimer()) {
347 errCode = AddEvent(event);
348 }
349
350 if (errCode == E_OK) {
351 polling_.insert(event);
352 event->SetStartTime(now);
353 event->SetRevents(0);
354 event->IncObjRef(event);
355 pollingSetChanged_ = true;
356 } else {
357 LOGE("Add event failed. err: '%d'.", errCode);
358 }
359 return errCode;
360 }
361
RemoveEventObject(EventImpl * event)362 int EventLoopImpl::RemoveEventObject(EventImpl *event)
363 {
364 if (event == nullptr) {
365 return -E_INVALID_ARGS;
366 }
367 if (!EventObjectExists(event)) {
368 return -E_NO_SUCH_ENTRY;
369 }
370
371 int errCode = E_OK;
372 if (!event->IsTimer()) {
373 errCode = RemoveEvent(event);
374 }
375
376 if (errCode == E_OK) {
377 polling_.erase(event);
378 event->SetLoop(nullptr);
379 event->DecObjRef(event);
380 pollingSetChanged_ = true;
381 } else {
382 LOGE("Remove event failed. err: '%d'.", errCode);
383 }
384 return errCode;
385 }
386
ModifyEventObject(EventImpl * event,bool isAdd,EventsMask events)387 int EventLoopImpl::ModifyEventObject(EventImpl *event, bool isAdd, EventsMask events)
388 {
389 if (event == nullptr) {
390 return -E_INVALID_ARGS;
391 }
392 if (!EventObjectExists(event)) {
393 return -EEXIST;
394 }
395
396 int errCode = E_OK;
397 if (!event->IsTimer()) {
398 EventsMask genericEvents = events & (~IEvent::ET_TIMEOUT);
399 if (genericEvents) {
400 errCode = ModifyEvent(event, isAdd, genericEvents);
401 }
402 }
403
404 if (errCode == E_OK) {
405 event->SetEvents(isAdd, events);
406 } else {
407 LOGE("Modify event' failed. err: '%d'.", errCode);
408 }
409 return errCode;
410 }
411
ModifyEventObject(EventImpl * event,EventTime timeout)412 int EventLoopImpl::ModifyEventObject(EventImpl *event, EventTime timeout)
413 {
414 if (event == nullptr) {
415 return -E_INVALID_ARGS;
416 }
417 if (!EventObjectExists(event)) {
418 return -E_NO_SUCH_ENTRY;
419 }
420 event->SetTimeoutPeriod(timeout);
421 return E_OK;
422 }
423
ProcessRequest(std::list<EventRequest * > & requests)424 void EventLoopImpl::ProcessRequest(std::list<EventRequest *> &requests)
425 {
426 EventTime now = GetTime();
427 while (true) {
428 if (requests.empty()) {
429 break;
430 }
431
432 EventRequest *request = requests.front();
433 requests.pop_front();
434 if (request == nullptr) {
435 continue;
436 }
437
438 if (!IsKilled()) {
439 EventImpl *event = nullptr;
440 request->GetEvent(event);
441 EventsMask events = request->GetEvents();
442 EventTime timeout = request->GetTimeout();
443
444 switch (request->GetType()) {
445 case EventRequest::ADD_EVENT:
446 (void)(AddEventObject(event, now));
447 break;
448
449 case EventRequest::REMOVE_EVENT:
450 (void)(RemoveEventObject(event));
451 break;
452
453 case EventRequest::MOD_EVENTS_ADD:
454 (void)(ModifyEventObject(event, true, events));
455 break;
456
457 case EventRequest::MOD_EVENTS_REMOVE:
458 (void)(ModifyEventObject(event, false, events));
459 break;
460
461 case EventRequest::SET_TIMEOUT:
462 (void)(ModifyEventObject(event, timeout));
463 break;
464
465 default:
466 break;
467 }
468 }
469
470 delete request;
471 request = nullptr;
472 }
473 }
474
ProcessRequest()475 int EventLoopImpl::ProcessRequest()
476 {
477 int errCode = E_OK;
478 std::list<EventRequest *> requests;
479 {
480 RefObject::AutoLock lockGuard(this);
481 if (IsKilled()) {
482 errCode = -E_OBJ_IS_KILLED;
483 }
484 if (requests_.empty()) {
485 return errCode;
486 }
487 std::swap(requests, requests_);
488 }
489
490 ProcessRequest(requests);
491 return errCode;
492 }
493
CalSleepTime() const494 EventTime EventLoopImpl::CalSleepTime() const
495 {
496 EventTime now = GetTime();
497 EventTime minInterval = EventImpl::MAX_TIME_VALUE;
498
499 for (auto event : polling_) {
500 if (event == nullptr) {
501 continue;
502 }
503
504 EventTime t;
505 bool valid = event->GetTimeoutPoint(t);
506 if (!valid) {
507 continue;
508 }
509
510 if (t <= now) {
511 return 0;
512 }
513
514 EventTime interval = t - now;
515 if (interval < minInterval) {
516 minInterval = interval;
517 }
518 }
519
520 return minInterval;
521 }
522
DispatchAll()523 int EventLoopImpl::DispatchAll()
524 {
525 do {
526 EventTime now = GetTime();
527 pollingSetChanged_ = false;
528
529 for (auto event : polling_) {
530 if (IsKilled()) {
531 return -E_OBJ_IS_KILLED;
532 }
533 if (event == nullptr) {
534 continue;
535 }
536
537 event->IncObjRef(event);
538 event->UpdateElapsedTime(now);
539 int errCode = event->Dispatch();
540 if (errCode != E_OK) {
541 RemoveEventObject(event);
542 } else {
543 event->SetRevents(0);
544 }
545 event->DecObjRef(event);
546
547 if (pollingSetChanged_) {
548 break;
549 }
550 }
551 } while (pollingSetChanged_);
552 return E_OK;
553 }
554
CleanLoop()555 void EventLoopImpl::CleanLoop()
556 {
557 if (!IsKilled()) {
558 return;
559 }
560
561 ProcessRequest();
562 std::set<EventImpl *> polling = std::move(polling_);
563 int errCode = Exit(polling);
564 if (errCode != E_OK) {
565 LOGE("Exit loop failed when cleanup, err:'%d'.", errCode);
566 }
567
568 for (auto event : polling) {
569 if (event != nullptr) {
570 event->KillAndDecObjRef(event);
571 }
572 }
573 }
574
OnKillLoop()575 void EventLoopImpl::OnKillLoop()
576 {
577 bool started = true;
578 if (IsInLoopThread(started)) {
579 // Loop object is set to state: killed,
580 // everything will be done in loop.Run()
581 return;
582 }
583
584 if (started) {
585 // Ditto
586 WakeUp();
587 } else {
588 // Drop the lock.
589 UnlockObj();
590 CleanLoop();
591 LockObj();
592 }
593 }
594 }
595