1 /*
2  * Copyright (c) 2023-2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "video_job_repository.h"
17 
18 namespace OHOS {
19 namespace CameraStandard {
20 namespace DeferredProcessing {
VideoJobRepository(const int32_t userId)21 VideoJobRepository::VideoJobRepository(const int32_t userId) : userId_(userId)
22 {
23     DP_DEBUG_LOG("entered, userid: %{public}d", userId_);
24     jobQueue_ = std::make_shared<VideoJobQueue>([] (DeferredVideoJobPtr a, DeferredVideoJobPtr b) {return *a > *b;});
25 }
26 
~VideoJobRepository()27 VideoJobRepository::~VideoJobRepository()
28 {
29     DP_DEBUG_LOG("entered, userid: %{public}d", userId_);
30     ClearCatch();
31 }
32 
AddVideoJob(const std::string & videoId,const sptr<IPCFileDescriptor> & srcFd,const sptr<IPCFileDescriptor> & dstFd)33 void VideoJobRepository::AddVideoJob(const std::string& videoId,
34     const sptr<IPCFileDescriptor>& srcFd, const sptr<IPCFileDescriptor>& dstFd)
35 {
36     std::lock_guard<std::recursive_mutex> lock(mutex_);
37     DeferredVideoJobPtr jobPtrFind = GetJobUnLocked(videoId);
38     DP_CHECK_RETURN_LOG(jobPtrFind != nullptr, "already existed, videoId: %{public}s", videoId.c_str());
39 
40     DeferredVideoJobPtr jobPtr = std::make_shared<DeferredVideoJob>(videoId, srcFd, dstFd);
41     jobPtr->SetJobStatus(VideoJobStatus::PENDING);
42     jobMap_.emplace(videoId, jobPtr);
43     jobQueue_->Push(jobPtr);
44     DP_INFO_LOG("add video job size: %{public}d, videoId: %{public}s, srcFd: %{public}d",
45         static_cast<int>(jobQueue_->GetSize()), videoId.c_str(), srcFd->GetFd());
46 }
47 
RemoveVideoJob(const std::string & videoId,bool restorable)48 bool VideoJobRepository::RemoveVideoJob(const std::string& videoId, bool restorable)
49 {
50     DP_INFO_LOG("entered, videoId: %{public}s, restorable: %{public}d", videoId.c_str(), restorable);
51     std::lock_guard<std::recursive_mutex> lock(mutex_);
52     DeferredVideoJobPtr jobPtrFind = GetJobUnLocked(videoId);
53     bool isNeedStop = false;
54     DP_CHECK_RETURN_RET_LOG(jobPtrFind == nullptr, isNeedStop,
55         "does not existed, videoId: %{public}s", videoId.c_str());
56 
57     isNeedStop = jobPtrFind->GetCurStatus() == VideoJobStatus::RUNNING;
58     if (!restorable) {
59         DP_INFO_LOG("remove video job size: %{public}d, videoId: %{public}s",
60             static_cast<int>(jobQueue_->GetSize()), videoId.c_str());
61         jobMap_.erase(videoId);
62         jobQueue_->Remove(jobPtrFind);
63     }
64     bool statusChanged = jobPtrFind->SetJobStatus(VideoJobStatus::DELETED);
65     UpdateRunningCountUnLocked(statusChanged, jobPtrFind);
66     return isNeedStop;
67 }
68 
RestoreVideoJob(const std::string & videoId)69 void VideoJobRepository::RestoreVideoJob(const std::string& videoId)
70 {
71     DP_INFO_LOG("entered, videoId: %{public}s", videoId.c_str());
72     std::lock_guard<std::recursive_mutex> lock(mutex_);
73     DeferredVideoJobPtr jobPtrFind = GetJobUnLocked(videoId);
74     DP_CHECK_RETURN_LOG(jobPtrFind == nullptr, "does not existed, videoId: %{public}s", videoId.c_str());
75 
76     bool statusChanged = jobPtrFind->SetJobStatus(VideoJobStatus::PENDING);
77     DP_CHECK_EXECUTE(statusChanged, jobQueue_->Update(jobPtrFind));
78 }
79 
SetJobPending(const std::string & videoId)80 void VideoJobRepository::SetJobPending(const std::string& videoId)
81 {
82     DP_INFO_LOG("entered, videoId: %{public}s", videoId.c_str());
83     std::lock_guard<std::recursive_mutex> lock(mutex_);
84     DeferredVideoJobPtr jobPtrFind = GetJobUnLocked(videoId);
85     DP_CHECK_RETURN_LOG(jobPtrFind == nullptr, "does not existed, videoId: %{public}s", videoId.c_str());
86 
87     bool statusChanged = jobPtrFind->SetJobStatus(VideoJobStatus::PENDING);
88     UpdateRunningCountUnLocked(statusChanged, jobPtrFind);
89     NotifyJobChangedUnLocked(statusChanged, jobPtrFind);
90 }
91 
SetJobRunning(const std::string & videoId)92 void VideoJobRepository::SetJobRunning(const std::string& videoId)
93 {
94     DP_INFO_LOG("entered, videoId: %{public}s", videoId.c_str());
95     std::lock_guard<std::recursive_mutex> lock(mutex_);
96     DeferredVideoJobPtr jobPtrFind = GetJobUnLocked(videoId);
97     DP_CHECK_RETURN_LOG(jobPtrFind == nullptr, "does not existed, videoId: %{public}s", videoId.c_str());
98 
99     bool statusChanged = jobPtrFind->SetJobStatus(VideoJobStatus::RUNNING);
100     UpdateRunningCountUnLocked(statusChanged, jobPtrFind);
101 }
102 
SetJobCompleted(const std::string & videoId)103 void VideoJobRepository::SetJobCompleted(const std::string& videoId)
104 {
105     DP_INFO_LOG("entered, videoId: %{public}s", videoId.c_str());
106     std::lock_guard<std::recursive_mutex> lock(mutex_);
107     DeferredVideoJobPtr jobPtrFind = GetJobUnLocked(videoId);
108     DP_CHECK_RETURN_LOG(jobPtrFind == nullptr, "does not existed, videoId: %{public}s", videoId.c_str());
109 
110     bool statusChanged = jobPtrFind->SetJobStatus(VideoJobStatus::COMPLETED);
111     UpdateRunningCountUnLocked(statusChanged, jobPtrFind);
112     NotifyJobChangedUnLocked(statusChanged, jobPtrFind);
113 }
114 
SetJobFailed(const std::string & videoId)115 void VideoJobRepository::SetJobFailed(const std::string& videoId)
116 {
117     DP_INFO_LOG("entered, videoId: %{public}s", videoId.c_str());
118     std::lock_guard<std::recursive_mutex> lock(mutex_);
119     DeferredVideoJobPtr jobPtrFind = GetJobUnLocked(videoId);
120     DP_CHECK_RETURN_LOG(jobPtrFind == nullptr, "does not existed, videoId: %{public}s", videoId.c_str());
121 
122     bool statusChanged = jobPtrFind->SetJobStatus(VideoJobStatus::FAILED);
123     UpdateRunningCountUnLocked(statusChanged, jobPtrFind);
124     NotifyJobChangedUnLocked(statusChanged, jobPtrFind);
125 }
126 
SetJobPause(const std::string & videoId)127 void VideoJobRepository::SetJobPause(const std::string& videoId)
128 {
129     DP_INFO_LOG("entered, videoId: %{public}s", videoId.c_str());
130     std::lock_guard<std::recursive_mutex> lock(mutex_);
131     DeferredVideoJobPtr jobPtrFind = GetJobUnLocked(videoId);
132     DP_CHECK_RETURN_LOG(jobPtrFind == nullptr, "does not existed, videoId: %{public}s", videoId.c_str());
133 
134     bool statusChanged = jobPtrFind->SetJobStatus(VideoJobStatus::PAUSE);
135     UpdateRunningCountUnLocked(statusChanged, jobPtrFind);
136 }
137 
SetJobError(const std::string & videoId)138 void VideoJobRepository::SetJobError(const std::string& videoId)
139 {
140     DP_INFO_LOG("entered, videoId: %{public}s", videoId.c_str());
141     std::lock_guard<std::recursive_mutex> lock(mutex_);
142     DeferredVideoJobPtr jobPtrFind = GetJobUnLocked(videoId);
143     DP_CHECK_RETURN_LOG(jobPtrFind == nullptr, "does not existed, videoId: %{public}s", videoId.c_str());
144 
145     bool statusChanged = jobPtrFind->SetJobStatus(VideoJobStatus::ERROR);
146     UpdateRunningCountUnLocked(statusChanged, jobPtrFind);
147 }
148 
GetJob()149 DeferredVideoJobPtr VideoJobRepository::GetJob()
150 {
151     DP_INFO_LOG("entered, video job size: %{public}d, running num: %{public}d",
152         jobQueue_->GetSize(), static_cast<int32_t>(runningSet_.size()));
153     std::lock_guard<std::recursive_mutex> lock(mutex_);
154     auto jobPtr = jobQueue_->Peek();
155     DP_CHECK_RETURN_RET(jobPtr == nullptr || jobPtr->GetCurStatus() == VideoJobStatus::COMPLETED ||
156         jobPtr->GetCurStatus() == VideoJobStatus::ERROR, nullptr);
157 
158     if (jobPtr->GetCurStatus() == VideoJobStatus::FAILED) {
159         jobPtr->SetJobStatus(VideoJobStatus::PENDING);
160         jobQueue_->Update(jobPtr);
161     }
162     return jobPtr;
163 }
164 
165 
GetRunningJobCounts()166 int32_t VideoJobRepository::GetRunningJobCounts()
167 {
168     std::lock_guard<std::recursive_mutex> lock(mutex_);
169     DP_DEBUG_LOG("video running jobs num: %{public}d", static_cast<int32_t>(runningSet_.size()));
170     return static_cast<int32_t>(runningSet_.size());
171 }
172 
GetRunningJobList(std::vector<std::string> & list)173 void VideoJobRepository::GetRunningJobList(std::vector<std::string>& list)
174 {
175     std::lock_guard<std::recursive_mutex> lock(mutex_);
176     DP_DEBUG_LOG("video running jobs num: %{public}d", static_cast<int32_t>(runningSet_.size()));
177     list.clear();
178     list.reserve(runningSet_.size());
179     for (auto& item : runningSet_) {
180         list.emplace_back(item);
181     }
182 }
183 
RegisterJobListener(const std::weak_ptr<IVideoJobRepositoryListener> & listener)184 void VideoJobRepository::RegisterJobListener(const std::weak_ptr<IVideoJobRepositoryListener>& listener)
185 {
186     DP_INFO_LOG("entered");
187     jobListener_ = listener;
188 }
189 
GetJobUnLocked(const std::string & videoId)190 DeferredVideoJobPtr VideoJobRepository::GetJobUnLocked(const std::string& videoId)
191 {
192     DeferredVideoJobPtr jobPtr = nullptr;
193     if (jobMap_.count(videoId) == 1) {
194         DP_DEBUG_LOG("video job, videoId: %{public}s", videoId.c_str());
195         jobPtr = jobMap_.find(videoId)->second;
196     }
197     return jobPtr;
198 }
199 
NotifyJobChangedUnLocked(bool statusChanged,DeferredVideoJobPtr jobPtr)200 void VideoJobRepository::NotifyJobChangedUnLocked(bool statusChanged, DeferredVideoJobPtr jobPtr)
201 {
202     DP_DEBUG_LOG("entered, statusChanged: %{public}d, videoId: %{public}s",
203         statusChanged, jobPtr->GetVideoId().c_str());
204     if (auto listenerSptr = jobListener_.lock()) {
205         listenerSptr->OnVideoJobChanged(jobPtr);
206     }
207 }
208 
UpdateRunningCountUnLocked(bool statusChanged,const DeferredVideoJobPtr & jobPtr)209 void VideoJobRepository::UpdateRunningCountUnLocked(bool statusChanged, const DeferredVideoJobPtr& jobPtr)
210 {
211     DP_CHECK_EXECUTE(statusChanged, jobQueue_->Update(jobPtr));
212 
213     if (statusChanged && (jobPtr->GetPreStatus() == VideoJobStatus::RUNNING)) {
214         runningSet_.erase(jobPtr->GetVideoId());
215     }
216     if (statusChanged && (jobPtr->GetCurStatus() == VideoJobStatus::RUNNING)) {
217         runningSet_.emplace(jobPtr->GetVideoId());
218     }
219     DP_INFO_LOG("video running jobs num: %{public}d, videoId: %{public}s",
220         static_cast<int32_t>(runningSet_.size()), jobPtr->GetVideoId().c_str());
221 }
222 
ClearCatch()223 void VideoJobRepository::ClearCatch()
224 {
225     jobQueue_->Clear();
226     jobMap_.clear();
227     runningSet_.clear();
228 }
229 } // namespace DeferredProcessing
230 } // namespace CameraStandard
231 } // namespace OHOS