1# CPU密集型任务开发指导 (TaskPool和Worker)
2
3
4CPU密集型任务是指需要占用系统资源处理大量计算能力的任务,需要长时间运行,这段时间会阻塞线程其它事件的处理,不适宜放在UI主线程进行。例如图像处理、视频编码、数据分析等。
5
6
7基于多线程并发机制处理CPU密集型任务可以提高CPU利用率,提升应用程序响应速度。
8
9
10当任务不需要长时间(3分钟)占据后台线程,而是一个个独立的任务时,推荐使用TaskPool,反之推荐使用Worker。
11
12接下来将以图像直方图处理以及后台长时间的模型预测任务分别进行举例。
13
14
15## 使用TaskPool进行图像直方图处理
16
171. 实现图像处理的业务逻辑。
18
192. 数据分段,通过任务组发起关联任务调度。
20   创建[TaskGroup](../reference/apis-arkts/js-apis-taskpool.md#taskgroup10)并通过[addTask()](../reference/apis-arkts/js-apis-taskpool.md#addtask10)添加对应的任务,通过[execute()](../reference/apis-arkts/js-apis-taskpool.md#taskpoolexecute10)执行任务组,并指定为[高优先级](../reference/apis-arkts/js-apis-taskpool.md#priority),在当前任务组所有任务结束后,会将直方图处理结果同时返回。
21
223. 结果数组汇总处理。
23
24```ts
25import { taskpool } from '@kit.ArkTS';
26
27@Concurrent
28function imageProcessing(dataSlice: ArrayBuffer): ArrayBuffer {
29  // 步骤1: 具体的图像处理操作及其他耗时操作
30  return dataSlice;
31}
32
33function histogramStatistic(pixelBuffer: ArrayBuffer): void {
34  // 步骤2: 分成三段并发调度
35  let number: number = pixelBuffer.byteLength / 3;
36  let buffer1: ArrayBuffer = pixelBuffer.slice(0, number);
37  let buffer2: ArrayBuffer = pixelBuffer.slice(number, number * 2);
38  let buffer3: ArrayBuffer = pixelBuffer.slice(number * 2);
39
40  let group: taskpool.TaskGroup = new taskpool.TaskGroup();
41  group.addTask(imageProcessing, buffer1);
42  group.addTask(imageProcessing, buffer2);
43  group.addTask(imageProcessing, buffer3);
44
45  taskpool.execute(group, taskpool.Priority.HIGH).then((ret: Object) => {
46    // 步骤3: 结果数组汇总处理
47  })
48}
49
50@Entry
51@Component
52struct Index {
53  @State message: string = 'Hello World'
54
55  build() {
56    Row() {
57      Column() {
58        Text(this.message)
59          .fontSize(50)
60          .fontWeight(FontWeight.Bold)
61          .onClick(() => {
62            let buffer: ArrayBuffer = new ArrayBuffer(24);
63            histogramStatistic(buffer);
64          })
65      }
66      .width('100%')
67    }
68    .height('100%')
69  }
70}
71```
72
73
74## 使用Worker进行长时间数据分析
75
76本文通过某地区提供的房价数据训练一个简易的房价预测模型,该模型支持通过输入房屋面积和房间数量去预测该区域的房价,模型需要长时间运行,房价预测需要使用前面的模型运行结果,因此需要使用Worker。
77
781. DevEco Studio提供了Worker创建的模板,新建一个Worker线程,例如命名为“MyWorker”。
79
80   ![newWorker](figures/newWorker.png)
81
822. 在宿主线程中通过调用ThreadWorker的[constructor()](../reference/apis-arkts/js-apis-worker.md#constructor9)方法创建Worker对象。
83
84    ```ts
85    // Index.ets
86    import { worker } from '@kit.ArkTS';
87
88    const workerInstance: worker.ThreadWorker = new worker.ThreadWorker('entry/ets/workers/MyWorker.ts');
89    ```
90
913. 在宿主线程中通过调用[onmessage()](../reference/apis-arkts/js-apis-worker.md#onmessage9)方法接收Worker线程发送过来的消息,并通过调用[postMessage()](../reference/apis-arkts/js-apis-worker.md#postmessage9)方法向Worker线程发送消息。
92   例如向Worker线程发送训练和预测的消息,同时接收Worker线程发送回来的消息。
93
94    ```ts
95    // Index.ets
96    let done = false;
97
98    // 接收Worker子线程的结果
99    workerInstance.onmessage = (() => {
100      console.info('MyWorker.ts onmessage');
101      if (!done) {
102        workerInstance.postMessage({ 'type': 1, 'value': 0 });
103        done = true;
104      }
105    })
106
107    workerInstance.onerror = (() => {
108      // 接收Worker子线程的错误信息
109    })
110
111    // 向Worker子线程发送训练消息
112    workerInstance.postMessage({ 'type': 0 });
113    ```
114
1154. 在MyWorker.ts文件中绑定Worker对象,当前线程为Worker线程。
116
117   ```ts
118   // MyWorker.ts
119   import { worker, ThreadWorkerGlobalScope, MessageEvents, ErrorEvent } from '@kit.ArkTS';
120
121   let workerPort: ThreadWorkerGlobalScope = worker.workerPort;
122   ```
123
1245. 在Worker线程中通过调用[onmessage()](../reference/apis-arkts/js-apis-worker.md#onmessage9-1)方法接收宿主线程发送的消息内容,并通过调用[postMessage()](../reference/apis-arkts/js-apis-worker.md#postmessage9-2)方法向宿主线程发送消息。
125    例如在Worker线程中定义预测模型及其训练过程,同时与宿主线程进行信息交互。
126
127    ```ts
128    // MyWorker.ts
129    // 定义训练模型及结果
130    let result: Array<number>;
131    // 定义预测函数
132    function predict(x: number): number {
133     return result[x];
134    }
135    // 定义优化器训练过程
136    function optimize(): void {
137     result = [0];
138    }
139    // Worker线程的onmessage逻辑
140    workerPort.onmessage = (e: MessageEvents): void => {
141     // 根据传输的数据的type选择进行操作
142     switch (e.data.type as number) {
143      case 0:
144      // 进行训练
145       optimize();
146      // 训练之后发送宿主线程训练成功的消息
147       workerPort.postMessage({ type: 'message', value: 'train success.' });
148       break;
149      case 1:
150      // 执行预测
151       const output: number = predict(e.data.value as number);
152      // 发送宿主线程预测的结果
153       workerPort.postMessage({ type: 'predict', value: output });
154       break;
155      default:
156       workerPort.postMessage({ type: 'message', value: 'send message is invalid' });
157       break;
158     }
159    }
160    ```
161
1626. 在Worker线程中完成任务之后,执行Worker线程销毁操作。销毁线程的方式主要有两种:根据需要可以在宿主线程中对Worker线程进行销毁;也可以在Worker线程中主动销毁Worker线程。
163
164    在宿主线程中通过调用[onexit()](../reference/apis-arkts/js-apis-worker.md#onexit9)方法定义Worker线程销毁后的处理逻辑。
165
166    ```ts
167    // Worker线程销毁后,执行onexit回调方法
168    workerInstance.onexit = (): void => {
169     console.info("main thread terminate");
170    }
171    ```
172
173    方式一:在宿主线程中通过调用[terminate()](../reference/apis-arkts/js-apis-worker.md#terminate9)方法销毁Worker线程,并终止Worker接收消息。
174
175    ```ts
176    // 销毁Worker线程
177    workerInstance.terminate();
178    ```
179
180    方式二:在Worker线程中通过调用[close()](../reference/apis-arkts/js-apis-worker.md#close9)方法主动销毁Worker线程,并终止Worker接收消息。
181
182    ```ts
183    // 销毁线程
184    workerPort.close();
185    ```