1/* 2 * Copyright (c) 2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15type AnyType = Object | null | undefined | unknown; 16declare function requireNapi(napiModuleName: string): AnyType; 17const emitter = requireNapi('events.emitter'); 18// @ts-ignore 19const { TextEncoder, StringDecoder } = requireNapi('util'); 20 21const DEFAULT_HIGH_WATER_MARK = 16 * 1024; 22const DEFAULT_ENCODING = 'utf-8'; 23const TypeErrorCodeId = 401; 24class BusinessError extends Error { 25 code: number | string; 26 27 constructor(msg: string, code?: number) { 28 super(msg); 29 this.name = 'BusinessError'; 30 this.code = code ? code : TypeErrorCodeId; 31 } 32} 33const ERR_DOWRITE_NOT_IMPLEMENTED:BusinessError = new BusinessError('The doWrite() method is not implemented', 10200035); 34const ERR_DOWRITEV_NOT_IMPLEMENTED:BusinessError = new BusinessError('The doWritev() method is not implemented', 10200035); 35const ERR_MULTIPLE_CALLBACK:BusinessError = new BusinessError('Callback called multiple times', 10200037); 36const ERR_STREAM_ALREADY_FINISHED:BusinessError = new BusinessError('stream already finished', 10200036); 37const ERR_WRITE_AFTER_END:BusinessError = new BusinessError('write after end', 10200036); 38const ENCODING_SET: Array<string> = ['ascii', 'utf-8', 'UTF-8', 'gbk', 'GBK', 'GB2312', 'gb2312', 39 'GB18030', 'gb18030', 'ibm866', 'iso-8859-2', 'iso-8859-3', 40 'iso-8859-4', 'iso-8859-5', 'iso-8859-6', 'iso-8859-7', 41 'iso-8859-8', 'iso-8859-8-i', 'iso-8859-10', 'iso-8859-13', 42 'iso-8859-14', 'iso-8859-15', 'koi8-r', 'koi8-u', 'macintosh', 43 'windows-874', 'windows-1250', 'windows-1251', 'windows-1252', 44 'windows-1253', 'windows-1254', 'windows-1255', 'windows-1256', 45 'windows-1257', 'windows-1258', 'big5', 'euc-jp', 'iso-2022-jp', 46 'shift_jis', 'euc-kr', 'x-mac-cyrillic', 'utf-16be', 47 'utf-16le']; 48class EventEmitter { 49 handlers: { [key: string]: Function[] }; 50 51 constructor() { 52 this.handlers = {}; 53 } 54 55 on(event: string, callback: Function): void { 56 if (!this.handlers[event]) { 57 this.handlers[event] = []; 58 } 59 this.handlers[event].push(callback); 60 } 61 62 off(event: string, callback: Function): void { 63 if (this.handlers[event]) { 64 const idx = this.handlers[event].findIndex((value: Function): boolean => value === callback); 65 if (idx !== -1) { 66 this.handlers[event].splice(idx, 1); 67 } 68 } 69 } 70 71 emit(event: string, ...args: AnyType[]): void { 72 if (this.handlers[event]) { 73 this.handlers[event].forEach((item: any) => { 74 if (args.length > 0) { 75 item({ data: args[0] }); 76 } else { 77 item({}); 78 } 79 }); 80 } 81 } 82 83 isOn(event: string): boolean { 84 const handler:Function[] = this.handlers[event]; 85 return handler && handler.length > 0; 86 } 87 88 listenerCount(event: string): number { 89 return this.handlers[event]?.length || 0; 90 } 91} 92function runOnce(runFn: Function, callback?: (multipleTimes: boolean, error: Error) => void): Function { 93 let executed = false; 94 return function (...args: Function[]) { 95 if (!executed) { 96 executed = true; 97 return runFn(...args); 98 } else { 99 if (callback) { 100 Promise.resolve().then(():void => { 101 // @ts-ignore 102 callback(); 103 }); 104 } 105 } 106 }; 107} 108function asyncFn(asyncFn: Function) { 109 return function (...args: Function[]): void { 110 setTimeout(() => asyncFn(...args)); 111 }; 112} 113enum ReadableEvent { 114 CLOSE = 'close', 115 DATA = 'data', 116 END = 'end', 117 ERROR = 'error', 118 READABLE = 'readable', 119 PAUSE = 'pause', 120 RESUME = 'resume', 121} 122enum WritableEvent { 123 CLOSE = 'close', 124 DRAIN = 'drain', 125 ERROR = 'error', 126 FINISH = 'finish', 127 PIPE = 'pipe', 128 UNPIPE = 'unpipe', 129} 130 131interface ReadablePipeStream { 132 write: Writable; 133 dataCallback: Function; 134 drainCallback: Function; 135 endCallback: Function; 136} 137class Readable { 138 private buf: Array<number>; 139 private listener: EventEmitter | undefined; 140 private callbacks: { [key: string]: Function[] } = {}; 141 protected encoder = new TextEncoder(); 142 protected stringDecoder = new StringDecoder(); 143 private isInitialized: boolean = false; 144 private pauseInner: boolean; 145 private pipeWritableArrayInner: ReadablePipeStream[] = []; 146 private readableObjectModeInner: boolean | undefined; 147 private readableInner: boolean; 148 private readableHighWatermarkInner: number; 149 private readableFlowingInner: boolean; 150 private readableLengthInner: number; 151 private readableEncodingInner: string; 152 private readableEndedInner: boolean; 153 private erroredInner: Error | undefined; 154 private closedInner: boolean | undefined; 155 156 /** 157 * The Readable constructor. 158 * 159 * @syscap SystemCapability.Utils.Lang 160 * @throws { BusinessError } 401 - Parameter error. Possible causes: 161 * 1.Incorrect parameter types. 162 * @crossplatform 163 * @since 12 164 */ 165 constructor(options?: { encoding: string | null; highWatermark?: number; doRead?: (size: number) => void }) { 166 this.readableEncodingInner = options?.encoding || DEFAULT_ENCODING; 167 this.readableHighWatermarkInner = options?.highWatermark || DEFAULT_HIGH_WATER_MARK; 168 this.readableObjectModeInner = false; 169 this.readableLengthInner = 0; 170 this.pauseInner = false; 171 this.readableFlowingInner = true; 172 this.readableInner = true; 173 this.readableEndedInner = false; 174 this.listener = new EventEmitter(); 175 this.buf = []; 176 if (arguments.length !== 0) { 177 if (typeof options?.doRead === 'function') { 178 this.doRead = options.doRead; 179 } 180 if (this.readableEncodingInner.toLowerCase() === 'utf8') { 181 this.readableEncodingInner = DEFAULT_ENCODING; 182 } 183 if (ENCODING_SET.indexOf(this.readableEncodingInner.toLowerCase()) === -1) { 184 let error = new BusinessError('Parameter error. Incorrect parameter types.', 401); 185 throw error; 186 } 187 this.stringDecoder = new StringDecoder(this.readableEncodingInner); 188 this.encoder = new TextEncoder(this.readableEncodingInner); 189 } 190 } 191 192 /** 193 * Returns boolean indicating whether it is in ObjectMode. 194 * 195 * @syscap SystemCapability.Utils.Lang 196 * @crossplatform 197 * @since 12 198 */ 199 get readableObjectMode(): boolean | undefined { 200 return this.readableObjectModeInner; 201 } 202 203 /** 204 * Is true if it is safe to call readable.read(), which means 205 * the stream has not been destroyed or emitted 'error' or 'end'. 206 * 207 * @syscap SystemCapability.Utils.Lang 208 * @crossplatform 209 * @since 12 210 */ 211 get readable(): boolean { 212 if (this.readableInner) { 213 return true; 214 } else if (!this.readableInner && this.readableEndedInner) { 215 return false; 216 } 217 return true; 218 } 219 220 /** 221 * Returns the value of highWatermark passed when creating this Readable. 222 * 223 * @syscap SystemCapability.Utils.Lang 224 * @crossplatform 225 * @since 12 226 */ 227 get readableHighWatermark(): number { 228 return this.readableHighWatermarkInner; 229 } 230 231 /** 232 * This property reflects the current state of the readable stream null/true/false. 233 * 234 * @syscap SystemCapability.Utils.Lang 235 * @crossplatform 236 * @since 12 237 */ 238 get readableFlowing(): boolean { 239 return this.readableFlowingInner; 240 } 241 242 /** 243 * Size of the data that can be read, in bytes or objects. 244 * 245 * @syscap SystemCapability.Utils.Lang 246 * @crossplatform 247 * @since 12 248 */ 249 get readableLength(): number { 250 return this.readableLengthInner; 251 } 252 253 /** 254 * Getter for the property encoding of a given Readable stream. The encoding property can be set using the 255 * readable.setEncoding() method. 256 * 257 * @syscap SystemCapability.Utils.Lang 258 * @crossplatform 259 * @since 12 260 */ 261 get readableEncoding(): string | null { 262 return this.readableEncodingInner; 263 } 264 265 /** 266 * Whether all data has been generated. 267 * 268 * @syscap SystemCapability.Utils.Lang 269 * @crossplatform 270 * @since 12 271 */ 272 get readableEnded(): boolean { 273 return this.readableEndedInner; 274 } 275 276 /** 277 * Returns error if the stream has been destroyed with an error. 278 * 279 * @syscap SystemCapability.Utils.Lang 280 * @crossplatform 281 * @since 12 282 */ 283 get errored(): Error | undefined { 284 return this.erroredInner; 285 } 286 287 /** 288 * Readable completes destroyfile and returns true, otherwise returns false. 289 * 290 * @syscap SystemCapability.Utils.Lang 291 * @crossplatform 292 * @since 12 293 */ 294 get closed(): boolean { 295 return this.closedInner || false; 296 } 297 298 private computeNewReadableHighWatermark(readSize: number): number { 299 readSize--; 300 readSize |= readSize >>> 1; 301 readSize |= readSize >>> 2; 302 readSize |= readSize >>> 4; 303 readSize |= readSize >>> 8; 304 readSize |= readSize >>> 16; 305 readSize++; 306 return readSize; 307 } 308 309 setEndType(): void { 310 Promise.resolve().then((): void => { 311 this.readableInner = false; 312 this.readableEndedInner = true; 313 this.listener?.emit(ReadableEvent.END); 314 }); 315 } 316 317 /** 318 * Reads a buffer of a specified size from the buffer. If the available buffer is sufficient, the result 319 * of the specified size is returned. Otherwise, if Readable has ended, all remaining buffers are returned. 320 * 321 * @param { number } size - Expected length of the data to be read. 322 * @returns { string | null } If no data is available to read, null is returned. 323 * @throws { BusinessError } 401 - if the input parameters are invalid. 324 * @throws { BusinessError } 10200038 - if the doRead method has not been implemented, an error will be thrown. 325 * @syscap SystemCapability.Utils.Lang 326 * @crossplatform 327 * @since 12 328 */ 329 read(size?: number): string | null { 330 if (size && typeof size !== 'number') { 331 this.throwError(new BusinessError(`Parameter error. The type of ${size} must be object`, 401)); 332 return null; 333 } 334 if (this.doRead === null && this.readableInner) { 335 this.readableInner = false; 336 Promise.resolve().then(() => { 337 this.closedInner = true; 338 this.erroredInner = new BusinessError('The doRead() method is not implemented', 10200038); 339 this.listener?.emit(ReadableEvent.ERROR, this.erroredInner); 340 this.listener?.emit(ReadableEvent.CLOSE); 341 }); 342 return null; 343 } 344 size = size ?? this.readableLengthInner; 345 if (size > this.readableHighWatermarkInner) { 346 this.readableHighWatermarkInner = this.computeNewReadableHighWatermark(size); 347 } 348 if (size > this.readableLengthInner) { 349 if (!this.readableFlowingInner) { 350 return null; 351 } else { 352 size = this.readableLengthInner; 353 } 354 } 355 let buffer = null; 356 if (size > 0 && size <= this.readableLengthInner) { 357 this.readableLengthInner -= size; 358 buffer = this.stringDecoder.write(new Uint8Array(this.buf.splice(0, size))); 359 this.doRead !== null && this.listener?.emit(ReadableEvent.DATA, buffer); 360 } 361 if ((!this.readableInner || size <= -1) && this.readableFlowingInner) { 362 return null; 363 } 364 if (this.readableFlowingInner) { 365 try { 366 this.doRead(this.readableHighWatermarkInner); 367 } catch (error) { 368 this.readableInner = false; 369 this.readableEndedInner = true; 370 this.listener?.emit(ReadableEvent.ERROR, error); 371 this.listener?.emit(ReadableEvent.CLOSE); 372 } 373 } 374 return buffer; 375 }; 376 377 /** 378 * Switch Readable to Streaming Mode. 379 * 380 * @syscap SystemCapability.Utils.Lang 381 * @crossplatform 382 * @since 12 383 */ 384 resume(): Readable { 385 if (this.readableLengthInner === 0) { 386 Promise.resolve().then((): void => { 387 this.read(this.readableHighWatermarkInner < this.readableLengthInner ? -1 : this.readableLengthInner); 388 }); 389 } 390 this.pauseInner = false; 391 this.readableFlowingInner = true; 392 this.listener?.emit(ReadableEvent.RESUME); 393 return this; 394 } 395 396 /** 397 * Toggle Readable to Suspend Mode. 398 * 399 * @syscap SystemCapability.Utils.Lang 400 * @crossplatform 401 * @since 12 402 */ 403 pause(): Readable { 404 this.pauseInner = true; 405 Promise.resolve().then((): void => { 406 this.readableFlowingInner = false; 407 }); 408 this.listener?.emit(ReadableEvent.PAUSE); 409 return this; 410 } 411 412 /** 413 * Sets the encoding format of the input binary data.Default: utf8. 414 * 415 * @param { string } [encoding] - Original Data Encoding Type. 416 * @returns { boolean } Setting successful returns true, setting failed returns false. 417 * @throws { BusinessError } 401 - if the input parameters are invalid. 418 * @syscap SystemCapability.Utils.Lang 419 * @crossplatform 420 * @since 12 421 */ 422 setEncoding(encoding?: string): boolean { 423 if(this.readableEncodingInner === encoding) { 424 return true; 425 } 426 if (!encoding) { 427 this.readableEncodingInner = DEFAULT_ENCODING; 428 this.encoder = new TextEncoder(this.readableEncodingInner); 429 this.stringDecoder = new StringDecoder(this.readableEncodingInner); 430 return false; 431 } 432 if (encoding.toLowerCase() === 'utf8') { 433 encoding = 'utf-8'; 434 } 435 if (this.buf.length !== 0) { 436 console.error('stream: The buffer also has data, and encoding is not allowed'); 437 return false; 438 } 439 if (ENCODING_SET.indexOf(encoding.toLowerCase()) !== -1) { 440 try { 441 this.encoder = new TextEncoder(encoding); 442 this.stringDecoder = new StringDecoder(encoding); 443 this.readableEncodingInner = encoding.toLowerCase(); 444 } catch (e) { 445 this.throwError(e as Error); 446 return false; 447 } 448 return true; 449 } else { 450 const err: BusinessError = new BusinessError(`Parameter error. The type of ${encoding} must be string.`); 451 this.throwError(err); 452 return false; 453 } 454 } 455 456 /** 457 * Query whether it is in pause state. 458 * 459 * @returns { boolean } Pause state returns true, otherwise returns false. 460 * @syscap SystemCapability.Utils.Lang 461 * @crossplatform 462 * @since 12 463 */ 464 isPaused(): boolean { 465 return this.pauseInner; 466 } 467 468 /** 469 * Concatenates a Writable to a Readable and switches the Readable to stream mode. 470 * 471 * @param { Writable } destination - Output writable stream. 472 * @param { Object } [option] - Pipeline Options. 473 * @returns { Writable } Returns the Writable object. 474 * @throws { BusinessError } 401 - if the input parameters are invalid. 475 * @syscap SystemCapability.Utils.Lang 476 * @crossplatform 477 * @since 12 478 */ 479 pipe(destination: Writable, option?: Object): Writable { 480 this.pauseInner = false; 481 const obj: ReadablePipeStream = { 482 write: destination, 483 dataCallback: (data: { data: string | Uint8Array }): void => { 484 destination.write(data.data); 485 if ((destination.writableLength || 0) > (destination.writableHighWatermark || DEFAULT_HIGH_WATER_MARK)) { 486 this.pauseInner = true; 487 this.readableFlowingInner = false; 488 } 489 }, 490 drainCallback: (): void => { 491 this.pauseInner = false; 492 this.readableFlowingInner = true; 493 this.read(this.readableLengthInner); 494 }, 495 endCallback: (): void => { 496 destination.end(); 497 } 498 }; 499 this.pipeWritableArrayInner.push(obj); 500 this.on(ReadableEvent.DATA, (data: { data: Function }) => { 501 obj.dataCallback(data); 502 }); 503 destination.on('drain', (): void => { 504 obj.drainCallback(); 505 }); 506 this.on(ReadableEvent.END, (): void => { 507 obj.endCallback(); 508 }); 509 destination?.listener?.emit('pipe', this); 510 return destination; 511 } 512 513 /** 514 * Disconnect Writable from Readable. 515 * 516 * @param { Writable } [destination] - Writable Streams Needing to Be Disconnected. 517 * @returns { Readable } Returns the Readable object. 518 * @throws { BusinessError } 401 - if the input parameters are invalid. 519 * @syscap SystemCapability.Utils.Lang 520 * @crossplatform 521 * @since 12 522 */ 523 unpipe(destination?: Writable): Readable { 524 const objIdx: number = this.pipeWritableArrayInner.findIndex((value: ReadablePipeStream) => value.write === destination); 525 if (objIdx !== -1) { 526 this.readableInner = false; 527 const obj: ReadablePipeStream = this.pipeWritableArrayInner[objIdx]; 528 this.listener?.off(ReadableEvent.DATA, obj.dataCallback); 529 destination?.listener?.off('drain', obj.drainCallback); 530 this.listener?.off(ReadableEvent.END, obj.endCallback); 531 destination?.listener?.emit('unpipe', this); 532 } 533 return this; 534 } 535 536 /** 537 * Registering Event Messages. 538 * 539 * @param { string } event - Registering Events. 540 * @param { Callback } callback - Event callback. 541 * @throws { BusinessError } 401 - if the input parameters are invalid. 542 * @syscap SystemCapability.Utils.Lang 543 * @crossplatform 544 * @since 12 545 */ 546 on(event: string, callback: Function): void { 547 const that = this; 548 if (!this.isInitialized) { 549 this.isInitialized = true; 550 this.doInitialize?.(() => { 551 }); 552 } 553 this.callbacks[event] = this.callbacks[event] ?? []; 554 const callbackFn = callback; 555 this.callbacks[event].push(callbackFn); 556 this.listener?.on(event, callbackFn); 557 Promise.resolve().then((): void => { 558 if (event === ReadableEvent.READABLE) { 559 this.readableFlowingInner = false; 560 if (this.readableInner) { 561 this.doRead?.(this.readableHighWatermarkInner); 562 } 563 } else if (event === ReadableEvent.DATA) { 564 this.readableFlowingInner = true; 565 if (!this.pauseInner) { 566 this.read(); 567 } 568 } 569 }); 570 } 571 572 /** 573 * Cancel event message. 574 * 575 * @param { string } event - Registering Events. 576 * @param { Callback<emitter.EventData> } callback - Event callback. 577 * @throws { BusinessError } 401 - if the input parameters are invalid. 578 * @syscap SystemCapability.Utils.Lang 579 * @crossplatform 580 * @since 12 581 */ 582 off(event: string, callback?: Function): void { 583 if (!event) { 584 this.throwError(new BusinessError(`Parameter error. The value of ${event} is null`, 401)); 585 return; 586 } 587 if (event && typeof event !== 'string') { 588 this.throwError(new BusinessError(`Parameter error. The type of ${event} must be string`, 401)); 589 return; 590 } 591 if (callback) { 592 this.callbacks[event]?.forEach((it: Function): void => { 593 if (callback === it) { 594 this.listener?.off(event, it); 595 } 596 }); 597 } else { 598 this.callbacks[event]?.forEach((it : Function) => this.listener?.off(event, it)); 599 } 600 } 601 602 /** 603 * Called by the Readable during initialization. It should not be called actively. Call callback () after the 604 * resource has been initialized within the doInitialize, or call callback (err) when an error occurs. 605 * 606 * @param { Function } callback - Callback when the stream has completed the initial. 607 * @throws { BusinessError } 401 - if the input parameters are invalid. 608 * @syscap SystemCapability.Utils.Lang 609 * @crossplatform 610 * @since 12 611 */ 612 doInitialize(callback: Function): void { 613 } 614 615 /** 616 * The specific implementation of data production should not be actively called. Readable.read controls the 617 * calling. After data production, Readable.push should be called to push the produced data into the buffer. 618 * If push is not called, doRead will not be called again. 619 * 620 * @param { number } size -Expected length of the data to be read. 621 * @throws { BusinessError } 401 - if the input parameters are invalid. 622 * @syscap SystemCapability.Utils.Lang 623 * @crossplatform 624 * @since 12 625 */ 626 doRead(size: number): void { 627 }; 628 629 /** 630 * Adds the generated data to the buffer. The return value indicates whether the data in the buffer has not 631 * reached the highWaterMark (similar to Writable.write). If the chunk is null, all data has been generated. 632 * 633 * @param { Uint8Array | string | null } chunk - Binary data to be stored in the buffer. 634 * @param { string } [encoding] - Binary data encoding type. 635 * @returns { boolean } If true is returned, the data in the buffer reaches the highWaterMark. Otherwise, the 636 * data in the buffer does not reach the highWaterMark. 637 * @throws { BusinessError } 401 - if the input parameters are invalid. 638 * @syscap SystemCapability.Utils.Lang 639 * @crossplatform 640 * @since 12 641 */ 642 push(chunk: Uint8Array | string | null, encoding?: string): boolean { 643 let bufferArr: Uint8Array; 644 if (encoding) { 645 this.setEncoding(encoding); 646 } 647 if (typeof chunk === 'string' || chunk instanceof Uint8Array) { 648 if (typeof chunk === 'string') { 649 bufferArr = this.encoder.encodeInto(chunk); 650 this.buf.push(...bufferArr); 651 this.readableLengthInner += bufferArr.length; 652 } else if (chunk instanceof Uint8Array) { 653 this.buf.push(...chunk); 654 this.readableLengthInner += chunk.length; 655 } 656 const highWaterMark = this.readableLengthInner <= this.readableHighWatermarkInner; 657 Promise.resolve().then((): void => { 658 try { 659 if (this.readableFlowingInner) { 660 !this.pauseInner && this.read(highWaterMark ? this.readableLengthInner : -1); 661 } else { 662 if (highWaterMark) { 663 this.doRead?.(this.readableHighWatermarkInner); 664 } 665 } 666 } catch (error) { 667 this.listener?.emit(ReadableEvent.ERROR, error); 668 this.listener?.emit(ReadableEvent.CLOSE); 669 } 670 this.listener?.emit(ReadableEvent.READABLE); 671 }); 672 return this.readableLengthInner < this.readableHighWatermarkInner; 673 } else if (chunk === null) { 674 if (!this.readableEndedInner && this.readableInner) { 675 !this.readableFlowingInner && this.listener?.emit(ReadableEvent.READABLE); 676 this.readableInner = false; 677 Promise.resolve().then((): void => { 678 this.readableEndedInner = true; 679 this.pauseInner = true; 680 this.closedInner = true; 681 this.listener?.emit(ReadableEvent.END); 682 this.listener?.emit(ReadableEvent.CLOSE); 683 }); 684 } 685 return false; 686 } else { 687 this.readableInner = false; 688 this.erroredInner = new BusinessError('ERR_INVALID_ARG_TYPE'); 689 this.listener?.emit(ReadableEvent.ERROR, this.erroredInner); 690 return false; 691 } 692 }; 693 694 throwError(error: Error): void { 695 this.erroredInner = error; 696 if (this.listener && this.listener.listenerCount(WritableEvent.ERROR) > 0) { 697 setTimeout((): void => { 698 this.listener?.emit(WritableEvent.ERROR, error); 699 }); 700 } else { 701 throw error; 702 } 703 } 704} 705 706// @ts-ignore 707Readable.prototype.doRead = null; 708 709class Writable { 710 public listener: EventEmitter | undefined; 711 private callbacks: { [key: string]: Function[] } = {}; 712 private buffer: ({ encoding?: string, chunk: string | Uint8Array, callback: Function })[] = []; 713 private writing: boolean = false; 714 private encoding: string | undefined; 715 protected encoder = new TextEncoder(); 716 private ending: boolean = false; 717 private writableObjectModeInner: boolean | undefined; 718 private writableHighWatermarkInner: number; 719 private writableInner: boolean | undefined; 720 private writableLengthInner: number | undefined; 721 private writableNeedDrainInner: boolean | undefined; 722 private writableCorkedInner: number = 0; 723 private writableEndedInner: boolean | undefined; 724 private writableFinishedInner: boolean | undefined; 725 private erroredInner: Error | undefined | null; 726 private closedInner: boolean | undefined; 727 728 /** 729 * The Writable constructor. 730 * 731 * @syscap SystemCapability.Utils.Lang 732 * @crossplatform 733 * @since 12 734 */ 735 constructor(options?: { 736 highWaterMark?: number | undefined; 737 objectMode?: boolean | undefined; 738 }) { 739 this.listener = new EventEmitter(); 740 if (!options) { 741 options = { 742 highWaterMark: DEFAULT_HIGH_WATER_MARK, 743 objectMode: false, 744 }; 745 } 746 this.writableHighWatermarkInner = options.highWaterMark ?? DEFAULT_HIGH_WATER_MARK; 747 this.writableObjectModeInner = options.objectMode || false; 748 this.writableLengthInner = 0; 749 this.writableEndedInner = false; 750 this.writableNeedDrainInner = false; 751 this.writableInner = true; 752 this.writableCorkedInner = 0; 753 this.writableFinishedInner = false; 754 this.erroredInner = null; 755 this.encoding = 'utf8'; 756 this.closedInner = false; 757 this.doInitialize((error: Error): void => { 758 if (error) { 759 this.listener?.emit(WritableEvent.ERROR, error); 760 } 761 }); 762 } 763 764 /** 765 * Returns boolean indicating whether it is in ObjectMode. 766 * 767 * @syscap SystemCapability.Utils.Lang 768 * @crossplatform 769 * @since 12 770 */ 771 get writableObjectMode(): boolean | undefined { 772 return this.writableObjectModeInner; 773 } 774 775 /** 776 * Value of highWaterMark. 777 * 778 * @syscap SystemCapability.Utils.Lang 779 * @crossplatform 780 * @since 12 781 */ 782 get writableHighWatermark(): number | undefined { 783 return this.writableHighWatermarkInner; 784 } 785 786 /** 787 * Is true if it is safe to call writable.write(), which means the stream has not been destroyed, errored, or ended. 788 * 789 * @syscap SystemCapability.Utils.Lang 790 * @crossplatform 791 * @since 12 792 */ 793 get writable(): boolean | undefined { 794 return this.writableInner; 795 } 796 797 /** 798 * Size of data this can be flushed, in bytes or objects. 799 * 800 * @syscap SystemCapability.Utils.Lang 801 * @crossplatform 802 * @since 12 803 */ 804 get writableLength(): number | undefined { 805 return this.writableLengthInner; 806 } 807 808 /** 809 * If the buffer of the stream is full and true, otherwise it is false. 810 * 811 * @syscap SystemCapability.Utils.Lang 812 * @crossplatform 813 * @since 12 814 */ 815 get writableNeedDrain(): boolean | undefined { 816 return this.writableNeedDrainInner; 817 } 818 819 /** 820 * Number of times writable.uncork() needs to be called in order to fully uncork the stream. 821 * 822 * @syscap SystemCapability.Utils.Lang 823 * @crossplatform 824 * @since 12 825 */ 826 get writableCorked(): number | undefined { 827 return this.writableCorkedInner; 828 }; 829 830 /** 831 * Whether Writable.end has been called. 832 * 833 * @syscap SystemCapability.Utils.Lang 834 * @crossplatform 835 * @since 12 836 */ 837 get writableEnded(): boolean | undefined { 838 return this.writableEndedInner; 839 } 840 841 /** 842 * Whether Writable.end has been called and all buffers have been flushed. 843 * 844 * @syscap SystemCapability.Utils.Lang 845 * @crossplatform 846 * @since 12 847 */ 848 get writableFinished(): boolean | undefined { 849 return this.writableFinishedInner; 850 } 851 852 /** 853 * Returns error if the stream has been destroyed with an error. 854 * 855 * @syscap SystemCapability.Utils.Lang 856 * @crossplatform 857 * @since 12 858 */ 859 get errored(): Error | undefined | null { 860 return this.erroredInner; 861 } 862 863 /** 864 * Writable completes destroyfile and returns true, otherwise returns false. 865 * 866 * @syscap SystemCapability.Utils.Lang 867 * @crossplatform 868 * @since 12 869 */ 870 get closed(): boolean | undefined { 871 return this.closedInner; 872 } 873 874 /** 875 * writes a chunk to Writable and invokes callback when the chunk is flushed. The return value indicates 876 * whether the internal buffer of the Writable reaches the hightWaterMark. If true is returned, the buffer 877 * does not reach the hightWaterMark. If false is returned, the buffer has been reached. The write function 878 * should be called after the drain event is triggered. If the write function is called continuously, 879 * the chunk is still added to the buffer until the memory overflows 880 * 881 * @param { string | Uint8Array } [chunk] - Data to be written. 882 * @param { string } [encoding] - Encoding type. 883 * @param { Function } [callback] - Callback after writing. 884 * @returns { boolean } Write success returns true, write failure returns false. 885 * @throws { BusinessError } 401 - if the input parameters are invalid. 886 * @throws { BusinessError } 10200035 - if doWrite not implemented, an exception will be thrown. 887 * @throws { BusinessError } 10200036 - if stream has been ended, writing data to it will throw an error. 888 * @throws { BusinessError } 10200037 - if the callback is called multiple times consecutively, an error will be thrown. 889 * @syscap SystemCapability.Utils.Lang 890 * @crossplatform 891 * @since 12 892 */ 893 write(chunk?: string | Uint8Array, encoding?: string, callback?: Function): boolean { 894 if (encoding) { 895 this.setDefaultEncoding(encoding); 896 } 897 if (chunk === null) { 898 throw new BusinessError(`Parameter error. The type of ${chunk} must be string or UintArray`, 401); 899 } 900 if (typeof chunk !== 'string' && !(chunk instanceof Uint8Array)) { 901 throw new BusinessError(`Parameter error. The type of ${chunk} must be string or UintArray`, 401); 902 } 903 if (this.ending && !this.writing) { 904 setTimeout((): void => { 905 this.erroredInner = new BusinessError('write after end', 10200036); 906 callback?.(this.erroredInner); 907 this.throwError(this.erroredInner); 908 }); 909 return false; 910 } 911 if (this.erroredInner) { 912 return false; 913 } 914 let flag = false; 915 if (chunk instanceof Uint8Array) { 916 flag = this.writeUint8Array(chunk, encoding ?? this.encoding, callback); 917 } else { 918 flag = this.writeString(chunk!, encoding ?? this.encoding, callback); 919 } 920 if (!flag) { 921 this.writableNeedDrainInner = true; 922 } 923 return flag; 924 } 925 926 private getChunkLength(chunk: string | Uint8Array): number { 927 if (chunk instanceof Uint8Array) { 928 return chunk.byteLength; 929 } else { 930 return this.encoder.encodeInto(chunk).byteLength; 931 } 932 } 933 934 private writeUint8Array(chunk: Uint8Array, encoding?: string, callback?: Function): boolean { 935 this.writableLengthInner! += this.getChunkLength(chunk); 936 const hasRemaining = this.writableLengthInner! < this.writableHighWatermark!; 937 const fnBack = runOnce((error?: Error): void => { 938 if (error && error instanceof Error) { 939 this.writableInner = false; 940 this.throwError(error); 941 return; 942 } 943 callback?.(error ?? null); 944 this.freshCache(); 945 }, (multipleTimes: boolean, err: Error): void => { 946 this.listener?.emit(WritableEvent.ERROR, ERR_MULTIPLE_CALLBACK); 947 }); 948 if (this.writableCorkedInner === 0) { 949 if (!this.writing) { 950 this.writing = true; 951 this.doWrite(chunk, encoding ?? 'utf8', fnBack); 952 } else { 953 this.buffer.push({ chunk: chunk, encoding: encoding, callback: fnBack }); 954 } 955 } else { 956 this.buffer.push({ chunk: chunk, encoding: encoding, callback: fnBack }); 957 } 958 return hasRemaining; 959 } 960 961 private writeString(chunk: string, encoding?: string, callback?: Function): boolean { 962 this.writableLengthInner! += this.getChunkLength(chunk); 963 const hasRemaining = this.writableLengthInner! < this.writableHighWatermark!; 964 const fb = runOnce((error?: Error): void => { 965 if (error) { 966 this.erroredInner = error; 967 } 968 callback?.(error ?? null); 969 this.freshCache(); 970 if (error && error instanceof Error) { 971 this.writableInner = false; 972 this.erroredInner = error; 973 Promise.resolve().then((): void => { 974 if (this.isOnError()) { 975 this.emitErrorOnce(error); 976 } else { 977 this.emitErrorOnce(error); 978 throw error; 979 } 980 }); 981 return; 982 } 983 }, () => { 984 this.emitErrorOnce(ERR_MULTIPLE_CALLBACK, true); 985 }); 986 987 if (this.writableCorkedInner === 0) { 988 if (!this.writing) { 989 this.writing = true; 990 this.doWrite?.(chunk, encoding ?? 'utf8', fb); 991 if (!this.doWrite && !hasRemaining) { 992 Promise.resolve().then(() => { 993 this.writableLengthInner = 0; 994 this.listener?.emit(WritableEvent.DRAIN); 995 }); 996 } 997 } else { 998 this.buffer.push({ chunk: chunk, encoding: encoding, callback: fb }); 999 } 1000 } else { 1001 this.buffer.push({ chunk: chunk, encoding: encoding, callback: fb }); 1002 } 1003 return this.erroredInner ? false : hasRemaining; 1004 } 1005 1006 private freshCache(): void { 1007 const current = this.buffer.shift(); 1008 if (current) { 1009 this.doWrite?.(current.chunk, current.encoding ?? 'utf8', current.callback); 1010 this.writableLengthInner! -= this.getChunkLength(current.chunk); 1011 } else { 1012 this.writing = false; 1013 this.writableLengthInner = 0; 1014 if (!this.finishMayBe()) { 1015 this.writableNeedDrainInner = false; 1016 this.listener?.emit(WritableEvent.DRAIN); 1017 } 1018 } 1019 } 1020 1021 private freshCacheV(): void { 1022 if (this.buffer.length > 0) { 1023 if (this.doWritev) { 1024 const funCallback = runOnce((error?: Error): void => { 1025 if (error && error instanceof Error) { 1026 this.erroredInner = error; 1027 this.listener?.emit(WritableEvent.ERROR, error); 1028 return; 1029 } 1030 this.buffer = []; 1031 }, () => { 1032 this.listener?.emit(WritableEvent.ERROR, ERR_MULTIPLE_CALLBACK); 1033 }); 1034 // @ts-ignore 1035 this.doWritev(this.buffer.map((item: { encoding?: string; chunk: string | Uint8Array; callback: Function }) => { 1036 return item.chunk; 1037 }), funCallback); 1038 if (!this.finishMayBe()) { 1039 this.writableNeedDrainInner = true; 1040 this.listener?.emit(WritableEvent.DRAIN); 1041 } 1042 } else { 1043 this.freshCache(); 1044 } 1045 } 1046 } 1047 1048 endInner(chunk?: string | Uint8Array, encoding?: string, callback?: Function): void { 1049 if (chunk) { 1050 if (this.writing) { 1051 this.write(chunk, encoding, callback); 1052 } else { 1053 this.doWrite?.(chunk!, encoding ?? 'utf8', (error?: Error): void => { 1054 if (error && error instanceof Error) { 1055 this.erroredInner = error; 1056 this.listener?.emit(WritableEvent.ERROR, error); 1057 } else { 1058 this.writableLengthInner! -= this.getChunkLength(chunk); 1059 this.writing = false; 1060 this.finishMayBe(); 1061 } 1062 this.writableEndedInner = true; 1063 this.writableInner = false; 1064 asyncFn((): void => { 1065 callback?.(this.erroredInner ?? error ?? null); 1066 })(); 1067 if (!this.writableFinishedInner) { 1068 this.writableFinishedInner = true; 1069 asyncFn((): void => { 1070 if ((!this.erroredInner || this.erroredInner.message === 'write after end') && !this.isOnError()) { 1071 this.listener?.emit(WritableEvent.FINISH); 1072 } 1073 })(); 1074 } 1075 }); 1076 } 1077 } else { 1078 if (this.writableEndedInner) { 1079 this.erroredInner = new BusinessError('write after end', 10200036); 1080 callback?.(this.erroredInner); 1081 } else { 1082 setTimeout(() => callback?.(this.erroredInner)); 1083 } 1084 if (!this.writableFinishedInner && !this.writableEndedInner) { 1085 this.writableFinishedInner = true; 1086 asyncFn((): void => { 1087 if (!this.erroredInner || this.erroredInner.message === 'write after end') { 1088 this.listener?.emit(WritableEvent.FINISH); 1089 } 1090 })(); 1091 } 1092 } 1093 } 1094 1095 /** 1096 * Write the last chunk to Writable. 1097 * 1098 * @param { string | Uint8Array } [chunk] - Data to be written. 1099 * @param { string } [encoding] - Encoding type. 1100 * @param { Function } [callback] - Callback after writing. 1101 * @returns { Writable } Returns the Writable object. 1102 * @throws { BusinessError } 401 - if the input parameters are invalid. 1103 * @throws { BusinessError } 10200035 - if doWrite not implemented, an exception will be thrown. 1104 * @syscap SystemCapability.Utils.Lang 1105 * @crossplatform 1106 * @since 12 1107 */ 1108 end(chunk?: string | Uint8Array, encoding?: string, callback?: Function): Writable { 1109 if (this.writableFinishedInner) { 1110 this.erroredInner = ERR_STREAM_ALREADY_FINISHED; 1111 setTimeout(() => callback?.(this.erroredInner)); 1112 this.emitErrorOnce(this.erroredInner); 1113 return this; 1114 } else if (this.writableEndedInner) { 1115 this.erroredInner = ERR_WRITE_AFTER_END; 1116 setTimeout(() => callback?.(this.erroredInner)); 1117 this.emitErrorOnce(this.erroredInner); 1118 return this; 1119 } 1120 if (this.erroredInner) { 1121 setTimeout(() => callback?.(this.erroredInner)); 1122 return this; 1123 } 1124 this.writableNeedDrainInner = false; 1125 this.closedInner = true; 1126 this.ending = true; 1127 this.writableInner = false; 1128 if (!this.writableEndedInner) { 1129 if (this.writableCorkedInner === 0) { 1130 this.endInner(chunk, encoding, callback); 1131 } else { 1132 this.writableCorkedInner = 1; 1133 this.uncork(); 1134 } 1135 } 1136 this.writableEndedInner = true; 1137 this.listener?.emit(WritableEvent.CLOSE); 1138 return this; 1139 } 1140 1141 private finishMayBe(): boolean { 1142 return !this.writing && this.writableCorkedInner === 0 && this.ending; 1143 } 1144 1145 /** 1146 * Set the default encoding mode. 1147 * 1148 * @param { string } [encoding] - Encoding type.Default: utf8. 1149 * @returns { boolean } Setting successful returns true, setting failed returns false. 1150 * @throws { BusinessError } 401 - if the input parameters are invalid. 1151 * @syscap SystemCapability.Utils.Lang 1152 * @crossplatform 1153 * @since 12 1154 */ 1155 1156 setDefaultEncoding(encoding?: string): boolean { 1157 if (!encoding) { 1158 return false; 1159 } 1160 if (encoding.toLowerCase() === 'utf8') { 1161 encoding = 'utf-8'; 1162 } 1163 if (ENCODING_SET.indexOf(encoding.toLowerCase()) !== -1) { 1164 this.encoding = encoding.toLowerCase(); 1165 try { 1166 if (encoding.toLowerCase() !== 'ascii') { 1167 this.encoder = new TextEncoder(encoding); 1168 } 1169 } catch (e) { 1170 this.throwError(e as Error); 1171 return false; 1172 } 1173 return true; 1174 } else { 1175 const err: BusinessError = new BusinessError(`Unknown encoding: ${encoding}`); 1176 this.throwError(err); 1177 return false; 1178 } 1179 } 1180 1181 /** 1182 * After the call, all Write operations will be forced to write to the buffer instead of being flushed. 1183 * 1184 * @returns { boolean } Setting successful returns true, setting failed returns false. 1185 * @syscap SystemCapability.Utils.Lang 1186 * @crossplatform 1187 * @since 12 1188 */ 1189 cork(): boolean { 1190 this.writableCorkedInner += 1; 1191 return true; 1192 } 1193 1194 /** 1195 * After calling, flush all buffers. 1196 * 1197 * @returns { boolean } Setting successful returns true, setting failed returns false. 1198 * @syscap SystemCapability.Utils.Lang 1199 * @crossplatform 1200 * @since 12 1201 */ 1202 uncork(): boolean { 1203 if (this.writableCorkedInner > 0) { 1204 this.writableCorkedInner -= 1; 1205 } 1206 if (this.writableCorkedInner === 0) { 1207 this.freshCacheV(); 1208 } 1209 return true; 1210 } 1211 1212 /** 1213 * Registering Event Messages. 1214 * 1215 * @param { string } event - Register Event. 1216 * @param { Callback<emitter.EventData> } callback - event callbacks. 1217 * @throws { BusinessError } 401 - if the input parameters are invalid. 1218 * @syscap SystemCapability.Utils.Lang 1219 * @crossplatform 1220 * @since 12 1221 */ 1222 on(event: string, callback: Function): void { 1223 this.callbacks[event] = this.callbacks[event] ?? []; 1224 const callbackFn = callback.bind(this); 1225 this.callbacks[event].push(callbackFn); 1226 this.listener?.on(event, callbackFn); 1227 } 1228 1229 /** 1230 * Cancel event message. 1231 * 1232 * @param { string } event - Register Event. 1233 * @param { Callback<emitter.EventData> } callback - event callbacks. 1234 * @throws { BusinessError } 401 - if the input parameters are invalid. 1235 * @syscap SystemCapability.Utils.Lang 1236 * @crossplatform 1237 * @since 12 1238 */ 1239 off(event: string, callback?: Function): void { 1240 if (!event) { 1241 this.throwError(new BusinessError(`Parameter error. The value of event is null `, 401)); 1242 return; 1243 } 1244 if (callback) { 1245 this.callbacks[event]?.forEach((it: Function): void => { 1246 if (callback && callback === it) { 1247 this.listener?.off(event, it); 1248 } 1249 }); 1250 } else { 1251 this.callbacks[event]?.forEach((it: Function) => this.listener?.off(event, it)); 1252 } 1253 } 1254 1255 noWriteOpes(chunk: string | Uint8Array, encoding: string, callback: Function): void { 1256 if (this.doWritev === null) { 1257 this.throwError(ERR_DOWRITE_NOT_IMPLEMENTED); 1258 } else { 1259 // @ts-ignore 1260 this.doWritev([chunk], callback); 1261 } 1262 } 1263 1264 /** 1265 * This method is invoked by the Writable method during initialization and should not be invoked actively. 1266 * After the resource is initialized in the doInitialize method, the callback () method is invoked. 1267 * 1268 * @param { Function } callback - Callback when the stream has completed the initial. 1269 * @throws { BusinessError } 401 - if the input parameters are invalid. 1270 * @syscap SystemCapability.Utils.Lang 1271 * @crossplatform 1272 * @since 12 1273 */ 1274 doInitialize(callback: Function): void { 1275 } 1276 1277 /** 1278 * Implemented by subclass inheritance. The implementation logic of flushing chunks in the buffer should not be 1279 * actively called. The call is controlled by Writable.write. 1280 * 1281 * @param { string | Uint8Array } chunk - Data to be written. 1282 * @param { string } encoding - Encoding type. 1283 * @param { Function } callback - Callback after writing. 1284 * @throws { BusinessError } 401 - if the input parameters are invalid. 1285 * @syscap SystemCapability.Utils.Lang 1286 * @crossplatform 1287 * @since 12 1288 */ 1289 doWrite(chunk: string | Uint8Array, encoding: string, callback: Function): void { 1290 throw ERR_DOWRITE_NOT_IMPLEMENTED; 1291 } 1292 1293 /** 1294 * The implementation logic of flushing chunks in the buffer in batches should not be actively called. 1295 * The call is controlled by Writable.write. 1296 * 1297 * @param { string[] | Uint8Array[] } chunk - Data to be written. 1298 * @param { Function } callback - Callback after writing. 1299 * @returns { Writable } Returns the Writable object. 1300 * @throws { BusinessError } 401 - if the input parameters are invalid. 1301 * @syscap SystemCapability.Utils.Lang 1302 * @crossplatform 1303 * @since 12 1304 */ 1305 doWritev(chunk: string[] | Uint8Array[], callback: Function): void { 1306 throw ERR_DOWRITEV_NOT_IMPLEMENTED; 1307 } 1308 1309 throwError(error: Error): void { 1310 this.erroredInner = error; 1311 if (this.listener && this.listener.listenerCount(WritableEvent.ERROR) > 0) { 1312 setTimeout(() => { 1313 this.listener?.emit(WritableEvent.ERROR, error); 1314 }); 1315 } else { 1316 throw error; 1317 } 1318 } 1319 1320 private isOnError(): boolean { 1321 return this.listener?.isOn(WritableEvent.ERROR) || false; 1322 } 1323 1324 private emitErrorExecutedInner = false; 1325 // @ts-ignore 1326 private emitErrorIdInner: number; 1327 1328 private emitErrorOnce(error: Error, reset?: boolean): void { 1329 if (reset) { 1330 this.emitErrorExecutedInner = false; 1331 clearTimeout(this.emitErrorIdInner); 1332 } 1333 if (!this.emitErrorExecutedInner) { 1334 this.emitErrorExecutedInner = true; 1335 // @ts-ignore 1336 this.emitErrorIdInner = setTimeout((): void => { 1337 this.listener?.emit(WritableEvent.ERROR, this.erroredInner ?? error); 1338 }); 1339 } 1340 } 1341} 1342 1343// @ts-ignore 1344Writable.prototype.doWritev = null; 1345Writable.prototype.doWrite = Writable.prototype.noWriteOpes; 1346 1347class Duplex extends Readable { 1348 private _writable: Writable; 1349 1350 constructor() { 1351 super(); 1352 this._writable = new Writable(); 1353 const that = this; 1354 if (this.doWrite) { 1355 this._writable.doWrite = this.doWrite?.bind(that); 1356 } 1357 this._writable.doWritev = this.doWritev?.bind(that); 1358 Object.defineProperties(that, { 1359 doWrite: { 1360 get(): Function { 1361 return that._writable.doWrite?.bind(that); 1362 }, 1363 set(value: Function):void { 1364 that._writable.doWrite = value.bind(that); 1365 } 1366 }, 1367 doWritev: { 1368 get(): Function { 1369 return that._writable.doWritev?.bind(that); 1370 }, 1371 set(value: Function):void { 1372 // @ts-ignore 1373 that._writable.doWritev = value?.bind(that); 1374 } 1375 } 1376 }); 1377 } 1378 1379 /** 1380 * writes a chunk to Writable and invokes callback when the chunk is flushed. The return value indicates 1381 * whether the internal buffer of the Writable reaches the hightWaterMark. If true is returned, the buffer 1382 * does not reach the hightWaterMark. If false is returned, the buffer has been reached. The write function 1383 * should be called after the drain event is triggered. If the write function is called continuously, 1384 * the chunk is still added to the buffer until the memory overflows 1385 * 1386 * @param { string | Uint8Array } [chunk] - Data to be written. 1387 * @param { string } [encoding] - Encoding type. 1388 * @param { Function } [callback] - Callback after writing. 1389 * @returns { boolean } Write success returns true, write failure returns false. 1390 * @throws { BusinessError } 401 - if the input parameters are invalid. 1391 * @throws { BusinessError } 10200036 - if stream has been ended, writing data to it will throw an error. 1392 * @throws { BusinessError } 10200037 - if the callback is called multiple times consecutively, an error will be thrown. 1393 * @throws { BusinessError } 10200039 - if a class inherits from Transform, it must implement doTransform; otherwise, an error will be raised. 1394 * @syscap SystemCapability.Utils.Lang 1395 * @crossplatform 1396 * @since 12 1397 */ 1398 write(chunk?: string | Uint8Array, encoding?: string, callback?: Function): boolean { 1399 return this._writable.write(chunk, encoding, callback); 1400 } 1401 1402 /** 1403 * Write the last chunk to Writable. 1404 * 1405 * @param { string | Uint8Array } [chunk] - Data to be written. 1406 * @param { string } [encoding] - Encoding type. 1407 * @param { Function } [callback] - Callback after writing. 1408 * @returns { Writable } Returns the Writable object. 1409 * @throws { BusinessError } 401 - if the input parameters are invalid. 1410 * @throws { BusinessError } 10200039 - if a class inherits from Transform, it must implement doTransform; otherwise, an error will be raised. 1411 * @syscap SystemCapability.Utils.Lang 1412 * @crossplatform 1413 * @since 12 1414 */ 1415 end(chunk?: string | Uint8Array, encoding?: string, callback?: Function): Writable { 1416 super.setEndType(); 1417 return this._writable.end(chunk, encoding, callback); 1418 } 1419 1420 on(event: string, callback: Function): void { 1421 super.on(event, callback); 1422 this._writable.on(event, callback); 1423 } 1424 1425 off(event: string, callback?: Function): void { 1426 super.off(event); 1427 this._writable.off(event, callback); 1428 } 1429 1430 getListener(): EventEmitter | undefined { 1431 return this._writable.listener; 1432 } 1433 1434 setDefaultEncoding(encoding?: string): boolean { 1435 return this._writable.setDefaultEncoding(encoding); 1436 } 1437 1438 cork(): boolean { 1439 return this._writable.cork(); 1440 } 1441 1442 uncork(): boolean { 1443 return this._writable.uncork(); 1444 } 1445 1446 doInitialize(callback: Function): void { 1447 super.doInitialize(callback); 1448 this._writable.doInitialize(callback); 1449 } 1450 1451 doWrite(chunk: string | Uint8Array, encoding: string, callback: Function): void { 1452 } 1453 1454 doWritev(chunk: string[] | Uint8Array[], callback: Function): void { 1455 this._writable.doWritev?.(chunk, callback); 1456 } 1457 1458 get writableObjectMode(): boolean { 1459 return this._writable.writableObjectMode || false; 1460 } 1461 1462 get writableHighWatermark(): number { 1463 return this._writable.writableHighWatermark || 0; 1464 } 1465 1466 get writable(): boolean { 1467 return this._writable.writable || false; 1468 } 1469 1470 get writableLength(): number { 1471 return this._writable.writableLength || 0; 1472 } 1473 1474 get writableNeedDrain(): boolean { 1475 return this._writable.writableNeedDrain || false; 1476 } 1477 1478 get writableCorked(): number { 1479 return this._writable.writableCorked || 0; 1480 } 1481 1482 get writableEnded(): boolean { 1483 return this._writable.writableEnded || false; 1484 } 1485 1486 get writableFinished(): boolean { 1487 return this._writable.writableFinished || false; 1488 } 1489} 1490 1491// @ts-ignore 1492Duplex.prototype.doWrite = null; 1493// @ts-ignore 1494Duplex.prototype.doWritev = null; 1495 1496class Transform extends Duplex { 1497 /** 1498 * The Transform constructor. 1499 * 1500 * @syscap SystemCapability.Utils.Lang 1501 * @crossplatform 1502 * @since 12 1503 */ 1504 constructor() { 1505 super(); 1506 } 1507 1508 on(event: string, callback: Function): void { 1509 super.on(event, callback); 1510 } 1511 1512 /** 1513 * Write the last chunk to Writable. 1514 * 1515 * @param { string | Uint8Array } [chunk] - Data to be written. 1516 * @param { string } [encoding] - Encoding type. 1517 * @param { Function } [callback] - Callback after writing. 1518 * @returns { Writable } Returns the Writable object. 1519 * @throws { BusinessError } 401 - if the input parameters are invalid. 1520 * @throws { BusinessError } 10200039 - if a class inherits from Transform, it must implement doTransform; otherwise, an error will be raised. 1521 * @syscap SystemCapability.Utils.Lang 1522 * @crossplatform 1523 * @since 12 1524 */ 1525 end(chunk?: string | Uint8Array, encoding?: string, callback?: Function): Writable { 1526 if (!this.doTransform) { 1527 throw new BusinessError('The doTransform() method is not implemented', 10200039); 1528 } 1529 if (chunk instanceof Uint8Array) { 1530 const chunkString = this.stringDecoder.write(chunk); 1531 this.doTransform(chunkString, encoding || 'utf8', callback || ((): void => { 1532 })); 1533 } else if (typeof chunk === 'string') { 1534 this.doTransform(chunk, encoding || 'utf8', callback || ((): void => { 1535 })); 1536 } 1537 this.doFlush?.((...args: (string | Uint8Array)[]) => { 1538 args.forEach((it: string | Uint8Array) => { 1539 if (it) { 1540 this.push(it ?? '', encoding); 1541 } 1542 }); 1543 }); 1544 const write:Writable = super.end(chunk, encoding, callback); 1545 return write; 1546 } 1547 1548 push(chunk: Uint8Array | string | null, encoding?: string): boolean { 1549 return super.push(chunk, encoding); 1550 } 1551 1552 /** 1553 * Convert the input data. After the conversion, Transform.push can be called to send the input to the read stream. 1554 * Transform.push should not be called Transform.write to call. 1555 * 1556 * @param { string } chunk - Input data to be converted. 1557 * @param { string } encoding - If the chunk is a string, then this is the encoding type. If chunk is a buffer, 1558 * then this is the special value 'buffer'. Ignore it in that case. 1559 * @param { Function } callback - Callback after conversion. 1560 * @throws { BusinessError } 401 - if the input parameters are invalid. 1561 * @syscap SystemCapability.Utils.Lang 1562 * @crossplatform 1563 * @since 12 1564 */ 1565 doTransform(chunk: string, encoding: string, callback: Function): void { 1566 throw new BusinessError('The doTransform() method is not implemented'); 1567 } 1568 1569 /** 1570 * After all data is flushed to the write stream, you can use the Transform.doFlush writes some extra data, should 1571 * not be called actively, only called by Writable after flushing all data. 1572 * 1573 * @param { Function } callback - Callback after flush completion. 1574 * @throws { BusinessError } 401 - if the input parameters are invalid. 1575 * @syscap SystemCapability.Utils.Lang 1576 * @crossplatform 1577 * @since 12 1578 */ 1579 doFlush(callback: Function): void { 1580 } 1581 1582 /** 1583 * writes a chunk to Writable and invokes callback when the chunk is flushed. The return value indicates 1584 * whether the internal buffer of the Writable reaches the hightWaterMark. If true is returned, the buffer 1585 * does not reach the hightWaterMark. If false is returned, the buffer has been reached. The write function 1586 * should be called after the drain event is triggered. If the write function is called continuously, 1587 * the chunk is still added to the buffer until the memory overflows 1588 * 1589 * @param { string | Uint8Array } [chunk] - Data to be written. 1590 * @param { string } [encoding] - Encoding type. 1591 * @param { Function } [callback] - Callback after writing. 1592 * @returns { boolean } Write success returns true, write failure returns false. 1593 * @throws { BusinessError } 401 - if the input parameters are invalid. 1594 * @throws { BusinessError } 10200036 - if stream has been ended, writing data to it will throw an error. 1595 * @throws { BusinessError } 10200037 - if the callback is called multiple times consecutively, an error will be thrown. 1596 * @throws { BusinessError } 10200039 - if a class inherits from Transform, it must implement doTransform; otherwise, an error will be raised. 1597 * @syscap SystemCapability.Utils.Lang 1598 * @crossplatform 1599 * @since 12 1600 */ 1601 write(chunk?: string | Uint8Array, encoding?: string, callback?: Function): boolean { 1602 if (typeof chunk === 'string') { 1603 const callBackFunction = runOnce((error: Error) => { 1604 if (error) { 1605 this.getListener()?.emit(WritableEvent.ERROR, error); 1606 } 1607 }, () => { 1608 const err:BusinessError = new BusinessError('Callback called multiple times', 10200037); 1609 this.getListener()?.emit(WritableEvent.ERROR, err); 1610 }); 1611 this.doTransform?.(chunk ?? '', encoding ?? 'utf8', callBackFunction); 1612 } 1613 return super.write(chunk, encoding, callback); 1614 } 1615 1616 doRead(size: number): void { 1617 } 1618 1619 doWrite(chunk: string | Uint8Array, encoding: string, callback: Function):void { 1620 super.doWrite?.(chunk, encoding, callback); 1621 } 1622} 1623 1624// @ts-ignore 1625Transform.prototype.doTransform = null; 1626// @ts-ignore 1627Transform.prototype.doFlush = null; 1628 1629export default { 1630 Readable: Readable, 1631 Writable: Writable, 1632 Duplex: Duplex, 1633 Transform: Transform, 1634}; 1635