1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "rs_profiler_network.h"
17 
18 #include <fstream>
19 #include <memory>
20 #include <thread>
21 
22 #include "rs_profiler_archive.h"
23 #include "rs_profiler_cache.h"
24 #include "rs_profiler_capturedata.h"
25 #include "rs_profiler_file.h"
26 #include "rs_profiler_packet.h"
27 #include "rs_profiler_socket.h"
28 #include "rs_profiler_utils.h"
29 
30 #include "pipeline/rs_main_thread.h"
31 
32 namespace OHOS::Rosen {
33 
34 bool Network::isRunning_ = false;
35 std::atomic<bool> Network::forceShutdown_ = false;
36 
37 std::mutex Network::incomingMutex_ {};
38 std::queue<std::vector<std::string>> Network::incoming_ {};
39 
40 std::mutex Network::outgoingMutex_ {};
41 std::queue<std::vector<char>> Network::outgoing_ {};
42 
AwakeRenderServiceThread()43 static void AwakeRenderServiceThread()
44 {
45     RSMainThread::Instance()->SetAccessibilityConfigChanged();
46     RSMainThread::Instance()->RequestNextVSync();
47     RSMainThread::Instance()->PostTask([]() {
48         RSMainThread::Instance()->SetAccessibilityConfigChanged();
49         RSMainThread::Instance()->RequestNextVSync();
50     });
51 }
52 
OnBinaryPrepare(RSFile & file,const char * data,size_t size)53 static uint32_t OnBinaryPrepare(RSFile& file, const char* data, size_t size)
54 {
55     file.Create(RSFile::GetDefaultPath());
56     return BinaryHelper::BinaryCount(data);
57 }
58 
OnBinaryHeader(RSFile & file,const char * data,size_t size)59 static void OnBinaryHeader(RSFile& file, const char* data, size_t size)
60 {
61     if (!file.IsOpen()) {
62         return;
63     }
64 
65     std::stringstream stream(std::ios::in | std::ios::out | std::ios::binary);
66     stream.write(reinterpret_cast<const char*>(data + 1), size - 1);
67     stream.seekg(0);
68 
69     double writeStartTime = 0.0;
70     stream.read(reinterpret_cast<char*>(&writeStartTime), sizeof(writeStartTime));
71     file.SetWriteTime(writeStartTime);
72 
73     uint32_t pidCount = 0u;
74     stream.read(reinterpret_cast<char*>(&pidCount), sizeof(pidCount));
75     for (uint32_t i = 0; i < pidCount; i++) {
76         pid_t pid = 0u;
77         stream.read(reinterpret_cast<char*>(&pid), sizeof(pid));
78         file.AddHeaderPid(pid);
79     }
80     file.AddLayer();
81 
82     std::string dataFirstFrame;
83     size_t sizeDataFirstFrame = 0;
84     stream.read(reinterpret_cast<char*>(&sizeDataFirstFrame), sizeof(sizeDataFirstFrame));
85     dataFirstFrame.resize(sizeDataFirstFrame);
86     stream.read(reinterpret_cast<char*>(&dataFirstFrame[0]), sizeDataFirstFrame);
87     file.AddHeaderFirstFrame(dataFirstFrame);
88 
89     ImageCache::Deserialize(stream);
90 }
91 
OnBinaryChunk(RSFile & file,const char * data,size_t size)92 static void OnBinaryChunk(RSFile& file, const char* data, size_t size)
93 {
94     constexpr size_t timeOffset = 8 + 1;
95     if (file.IsOpen() && (size >= timeOffset)) {
96         const double time = *(reinterpret_cast<const double*>(data + 1));
97         file.WriteRSData(time, const_cast<char*>(data) + timeOffset, size - timeOffset);
98     }
99 }
100 
OnBinaryFinish(RSFile & file,const char * data,size_t size)101 static void OnBinaryFinish(RSFile& file, const char* data, size_t size)
102 {
103     file.Close();
104 }
105 
Run()106 void Network::Run()
107 {
108     const uint16_t port = 5050;
109     const uint32_t sleepTimeout = 500000u;
110 
111     Socket* socket = nullptr;
112 
113     isRunning_ = true;
114     forceShutdown_ = false;
115 
116     while (isRunning_) {
117         if (!socket) {
118             socket = new Socket();
119         }
120 
121         const SocketState state = socket->GetState();
122         if (forceShutdown_) {
123             Shutdown(socket);
124             forceShutdown_ = false;
125         } else if (state == SocketState::INITIAL) {
126             socket->Open(port);
127         } else if (state == SocketState::CREATE) {
128             socket->AcceptClient();
129             usleep(sleepTimeout);
130         } else if (state == SocketState::ACCEPT) {
131             bool readyToReceive = false;
132             bool readyToSend = false;
133             socket->GetStatus(readyToReceive, readyToSend);
134 
135             if (readyToReceive) {
136                 ProcessIncoming(*socket);
137             }
138             if (readyToSend) {
139                 ProcessOutgoing(*socket);
140             }
141         } else if (state == SocketState::SHUTDOWN) {
142             Shutdown(socket);
143         }
144     }
145 
146     delete socket;
147 }
148 
Stop()149 void Network::Stop()
150 {
151     isRunning_ = false;
152 }
153 
GetStats(const std::string & interface)154 std::vector<NetworkStats> Network::GetStats(const std::string& interface)
155 {
156     static const uint32_t INTERFACE_COLUMN = 0u;
157     static const uint32_t RECV_BYTES_COLUMN = 1u;
158     static const uint32_t SENT_BYTES_COLUMN = 9u;
159 
160     std::ifstream netdev("/proc/net/dev");
161     if (!netdev.good()) {
162         return {};
163     }
164 
165     std::vector<NetworkStats> results;
166 
167     std::string data;
168     // skip the first two lines (headers)
169     std::getline(netdev, data);
170     std::getline(netdev, data);
171 
172     while (netdev.good()) {
173         std::getline(netdev, data);
174         std::vector<std::string> parts = Utils::Split(data);
175         if (parts.empty()) {
176             continue;
177         }
178 
179         std::string candidate = parts[INTERFACE_COLUMN];
180         // remove the trailing ':' so we can compare against the provided interface
181         candidate.pop_back();
182 
183         if (("*" == interface) || (candidate == interface)) {
184             const uint64_t recvBytes = std::stoull(parts[RECV_BYTES_COLUMN]);
185             const uint64_t sentBytes = std::stoull(parts[SENT_BYTES_COLUMN]);
186 
187             results.push_back({ .interface = candidate, .receivedBytes = recvBytes, .transmittedBytes = sentBytes });
188         }
189     };
190 
191     return results;
192 }
193 
SendPacket(const Packet & packet)194 void Network::SendPacket(const Packet& packet)
195 {
196     if (isRunning_) {
197         const std::lock_guard<std::mutex> guard(outgoingMutex_);
198         outgoing_.emplace(const_cast<Packet&>(packet).Release());
199     }
200 }
201 
SendPath(const std::string & path,PackageID id)202 void Network::SendPath(const std::string& path, PackageID id)
203 {
204     if (!path.empty()) {
205         std::string out;
206         out += static_cast<char>(id);
207         out += path;
208         SendBinary(out);
209     }
210 }
211 
SendRdcPath(const std::string & path)212 void Network::SendRdcPath(const std::string& path)
213 {
214     SendPath(path, PackageID::RS_PROFILER_RDC_BINARY);
215 }
216 
SendDclPath(const std::string & path)217 void Network::SendDclPath(const std::string& path)
218 {
219     SendPath(path, PackageID::RS_PROFILER_DCL_BINARY);
220 }
221 
SendMskpPath(const std::string & path)222 void Network::SendMskpPath(const std::string& path)
223 {
224     SendPath(path, PackageID::RS_PROFILER_MSKP_FILEPATH);
225 }
226 
SendBetaRecordPath(const std::string & path)227 void Network::SendBetaRecordPath(const std::string& path)
228 {
229     SendPath(path, PackageID::RS_PROFILER_BETAREC_FILEPATH);
230 }
231 
SendSkp(const void * data,size_t size)232 void Network::SendSkp(const void* data, size_t size)
233 {
234     if (data && (size > 0)) {
235         std::vector<char> buffer;
236         buffer.reserve(size + 1);
237         buffer.push_back(static_cast<char>(PackageID::RS_PROFILER_SKP_BINARY));
238         buffer.insert(buffer.end(), static_cast<const char*>(data), static_cast<const char*>(data) + size);
239         SendBinary(buffer);
240     }
241 }
242 
SendCaptureData(const RSCaptureData & data)243 void Network::SendCaptureData(const RSCaptureData& data)
244 {
245     std::vector<char> out;
246     DataWriter archive(out);
247     char headerType = static_cast<char>(PackageID::RS_PROFILER_GFX_METRICS);
248     archive.Serialize(headerType);
249 
250     const_cast<RSCaptureData&>(data).Serialize(archive);
251 
252     // if no data is serialized, we end up with just 1 char header
253     if (out.size() > 1) {
254         SendBinary(out);
255     }
256 }
257 
SendRSTreeDumpJSON(const std::string & jsonstr)258 void Network::SendRSTreeDumpJSON(const std::string& jsonstr)
259 {
260     Packet packet { Packet::BINARY };
261     packet.Write(static_cast<char>(PackageID::RS_PROFILER_RSTREE_DUMP_JSON));
262     packet.Write(jsonstr);
263     SendPacket(packet);
264 }
265 
SendRSTreePerfNodeList(const std::unordered_set<uint64_t> & perfNodesList)266 void Network::SendRSTreePerfNodeList(const std::unordered_set<uint64_t>& perfNodesList)
267 {
268     Packet packet { Packet::BINARY };
269     packet.Write(static_cast<char>(PackageID::RS_PROFILER_RSTREE_PERF_NODE_LIST));
270     packet.Write(perfNodesList);
271     SendPacket(packet);
272 }
273 
SendRSTreeSingleNodePerf(uint64_t id,uint64_t nanosec)274 void Network::SendRSTreeSingleNodePerf(uint64_t id, uint64_t nanosec)
275 {
276     Packet packet { Packet::BINARY };
277     packet.Write(static_cast<char>(PackageID::RS_PROFILER_RSTREE_SINGLE_NODE_PERF));
278     packet.Write(id);
279     packet.Write(nanosec);
280     SendPacket(packet);
281 }
282 
SendBinary(const void * data,size_t size)283 void Network::SendBinary(const void* data, size_t size)
284 {
285     if (data && (size > 0)) {
286         Packet packet { Packet::BINARY };
287         packet.Write(data, size);
288         SendPacket(packet);
289     }
290 }
291 
SendBinary(const std::vector<char> & data)292 void Network::SendBinary(const std::vector<char>& data)
293 {
294     SendBinary(data.data(), data.size());
295 }
296 
SendBinary(const std::string & data)297 void Network::SendBinary(const std::string& data)
298 {
299     SendBinary(data.data(), data.size());
300 }
301 
SendMessage(const std::string & message)302 void Network::SendMessage(const std::string& message)
303 {
304     if (!message.empty()) {
305         Packet packet { Packet::LOG };
306         packet.Write(message);
307         SendPacket(packet);
308     }
309 }
310 
PushCommand(const std::vector<std::string> & args)311 void Network::PushCommand(const std::vector<std::string>& args)
312 {
313     if (!args.empty()) {
314         const std::lock_guard<std::mutex> guard(incomingMutex_);
315         incoming_.emplace(args);
316     }
317 }
318 
PopCommand(std::vector<std::string> & args)319 bool Network::PopCommand(std::vector<std::string>& args)
320 {
321     args.clear();
322 
323     incomingMutex_.lock();
324     if (!incoming_.empty()) {
325         args.swap(incoming_.front());
326         incoming_.pop();
327     }
328     incomingMutex_.unlock();
329 
330     return !args.empty();
331 }
332 
ProcessCommand(const char * data,size_t size)333 void Network::ProcessCommand(const char* data, size_t size)
334 {
335     const std::vector<std::string> args = Utils::Split({ data, size });
336     if (args.empty()) {
337         return;
338     }
339 
340     PushCommand(args);
341     AwakeRenderServiceThread();
342 }
343 
ProcessOutgoing(Socket & socket)344 void Network::ProcessOutgoing(Socket& socket)
345 {
346     std::vector<char> data;
347 
348     bool nothingToSend = false;
349     while (!nothingToSend) {
350         outgoingMutex_.lock();
351         nothingToSend = outgoing_.empty();
352         if (!nothingToSend) {
353             data.swap(outgoing_.front());
354             outgoing_.pop();
355         }
356         outgoingMutex_.unlock();
357 
358         if (!nothingToSend) {
359             socket.SendWhenReady(data.data(), data.size());
360         }
361     }
362 }
363 
ProcessBinary(const char * data,size_t size)364 void Network::ProcessBinary(const char* data, size_t size)
365 {
366     static uint32_t chunks = 0u;
367     static RSFile file;
368 
369     const PackageID id = BinaryHelper::Type(data);
370     if (id == PackageID::RS_PROFILER_PREPARE) {
371         // ping/pong for connection speed measurement
372         const char type = static_cast<char>(PackageID::RS_PROFILER_PREPARE);
373         SendBinary(&type, sizeof(type));
374         // amount of binary packages will be sent
375         chunks = OnBinaryPrepare(file, data, size);
376     } else if (id == PackageID::RS_PROFILER_HEADER) {
377         OnBinaryHeader(file, data, size);
378     } else if (id == PackageID::RS_PROFILER_BINARY) {
379         OnBinaryChunk(file, data, size);
380 
381         chunks--;
382         if (chunks == 0) {
383             OnBinaryFinish(file, data, size);
384             const char type = static_cast<char>(PackageID::RS_PROFILER_PREPARE_DONE);
385             SendBinary(&type, sizeof(type));
386         }
387     }
388 }
389 
ForceShutdown()390 void Network::ForceShutdown()
391 {
392     forceShutdown_ = true;
393 }
394 
Shutdown(Socket * & socket)395 void Network::Shutdown(Socket*& socket)
396 {
397     delete socket;
398     socket = nullptr;
399 
400     std::string command = "rsrecord_stop";
401     ProcessCommand(command.c_str(), command.size());
402     command = "rsrecord_replay_stop";
403     ProcessCommand(command.c_str(), command.size());
404     AwakeRenderServiceThread();
405 }
406 
ProcessIncoming(Socket & socket)407 void Network::ProcessIncoming(Socket& socket)
408 {
409     const uint32_t sleepTimeout = 500000u;
410 
411     Packet packetIncoming { Packet::UNKNOWN };
412     auto wannaReceive = Packet::HEADER_SIZE;
413     socket.Receive(packetIncoming.Begin(), wannaReceive);
414 
415     if (wannaReceive == 0) {
416         socket.SetState(SocketState::SHUTDOWN);
417         usleep(sleepTimeout);
418         return;
419     }
420 
421     const size_t size = packetIncoming.GetPayloadLength();
422     if (size == 0) {
423         return;
424     }
425 
426     std::vector<char> data;
427     data.resize(size);
428     socket.ReceiveWhenReady(data.data(), data.size());
429 
430     if (packetIncoming.IsBinary()) {
431         ProcessBinary(data.data(), data.size());
432     } else if (packetIncoming.IsCommand()) {
433         ProcessCommand(data.data(), data.size());
434     }
435 }
436 
ReportStats()437 void Network::ReportStats()
438 {
439     constexpr uint32_t bytesToBits = 8u;
440     const std::string interface("wlan0");
441     const std::vector<NetworkStats> stats = Network::GetStats(interface);
442 
443     std::string out = "Interface: " + interface;
444     for (const NetworkStats& stat : stats) {
445         out += "Transmitted: " + std::to_string(stat.transmittedBytes * bytesToBits);
446         out += "Received: " + std::to_string(stat.transmittedBytes * bytesToBits);
447     }
448 
449     SendMessage(out);
450 }
451 
452 } // namespace OHOS::Rosen