1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "core/version_ctx.h"
17 #include "core/entity.h"
18 #include "util/slab.h"
19 #include "ffrt_trace.h"
20 
21 namespace ffrt {
BuildConsumeRelationship(VersionCtx * version,SCPUEUTask * consumer)22 static inline void BuildConsumeRelationship(VersionCtx* version, SCPUEUTask* consumer)
23 {
24     if (version->status == DataStatus::IDLE) {
25         consumer->IncDepRef();
26     }
27     version->consumers.insert(consumer);
28     if (version->status == DataStatus::CONSUMED) {
29         version->status = DataStatus::READY;
30     }
31 }
32 
BuildProducerProducerRelationship(VersionCtx * preVersion,SCPUEUTask * nextProducer)33 static inline void BuildProducerProducerRelationship(VersionCtx* preVersion, SCPUEUTask* nextProducer)
34 {
35     if (preVersion->status != DataStatus::CONSUMED) {
36         preVersion->nextProducer = nextProducer;
37         nextProducer->IncDepRef();
38     }
39 }
40 
AddConsumer(SCPUEUTask * consumer,NestType nestType)41 void VersionCtx::AddConsumer(SCPUEUTask* consumer, NestType nestType)
42 {
43     FFRT_TRACE_SCOPE(2, AddConsumer);
44     // Parent's VersionCtx
45     VersionCtx* beConsumeVersion = this;
46     if (nestType == NestType::PARENTOUT || nestType == NestType::DEFAULT) {
47         // Create READY version when last is nullptr
48         if (last == nullptr) {
49             CreateChildVersion(consumer, DataStatus::READY);
50         }
51         beConsumeVersion = last;
52     }
53     BuildConsumeRelationship(beConsumeVersion, consumer);
54     consumer->ins.insert(beConsumeVersion);
55 }
56 
AddProducer(SCPUEUTask * producer)57 void VersionCtx::AddProducer(SCPUEUTask* producer)
58 {
59     FFRT_TRACE_SCOPE(2, AddAddProducer);
60     // Parent's VersionCtx
61     auto parentVersion = this;
62     if (parentVersion->last != nullptr) {
63         VersionCtx* preVersion = parentVersion->last;
64         BuildProducerProducerRelationship(preVersion, producer);
65     }
66     parentVersion->CreateChildVersion(producer, DataStatus::IDLE);
67     producer->outs.insert(parentVersion->last);
68     parentVersion->last->myProducer = producer;
69 }
70 
onProduced()71 void VersionCtx::onProduced()
72 {
73     /* No merge operation, merge operation can only occur when the parent version is produced and
74      * the last child version is not in the CONSUMED state
75      */
76     if (last == nullptr || last->status == DataStatus::CONSUMED) {
77         // No consumers, directly into CONSUMED after being produced
78         if (consumers.empty()) {
79             status = DataStatus::CONSUMED;
80             NotifyNextProducer();
81             Entity::Instance()->versionTrashcan.push_back(this);
82         } else { // if have consumers,notify them
83             status = DataStatus::READY;
84             NotifyConsumers();
85         }
86         NotifyDataWaitTask();
87     } else { // Merge previous VersionCtx
88         MergeChildVersion();
89     }
90 }
91 
onConsumed(SCPUEUTask * consumer)92 void VersionCtx::onConsumed(SCPUEUTask* consumer)
93 {
94     auto it = std::as_const(consumers).find(consumer);
95     if (it != consumers.end()) {
96         consumers.erase(it);
97     }
98     if (consumers.empty()) {
99         status = DataStatus::CONSUMED;
100         NotifyNextProducer();
101         Entity::Instance()->versionTrashcan.push_back(this);
102     }
103 }
104 
CreateChildVersion(SCPUEUTask * task,DataStatus dataStatus)105 void VersionCtx::CreateChildVersion(SCPUEUTask* task __attribute__((unused)), DataStatus dataStatus)
106 {
107     // Add VersionCtx
108     auto prev = last;
109     last = new (SimpleAllocator<VersionCtx>::AllocMem()) VersionCtx(this->signature, this, prev);
110     last->status = dataStatus;
111     if (prev != nullptr) {
112         prev->next = last;
113     }
114 }
115 
MergeChildVersion()116 void VersionCtx::MergeChildVersion()
117 {
118     // Merge VersionCtx
119     auto versionToMerge = last;
120     status = versionToMerge->status;
121     if (status == DataStatus::READY) {
122         NotifyConsumers();
123         NotifyDataWaitTask();
124     }
125     MergeConsumerInDep(versionToMerge);
126     if (status == DataStatus::IDLE) {
127         consumers.insert(versionToMerge->consumers.cbegin(), versionToMerge->consumers.cend());
128         MergeProducerOutDep(versionToMerge);
129         myProducer = versionToMerge->myProducer;
130     }
131     Entity::Instance()->versionTrashcan.push_back(versionToMerge);
132 }
133 } /* namespace ffrt */