1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "generic_virtual_device.h"
16 
17 #include "kv_store_errno.h"
18 #include "multi_ver_sync_task_context.h"
19 #include "single_ver_kv_sync_task_context.h"
20 #include "single_ver_relational_sync_task_context.h"
21 
22 namespace DistributedDB {
GenericVirtualDevice(std::string deviceId)23 GenericVirtualDevice::GenericVirtualDevice(std::string deviceId)
24     : communicateHandle_(nullptr),
25       communicatorAggregator_(nullptr),
26       storage_(nullptr),
27       metadata_(nullptr),
28       deviceId_(std::move(deviceId)),
29       remoteDeviceId_("real_device"),
30       context_(nullptr),
31       onRemoteDataChanged_(nullptr),
32       subManager_(nullptr),
33       executor_(nullptr)
34 {
35 }
36 
~GenericVirtualDevice()37 GenericVirtualDevice::~GenericVirtualDevice()
38 {
39     std::mutex cvMutex;
40     std::condition_variable cv;
41     bool finished = false;
42     Offline();
43 
44     if (communicateHandle_ != nullptr) {
45         communicateHandle_->RegOnMessageCallback(nullptr, nullptr);
46         communicatorAggregator_->ReleaseCommunicator(communicateHandle_);
47         communicateHandle_ = nullptr;
48     }
49     communicatorAggregator_ = nullptr;
50 
51     if (context_ != nullptr) {
52         ISyncInterface *storage = storage_;
53         context_->OnLastRef([storage, &cv, &cvMutex, &finished]() {
54             delete storage;
55             {
56                 std::lock_guard<std::mutex> lock(cvMutex);
57                 finished = true;
58             }
59             cv.notify_one();
60         });
61         RefObject::KillAndDecObjRef(context_);
62         std::unique_lock<std::mutex> lock(cvMutex);
63         cv.wait(lock, [&finished] { return finished; });
64     } else {
65         delete storage_;
66     }
67     context_ = nullptr;
68     metadata_ = nullptr;
69     storage_ = nullptr;
70     if (executor_ != nullptr) {
71         RefObject::KillAndDecObjRef(executor_);
72         executor_ = nullptr;
73     }
74 }
75 
Initialize(VirtualCommunicatorAggregator * comAggregator,ISyncInterface * syncInterface)76 int GenericVirtualDevice::Initialize(VirtualCommunicatorAggregator *comAggregator, ISyncInterface *syncInterface)
77 {
78     if ((comAggregator == nullptr) || (syncInterface == nullptr)) {
79         return -E_INVALID_ARGS;
80     }
81 
82     communicatorAggregator_ = comAggregator;
83     int errCode = E_OK;
84     communicateHandle_ = communicatorAggregator_->AllocCommunicator(deviceId_, errCode);
85     if (communicateHandle_ == nullptr) {
86         return errCode;
87     }
88 
89     storage_ = syncInterface;
90     metadata_ = std::make_shared<Metadata>();
91     if (metadata_->Initialize(storage_) != E_OK) {
92         LOGE("metadata_ init failed");
93         return -E_NOT_SUPPORT;
94     }
95     if (storage_->GetInterfaceType() == IKvDBSyncInterface::SYNC_SVD) {
96         context_ = new (std::nothrow) SingleVerKvSyncTaskContext;
97         subManager_ = std::make_shared<SubscribeManager>();
98         static_cast<SingleVerSyncTaskContext *>(context_)->SetSubscribeManager(subManager_);
99     } else if (storage_->GetInterfaceType() == IKvDBSyncInterface::SYNC_RELATION) {
100         context_ = new (std::nothrow) SingleVerRelationalSyncTaskContext;
101     } else {
102 #ifndef OMIT_MULTI_VER
103         context_ = new (std::nothrow) MultiVerSyncTaskContext;
104 #else
105 	return -E_NOT_SUPPORT;
106 #endif // OMIT_MULTI_VER
107     }
108     if (context_ == nullptr) {
109         return -E_OUT_OF_MEMORY;
110     }
111     communicateHandle_->RegOnMessageCallback(
112         std::bind(&GenericVirtualDevice::MessageCallback, this, std::placeholders::_1, std::placeholders::_2), []() {});
113     context_->Initialize(remoteDeviceId_, storage_, metadata_, communicateHandle_);
114     context_->SetRetryStatus(SyncTaskContext::NO_NEED_RETRY);
115     context_->RegOnSyncTask(std::bind(&GenericVirtualDevice::StartResponseTask, this));
116 
117     executor_ = new (std::nothrow) RemoteExecutor();
118     if (executor_ == nullptr) {
119         return -E_OUT_OF_MEMORY;
120     }
121     executor_->Initialize(syncInterface, communicateHandle_);
122     return E_OK;
123 }
124 
SetDeviceId(const std::string & deviceId)125 void GenericVirtualDevice::SetDeviceId(const std::string &deviceId)
126 {
127     deviceId_ = deviceId;
128 }
129 
GetDeviceId() const130 std::string GenericVirtualDevice::GetDeviceId() const
131 {
132     return deviceId_;
133 }
134 
MessageCallback(const std::string & deviceId,Message * inMsg)135 int GenericVirtualDevice::MessageCallback(const std::string &deviceId, Message *inMsg)
136 {
137     if (inMsg->GetMessageId() == LOCAL_DATA_CHANGED) {
138         if (onRemoteDataChanged_) {
139             onRemoteDataChanged_(deviceId);
140             delete inMsg;
141             inMsg = nullptr;
142             return E_OK;
143         }
144         delete inMsg;
145         inMsg = nullptr;
146         return -E_INVALID_ARGS;
147     }
148 
149     LOGD("[GenericVirtualDevice] onMessage, src %s id %u", deviceId.c_str(), inMsg->GetMessageId());
150     if (inMsg->GetMessageId() == REMOTE_EXECUTE_MESSAGE && executor_ != nullptr) {
151         RefObject::IncObjRef(executor_);
152         executor_->ReceiveMessage(deviceId, inMsg);
153         RefObject::DecObjRef(executor_);
154         return E_OK;
155     }
156 
157     RefObject::IncObjRef(context_);
158     RefObject::IncObjRef(communicateHandle_);
159     SyncTaskContext *context = context_;
160     ICommunicator *communicateHandle = communicateHandle_;
161     std::thread thread([context, communicateHandle, inMsg]() {
162         int errCode = context->ReceiveMessageCallback(inMsg);
163         if (errCode != -E_NOT_NEED_DELETE_MSG) {
164             delete inMsg;
165         }
166         RefObject::DecObjRef(context);
167         RefObject::DecObjRef(communicateHandle);
168     });
169     thread.detach();
170     return E_OK;
171 }
172 
OnRemoteDataChanged(const std::function<void (const std::string &)> & callback)173 void GenericVirtualDevice::OnRemoteDataChanged(const std::function<void(const std::string &)> &callback)
174 {
175     onRemoteDataChanged_ = callback;
176 }
177 
Online()178 void GenericVirtualDevice::Online()
179 {
180     static_cast<VirtualCommunicator *>(communicateHandle_)->Enable();
181     communicatorAggregator_->OnlineDevice(deviceId_);
182 }
183 
Offline()184 void GenericVirtualDevice::Offline()
185 {
186     static_cast<VirtualCommunicator *>(communicateHandle_)->Disable();
187     communicatorAggregator_->OfflineDevice(deviceId_);
188 }
189 
StartResponseTask()190 int GenericVirtualDevice::StartResponseTask()
191 {
192     LOGD("[KvVirtualDevice] StartResponseTask");
193     RefObject::AutoLock lockGuard(context_);
194     int status = context_->GetTaskExecStatus();
195     if ((status == SyncTaskContext::RUNNING) || context_->IsKilled()) {
196         LOGD("[KvVirtualDevice] StartResponseTask status:%d", status);
197         return -E_NOT_SUPPORT;
198     }
199     if (context_->IsTargetQueueEmpty()) {
200         LOGD("[KvVirtualDevice] StartResponseTask IsTargetQueueEmpty is empty");
201         return E_OK;
202     }
203     context_->SetTaskExecStatus(ISyncTaskContext::RUNNING);
204     context_->MoveToNextTarget();
205     LOGI("[KvVirtualDevice] machine StartSync");
206     context_->UnlockObj();
207     int errCode = context_->StartStateMachine();
208     context_->LockObj();
209     if (errCode != E_OK) {
210         LOGE("[KvVirtualDevice] machine StartSync failed");
211         context_->SetOperationStatus(SyncOperation::OP_FAILED);
212     }
213     return errCode;
214 }
215 
GetLocalTimeOffset() const216 TimeOffset GenericVirtualDevice::GetLocalTimeOffset() const
217 {
218     return metadata_->GetLocalTimeOffset();
219 }
220 
Sync(SyncMode mode,bool wait)221 int GenericVirtualDevice::Sync(SyncMode mode, bool wait)
222 {
223     auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, mode, nullptr, wait);
224     if (operation == nullptr) {
225         return -E_OUT_OF_MEMORY;
226     }
227     operation->Initialize();
228     operation->SetOnSyncFinished([operation](int id) {
229         operation->NotifyIfNeed();
230     });
231     context_->AddSyncOperation(operation);
232     operation->WaitIfNeed();
233     RefObject::KillAndDecObjRef(operation);
234     return E_OK;
235 }
236 
Sync(SyncMode mode,const Query & query,bool wait)237 int GenericVirtualDevice::Sync(SyncMode mode, const Query &query, bool wait)
238 {
239     return Sync(mode, query, nullptr, wait);
240 }
241 
Sync(SyncMode mode,const Query & query,const SyncOperation::UserCallback & callBack,bool wait)242 int GenericVirtualDevice::Sync(SyncMode mode, const Query &query,
243     const SyncOperation::UserCallback &callBack, bool wait)
244 {
245     auto operation = new (std::nothrow) SyncOperation(1, {remoteDeviceId_}, mode, callBack, wait);
246     if (operation == nullptr) {
247         return -E_OUT_OF_MEMORY;
248     }
249     operation->Initialize();
250     operation->SetOnSyncFinished([operation](int id) {
251         operation->NotifyIfNeed();
252     });
253     QuerySyncObject querySyncObject(query);
254     int errCode = querySyncObject.Init();
255     if (errCode != E_OK) {
256         return errCode;
257     }
258     operation->SetQuery(querySyncObject);
259     context_->AddSyncOperation(operation);
260     operation->WaitIfNeed();
261     RefObject::KillAndDecObjRef(operation);
262     return errCode;
263 }
264 
RemoteQuery(const std::string & device,const RemoteCondition & condition,uint64_t timeout,std::shared_ptr<ResultSet> & result)265 int GenericVirtualDevice::RemoteQuery(const std::string &device, const RemoteCondition &condition,
266     uint64_t timeout, std::shared_ptr<ResultSet> &result)
267 {
268     if (executor_ == nullptr) {
269         result = nullptr;
270         return TransferDBErrno(-E_BUSY);
271     }
272     int errCode = executor_->RemoteQuery(device, condition, timeout, 1u, result);
273     if (errCode != E_OK) {
274         result = nullptr;
275     }
276     return TransferDBErrno(errCode);
277 }
278 
SetClearRemoteStaleData(bool isStaleData)279 void GenericVirtualDevice::SetClearRemoteStaleData(bool isStaleData)
280 {
281     if (context_ != nullptr) {
282         static_cast<SingleVerSyncTaskContext *>(context_)->EnableClearRemoteStaleData(isStaleData);
283         LOGD("set clear remote stale data mark");
284     }
285 }
286 } // DistributedDB
287