1 /*
2  * Copyright (c) 2023-2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "output_controller.h"
17 
18 #include <cstdlib>
19 #include <sys/prctl.h>
20 
21 #include "av_trans_constants.h"
22 #include "av_trans_errno.h"
23 
24 namespace OHOS {
25 namespace DistributedHardware {
~OutputController()26 OutputController::~OutputController()
27 {
28     ReleaseControl();
29 }
30 
PushData(std::shared_ptr<Plugin::Buffer> & data)31 void OutputController::PushData(std::shared_ptr<Plugin::Buffer>& data)
32 {
33     int64_t pushTime = GetCurrentTime();
34     TRUE_RETURN((GetControlStatus() != ControlStatus::START),
35         "Control status wrong, push data failed.");
36     {
37         std::lock_guard<std::mutex> lock(queueMutex_);
38         if (dataQueue_.size() > QUEUE_MAX_SIZE) {
39             AVTRANS_LOGE("DataQueue is greater than QUEUE_MAX_SIZE %{public}zu.", QUEUE_MAX_SIZE);
40             dataQueue_.pop();
41         }
42         CheckSyncInfo(data);
43         if (data == nullptr || data->GetBufferMeta() == nullptr) {
44             AVTRANS_LOGE("data or data getbuffermeta is nullptr");
45             return;
46         }
47         data->GetBufferMeta()->SetMeta(Tag::USER_PUSH_DATA_TIME, pushTime);
48         dataQueue_.push(data);
49         AVTRANS_LOGD("Push Data, dataQueue size is %{public}zu.", dataQueue_.size());
50     }
51     if (GetControlMode() == ControlMode::SYNC) {
52         clockCon_.notify_one();
53     }
54     controlCon_.notify_one();
55 }
56 
StartControl()57 OutputController::ControlStatus OutputController::StartControl()
58 {
59     TRUE_RETURN_V_MSG_D((GetControlStatus() == ControlStatus::START), ControlStatus::STARTED,
60         "Control status is started.");
61     SetControlStatus(ControlStatus::START);
62     if (!handlerThread_) {
63         AVTRANS_LOGD("Init handler thread.");
64         handlerThread_ = std::make_unique<std::thread>(&OutputController::LooperHandle, this);
65         std::unique_lock<std::mutex> lock(handlerMutex_);
66         handlerCon_.wait(lock, [this] {
67             return handler_;
68         });
69     }
70     if (!controlThread_) {
71         AVTRANS_LOGD("Init control thread.");
72         controlThread_ = std::make_unique<std::thread>(&OutputController::LooperControl, this);
73     }
74     AVTRANS_LOGI("Start control success.");
75     return ControlStatus::START;
76 }
77 
PrepareControl()78 void OutputController::PrepareControl()
79 {
80     if (GetControlMode() == ControlMode::SMOOTH) {
81         PrepareSmooth();
82     } else {
83         PrepareSync();
84     }
85     InitTimeStatistician();
86     AVTRANS_LOGI("Prepare control success.");
87 }
88 
StopControl()89 OutputController::ControlStatus OutputController::StopControl()
90 {
91     TRUE_RETURN_V_MSG_D((GetControlStatus() == ControlStatus::STOP), ControlStatus::STOPPED,
92         "Control status is stopped.");
93     SetControlStatus(ControlStatus::STOP);
94     ClearQueue(dataQueue_);
95     AVTRANS_LOGI("Stop control success.");
96     return ControlStatus::STOP;
97 }
98 
ReleaseControl()99 OutputController::ControlStatus OutputController::ReleaseControl()
100 {
101     TRUE_RETURN_V_MSG_D((GetControlStatus() == ControlStatus::RELEASE), ControlStatus::RELEASED,
102         "Control status is released.");
103     SetControlStatus(ControlStatus::RELEASE);
104     statistician_ = nullptr;
105     UnregisterListener();
106     controlCon_.notify_one();
107     sleepCon_.notify_one();
108     clockCon_.notify_one();
109     if (handler_ && handler_->GetEventRunner()) {
110         handler_->GetEventRunner()->Stop();
111     }
112     if (handlerThread_) {
113         handlerThread_->join();
114         handlerThread_ = nullptr;
115     }
116     if (controlThread_) {
117         controlThread_->join();
118         controlThread_ = nullptr;
119     }
120     ClearQueue(dataQueue_);
121     paramsMap_.clear();
122     AVTRANS_LOGI("Release control success.");
123     return ControlStatus::RELEASE;
124 }
125 
GetParameter(Tag tag,ValueType & value)126 Status OutputController::GetParameter(Tag tag, ValueType& value)
127 {
128     {
129         std::lock_guard<std::mutex> lock(paramMapMutex_);
130         auto iter = paramsMap_.find(tag);
131         if (iter != paramsMap_.end()) {
132             value = iter->second;
133             return Status::OK;
134         }
135         return Status::ERROR_NOT_EXISTED;
136     }
137 }
138 
SetParameter(Tag tag,const ValueType & value)139 Status OutputController::SetParameter(Tag tag, const ValueType& value)
140 {
141     {
142         std::lock_guard<std::mutex> lock(paramMapMutex_);
143         switch (tag) {
144             case Tag::USER_AV_SYNC_GROUP_INFO: {
145                 std::string jsonStr = Plugin::AnyCast<std::string>(value);
146                 AVTRANS_LOGD("Set parameter USER_AV_SYNC_GROUP_INFO: %{public}s", jsonStr.c_str());
147                 break;
148             }
149             case Tag::USER_SHARED_MEMORY_FD: {
150                 std::string jsonStr = Plugin::AnyCast<std::string>(value);
151                 sharedMem_ = UnmarshalSharedMemory(jsonStr);
152                 AVTRANS_LOGD("Set parameter USER_SHARED_MEMORY_FD: %{public}s, unmarshal sharedMem fd: %{public}d, "
153                     "size: %{public}d, name: %{public}s", jsonStr.c_str(), sharedMem_.fd, sharedMem_.size,
154                     sharedMem_.name.c_str());
155                 break;
156             }
157             case Tag::USER_TIME_SYNC_RESULT: {
158                 std::string jsonStr = Plugin::AnyCast<std::string>(value);
159                 int32_t devClockDiff = atoi(jsonStr.c_str());
160                 SetDevClockDiff(devClockDiff);
161                 AVTRANS_LOGD("Set parameter USER_TIME_SYNC_RESULT: %{public}s, devClockDiff is %{public}d.",
162                     jsonStr.c_str(), devClockDiff);
163                 break;
164             }
165             default:
166                 AVTRANS_LOGE("Invalid tag.");
167         }
168         paramsMap_.insert(std::make_pair(tag, value));
169         return Status::OK;
170     }
171 }
172 
PrepareSync()173 void OutputController::PrepareSync()
174 {
175     AVTRANS_LOGI("Prepare sync.");
176     SetProcessDynamicBalanceState(true);
177     SetTimeInitState(false);
178     SetBaselineInitState(false);
179     SetAllowControlState(true);
180     SetBufferTime(0);
181     SetAdjustSleepFactor(0);
182     SetWaitClockThre(0);
183     SetTrackClockThre(0);
184     SetSleepThre(0);
185     SetAudioFrontTime(0);
186     SetAudioBackTime(0);
187     SetVideoFrontTime(0);
188     SetVideoBackTime(0);
189 }
190 
PrepareSmooth()191 void OutputController::PrepareSmooth()
192 {
193     AVTRANS_LOGI("Prepare smooth.");
194     SetProcessDynamicBalanceState(false);
195     SetTimeInitState(false);
196     SetBaselineInitState(false);
197     SetAllowControlState(true);
198     SetBufferTime(0);
199     SetAdjustSleepFactor(0);
200     SetWaitClockFactor(0);
201     SetTrackClockFactor(0);
202     SetSleepThre(0);
203     SetDynamicBalanceThre(0);
204     SetAverIntervalDiffThre(0);
205     SetPushOnceDiffThre(0);
206     SetTimeStampOnceDiffThre(0);
207 }
208 
LooperHandle()209 void OutputController::LooperHandle()
210 {
211     prctl(PR_SET_NAME, OUTPUT_HANDLE_THREAD_NAME.c_str());
212     auto runner = AppExecFwk::EventRunner::Create(false);
213     {
214         std::lock_guard<std::mutex> lock(handlerMutex_);
215         handler_ = std::make_shared<AppExecFwk::EventHandler>(runner);
216     }
217     handlerCon_.notify_one();
218     runner->Run();
219 }
220 
LooperControl()221 void OutputController::LooperControl()
222 {
223     prctl(PR_SET_NAME, LOOPER_CONTROL_THREAD_NAME.c_str());
224     while (GetControlStatus() != ControlStatus::RELEASE) {
225         std::shared_ptr<Plugin::Buffer> data = nullptr;
226         {
227             std::unique_lock<std::mutex> lock(queueMutex_);
228             controlCon_.wait(lock, [this] {
229                 return ((!dataQueue_.empty() && GetControlStatus() == ControlStatus::START)
230                     || (GetControlStatus() == ControlStatus::RELEASE));
231             });
232             if (GetControlStatus() == ControlStatus::RELEASE) {
233                 continue;
234             }
235             data = dataQueue_.front();
236         }
237         HandleControlResult(data, ControlOutput(data));
238     }
239 }
240 
ControlOutput(const std::shared_ptr<Plugin::Buffer> & data)241 int32_t OutputController::ControlOutput(const std::shared_ptr<Plugin::Buffer>& data)
242 {
243     if (data == nullptr) {
244         AVTRANS_LOGE("data is nullptr");
245         return OUTPUT_FRAME;
246     }
247     enterTime_ = GetCurrentTime();
248     int64_t timeStamp = data->pts;
249     TRUE_RETURN_V_MSG_D((timeStamp == INVALID_TIMESTAMP || !CheckIsAllowControl()), OUTPUT_FRAME,
250         "Direct output.");
251     if (CheckIsClockInvalid(data) || !CheckIsProcessInDynamicBalance(data)) {
252         CalProcessTime(data);
253         RecordTime(enterTime_, timeStamp);
254         return OUTPUT_FRAME;
255     }
256     CalProcessTime(data);
257     if (!CheckIsTimeInit()) {
258         InitTime(enterTime_, timeStamp);
259     }
260     if (!CheckIsBaselineInit()) {
261         InitBaseline(timeStamp, GetClockTime());
262     }
263     CalSleepTime(timeStamp);
264     SyncClock(data);
265     RecordTime(enterTime_, timeStamp);
266     return OUTPUT_FRAME;
267 }
268 
CheckSyncInfo(const std::shared_ptr<Plugin::Buffer> & data)269 void OutputController::CheckSyncInfo(const std::shared_ptr<Plugin::Buffer>& data)
270 {
271     if (data == nullptr || data->GetBufferMeta() == nullptr) {
272         AVTRANS_LOGE("data or getbuffermeta is nullptr");
273         return;
274     }
275     auto bufferMeta = data->GetBufferMeta();
276     bool isAFrameNumberExist = bufferMeta->IsExist(Tag::AUDIO_SAMPLE_PER_FRAME);
277     bool isAPtsExist = bufferMeta->IsExist(Tag::MEDIA_START_TIME);
278     if (GetControlMode() == ControlMode::SYNC && (!isAFrameNumberExist || !isAPtsExist)) {
279         ClearQueue(dataQueue_);
280         sleepCon_.notify_one();
281         clockCon_.notify_one();
282         SetControlMode(ControlMode::SMOOTH);
283         AVTRANS_LOGI("Stop sync and start smooth, aFrameNumberExist: %{public}d, aPtsExist: %{public}d.",
284             isAFrameNumberExist, isAPtsExist);
285         return;
286     }
287     if (GetControlMode() == ControlMode::SMOOTH && isAFrameNumberExist && isAPtsExist) {
288         ClearQueue(dataQueue_);
289         sleepCon_.notify_one();
290         clockCon_.notify_one();
291         SetControlMode(ControlMode::SYNC);
292         AVTRANS_LOGI("Stop smooth and start sync, aFrameNumberExist: %{public}d, aPtsExist: %{public}d.",
293             isAFrameNumberExist, isAPtsExist);
294     }
295 }
296 
CalProcessTime(const std::shared_ptr<Plugin::Buffer> & data)297 void OutputController::CalProcessTime(const std::shared_ptr<Plugin::Buffer>& data)
298 {
299     if (statistician_) {
300         statistician_->CalProcessTime(data);
301     }
302 }
303 
InitTimeStatistician()304 void OutputController::InitTimeStatistician()
305 {
306     if (!statistician_) {
307         AVTRANS_LOGD("Init time statistician.");
308         statistician_ = std::make_shared<TimeStatistician>();
309     }
310     statistician_->ClearStatistics();
311     AVTRANS_LOGI("Start time statistics.");
312 }
313 
RecordTime(const int64_t enterTime,const int64_t timeStamp)314 void OutputController::RecordTime(const int64_t enterTime, const int64_t timeStamp)
315 {
316     lastEnterTime_ = enterTime;
317     lastTimeStamp_ = timeStamp;
318     leaveTime_ = GetCurrentTime();
319     SetTimeInitState(true);
320 }
321 
InitTime(const int64_t enterTime,const int64_t timeStamp)322 void OutputController::InitTime(const int64_t enterTime, const int64_t timeStamp)
323 {
324     delta_ = 0;
325     sleep_ = 0;
326     lastEnterTime_ = enterTime;
327     lastTimeStamp_ = timeStamp;
328     leaveTime_ = enterTime;
329     SetTimeInitState(true);
330 }
331 
CheckIsClockInvalid(const std::shared_ptr<Plugin::Buffer> & data)332 bool OutputController::CheckIsClockInvalid(const std::shared_ptr<Plugin::Buffer>& data)
333 {
334     if (GetControlMode() == ControlMode::SMOOTH) {
335         SetClockTime(enterTime_);
336         AVTRANS_LOGD("Control mode is smooth, clock is valid.");
337         return false;
338     }
339     int32_t ret = AcquireSyncClockTime(data);
340     if (ret == ERR_DH_AVT_MASTER_NOT_READY) {
341         AVTRANS_LOGD("Master clock not ready, wait reread clock.");
342         return WaitRereadClockFailed(data);
343     }
344     TRUE_RETURN_V_MSG_E((ret != DH_AVT_SUCCESS), true, "Read unit clock is invalid.");
345     return false;
346 }
347 
AcquireSyncClockTime(const std::shared_ptr<Plugin::Buffer> & data)348 int32_t OutputController::AcquireSyncClockTime(const std::shared_ptr<Plugin::Buffer>& data)
349 {
350     if (data == nullptr || data->GetBufferMeta() == nullptr) {
351         AVTRANS_LOGE("data or getbuffermeta is nullptr");
352         return ERR_DH_AVT_INVALID_PARAM;
353     }
354     TRUE_RETURN_V_MSG_E(((sharedMem_.fd <= 0) || (sharedMem_.size <= 0) || sharedMem_.name.empty()),
355         ERR_DH_AVT_SHARED_MEMORY_FAILED, "Parameter USER_SHARED_MEMORY_FD info error.");
356     AVTransSharedMemory sharedMem;
357     sharedMem.fd = sharedMem_.fd;
358     sharedMem.size = sharedMem_.size;
359     sharedMem.name = sharedMem_.name;
360     AVSyncClockUnit clockUnit;
361     clockUnit.pts = INVALID_TIMESTAMP;
362     auto bufferMeta = data->GetBufferMeta();
363     clockUnit.frameNum = Plugin::AnyCast<uint32_t>(bufferMeta->GetMeta(Tag::AUDIO_SAMPLE_PER_FRAME));
364     int32_t ret = ReadClockUnitFromMemory(sharedMem, clockUnit);
365     if (ret == DH_AVT_SUCCESS) {
366         TRUE_RETURN_V_MSG_D((clockUnit.pts == INVALID_TIMESTAMP), ERR_DH_AVT_SHARED_MEMORY_FAILED,
367             "Read invalid clock.");
368         clockUnit_.pts = clockUnit.pts;
369         clockUnit_.frameNum = clockUnit.frameNum;
370         SetClockTime(clockUnit_.pts);
371         AVTRANS_LOGD("Acquire sync clock success, pts: %{public}lld.", clockUnit_.pts);
372     }
373     return ret;
374 }
375 
WaitRereadClockFailed(const std::shared_ptr<Plugin::Buffer> & data)376 bool OutputController::WaitRereadClockFailed(const std::shared_ptr<Plugin::Buffer>& data)
377 {
378     const uint32_t halfQueueSize = QUEUE_MAX_SIZE / 2;
379     while (GetQueueSize() < QUEUE_MAX_SIZE) {
380         if (GetQueueSize() < halfQueueSize) {
381             {
382                 AVTRANS_LOGD("Dataqueue size is less than half size, wait notify.");
383                 std::unique_lock<std::mutex> lock(clockMutex_);
384                 int64_t rereadTime = LESS_HALF_REREAD_TIME;
385                 if (statistician_) {
386                     rereadTime = statistician_->GetAverPushInterval() * FACTOR_DOUBLE;
387                 }
388                 clockCon_.wait_for(lock, std::chrono::nanoseconds(rereadTime),
389                     [this] { return (GetControlStatus() != ControlStatus::START); });
390             }
391         } else if (GetQueueSize() >= halfQueueSize) {
392             {
393                 AVTRANS_LOGD("Dataqueue size is greater than half size, scheduled %{public}lld query.",
394                     GREATER_HALF_REREAD_TIME);
395                 std::unique_lock<std::mutex> lock(clockMutex_);
396                 clockCon_.wait_for(lock, std::chrono::nanoseconds(GREATER_HALF_REREAD_TIME),
397                     [this] { return (GetControlStatus() != ControlStatus::START); });
398             }
399         }
400         int32_t ret = AcquireSyncClockTime(data);
401         TRUE_RETURN_V_MSG_D((ret == DH_AVT_SUCCESS || GetControlMode() == ControlMode::SMOOTH), false,
402             "Wait reread clock success.");
403     }
404     return true;
405 }
406 
CheckIsProcessInDynamicBalance(const std::shared_ptr<Plugin::Buffer> & data)407 bool OutputController::CheckIsProcessInDynamicBalance(const std::shared_ptr<Plugin::Buffer>& data)
408 {
409     TRUE_RETURN_V_MSG_D((GetControlMode() == ControlMode::SYNC || GetProcessDynamicBalanceState()), true,
410         "Process in dynamic balance.");
411     if (CheckIsProcessInDynamicBalanceOnce(data)) {
412         dynamicBalanceCount_++;
413     } else {
414         dynamicBalanceCount_ = 0;
415     }
416     if (dynamicBalanceCount_ >= dynamicBalanceThre_) {
417         AVTRANS_LOGD("Process meet dynamic balance condition.");
418         {
419             std::lock_guard<std::mutex> lock(queueMutex_);
420             ClearQueue(dataQueue_);
421         }
422         SetProcessDynamicBalanceState(true);
423         return true;
424     }
425     return false;
426 }
427 
CheckIsProcessInDynamicBalanceOnce(const std::shared_ptr<Plugin::Buffer> & data)428 bool OutputController::CheckIsProcessInDynamicBalanceOnce(const std::shared_ptr<Plugin::Buffer>& data)
429 {
430     TRUE_RETURN_V_MSG_E((!statistician_), false, "Statistician is nullptr.");
431     int64_t pushInterval = statistician_->GetPushInterval();
432     int64_t timeStampInterval = statistician_->GetTimeStampInterval();
433     int64_t averPushInterval = statistician_->GetAverPushInterval();
434     int64_t averTimeStamapInterval = statistician_->GetAverTimeStampInterval();
435     int64_t averIntervalDiff = averPushInterval - averTimeStamapInterval;
436     int64_t pushOnceDiff = pushInterval - averPushInterval;
437     int64_t timeStampOnceDiff = timeStampInterval - averTimeStamapInterval;
438     return (averPushInterval != 0) && (averTimeStamapInterval != 0) &&
439         (llabs(averIntervalDiff) < averIntervalDiffThre_) && (llabs(pushOnceDiff) < pushOnceDiffThre_)
440         && (llabs(timeStampOnceDiff) < timeStampOnceDiffThre_);
441 }
442 
InitBaseline(const int64_t timeStampBaseline,const int64_t clockBaseline)443 void OutputController::InitBaseline(const int64_t timeStampBaseline, const int64_t clockBaseline)
444 {
445     SetTimeStampBaseline(timeStampBaseline);
446     SetClockBaseline(clockBaseline + bufferTime_);
447     SetBaselineInitState(true);
448 }
449 
CalSleepTime(const int64_t timeStamp)450 void OutputController::CalSleepTime(const int64_t timeStamp)
451 {
452     int64_t interval = timeStamp - lastTimeStamp_;
453     int64_t elapse = enterTime_ - leaveTime_;
454     int64_t render = enterTime_ - lastEnterTime_;
455     int64_t delta = render - sleep_ - elapse;
456     delta_ += delta;
457     sleep_ = interval - elapse;
458     AVTRANS_LOGD("Control frame pts: %{public}lld, interval: %{public}lld, elapse: %{public}lld, render: %{public}lld,"
459         " delta: %{public}lld, delat count: %{public}lld, sleep: %{public}lld.",
460         timeStamp, interval, elapse, render, delta, delta_, sleep_);
461     TRUE_RETURN((interval == INVALID_INTERVAL), "Interverl is Invalid.");
462     const int64_t adjustThre = interval * adjustSleepFactor_;
463     if (delta_ > adjustThre && sleep_ > 0) {
464         int64_t sleep = sleep_ - adjustThre;
465         delta_ -= (sleep < 0) ? sleep_ : adjustThre;
466         sleep_ = sleep;
467         AVTRANS_LOGD("Delta greater than thre, adjust sleep to %{public}lld.", sleep_);
468     } else if (delta_ < -adjustThre) {
469         sleep_ += delta_;
470         delta_ = 0;
471         AVTRANS_LOGD("Delta less than negative thre, adjust sleep to %{public}lld.", sleep_);
472     }
473 }
474 
SyncClock(const std::shared_ptr<Plugin::Buffer> & data)475 void OutputController::SyncClock(const std::shared_ptr<Plugin::Buffer>& data)
476 {
477     if (GetControlMode() == ControlMode::SMOOTH) {
478         HandleSmoothTime(data);
479     } else {
480         HandleSyncTime(data);
481     }
482     if (sleep_ > sleepThre_) {
483         sleep_ = sleepThre_;
484         AVTRANS_LOGD("Sleep is more than sleepThre %{public}lld, adjust sleep to %{public}lld", sleepThre_, sleep_);
485     }
486     if (sleep_ < 0) {
487         sleep_ = 0;
488         AVTRANS_LOGD("Sleep less than zero, adjust sleep to zero.");
489     }
490     AVTRANS_LOGD("After sync clock, sleep is %{public}lld.", sleep_);
491     {
492         std::unique_lock<std::mutex> lock(sleepMutex_);
493         sleepCon_.wait_for(lock, std::chrono::nanoseconds(sleep_),
494             [this] { return (GetControlStatus() != ControlStatus::START); });
495     }
496 }
497 
HandleSmoothTime(const std::shared_ptr<Plugin::Buffer> & data)498 void OutputController::HandleSmoothTime(const std::shared_ptr<Plugin::Buffer>& data)
499 {
500     TRUE_RETURN(!statistician_, "Statistician is nullptr.");
501     int64_t interval = statistician_->GetTimeStampInterval();
502     TRUE_RETURN((interval == INVALID_INTERVAL), "Interval is Invalid.");
503     int64_t averTimeStampInterval = statistician_->GetAverTimeStampInterval();
504     TRUE_RETURN((averTimeStampInterval == INVALID_INTERVAL), "AverTimeStampInterval is Invalid.");
505     int64_t waitClockThre = averTimeStampInterval * waitClockFactor_;
506     int64_t trackClockThre = averTimeStampInterval * trackClockFactor_;
507     int64_t vTimeStamp = data->pts;
508     int64_t vcts = (sleep_ > 0) ? (vTimeStamp - sleep_) : vTimeStamp;
509     int64_t offset = vcts - timeStampBaseline_ - (GetClockTime() - clockBaseline_);
510     AVTRANS_LOGD("Smooth vTimeStamp: %{public}lld, offset: %{public}lld, averTimeStampInterval: %{public}lld,"
511         " waitClockThre: %{public}lld, trackClockThre: %{public}lld.",
512         vTimeStamp, offset, averTimeStampInterval, waitClockThre, trackClockThre);
513     if (offset > waitClockThre || offset < -trackClockThre) {
514         sleep_ += offset;
515         AVTRANS_LOGD("Smooth offset %{public}lld is over than thre, adjust sleep to %{public}lld.",
516             offset, sleep_);
517     }
518 }
519 
HandleSyncTime(const std::shared_ptr<Plugin::Buffer> & data)520 void OutputController::HandleSyncTime(const std::shared_ptr<Plugin::Buffer>& data)
521 {
522     if (data == nullptr || data->GetBufferMeta() == nullptr) {
523         AVTRANS_LOGE("data or getbuffermeta is nullptr");
524         return;
525     }
526     auto bufferMeta = data->GetBufferMeta();
527     int64_t vTimeStamp = data->pts;
528     uint32_t vFrameNumber = Plugin::AnyCast<uint32_t>(bufferMeta->GetMeta(Tag::USER_FRAME_NUMBER));
529     int64_t vcts = (sleep_ > 0) ? (vTimeStamp - sleep_) : vTimeStamp;
530     int64_t aTimeStamp = clockUnit_.pts;
531     uint32_t aFrameNumber = clockUnit_.frameNum;
532     int64_t acts = GetClockTime();
533     int64_t ctsDiff = vcts - acts;
534     int64_t offset = (vcts - vFront_ - vBack_) - (acts - aFront_ - aBack_) - GetDevClockDiff();
535     AVTRANS_LOGD("Sync vTimeStamp: %{public}lld, vFrameNumber: %{public}" PRIu32 " vcts: %{public}lld,"
536         " aTimeStamp: %{public}lld, aFrameNumber: %{public}" PRIu32 " acts: %{public}lld, ctsDiff: %{public}lld,"
537         " offset: %{public}lld", vTimeStamp, vFrameNumber, vcts, aTimeStamp, aFrameNumber, acts, ctsDiff, offset);
538     const int64_t append = (trackClockThre_ + waitClockThre_) / 2;
539     if (offset > waitClockThre_) {
540         sleep_ += offset - waitClockThre_ + append;
541         AVTRANS_LOGD("Sync offset %{public}lld is over than wait thre %{public}lld, adjust sleep to %{public}lld.",
542             offset, waitClockThre_, sleep_);
543     } else if (offset < -trackClockThre_) {
544         sleep_ += offset - trackClockThre_ - append;
545         AVTRANS_LOGD("Sync offset %{public}lld is over than track thre %{public}lld, adjust sleep to %{public}lld.",
546             offset, -trackClockThre_, sleep_);
547     }
548 }
549 
HandleControlResult(const std::shared_ptr<Plugin::Buffer> & data,int32_t result)550 void OutputController::HandleControlResult(const std::shared_ptr<Plugin::Buffer>& data, int32_t result)
551 {
552     switch (result) {
553         case OUTPUT_FRAME: {
554             int32_t ret = PostOutputEvent(data);
555             TRUE_RETURN((ret == HANDLE_FAILED), "Handle result OUTPUT_FRAME failed.");
556             {
557                 std::lock_guard<std::mutex> lock(queueMutex_);
558                 if (!dataQueue_.empty() && data == dataQueue_.front()) {
559                     dataQueue_.pop();
560                 }
561             }
562             break;
563         }
564         case DROP_FRAME: {
565             {
566                 std::lock_guard<std::mutex> lock(queueMutex_);
567                 if (!dataQueue_.empty() && data == dataQueue_.front()) {
568                     dataQueue_.pop();
569                 }
570             }
571             break;
572         }
573         case REPEAT_FREAM: {
574             int32_t ret = PostOutputEvent(data);
575             TRUE_RETURN((ret == HANDLE_FAILED), "Handle result REPEAT_FREAM failed.");
576             break;
577         }
578         default:
579             AVTRANS_LOGE("Invalid result.");
580     }
581 }
582 
PostOutputEvent(const std::shared_ptr<Plugin::Buffer> & data)583 int32_t OutputController::PostOutputEvent(const std::shared_ptr<Plugin::Buffer>& data)
584 {
585     TRUE_RETURN_V_MSG_E((!handler_), HANDLE_FAILED, "Handler is nullptr.");
586     auto outputFunc = [this, data]() {
587         int32_t ret = NotifyOutput(data);
588         TRUE_RETURN_V_MSG_E((ret == NOTIFY_FAILED), HANDLE_FAILED, "Notify failed.");
589         return HANDLE_SUCCESS;
590     };
591     TRUE_RETURN_V_MSG_E((!handler_->PostTask(outputFunc)), HANDLE_FAILED, "Handler post task failed.");
592     return HANDLE_SUCCESS;
593 }
594 
NotifyOutput(const std::shared_ptr<Plugin::Buffer> & data)595 int32_t OutputController::NotifyOutput(const std::shared_ptr<Plugin::Buffer>& data)
596 {
597     TRUE_RETURN_V_MSG_E((!listener_), NOTIFY_FAILED, "Listener is nullptr.");
598     return listener_->OnOutput(data);
599 }
600 
CheckIsBaselineInit()601 bool OutputController::CheckIsBaselineInit()
602 {
603     return GetBaselineInitState();
604 }
605 
CheckIsTimeInit()606 bool OutputController::CheckIsTimeInit()
607 {
608     return GetTimeInitState();
609 }
610 
CheckIsAllowControl()611 bool OutputController::CheckIsAllowControl()
612 {
613     return GetAllowControlState();
614 }
615 
RegisterListener(const std::shared_ptr<OutputControllerListener> & listener)616 void OutputController::RegisterListener(const std::shared_ptr<OutputControllerListener>& listener)
617 {
618     listener_ = listener;
619 }
620 
UnregisterListener()621 void OutputController::UnregisterListener()
622 {
623     listener_ = nullptr;
624 }
625 
SetBufferTime(const uint32_t time)626 void OutputController::SetBufferTime(const uint32_t time)
627 {
628     bufferTime_ = time;
629 }
630 
GetBufferTime()631 uint32_t OutputController::GetBufferTime()
632 {
633     return bufferTime_;
634 }
635 
SetTimeInitState(const bool state)636 void OutputController::SetTimeInitState(const bool state)
637 {
638     isTimeInit_.store(state);
639 }
640 
GetTimeInitState()641 bool OutputController::GetTimeInitState()
642 {
643     return isTimeInit_.load();
644 }
645 
SetBaselineInitState(const bool state)646 void OutputController::SetBaselineInitState(const bool state)
647 {
648     isBaselineInit_.store(state);
649 }
650 
GetBaselineInitState()651 bool OutputController::GetBaselineInitState()
652 {
653     return isBaselineInit_.load();
654 }
655 
SetProcessDynamicBalanceState(const bool state)656 void OutputController::SetProcessDynamicBalanceState(const bool state)
657 {
658     isInDynamicBalance_.store(state);
659 }
660 
GetProcessDynamicBalanceState()661 bool OutputController::GetProcessDynamicBalanceState()
662 {
663     return isInDynamicBalance_.load();
664 }
665 
SetAllowControlState(const bool state)666 void OutputController::SetAllowControlState(const bool state)
667 {
668     isAllowControl_.store(state);
669 }
670 
GetAllowControlState()671 bool OutputController::GetAllowControlState()
672 {
673     return isAllowControl_.load();
674 }
675 
SetClockBaseline(const int64_t clockBaseline)676 void OutputController::SetClockBaseline(const int64_t clockBaseline)
677 {
678     clockBaseline_ = clockBaseline;
679 }
680 
SetTimeStampBaseline(const int64_t timeStmapBaseline)681 void OutputController::SetTimeStampBaseline(const int64_t timeStmapBaseline)
682 {
683     timeStampBaseline_ = timeStmapBaseline;
684 }
685 
SetAverIntervalDiffThre(const uint32_t thre)686 void OutputController::SetAverIntervalDiffThre(const uint32_t thre)
687 {
688     averIntervalDiffThre_ = thre;
689 }
690 
SetDynamicBalanceThre(const uint8_t thre)691 void OutputController::SetDynamicBalanceThre(const uint8_t thre)
692 {
693     dynamicBalanceThre_ = thre;
694 }
695 
SetPushOnceDiffThre(const uint32_t thre)696 void OutputController::SetPushOnceDiffThre(const uint32_t thre)
697 {
698     pushOnceDiffThre_ = thre;
699 }
700 
SetTimeStampOnceDiffThre(const uint32_t thre)701 void OutputController::SetTimeStampOnceDiffThre(const uint32_t thre)
702 {
703     timeStampOnceDiffThre_ = thre;
704 }
705 
SetAdjustSleepFactor(const float factor)706 void OutputController::SetAdjustSleepFactor(const float factor)
707 {
708     adjustSleepFactor_ = factor;
709 }
710 
SetWaitClockFactor(const float factor)711 void OutputController::SetWaitClockFactor(const float factor)
712 {
713     waitClockFactor_ = factor;
714 }
715 
SetTrackClockFactor(const float factor)716 void OutputController::SetTrackClockFactor(const float factor)
717 {
718     trackClockFactor_ = factor;
719 }
720 
SetWaitClockThre(const int64_t thre)721 void OutputController::SetWaitClockThre(const int64_t thre)
722 {
723     waitClockThre_ = thre;
724 }
725 
SetTrackClockThre(const int64_t thre)726 void OutputController::SetTrackClockThre(const int64_t thre)
727 {
728     trackClockThre_ = thre;
729 }
730 
SetSleepThre(const int64_t thre)731 void OutputController::SetSleepThre(const int64_t thre)
732 {
733     sleepThre_ = thre;
734 }
735 
SetVideoFrontTime(const int64_t time)736 void OutputController::SetVideoFrontTime(const int64_t time)
737 {
738     vFront_ = time;
739 }
740 
SetVideoBackTime(const int64_t time)741 void OutputController::SetVideoBackTime(const int64_t time)
742 {
743     vBack_ = time;
744 }
745 
SetAudioFrontTime(const int64_t time)746 void OutputController::SetAudioFrontTime(const int64_t time)
747 {
748     aFront_ = time;
749 }
750 
SetAudioBackTime(const int64_t time)751 void OutputController::SetAudioBackTime(const int64_t time)
752 {
753     aBack_ = time;
754 }
755 
SetClockTime(const int64_t clockTime)756 void OutputController::SetClockTime(const int64_t clockTime)
757 {
758     clockTime_.store(clockTime);
759 }
760 
GetClockTime()761 int64_t OutputController::GetClockTime()
762 {
763     return clockTime_.load();
764 }
765 
SetDevClockDiff(int32_t diff)766 void OutputController::SetDevClockDiff(int32_t diff)
767 {
768     devClockDiff_.store(diff);
769 }
770 
GetDevClockDiff()771 int32_t OutputController::GetDevClockDiff()
772 {
773     return devClockDiff_.load();
774 }
775 
GetControlMode()776 OutputController::ControlMode OutputController::GetControlMode()
777 {
778     return mode_.load();
779 }
780 
GetControlStatus()781 OutputController::ControlStatus OutputController::GetControlStatus()
782 {
783     return status_.load();
784 }
785 
SetControlMode(ControlMode mode)786 void OutputController::SetControlMode(ControlMode mode)
787 {
788     mode_.store(mode);
789     PrepareControl();
790 }
791 
SetControlStatus(ControlStatus status)792 void OutputController::SetControlStatus(ControlStatus status)
793 {
794     status_.store(status);
795 }
796 
GetQueueSize()797 size_t OutputController::GetQueueSize()
798 {
799     {
800         std::lock_guard<std::mutex> lock(queueMutex_);
801         return dataQueue_.size();
802     }
803 }
804 
ClearQueue(std::queue<std::shared_ptr<Plugin::Buffer>> & queue)805 void OutputController::ClearQueue(std::queue<std::shared_ptr<Plugin::Buffer>>& queue)
806 {
807     std::queue<std::shared_ptr<Plugin::Buffer>> empty;
808     swap(empty, queue);
809 }
810 } // namespace DistributedHardware
811 } // namespace OHOS