1  /*
2   * Copyright (c) 2021 Huawei Device Co., Ltd.
3   * Licensed under the Apache License, Version 2.0 (the "License");
4   * you may not use this file except in compliance with the License.
5   * You may obtain a copy of the License at
6   *
7   *     http://www.apache.org/licenses/LICENSE-2.0
8   *
9   * Unless required by applicable law or agreed to in writing, software
10   * distributed under the License is distributed on an "AS IS" BASIS,
11   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12   * See the License for the specific language governing permissions and
13   * limitations under the License.
14   */
15  
16  #include <gtest/gtest.h>
17  #include <thread>
18  #include "db_errno.h"
19  #include "distributeddb_communicator_common.h"
20  #include "distributeddb_tools_unit_test.h"
21  #include "log_print.h"
22  #include "message.h"
23  #include "protocol_proto.h"
24  #include "time_sync.h"
25  #include "sync_types.h"
26  
27  using namespace std;
28  using namespace testing::ext;
29  using namespace DistributedDB;
30  
31  namespace {
32      constexpr int SEND_COUNT_GOAL = 20; // Send 20 times
33  
34      EnvHandle g_envDeviceA;
35      EnvHandle g_envDeviceB;
36      ICommunicator *g_commAA = nullptr;
37      ICommunicator *g_commBA = nullptr;
38      ICommunicator *g_commBB = nullptr;
39  }
40  
41  class DistributedDBCommunicatorSendReceiveTest : public testing::Test {
42  public:
43      static void SetUpTestCase(void);
44      static void TearDownTestCase(void);
45      void SetUp();
46      void TearDown();
47  };
48  
SetUpTestCase(void)49  void DistributedDBCommunicatorSendReceiveTest::SetUpTestCase(void)
50  {
51      /**
52       * @tc.setup: Create and init CommunicatorAggregator and AdapterStub
53       */
54      LOGI("[UT][SendRecvTest][SetUpTestCase] Enter.");
55      bool errCode = SetUpEnv(g_envDeviceA, DEVICE_NAME_A);
56      ASSERT_EQ(errCode, true);
57      errCode = SetUpEnv(g_envDeviceB, DEVICE_NAME_B);
58      ASSERT_EQ(errCode, true);
59      DoRegTransformFunction();
60      CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
61  }
62  
TearDownTestCase(void)63  void DistributedDBCommunicatorSendReceiveTest::TearDownTestCase(void)
64  {
65      /**
66       * @tc.teardown: Finalize and release CommunicatorAggregator and AdapterStub
67       */
68      LOGI("[UT][SendRecvTest][TearDownTestCase] Enter.");
69      std::this_thread::sleep_for(std::chrono::seconds(7)); // Wait 7 s to make sure all thread quiet and memory released
70      TearDownEnv(g_envDeviceA);
71      TearDownEnv(g_envDeviceB);
72      CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
73  }
74  
SetUp()75  void DistributedDBCommunicatorSendReceiveTest::SetUp()
76  {
77      DistributedDBUnitTest::DistributedDBToolsUnitTest::PrintTestCaseInfo();
78      /**
79       * @tc.setup: Alloc communicator AA, BA, BB
80       */
81      int errorNo = E_OK;
82      g_commAA = g_envDeviceA.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
83      ASSERT_EQ(errorNo, E_OK);
84      ASSERT_NOT_NULL_AND_ACTIVATE(g_commAA);
85  
86      errorNo = E_OK;
87      g_commBA = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_A, errorNo);
88      ASSERT_EQ(errorNo, E_OK);
89      ASSERT_NOT_NULL_AND_ACTIVATE(g_commBA);
90  
91      errorNo = E_OK;
92      g_commBB = g_envDeviceB.commAggrHandle->AllocCommunicator(LABEL_B, errorNo);
93      ASSERT_EQ(errorNo, E_OK);
94      ASSERT_NOT_NULL_AND_ACTIVATE(g_commBB);
95  }
96  
TearDown()97  void DistributedDBCommunicatorSendReceiveTest::TearDown()
98  {
99      /**
100       * @tc.teardown: Release communicator AA, BA, BB
101       */
102      g_envDeviceA.commAggrHandle->ReleaseCommunicator(g_commAA);
103      g_commAA = nullptr;
104      g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBA);
105      g_commBA = nullptr;
106      g_envDeviceB.commAggrHandle->ReleaseCommunicator(g_commBB);
107      g_commBA = nullptr;
108      std::this_thread::sleep_for(std::chrono::milliseconds(200)); // Wait 200 ms to make sure all thread quiet
109  }
110  
BuildAppLayerFrameMessage()111  static Message *BuildAppLayerFrameMessage()
112  {
113      DistributedDBUnitTest::DataSyncMessageInfo info;
114      info.messageId_ = DistributedDB::TIME_SYNC_MESSAGE;
115      info.messageType_ = TYPE_REQUEST;
116      DistributedDB::Message *message = nullptr;
117      DistributedDBUnitTest::DistributedDBToolsUnitTest::BuildMessage(info, message);
118      return message;
119  }
120  
121  /**
122   * @tc.name: Send And Receive 001
123   * @tc.desc: Test send and receive based on equipment communicator
124   * @tc.type: FUNC
125   * @tc.require: AR000BVDGI AR000CQE0M
126   * @tc.author: xiaozhenjian
127   */
128  HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendAndReceive001, TestSize.Level1)
129  {
130      // Preset
131      string srcTargetForAA;
132      Message *recvMsgForAA = nullptr;
133      string srcTargetForBA;
134      Message *recvMsgForBA = nullptr;
135      string srcTargetForBB;
136      Message *recvMsgForBB = nullptr;
__anon922778e00202(const std::string &srcTarget, Message *inMsg) 137      g_commAA->RegOnMessageCallback([&srcTargetForAA, &recvMsgForAA](const std::string &srcTarget, Message *inMsg) {
138          srcTargetForAA = srcTarget;
139          recvMsgForAA = inMsg;
140      }, nullptr);
__anon922778e00302(const std::string &srcTarget, Message *inMsg) 141      g_commBA->RegOnMessageCallback([&srcTargetForBA, &recvMsgForBA](const std::string &srcTarget, Message *inMsg) {
142          srcTargetForBA = srcTarget;
143          recvMsgForBA = inMsg;
144      }, nullptr);
__anon922778e00402(const std::string &srcTarget, Message *inMsg) 145      g_commBB->RegOnMessageCallback([&srcTargetForBB, &recvMsgForBB](const std::string &srcTarget, Message *inMsg) {
146          srcTargetForBB = srcTarget;
147          recvMsgForBB = inMsg;
148      }, nullptr);
149  
150      /**
151       * @tc.steps: step1. connect device A with device B
152       */
153      AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
154  
155      /**
156       * @tc.steps: step2. device A send message(registered and tiny) to device B using communicator AA
157       * @tc.expected: step2. communicator BA received the message
158       */
159      Message *msgForAA = BuildRegedTinyMessage();
160      ASSERT_NE(msgForAA, nullptr);
161      SendConfig conf = {false, false, 0};
162      int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
163      EXPECT_EQ(errCode, E_OK);
164      std::this_thread::sleep_for(std::chrono::milliseconds(200)); // sleep 200 ms
165      EXPECT_EQ(recvMsgForBB, nullptr);
166      EXPECT_EQ(srcTargetForBA, DEVICE_NAME_A);
167      ASSERT_NE(recvMsgForBA, nullptr);
168      EXPECT_EQ(recvMsgForBA->GetMessageId(), REGED_TINY_MSG_ID);
169      EXPECT_EQ(recvMsgForBA->GetMessageType(), TYPE_REQUEST);
170      EXPECT_EQ(recvMsgForBA->GetSessionId(), FIXED_SESSIONID);
171      EXPECT_EQ(recvMsgForBA->GetSequenceId(), FIXED_SEQUENCEID);
172      EXPECT_EQ(recvMsgForBA->GetErrorNo(), NO_ERROR);
173      delete recvMsgForBA;
174      recvMsgForBA = nullptr;
175  
176      /**
177       * @tc.steps: step3. device B send message(registered and tiny) to device A using communicator BB
178       * @tc.expected: step3. communicator AA did not receive the message
179       */
180      Message *msgForBB = BuildRegedTinyMessage();
181      ASSERT_NE(msgForBB, nullptr);
182      conf = {true, 0};
183      errCode = g_commBB->SendMessage(DEVICE_NAME_A, msgForBB, conf);
184      EXPECT_EQ(errCode, E_OK);
185      std::this_thread::sleep_for(std::chrono::milliseconds(100));
186      EXPECT_EQ(srcTargetForAA, "");
187  
188      // CleanUp
189      AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
190  }
191  
192  /**
193   * @tc.name: Send And Receive 002
194   * @tc.desc: Test send oversize message will fail
195   * @tc.type: FUNC
196   * @tc.require: AR000BVDGK AR000CQE0O
197   * @tc.author: xiaozhenjian
198   */
199  HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendAndReceive002, TestSize.Level1)
200  {
201      /**
202       * @tc.steps: step1. connect device A with device B
203       */
204      AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
205  
206      /**
207       * @tc.steps: step2. device A send message(registered and oversize) to device B using communicator AA
208       * @tc.expected: step2. send fail
209       */
210      Message *msgForAA = BuildRegedOverSizeMessage();
211      ASSERT_NE(msgForAA, nullptr);
212      SendConfig conf = {true, false, 0};
213      int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
214      EXPECT_NE(errCode, E_OK);
215      delete msgForAA;
216      msgForAA = nullptr;
217  
218      // CleanUp
219      AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
220  }
221  
222  /**
223   * @tc.name: Send And Receive 003
224   * @tc.desc: Test send unregistered message will fail
225   * @tc.type: FUNC
226   * @tc.require: AR000BVDGK AR000CQE0O
227   * @tc.author: xiaozhenjian
228   */
229  HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendAndReceive003, TestSize.Level1)
230  {
231      /**
232       * @tc.steps: step1. connect device A with device B
233       */
234      AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
235  
236      /**
237       * @tc.steps: step2. device A send message(unregistered and tiny) to device B using communicator AA
238       * @tc.expected: step2. send fail
239       */
240      Message *msgForAA = BuildUnRegedTinyMessage();
241      ASSERT_NE(msgForAA, nullptr);
242      SendConfig conf = {true, false, 0};
243      int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
244      EXPECT_NE(errCode, E_OK);
245      delete msgForAA;
246      msgForAA = nullptr;
247  
248      // CleanUp
249      AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
250  }
251  
252  /**
253   * @tc.name: Send Flow Control 001
254   * @tc.desc: Test send in nonblock way
255   * @tc.type: FUNC
256   * @tc.require: AR000BVDGI AR000CQE0M
257   * @tc.author: xiaozhenjian
258   */
259  HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendFlowControl001, TestSize.Level1)
260  {
261      // Preset
262      int countForBA = 0;
263      int countForBB = 0;
__anon922778e00502()264      g_commBA->RegOnSendableCallback([&countForBA](){ countForBA++; }, nullptr);
__anon922778e00602()265      g_commBB->RegOnSendableCallback([&countForBB](){ countForBB++; }, nullptr);
266  
267      /**
268       * @tc.steps: step1. connect device A with device B
269       */
270      AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
271      std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms to make sure send cause by online done
272      countForBA = 0;
273      countForBB = 0;
274  
275      /**
276       * @tc.steps: step2. device B simulates send block
277       */
278      g_envDeviceB.adapterHandle->SimulateSendBlock();
279  
280      /**
281       * @tc.steps: step3. device B send as much as possible message(unregistered and huge) in nonblock way
282       *                   to device A using communicator BA until send fail;
283       * @tc.expected: step3. send fail will happen.
284       */
285      int sendCount = 0;
286      while (true) {
287          Message *msgForBA = BuildRegedHugeMessage();
288          ASSERT_NE(msgForBA, nullptr);
289          SendConfig conf = {true, false, 0};
290          int errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
291          if (errCode == E_OK) {
292              sendCount++;
293          } else {
294              delete msgForBA;
295              msgForBA = nullptr;
296              break;
297          }
298      }
299  
300      /**
301       * @tc.steps: step4. device B simulates send block terminate
302       * @tc.expected: step4. send count before fail is equal as expected. sendable callback happened.
303       */
304      g_envDeviceB.adapterHandle->SimulateSendBlockClear();
305      int expectSendCount = MAX_CAPACITY / (HUGE_SIZE + HEADER_SIZE) +
306          (MAX_CAPACITY % (HUGE_SIZE + HEADER_SIZE) == 0 ? 0 : 1);
307      EXPECT_EQ(sendCount, expectSendCount);
308      std::this_thread::sleep_for(std::chrono::milliseconds(1000));
309      EXPECT_GE(countForBA, 1);
310      EXPECT_GE(countForBB, 1);
311  
312      // CleanUp
313      AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
314  }
315  
316  /**
317   * @tc.name: Send Flow Control 002
318   * @tc.desc: Test send in block(without timeout) way
319   * @tc.type: FUNC
320   * @tc.require: AR000BVDGI AR000CQE0M
321   * @tc.author: xiaozhenjian
322   */
323  HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendFlowControl002, TestSize.Level1)
324  {
325      // Preset
326      int cntForBA = 0;
327      int cntForBB = 0;
__anon922778e00702()328      g_commBA->RegOnSendableCallback([&cntForBA](){ cntForBA++; }, nullptr);
__anon922778e00802()329      g_commBB->RegOnSendableCallback([&cntForBB](){ cntForBB++; }, nullptr);
330  
331      /**
332       * @tc.steps: step1. connect device A with device B
333       */
334      AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
335      std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Wait 100 ms to make sure send cause by online done
336      cntForBA = 0;
337      cntForBB = 0;
338  
339      /**
340       * @tc.steps: step2. device B simulates send block
341       */
342      g_envDeviceB.adapterHandle->SimulateSendBlock();
343  
344      /**
345       * @tc.steps: step3. device B send a certain message(unregistered and huge) in block way
346       *                   without timeout to device A using communicator BA;
347       */
348      int sendCount = 0;
349      int sendFailCount = 0;
__anon922778e00902() 350      std::thread sendThread([&sendCount, &sendFailCount]() {
351          while (sendCount < SEND_COUNT_GOAL) {
352              Message *msgForBA = BuildRegedHugeMessage();
353              ASSERT_NE(msgForBA, nullptr);
354              SendConfig conf = {false, false, 0};
355              int errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
356              if (errCode != E_OK) {
357                  delete msgForBA;
358                  msgForBA = nullptr;
359                  sendFailCount++;
360              }
361              sendCount++;
362          }
363      });
364  
365      /**
366       * @tc.steps: step4. device B simulates send block terminate
367       * @tc.expected: step4. send fail count is zero. sendable callback happened.
368       */
369      std::this_thread::sleep_for(std::chrono::milliseconds(200));
370      g_envDeviceB.adapterHandle->SimulateSendBlockClear();
371      std::this_thread::sleep_for(std::chrono::milliseconds(1000));
372      sendThread.join();
373      EXPECT_EQ(sendCount, SEND_COUNT_GOAL);
374      EXPECT_EQ(sendFailCount, 0);
375      EXPECT_GE(cntForBA, 1);
376      EXPECT_GE(cntForBB, 1);
377  
378      // CleanUp
379      AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
380  }
381  
382  /**
383   * @tc.name: Send Flow Control 003
384   * @tc.desc: Test send in block(with timeout) way
385   * @tc.type: FUNC
386   * @tc.require: AR000BVDGI AR000CQE0M
387   * @tc.author: xiaozhenjian
388   */
389  HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendFlowControl003, TestSize.Level1)
390  {
391      // Preset
392      int cntsForBA = 0;
393      int cntsForBB = 0;
__anon922778e00a02()394      g_commBA->RegOnSendableCallback([&cntsForBA](){ cntsForBA++; }, nullptr);
__anon922778e00b02()395      g_commBB->RegOnSendableCallback([&cntsForBB](){ cntsForBB++; }, nullptr);
396  
397      /**
398       * @tc.steps: step1. connect device A with device B
399       */
400      AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
401      std::this_thread::sleep_for(std::chrono::milliseconds(100));
402      cntsForBA = 0;
403      cntsForBB = 0;
404  
405      /**
406       * @tc.steps: step2. device B simulates send block
407       */
408      g_envDeviceB.adapterHandle->SimulateSendBlock();
409  
410       /**
411       * @tc.steps: step3. device B send a certain message(unregistered and huge) in block way
412       *                   with timeout to device A using communicator BA;
413       */
414      int sendCnt = 0;
415      int sendFailCnt = 0;
__anon922778e00c02() 416      std::thread sendThread([&sendCnt, &sendFailCnt]() {
417          while (sendCnt < SEND_COUNT_GOAL) {
418              Message *msgForBA = BuildRegedHugeMessage();
419              ASSERT_NE(msgForBA, nullptr);
420              SendConfig conf = {false, false, 100};
421              int errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf); // 100 ms timeout
422              if (errCode != E_OK) {
423                  delete msgForBA;
424                  msgForBA = nullptr;
425                  sendFailCnt++;
426              }
427              sendCnt++;
428          }
429      });
430  
431      /**
432       * @tc.steps: step4. device B simulates send block terminate
433       * @tc.expected: step4. send fail count is no more than expected. sendable callback happened.
434       */
435      std::this_thread::sleep_for(std::chrono::milliseconds(300)); // wait 300 ms
436      g_envDeviceB.adapterHandle->SimulateSendBlockClear();
437      std::this_thread::sleep_for(std::chrono::milliseconds(1200)); // wait 1200 ms
438      sendThread.join();
439      EXPECT_EQ(sendCnt, SEND_COUNT_GOAL);
440      EXPECT_LE(sendFailCnt, 4);
441      EXPECT_GE(cntsForBA, 1);
442      EXPECT_GE(cntsForBB, 1);
443  
444      // CleanUp
445      AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
446  }
447  
448  /**
449   * @tc.name: Receive Check 001
450   * @tc.desc: Receive packet field check
451   * @tc.type: FUNC
452   * @tc.require: AR000BVRNU AR000CQE0J
453   * @tc.author: xiaozhenjian
454   */
455  HWTEST_F(DistributedDBCommunicatorSendReceiveTest, ReceiveCheck001, TestSize.Level1)
456  {
457      // Preset
458      int recvCount = 0;
__anon922778e00d02(const std::string &srcTarget, Message *inMsg) 459      g_commAA->RegOnMessageCallback([&recvCount](const std::string &srcTarget, Message *inMsg) {
460          recvCount++;
461          if (inMsg != nullptr) {
462              delete inMsg;
463              inMsg = nullptr;
464          }
465      }, nullptr);
466      AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
467  
468      /**
469       * @tc.steps: step1. create packet with magic field error
470       * @tc.expected: step1. no message callback
471       */
472      g_envDeviceB.adapterHandle->SimulateSendBitErrorInMagicField(true, 0xFFFF);
473      Message *msgForBA = BuildRegedTinyMessage();
474      SendConfig conf = {true, false, 0};
475      int errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
476      EXPECT_EQ(errCode, E_OK);
477      std::this_thread::sleep_for(std::chrono::milliseconds(100));
478      EXPECT_EQ(recvCount, 0);
479      g_envDeviceB.adapterHandle->SimulateSendBitErrorInMagicField(false, 0);
480  
481      /**
482       * @tc.steps: step2. create packet with version field error
483       * @tc.expected: step2. no message callback
484       */
485      g_envDeviceB.adapterHandle->SimulateSendBitErrorInVersionField(true, 0xFFFF);
486      msgForBA = BuildRegedTinyMessage();
487      errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
488      EXPECT_EQ(errCode, E_OK);
489      std::this_thread::sleep_for(std::chrono::milliseconds(100));
490      EXPECT_EQ(recvCount, 0);
491      g_envDeviceB.adapterHandle->SimulateSendBitErrorInVersionField(false, 0);
492  
493      /**
494       * @tc.steps: step3. create packet with checksum field error
495       * @tc.expected: step3. no message callback
496       */
497      g_envDeviceB.adapterHandle->SimulateSendBitErrorInCheckSumField(true, 0xFFFF);
498      msgForBA = BuildRegedTinyMessage();
499      errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
500      EXPECT_EQ(errCode, E_OK);
501      std::this_thread::sleep_for(std::chrono::milliseconds(100));
502      EXPECT_EQ(recvCount, 0);
503      g_envDeviceB.adapterHandle->SimulateSendBitErrorInCheckSumField(false, 0);
504  
505      // CleanUp
506      AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
507  }
508  
509  /**
510   * @tc.name: Receive Check 002
511   * @tc.desc: Receive packet field check
512   * @tc.type: FUNC
513   * @tc.require: AR000BVRNU AR000CQE0J
514   * @tc.author: xiaozhenjian
515   */
516  HWTEST_F(DistributedDBCommunicatorSendReceiveTest, ReceiveCheck002, TestSize.Level1)
517  {
518      // Preset
519      int recvCount = 0;
__anon922778e00e02(const std::string &srcTarget, Message *inMsg) 520      g_commAA->RegOnMessageCallback([&recvCount](const std::string &srcTarget, Message *inMsg) {
521          recvCount++;
522          if (inMsg != nullptr) {
523              delete inMsg;
524              inMsg = nullptr;
525          }
526      }, nullptr);
527      AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
528  
529      /**
530       * @tc.steps: step1. create packet with packetLen field error
531       * @tc.expected: step1. no message callback
532       */
533      g_envDeviceB.adapterHandle->SimulateSendBitErrorInPacketLenField(true, 0xFFFF);
534      Message *msgForBA = BuildRegedTinyMessage();
535      SendConfig conf = {true, false, 0};
536      int errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
537      EXPECT_EQ(errCode, E_OK);
538      std::this_thread::sleep_for(std::chrono::milliseconds(100));
539      EXPECT_EQ(recvCount, 0);
540      g_envDeviceB.adapterHandle->SimulateSendBitErrorInPacketLenField(false, 0);
541  
542      /**
543       * @tc.steps: step1. create packet with packetType field error
544       * @tc.expected: step1. no message callback
545       */
546      g_envDeviceB.adapterHandle->SimulateSendBitErrorInPacketTypeField(true, 0xFF);
547      msgForBA = BuildRegedTinyMessage();
548      errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
549      EXPECT_EQ(errCode, E_OK);
550      std::this_thread::sleep_for(std::chrono::milliseconds(100));
551      EXPECT_EQ(recvCount, 0);
552      g_envDeviceB.adapterHandle->SimulateSendBitErrorInPacketTypeField(false, 0);
553  
554      /**
555       * @tc.steps: step1. create packet with paddingLen field error
556       * @tc.expected: step1. no message callback
557       */
558      g_envDeviceB.adapterHandle->SimulateSendBitErrorInPaddingLenField(true, 0xFF);
559      msgForBA = BuildRegedTinyMessage();
560      errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
561      EXPECT_EQ(errCode, E_OK);
562      std::this_thread::sleep_for(std::chrono::milliseconds(100));
563      EXPECT_EQ(recvCount, 0);
564      g_envDeviceB.adapterHandle->SimulateSendBitErrorInPaddingLenField(false, 0);
565  
566      // CleanUp
567      AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
568  }
569  
570  /**
571   * @tc.name: Send Result Notify 001
572   * @tc.desc: Test send result notify
573   * @tc.type: FUNC
574   * @tc.require: AR000CQE0M
575   * @tc.author: xiaozhenjian
576   */
577  HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendResultNotify001, TestSize.Level1)
578  {
579      // preset
580      std::vector<int> sendResult;
__anon922778e00f02(int result, bool isDirectEnd) 581      auto sendResultNotifier = [&sendResult](int result, bool isDirectEnd) {
582          sendResult.push_back(result);
583      };
584  
585      /**
586       * @tc.steps: step1. connect device A with device B
587       */
588      AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
589  
590      /**
591       * @tc.steps: step2. device A send message to device B using communicator AA
592       * @tc.expected: step2. notify send done and success
593       */
594      Message *msgForAA = BuildRegedTinyMessage();
595      ASSERT_NE(msgForAA, nullptr);
596      SendConfig conf = {false, false, 0};
597      int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf, sendResultNotifier);
598      EXPECT_EQ(errCode, E_OK);
599      std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
600      ASSERT_EQ(sendResult.size(), static_cast<size_t>(1)); // 1 notify
601      EXPECT_EQ(sendResult[0], E_OK);
602  
603      /**
604       * @tc.steps: step3. disconnect device A with device B
605       */
606      AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
607  
608      /**
609       * @tc.steps: step4. device A send message to device B using communicator AA
610       * @tc.expected: step2. notify send done and fail
611       */
612      msgForAA = BuildRegedTinyMessage();
613      ASSERT_NE(msgForAA, nullptr);
614      errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf, sendResultNotifier);
615      EXPECT_EQ(errCode, E_OK);
616      std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
617      ASSERT_EQ(sendResult.size(), static_cast<size_t>(2)); // 2 notify
618      EXPECT_NE(sendResult[1], E_OK); // 1 for second element
619  }
620  
621  #define REG_MESSAGE_CALLBACK(src, label) \
622      string srcTargetFor##src##label; \
623      Message *recvMsgFor##src##label = nullptr; \
624      g_comm##src##label->RegOnMessageCallback( \
625          [&srcTargetFor##src##label, &recvMsgFor##src##label](const std::string &srcTarget, Message *inMsg) { \
626          srcTargetFor##src##label = srcTarget; \
627          recvMsgFor##src##label = inMsg; \
628      }, nullptr);
629  
630  /**
631   * @tc.name: Message Feedback 001
632   * @tc.desc: Test feedback not support messageid and communicator not found
633   * @tc.type: FUNC
634   * @tc.require: AR000CQE0M
635   * @tc.author: xiaozhenjian
636   */
637  HWTEST_F(DistributedDBCommunicatorSendReceiveTest, MessageFeedback001, TestSize.Level1)
638  {
639      CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(true);
640      // preset
641      REG_MESSAGE_CALLBACK(A, A);
642      REG_MESSAGE_CALLBACK(B, A);
643      REG_MESSAGE_CALLBACK(B, B);
644  
645      /**
646       * @tc.steps: step1. connect device A with device B
647       */
648      AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
649  
650      /**
651       * @tc.steps: step2. device B send message to device A using communicator BB
652       * @tc.expected: step2. communicator BB receive communicator not found feedback
653       */
654      Message *msgForBB = BuildRegedTinyMessage();
655      ASSERT_NE(msgForBB, nullptr);
656      SendConfig conf = {false, false, 0};
657      int errCode = g_commBB->SendMessage(DEVICE_NAME_A, msgForBB, conf);
658      EXPECT_EQ(errCode, E_OK);
659      std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
660      ASSERT_NE(recvMsgForBB, nullptr);
661      EXPECT_EQ(srcTargetForBB, DEVICE_NAME_A);
662      EXPECT_EQ(recvMsgForBB->GetMessageId(), REGED_TINY_MSG_ID);
663      EXPECT_EQ(recvMsgForBB->GetMessageType(), TYPE_RESPONSE);
664      EXPECT_EQ(recvMsgForBB->GetSessionId(), FIXED_SESSIONID);
665      EXPECT_EQ(recvMsgForBB->GetSequenceId(), FIXED_SEQUENCEID);
666      EXPECT_EQ(recvMsgForBB->GetErrorNo(), static_cast<uint32_t>(E_FEEDBACK_COMMUNICATOR_NOT_FOUND));
667      EXPECT_EQ(recvMsgForBB->GetObject<RegedTinyObject>(), nullptr);
668      delete recvMsgForBB;
669      recvMsgForBB = nullptr;
670  
671      /**
672       * @tc.steps: step3. simulate messageid not registered
673       */
674      g_envDeviceB.adapterHandle->SimulateSendBitErrorInMessageIdField(true, UNREGED_TINY_MSG_ID);
675  
676      /**
677       * @tc.steps: step4. device B send message to device A using communicator BA
678       * @tc.expected: step4. communicator BA receive messageid not register feedback
679       */
680      Message *msgForBA = BuildRegedTinyMessage();
681      ASSERT_NE(msgForBA, nullptr);
682      errCode = g_commBA->SendMessage(DEVICE_NAME_A, msgForBA, conf);
683      EXPECT_EQ(errCode, E_OK);
684      std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Sleep 100 ms
685      ASSERT_NE(recvMsgForBA, nullptr);
686      EXPECT_EQ(srcTargetForBA, DEVICE_NAME_A);
687      EXPECT_EQ(recvMsgForBA->GetMessageId(), UNREGED_TINY_MSG_ID);
688      EXPECT_EQ(recvMsgForBA->GetMessageType(), TYPE_RESPONSE);
689      EXPECT_EQ(recvMsgForBA->GetSessionId(), FIXED_SESSIONID);
690      EXPECT_EQ(recvMsgForBA->GetSequenceId(), FIXED_SEQUENCEID);
691      EXPECT_EQ(recvMsgForBA->GetErrorNo(), static_cast<uint32_t>(E_FEEDBACK_UNKNOWN_MESSAGE));
692      EXPECT_EQ(recvMsgForBA->GetObject<RegedTinyObject>(), nullptr);
693      delete recvMsgForBA;
694      recvMsgForBA = nullptr;
695  
696      // CleanUp
697      g_envDeviceB.adapterHandle->SimulateSendBitErrorInMessageIdField(false, 0);
698      AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
699      CommunicatorAggregator::EnableCommunicatorNotFoundFeedback(false);
700  }
701  
702  /**
703   * @tc.name: SendAndReceiveWithExtendHead001
704   * @tc.desc: Test fill extendHead func
705   * @tc.type: FUNC
706   * @tc.require: AR000BVDGI AR000CQE0M
707   * @tc.author: zhuwentao
708   */
709  HWTEST_F(DistributedDBCommunicatorSendReceiveTest, SendAndReceiveWithExtendHead001, TestSize.Level1)
710  {
711      // Preset
712      string srcTargetForAA;
713      Message *recvMsgForAA = nullptr;
714      string srcTargetForBA;
715      Message *recvMsgForBA = nullptr;
716      TimeSync::RegisterTransformFunc();
__anon922778e01002(const std::string &srcTarget, Message *inMsg) 717      g_commAA->RegOnMessageCallback([&srcTargetForAA, &recvMsgForAA](const std::string &srcTarget, Message *inMsg) {
718          srcTargetForAA = srcTarget;
719          recvMsgForAA = inMsg;
720      }, nullptr);
__anon922778e01102(const std::string &srcTarget, Message *inMsg) 721      g_commBA->RegOnMessageCallback([&srcTargetForBA, &recvMsgForBA](const std::string &srcTarget, Message *inMsg) {
722          srcTargetForBA = srcTarget;
723          recvMsgForBA = inMsg;
724      }, nullptr);
725  
726      /**
727       * @tc.steps: step1. connect device A with device B
728       */
729      AdapterStub::ConnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
730  
731      /**
732       * @tc.steps: step2. device A send ApplayerFrameMessage to device B using communicator AA with extednHead
733       * @tc.expected: step2. communicator BA received the message
734       */
735      Message *msgForAA = BuildAppLayerFrameMessage();
736      ASSERT_NE(msgForAA, nullptr);
737      SendConfig conf = {false, true, 0, {"appId", "storeId", "userId", "deviceB"}};
738      int errCode = g_commAA->SendMessage(DEVICE_NAME_B, msgForAA, conf);
739      EXPECT_EQ(errCode, E_OK);
740      std::this_thread::sleep_for(std::chrono::milliseconds(200)); // sleep 200 ms
741      EXPECT_EQ(srcTargetForBA, DEVICE_NAME_A);
742      ASSERT_NE(recvMsgForBA, nullptr);
743      delete recvMsgForBA;
744      recvMsgForBA = nullptr;
745      DistributedDB::ProtocolProto::UnRegTransformFunction(DistributedDB::TIME_SYNC_MESSAGE);
746      // CleanUp
747      AdapterStub::DisconnectAdapterStub(g_envDeviceA.adapterHandle, g_envDeviceB.adapterHandle);
748  }