/*
* Copyright (c) 2024 Huawei Device Co., Ltd.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "future.h"
#include
META_BEGIN_NAMESPACE()
Future::StateType Future::GetState() const
{
std::unique_lock lock { mutex_ };
return state_;
}
Future::StateType Future::Wait() const
{
std::unique_lock lock { mutex_ };
while (state_ == IFuture::WAITING) {
cond_.wait(lock);
}
return state_;
}
Future::StateType Future::WaitFor(const TimeSpan& time) const
{
std::unique_lock lock { mutex_ };
while (state_ == IFuture::WAITING) {
if (cond_.wait_for(lock, std::chrono::microseconds(time.ToMicroseconds())) == std::cv_status::timeout) {
return IFuture::WAITING;
}
}
return state_;
}
IAny::Ptr Future::GetResult() const
{
std::unique_lock lock { mutex_ };
while (state_ == IFuture::WAITING) {
cond_.wait(lock);
}
return result_;
}
IFuture::Ptr Future::Then(const IFutureContinuation::Ptr& func, const ITaskQueue::Ptr& queue)
{
std::unique_lock lock { mutex_ };
IFuture::Ptr result;
if (state_ == IFuture::ABANDONED) {
BASE_NS::shared_ptr f(new Future);
f->SetAbandoned();
result = BASE_NS::move(f);
} else {
ContinuationData d { queue == nullptr, queue };
d.continuation.reset(new ContinuationQueueTask(func));
result = d.continuation->GetFuture();
continuations_.push_back(BASE_NS::move(d));
if (state_ == IFuture::COMPLETED) {
ActivateContinuation(lock);
}
}
return result;
}
void Future::Cancel()
{
bool notify = false;
ITaskQueue::Token token {};
{
std::unique_lock lock { mutex_ };
if (state_ != IFuture::COMPLETED) {
notify = true;
token = token_;
token_ = {};
for (auto&& v : continuations_) {
v.continuation->SetAbandoned();
}
continuations_.clear();
state_ = IFuture::ABANDONED;
}
}
if (token) {
if (auto q = queue_.lock()) {
q->CancelTask(token);
}
}
if (notify) {
cond_.notify_all();
}
}
void Future::ActivateContinuation(const ContinuationData& d, const IAny::Ptr& result)
{
if (auto q = d.queue.lock()) {
d.continuation->SetParam(result);
auto token = q->AddTask(d.continuation);
d.continuation->SetQueueInfo(q, token);
} else if (d.runInline) {
d.continuation->SetParam(result);
d.continuation->Invoke();
} else {
d.continuation->SetAbandoned();
}
}
void Future::ActivateContinuation(std::unique_lock& lock)
{
BASE_NS::vector cdata = BASE_NS::move(continuations_);
auto result = result_;
lock.unlock();
for (auto&& v : cdata) {
ActivateContinuation(v, result);
}
}
void Future::SetResult(IAny::Ptr p)
{
std::unique_lock lock { mutex_ };
token_ = {};
if (state_ == IFuture::WAITING) {
result_ = BASE_NS::move(p);
state_ = IFuture::COMPLETED;
ActivateContinuation(lock);
cond_.notify_all();
}
}
void Future::SetAbandoned()
{
std::unique_lock lock { mutex_ };
if (state_ == IFuture::WAITING) {
state_ = IFuture::ABANDONED;
token_ = {};
for (auto&& v : continuations_) {
v.continuation->SetAbandoned();
}
continuations_.clear();
cond_.notify_all();
}
}
void Future::SetQueueInfo(const ITaskQueue::Ptr& queue, ITaskQueue::Token token)
{
std::unique_lock lock { mutex_ };
if (state_ == IFuture::WAITING) {
queue_ = queue;
token_ = token;
}
}
Promise::~Promise()
{
SetAbandoned();
}
void Promise::Set(const IAny::Ptr& res)
{
if (future_) {
future_->SetResult(res);
future_ = nullptr;
}
}
void Promise::SetAbandoned()
{
if (future_) {
future_->SetAbandoned();
future_ = nullptr;
}
}
IFuture::Ptr Promise::GetFuture()
{
if (!future_) {
future_.reset(new Future);
}
return future_;
}
void Promise::SetQueueInfo(const ITaskQueue::Ptr& queue, ITaskQueue::Token token)
{
if (future_) {
future_->SetQueueInfo(queue, token);
}
}
namespace Internal {
IObjectFactory::Ptr GetPromiseFactory()
{
return Promise::GetFactory();
}
} // namespace Internal
META_END_NAMESPACE()