1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "rs_profiler_network.h"
17
18 #include <fstream>
19 #include <memory>
20 #include <thread>
21
22 #include "rs_profiler_archive.h"
23 #include "rs_profiler_cache.h"
24 #include "rs_profiler_capturedata.h"
25 #include "rs_profiler_file.h"
26 #include "rs_profiler_packet.h"
27 #include "rs_profiler_socket.h"
28 #include "rs_profiler_utils.h"
29
30 #include "pipeline/rs_main_thread.h"
31
32 namespace OHOS::Rosen {
33
34 bool Network::isRunning_ = false;
35 std::atomic<bool> Network::forceShutdown_ = false;
36
37 std::mutex Network::incomingMutex_ {};
38 std::queue<std::vector<std::string>> Network::incoming_ {};
39
40 std::mutex Network::outgoingMutex_ {};
41 std::queue<std::vector<char>> Network::outgoing_ {};
42
AwakeRenderServiceThread()43 static void AwakeRenderServiceThread()
44 {
45 RSMainThread::Instance()->SetAccessibilityConfigChanged();
46 RSMainThread::Instance()->RequestNextVSync();
47 RSMainThread::Instance()->PostTask([]() {
48 RSMainThread::Instance()->SetAccessibilityConfigChanged();
49 RSMainThread::Instance()->RequestNextVSync();
50 });
51 }
52
OnBinaryPrepare(RSFile & file,const char * data,size_t size)53 static uint32_t OnBinaryPrepare(RSFile& file, const char* data, size_t size)
54 {
55 file.Create(RSFile::GetDefaultPath());
56 return BinaryHelper::BinaryCount(data);
57 }
58
OnBinaryHeader(RSFile & file,const char * data,size_t size)59 static void OnBinaryHeader(RSFile& file, const char* data, size_t size)
60 {
61 if (!file.IsOpen()) {
62 return;
63 }
64
65 std::stringstream stream(std::ios::in | std::ios::out | std::ios::binary);
66 stream.write(reinterpret_cast<const char*>(data + 1), size - 1);
67 stream.seekg(0);
68
69 double writeStartTime = 0.0;
70 stream.read(reinterpret_cast<char*>(&writeStartTime), sizeof(writeStartTime));
71 file.SetWriteTime(writeStartTime);
72
73 uint32_t pidCount = 0u;
74 stream.read(reinterpret_cast<char*>(&pidCount), sizeof(pidCount));
75 for (uint32_t i = 0; i < pidCount; i++) {
76 pid_t pid = 0u;
77 stream.read(reinterpret_cast<char*>(&pid), sizeof(pid));
78 file.AddHeaderPid(pid);
79 }
80 file.AddLayer();
81
82 std::string dataFirstFrame;
83 size_t sizeDataFirstFrame = 0;
84 stream.read(reinterpret_cast<char*>(&sizeDataFirstFrame), sizeof(sizeDataFirstFrame));
85 dataFirstFrame.resize(sizeDataFirstFrame);
86 stream.read(reinterpret_cast<char*>(&dataFirstFrame[0]), sizeDataFirstFrame);
87 file.AddHeaderFirstFrame(dataFirstFrame);
88
89 ImageCache::Deserialize(stream);
90 }
91
OnBinaryChunk(RSFile & file,const char * data,size_t size)92 static void OnBinaryChunk(RSFile& file, const char* data, size_t size)
93 {
94 constexpr size_t timeOffset = 8 + 1;
95 if (file.IsOpen() && (size >= timeOffset)) {
96 const double time = *(reinterpret_cast<const double*>(data + 1));
97 file.WriteRSData(time, const_cast<char*>(data) + timeOffset, size - timeOffset);
98 }
99 }
100
OnBinaryFinish(RSFile & file,const char * data,size_t size)101 static void OnBinaryFinish(RSFile& file, const char* data, size_t size)
102 {
103 file.Close();
104 }
105
Run()106 void Network::Run()
107 {
108 const uint16_t port = 5050;
109 const uint32_t sleepTimeout = 500000u;
110
111 Socket* socket = nullptr;
112
113 isRunning_ = true;
114 forceShutdown_ = false;
115
116 while (isRunning_) {
117 if (!socket) {
118 socket = new Socket();
119 }
120
121 const SocketState state = socket->GetState();
122 if (forceShutdown_) {
123 Shutdown(socket);
124 forceShutdown_ = false;
125 } else if (state == SocketState::INITIAL) {
126 socket->Open(port);
127 } else if (state == SocketState::CREATE) {
128 socket->AcceptClient();
129 usleep(sleepTimeout);
130 } else if (state == SocketState::ACCEPT) {
131 bool readyToReceive = false;
132 bool readyToSend = false;
133 socket->GetStatus(readyToReceive, readyToSend);
134
135 if (readyToReceive) {
136 ProcessIncoming(*socket);
137 }
138 if (readyToSend) {
139 ProcessOutgoing(*socket);
140 }
141 } else if (state == SocketState::SHUTDOWN) {
142 Shutdown(socket);
143 }
144 }
145
146 delete socket;
147 }
148
Stop()149 void Network::Stop()
150 {
151 isRunning_ = false;
152 }
153
GetStats(const std::string & interface)154 std::vector<NetworkStats> Network::GetStats(const std::string& interface)
155 {
156 static const uint32_t INTERFACE_COLUMN = 0u;
157 static const uint32_t RECV_BYTES_COLUMN = 1u;
158 static const uint32_t SENT_BYTES_COLUMN = 9u;
159
160 std::ifstream netdev("/proc/net/dev");
161 if (!netdev.good()) {
162 return {};
163 }
164
165 std::vector<NetworkStats> results;
166
167 std::string data;
168 // skip the first two lines (headers)
169 std::getline(netdev, data);
170 std::getline(netdev, data);
171
172 while (netdev.good()) {
173 std::getline(netdev, data);
174 std::vector<std::string> parts = Utils::Split(data);
175 if (parts.empty()) {
176 continue;
177 }
178
179 std::string candidate = parts[INTERFACE_COLUMN];
180 // remove the trailing ':' so we can compare against the provided interface
181 candidate.pop_back();
182
183 if (("*" == interface) || (candidate == interface)) {
184 const uint64_t recvBytes = std::stoull(parts[RECV_BYTES_COLUMN]);
185 const uint64_t sentBytes = std::stoull(parts[SENT_BYTES_COLUMN]);
186
187 results.push_back({ .interface = candidate, .receivedBytes = recvBytes, .transmittedBytes = sentBytes });
188 }
189 };
190
191 return results;
192 }
193
SendPacket(const Packet & packet)194 void Network::SendPacket(const Packet& packet)
195 {
196 if (isRunning_) {
197 const std::lock_guard<std::mutex> guard(outgoingMutex_);
198 outgoing_.emplace(const_cast<Packet&>(packet).Release());
199 }
200 }
201
SendPath(const std::string & path,PackageID id)202 void Network::SendPath(const std::string& path, PackageID id)
203 {
204 if (!path.empty()) {
205 std::string out;
206 out += static_cast<char>(id);
207 out += path;
208 SendBinary(out);
209 }
210 }
211
SendRdcPath(const std::string & path)212 void Network::SendRdcPath(const std::string& path)
213 {
214 SendPath(path, PackageID::RS_PROFILER_RDC_BINARY);
215 }
216
SendDclPath(const std::string & path)217 void Network::SendDclPath(const std::string& path)
218 {
219 SendPath(path, PackageID::RS_PROFILER_DCL_BINARY);
220 }
221
SendMskpPath(const std::string & path)222 void Network::SendMskpPath(const std::string& path)
223 {
224 SendPath(path, PackageID::RS_PROFILER_MSKP_FILEPATH);
225 }
226
SendBetaRecordPath(const std::string & path)227 void Network::SendBetaRecordPath(const std::string& path)
228 {
229 SendPath(path, PackageID::RS_PROFILER_BETAREC_FILEPATH);
230 }
231
SendSkp(const void * data,size_t size)232 void Network::SendSkp(const void* data, size_t size)
233 {
234 if (data && (size > 0)) {
235 std::vector<char> buffer;
236 buffer.reserve(size + 1);
237 buffer.push_back(static_cast<char>(PackageID::RS_PROFILER_SKP_BINARY));
238 buffer.insert(buffer.end(), static_cast<const char*>(data), static_cast<const char*>(data) + size);
239 SendBinary(buffer);
240 }
241 }
242
SendCaptureData(const RSCaptureData & data)243 void Network::SendCaptureData(const RSCaptureData& data)
244 {
245 std::vector<char> out;
246 DataWriter archive(out);
247 char headerType = static_cast<char>(PackageID::RS_PROFILER_GFX_METRICS);
248 archive.Serialize(headerType);
249
250 const_cast<RSCaptureData&>(data).Serialize(archive);
251
252 // if no data is serialized, we end up with just 1 char header
253 if (out.size() > 1) {
254 SendBinary(out);
255 }
256 }
257
SendRSTreeDumpJSON(const std::string & jsonstr)258 void Network::SendRSTreeDumpJSON(const std::string& jsonstr)
259 {
260 Packet packet { Packet::BINARY };
261 packet.Write(static_cast<char>(PackageID::RS_PROFILER_RSTREE_DUMP_JSON));
262 packet.Write(jsonstr);
263 SendPacket(packet);
264 }
265
SendRSTreePerfNodeList(const std::unordered_set<uint64_t> & perfNodesList)266 void Network::SendRSTreePerfNodeList(const std::unordered_set<uint64_t>& perfNodesList)
267 {
268 Packet packet { Packet::BINARY };
269 packet.Write(static_cast<char>(PackageID::RS_PROFILER_RSTREE_PERF_NODE_LIST));
270 packet.Write(perfNodesList);
271 SendPacket(packet);
272 }
273
SendRSTreeSingleNodePerf(uint64_t id,uint64_t nanosec)274 void Network::SendRSTreeSingleNodePerf(uint64_t id, uint64_t nanosec)
275 {
276 Packet packet { Packet::BINARY };
277 packet.Write(static_cast<char>(PackageID::RS_PROFILER_RSTREE_SINGLE_NODE_PERF));
278 packet.Write(id);
279 packet.Write(nanosec);
280 SendPacket(packet);
281 }
282
SendBinary(const void * data,size_t size)283 void Network::SendBinary(const void* data, size_t size)
284 {
285 if (data && (size > 0)) {
286 Packet packet { Packet::BINARY };
287 packet.Write(data, size);
288 SendPacket(packet);
289 }
290 }
291
SendBinary(const std::vector<char> & data)292 void Network::SendBinary(const std::vector<char>& data)
293 {
294 SendBinary(data.data(), data.size());
295 }
296
SendBinary(const std::string & data)297 void Network::SendBinary(const std::string& data)
298 {
299 SendBinary(data.data(), data.size());
300 }
301
SendMessage(const std::string & message)302 void Network::SendMessage(const std::string& message)
303 {
304 if (!message.empty()) {
305 Packet packet { Packet::LOG };
306 packet.Write(message);
307 SendPacket(packet);
308 }
309 }
310
PushCommand(const std::vector<std::string> & args)311 void Network::PushCommand(const std::vector<std::string>& args)
312 {
313 if (!args.empty()) {
314 const std::lock_guard<std::mutex> guard(incomingMutex_);
315 incoming_.emplace(args);
316 }
317 }
318
PopCommand(std::vector<std::string> & args)319 bool Network::PopCommand(std::vector<std::string>& args)
320 {
321 args.clear();
322
323 incomingMutex_.lock();
324 if (!incoming_.empty()) {
325 args.swap(incoming_.front());
326 incoming_.pop();
327 }
328 incomingMutex_.unlock();
329
330 return !args.empty();
331 }
332
ProcessCommand(const char * data,size_t size)333 void Network::ProcessCommand(const char* data, size_t size)
334 {
335 const std::vector<std::string> args = Utils::Split({ data, size });
336 if (args.empty()) {
337 return;
338 }
339
340 PushCommand(args);
341 AwakeRenderServiceThread();
342 }
343
ProcessOutgoing(Socket & socket)344 void Network::ProcessOutgoing(Socket& socket)
345 {
346 std::vector<char> data;
347
348 bool nothingToSend = false;
349 while (!nothingToSend) {
350 outgoingMutex_.lock();
351 nothingToSend = outgoing_.empty();
352 if (!nothingToSend) {
353 data.swap(outgoing_.front());
354 outgoing_.pop();
355 }
356 outgoingMutex_.unlock();
357
358 if (!nothingToSend) {
359 socket.SendWhenReady(data.data(), data.size());
360 }
361 }
362 }
363
ProcessBinary(const char * data,size_t size)364 void Network::ProcessBinary(const char* data, size_t size)
365 {
366 static uint32_t chunks = 0u;
367 static RSFile file;
368
369 const PackageID id = BinaryHelper::Type(data);
370 if (id == PackageID::RS_PROFILER_PREPARE) {
371 // ping/pong for connection speed measurement
372 const char type = static_cast<char>(PackageID::RS_PROFILER_PREPARE);
373 SendBinary(&type, sizeof(type));
374 // amount of binary packages will be sent
375 chunks = OnBinaryPrepare(file, data, size);
376 } else if (id == PackageID::RS_PROFILER_HEADER) {
377 OnBinaryHeader(file, data, size);
378 } else if (id == PackageID::RS_PROFILER_BINARY) {
379 OnBinaryChunk(file, data, size);
380
381 chunks--;
382 if (chunks == 0) {
383 OnBinaryFinish(file, data, size);
384 const char type = static_cast<char>(PackageID::RS_PROFILER_PREPARE_DONE);
385 SendBinary(&type, sizeof(type));
386 }
387 }
388 }
389
ForceShutdown()390 void Network::ForceShutdown()
391 {
392 forceShutdown_ = true;
393 }
394
Shutdown(Socket * & socket)395 void Network::Shutdown(Socket*& socket)
396 {
397 delete socket;
398 socket = nullptr;
399
400 std::string command = "rsrecord_stop";
401 ProcessCommand(command.c_str(), command.size());
402 command = "rsrecord_replay_stop";
403 ProcessCommand(command.c_str(), command.size());
404 AwakeRenderServiceThread();
405 }
406
ProcessIncoming(Socket & socket)407 void Network::ProcessIncoming(Socket& socket)
408 {
409 const uint32_t sleepTimeout = 500000u;
410
411 Packet packetIncoming { Packet::UNKNOWN };
412 auto wannaReceive = Packet::HEADER_SIZE;
413 socket.Receive(packetIncoming.Begin(), wannaReceive);
414
415 if (wannaReceive == 0) {
416 socket.SetState(SocketState::SHUTDOWN);
417 usleep(sleepTimeout);
418 return;
419 }
420
421 const size_t size = packetIncoming.GetPayloadLength();
422 if (size == 0) {
423 return;
424 }
425
426 std::vector<char> data;
427 data.resize(size);
428 socket.ReceiveWhenReady(data.data(), data.size());
429
430 if (packetIncoming.IsBinary()) {
431 ProcessBinary(data.data(), data.size());
432 } else if (packetIncoming.IsCommand()) {
433 ProcessCommand(data.data(), data.size());
434 }
435 }
436
ReportStats()437 void Network::ReportStats()
438 {
439 constexpr uint32_t bytesToBits = 8u;
440 const std::string interface("wlan0");
441 const std::vector<NetworkStats> stats = Network::GetStats(interface);
442
443 std::string out = "Interface: " + interface;
444 for (const NetworkStats& stat : stats) {
445 out += "Transmitted: " + std::to_string(stat.transmittedBytes * bytesToBits);
446 out += "Received: " + std::to_string(stat.transmittedBytes * bytesToBits);
447 }
448
449 SendMessage(out);
450 }
451
452 } // namespace OHOS::Rosen