Luồng — Hướng dẫn chính thức

Tìm hiểu cách sử dụng dữ liệu dễ đọc, có thể ghi và biến đổi luồng bằng API Luồng.

API Luồng cho phép bạn truy cập các luồng dữ liệu nhận được qua mạng theo phương thức lập trình hoặc được tạo bằng bất kỳ cách nào cục bộ và hãy xử lý chúng bằng JavaScript. Truyền trực tuyến bao gồm việc chia nhỏ tài nguyên mà bạn muốn nhận, gửi hoặc chuyển đổi thành các đoạn nhỏ rồi xử lý từng đoạn. Trong khi phát trực tiếp là một điều gì đó trình duyệt vẫn làm khi nhận được nội dung như HTML hoặc video hiển thị trên trang web. chưa từng có cho JavaScript trước fetch với các luồng được ra mắt vào năm 2015.

Trước đây, nếu bạn muốn xử lý tài nguyên ở dạng nào đó (ví dụ: video hoặc tệp văn bản, v.v.), bạn sẽ phải tải toàn bộ tệp xuống, chờ cho tệp được giải tuần tự sang định dạng phù hợp, rồi xử lý dữ liệu đó. Với luồng khả dụng cho JavaScript, tất cả điều này đều thay đổi. Giờ đây, bạn có thể xử lý dữ liệu thô bằng JavaScript dần dần dưới dạng ngay khi có trên máy khách mà không cần tạo vùng đệm, chuỗi hoặc blob. Điều này mở ra một số trường hợp sử dụng, một số trường hợp trong số đó tôi liệt kê bên dưới:

  • Hiệu ứng video: dẫn luồng video có thể đọc được thông qua luồng biến đổi có áp dụng hiệu ứng theo thời gian thực.
  • Nén dữ liệu: định tuyến luồng tệp thông qua luồng biến đổi có chọn lọc (de) nén tệp đó.
  • Giải mã hình ảnh: dẫn luồng phản hồi HTTP thông qua luồng biến đổi giải mã byte thành dữ liệu bitmap, rồi thông qua một luồng biến đổi khác giúp dịch bitmap thành PNG. Nếu được cài đặt bên trong trình xử lý fetch của một trình chạy dịch vụ, điều này cho phép bạn polyfill một cách minh bạch các định dạng hình ảnh mới như AVIF.

Hỗ trợ trình duyệt

ReadableStream và WritableStream

Hỗ trợ trình duyệt

  • Chrome: 43.
  • Cạnh: 14.
  • Firefox: 65.
  • Safari: 10.1.

Nguồn

TransformStream

Hỗ trợ trình duyệt

  • Chrome: 67.
  • Cạnh: 79.
  • Firefox: 102.
  • Safari: 14.1.

Nguồn

Các khái niệm chính

Trước khi tìm hiểu chi tiết về các loại sự kiện phát trực tiếp, tôi xin được giới thiệu một số khái niệm chính.

Viên nhỏ

Phân đoạn là một phần dữ liệu được ghi hoặc đọc từ một luồng. Mật khẩu có thể là bất kỳ nhãn nào loại; luồng thậm chí có thể chứa các đoạn thuộc nhiều loại khác nhau. Trong hầu hết trường hợp, phân đoạn sẽ không phải là phân đoạn có tính nguyên tử cao nhất đơn vị dữ liệu cho một luồng nhất định. Ví dụ: một luồng byte có thể chứa các phân đoạn gồm 16 Đơn vị KiB Uint8Array, thay vì một byte.

Luồng có thể đọc

Luồng có thể đọc được đại diện cho nguồn dữ liệu mà bạn có thể đọc từ đó. Nói cách khác, dữ liệu xuất hiện khỏi một luồng có thể đọc được. Cụ thể, luồng có thể đọc được là một thực thể của ReadableStream .

Luồng có thể ghi

Luồng có thể ghi đại diện cho đích đến của dữ liệu mà bạn có thể ghi vào đó. Nói cách khác, dữ liệu truy cập vào luồng có thể ghi. Cụ thể, luồng có thể ghi là một thực thể của Lớp WritableStream.

Biến đổi luồng

Luồng chuyển đổi bao gồm một cặp luồng: một luồng có thể ghi, còn gọi là phía có thể ghi, và luồng có thể đọc được, còn gọi là mặt có thể đọc được. Một phép ẩn dụ trong thế giới thực cho điều này sẽ là thông dịch viên đồng thời giúp dịch từ ngôn ngữ này sang ngôn ngữ khác một cách nhanh chóng. Theo cách dành riêng cho dòng chuyển đổi, việc viết dẫn đến kết quả là dữ liệu mới được cung cấp để đọc từ cạnh có thể đọc được. Cụ thể, mọi đối tượng có thuộc tính writable và thuộc tính readable đều có thể phân phát làm luồng biến đổi. Tuy nhiên, lớp TransformStream tiêu chuẩn giúp tạo dễ dàng hơn một cặp được gắn vào với nhau đúng cách.

Chuỗi ống

Các luồng chủ yếu được dùng bằng cách kết nối các luồng với nhau. Có thể truyền trực tiếp một luồng có thể đọc được vào một luồng có thể ghi, bằng cách sử dụng phương thức pipeTo() của luồng có thể đọc được hoặc có thể được dẫn qua một hoặc nhiều hơn là biến đổi luồng trước tiên, sử dụng phương thức pipeThrough() của luồng có thể đọc được. Tập hợp các luồng được nối với nhau theo cách này được gọi là chuỗi ống.

Áp lực ngược

Sau khi xây dựng một chuỗi ống, chuỗi này sẽ truyền các tín hiệu về tốc độ lưu chuyển của các đoạn thông qua đó. Nếu có bất kỳ bước nào trong chuỗi chưa thể chấp nhận các phân đoạn, thì bước đó sẽ truyền tín hiệu ngược qua chuỗi ống, cho đến khi cuối cùng nguồn ban đầu được yêu cầu ngừng sản xuất đoạn sao cho một cách nhanh chóng. Quá trình chuẩn hoá luồng này được gọi là áp suất ngược.

Phát bóng

Bạn có thể chèn một luồng có thể đọc được (được đặt tên theo hình dạng của một chữ "T" viết hoa) bằng cách sử dụng phương thức tee(). Thao tác này sẽ khoá luồng, tức là khiến luồng không thể sử dụng trực tiếp được nữa; tuy nhiên, việc này sẽ tạo ra hai nguồn mới luồng (được gọi là nhánh) có thể được sử dụng độc lập. Việc phát bóng cũng rất quan trọng vì bạn không thể tua lại hoặc khởi động lại sự kiện phát trực tiếp, hãy nói thêm về điều này sau.

Sơ đồ về chuỗi ống bao gồm một luồng có thể đọc được đến từ lệnh gọi đến API tìm nạp. Sau đó, luồng này được dẫn qua luồng biến đổi có đầu ra được truyền và sau đó được gửi đến trình duyệt để có luồng kết quả đầu tiên có thể đọc được và tới bộ nhớ đệm của Service worker cho luồng kết quả thứ hai có thể đọc được.
Chuỗi ống.

Cơ chế của một luồng có thể đọc được

Luồng có thể đọc được là nguồn dữ liệu được biểu thị bằng JavaScript bởi một đối tượng ReadableStream từ một nguồn cơ bản. Chiến lược phát hành đĩa đơn ReadableStream() hàm khởi tạo sẽ tạo và trả về một đối tượng luồng có thể đọc được từ các trình xử lý đã cho. Có hai của nguồn cơ bản:

  • Nguồn đẩy liên tục đẩy dữ liệu đến bạn khi bạn đã truy cập vào chúng và bạn có quyền quyết định bắt đầu, tạm dừng hoặc huỷ quyền truy cập vào luồng. Ví dụ: luồng video trực tiếp, sự kiện do máy chủ gửi, hoặc WebSockets.
  • Nguồn lấy dữ liệu yêu cầu bạn yêu cầu rõ ràng dữ liệu từ những nguồn đó sau khi kết nối. Ví dụ bao gồm cả hoạt động HTTP qua lệnh gọi fetch() hoặc XMLHttpRequest.

Dữ liệu trong luồng được đọc tuần tự trong các phần nhỏ gọi là phân đoạn. Các đoạn dữ liệu được đưa vào một luồng được coi là đã được đưa vào hàng đợi. Điều này có nghĩa là họ đang đợi trong hàng đợi sẵn sàng để đọc. Hàng đợi nội bộ theo dõi các đoạn chưa được đọc.

Chiến lược xếp hàng là một đối tượng xác định cách một sự kiện phát trực tiếp sẽ báo hiệu áp lực ngược dựa trên trạng thái của hàng đợi nội bộ. Chiến lược xếp hàng chỉ định kích thước cho mỗi đoạn và so sánh tổng kích thước của tất cả các phần trong hàng đợi thành một số được chỉ định, được gọi là dấu nước cao.

Trình đọc sẽ đọc các đoạn trong luồng. Trình đọc này truy xuất dữ liệu từng đoạn tại một thời gian, cho phép bạn thực hiện bất kỳ loại thao tác nào bạn muốn đối với máy chủ đó. Người đọc và người khác mã xử lý đi kèm với mã đó được gọi là người tiêu dùng (consumer).

Cấu trúc tiếp theo trong ngữ cảnh này được gọi là bộ điều khiển. Mỗi luồng có thể đọc được có bộ điều khiển, đúng như tên gọi, cho phép bạn kiểm soát luồng.

Mỗi lần, chỉ một người đọc có thể đọc một luồng; khi một trình đọc được tạo và bắt đầu đọc luồng (tức là trở thành người đọc đang hoạt động), thì trình đọc đó sẽ bị khoá. Nếu bạn muốn một độc giả khác tiếp quản đọc luồng, thông thường, bạn cần phải huỷ trình đọc đầu tiên trước khi làm bất cứ việc gì khác (mặc dù bạn có thể phát trực tiếp).

Tạo luồng dễ đọc

Bạn tạo một luồng có thể đọc được bằng cách gọi hàm khởi tạo của luồng đó ReadableStream(). Hàm khởi tạo có một đối số không bắt buộc là underlyingSource, đại diện cho một đối tượng bằng các phương thức và thuộc tính xác định cách hoạt động của thực thể luồng đã tạo.

underlyingSource

Bạn có thể sử dụng các phương thức không bắt buộc do nhà phát triển xác định sau đây:

  • start(controller): Được gọi ngay lập tức khi đối tượng được tạo. Chiến lược phát hành đĩa đơn có thể truy cập vào nguồn của luồng và thực hiện bất kỳ thao tác nào khác cần thiết để thiết lập chức năng phát trực tiếp. Nếu quá trình này được thực hiện không đồng bộ, phương thức này có thể trả lời hứa hẹn để cho biết thành công hay thất bại. Tham số controller được truyền đến phương thức này là một ReadableStreamDefaultController.
  • pull(controller): Có thể dùng để kiểm soát luồng khi tìm nạp nhiều đoạn hơn. Nó được gọi nhiều lần miễn là hàng đợi phân đoạn nội bộ của luồng chưa đầy, cho đến khi hàng đợi này đạt đến mức nước cao. Nếu kết quả gọi pull() là một lời hứa, pull() sẽ không được gọi lại cho đến khi thực hiện lời hứa đã nêu. Nếu lời hứa bị từ chối, luồng sẽ bị lỗi.
  • cancel(reason): Được gọi khi người sử dụng luồng huỷ sự kiện phát trực tiếp.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController hỗ trợ các phương thức sau:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Đối số thứ hai, tương tự như đối số không bắt buộc, của hàm khởi tạo ReadableStream()queuingStrategy. Đây là một đối tượng tuỳ ý xác định chiến lược xếp hàng cho luồng, trong đó mất hai thông số:

  • highWaterMark: Một số không âm cho biết vạch nước cao của dòng suối bằng chiến lược xếp hàng này.
  • size(chunk): Một hàm tính toán và trả về kích thước hữu hạn không âm của giá trị đoạn đã cho. Kết quả này được dùng để xác định áp lực ngược, biểu thị thông qua thuộc tính ReadableStreamDefaultController.desiredSize thích hợp. Miền này cũng chi phối thời điểm phương thức pull() của nguồn cơ bản được gọi.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Phương thức getReader()read()

Để đọc từ một luồng có thể đọc được, bạn cần một thiết bị đọc ReadableStreamDefaultReader. Phương thức getReader() của giao diện ReadableStream tạo một trình đọc và khoá luồng để nó. Khi luồng bị khoá, bạn không thể thu nạp trình đọc nào khác cho đến khi luồng này ra mắt.

read() của giao diện ReadableStreamDefaultReader trả về lời hứa cung cấp quyền truy cập vào phân đoạn trong hàng đợi nội bộ của luồng. Phương thức này đáp ứng hoặc từ chối kèm theo kết quả tuỳ thuộc vào trạng thái của luồng. Có các khả năng khác nhau như sau:

  • Nếu có một phân đoạn, lời hứa sẽ được thực hiện bằng một đối tượng của biểu mẫu
    { value: chunk, done: false }
  • Nếu luồng bị đóng, lời hứa sẽ được thực hiện bằng một đối tượng của biểu mẫu
    { value: undefined, done: true }
  • Nếu luồng bị lỗi, lời hứa sẽ bị từ chối cùng với lỗi liên quan.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Thuộc tính locked

Bạn có thể kiểm tra xem một luồng có thể đọc được có bị khoá hay không bằng cách truy cập vào ReadableStream.locked thuộc tính này.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Mẫu mã luồng có thể đọc

Mã mẫu dưới đây cho thấy tất cả các bước trong thực tế. Trước tiên, bạn cần tạo một ReadableStream trong Đối số underlyingSource (tức là lớp TimestampSource) xác định phương thức start(). Phương thức này yêu cầu controller của luồng dữ liệu enqueue() một dấu thời gian mỗi giây trong vòng 10 giây. Cuối cùng, thao tác này sẽ yêu cầu người điều khiển close() truyền trực tuyến. Bạn tiêu thụ phát trực tuyến bằng cách tạo một trình đọc thông qua phương thức getReader() và gọi read() cho đến khi luồng được done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result  = value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Lặp lại không đồng bộ

Kiểm tra mỗi lần lặp lại vòng lặp read() nếu luồng là done có thể không phải là API thuận tiện nhất. May mắn thay, sẽ sớm có một cách tốt hơn để làm điều này: lặp lại không đồng bộ.

for await (const chunk of stream) {
  console.log(chunk);
}

Giải pháp để sử dụng lặp lại không đồng bộ ngay hôm nay là triển khai hành vi bằng một polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Cách phát trực tiếp có thể đọc được

Phương thức tee() của Giao diện ReadableStream chuyển đến luồng có thể đọc được hiện tại, trả về một mảng 2 phần tử chứa 2 nhánh kết quả dưới dạng thực thể ReadableStream mới. Điều này cho phép hai trình đọc để đọc luồng cùng lúc. Bạn có thể thực hiện việc này, chẳng hạn như trong một trình chạy dịch vụ nếu bạn muốn tìm nạp phản hồi từ máy chủ và truyền phản hồi đó đến trình duyệt, đồng thời truyền trực tuyến phản hồi đến bộ nhớ đệm của trình chạy dịch vụ. Vì không thể tiêu thụ một nội dung phản hồi nhiều lần, nên bạn cần hai bản sao để làm việc này. Để huỷ luồng, bạn cần phải huỷ cả hai nhánh thu được. Xem sự kiện phát trực tiếp thường sẽ khoá khoá trong một khoảng thời gian để ngăn các độc giả khác khoá nó.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Luồng byte có thể đọc

Đối với những luồng biểu thị byte, một phiên bản mở rộng của luồng dễ đọc được cung cấp để xử lý byte hiệu quả, đặc biệt là bằng cách giảm thiểu số lượng bản sao. Các luồng byte cho phép tự tạo bộ đệm (BYOB) độc giả cần thu nạp. Cách triển khai mặc định có thể cung cấp một loạt kết quả, chẳng hạn như làm chuỗi hoặc vùng đệm mảng trong trường hợp WebSockets, trong khi luồng byte đảm bảo đầu ra byte. Ngoài ra, độc giả BYOB có lợi ích về tính ổn định. Đây là bởi vì nếu một bộ đệm tách ra, nó có thể đảm bảo rằng một bộ đệm sẽ không ghi hai lần vào cùng một bộ đệm, do đó cần tránh tình trạng tương tranh. Trình đọc BYOB có thể giảm số lần trình duyệt cần chạy thu gom rác vì có thể sử dụng lại vùng đệm.

Tạo luồng byte có thể đọc được

Bạn có thể tạo luồng byte có thể đọc được bằng cách truyền thêm tham số type vào phương thức Hàm khởi tạo ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

Nguồn cơ bản của luồng byte có thể đọc được cấp ReadableByteStreamController cho thao túng. Phương thức ReadableByteStreamController.enqueue() nhận một đối số chunk có giá trị là một ArrayBufferView. Thuộc tính ReadableByteStreamController.byobRequest trả về giá trị hiện tại Yêu cầu kéo BYOB hoặc rỗng nếu không có. Cuối cùng, ReadableByteStreamController.desiredSize sẽ trả về kích thước mong muốn để lấp đầy hàng đợi nội bộ của luồng được kiểm soát.

queuingStrategy

Đối số thứ hai, tương tự như đối số không bắt buộc, của hàm khởi tạo ReadableStream()queuingStrategy. Đây là một đối tượng tuỳ ý xác định chiến lược xếp hàng cho luồng, trong đó lấy một tham số:

  • highWaterMark: Một số byte không âm cho biết dấu nước cao của luồng bằng cách sử dụng chiến lược xếp hàng này. Tham số này dùng để xác định áp lực trở lại, biểu thị thông qua thuộc tính ReadableByteStreamController.desiredSize thích hợp. Miền này cũng chi phối thời điểm phương thức pull() của nguồn cơ bản được gọi.

Phương thức getReader()read()

Sau đó, bạn có thể truy cập vào ReadableStreamBYOBReader bằng cách đặt tham số mode tương ứng: ReadableStream.getReader({ mode: "byob" }). Tính năng này cho phép kiểm soát chính xác hơn đối với vùng đệm để tránh sao chép. Để đọc từ luồng byte, bạn cần gọi ReadableStreamBYOBReader.read(view), trong đó view là một ArrayBufferView.

Mẫu mã luồng byte có thể đọc

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset  = view.byteLength;
  }

  return buffer;
}

Hàm sau đây trả về các luồng byte đọc được, cho phép đọc dữ liệu không sao chép một cách hiệu quả mảng được tạo ngẫu nhiên. Thay vì sử dụng kích thước đoạn được xác định trước là 1.024, hàm này sẽ cố gắng lấp đầy vùng đệm do nhà phát triển cung cấp, cho phép kiểm soát hoàn toàn.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Cơ chế của một luồng có thể ghi

Luồng có thể ghi là một đích đến mà bạn có thể ghi dữ liệu vào đó, được biểu thị trong JavaScript bằng một Đối tượng WritableStream. Chiến dịch này đóng vai trò là phần trừu tượng nằm trên phần lưu trữ cơ bản — một phần I/O cấp thấp hơn mà dữ liệu thô được ghi.

Dữ liệu được ghi vào luồng thông qua một trình ghi, mỗi lần một đoạn. Mỗi phân đoạn có thể mất vô số biểu mẫu, giống như các mẩu thông tin trong một độc giả. Bạn có thể sử dụng bất kỳ mã nào mình thích để tạo các đoạn sẵn sàng để viết; trình ghi cùng mã liên kết được gọi là nhà sản xuất (production).

Khi một tác giả được tạo và bắt đầu ghi vào luồng (người viết đang hoạt động), thì tác giả đó được gọi là bị khoá vào đó. Tại một thời điểm, chỉ một người ghi có thể ghi vào luồng có thể ghi. Nếu bạn muốn một để bắt đầu ghi vào luồng của mình, thông thường bạn cần phát hành nó trước khi đính kèm của một tác giả khác.

Hàng đợi nội bộ theo dõi những phần đã được ghi vào luồng nhưng chưa được ghi đã được xử lý bởi bồn lưu trữ dữ liệu cơ bản.

Chiến lược xếp hàng là một đối tượng xác định cách một sự kiện phát trực tiếp sẽ báo hiệu áp lực ngược dựa trên trạng thái của hàng đợi nội bộ. Chiến lược xếp hàng chỉ định kích thước cho mỗi đoạn và so sánh tổng kích thước của tất cả các phần trong hàng đợi thành một số được chỉ định, được gọi là dấu nước cao.

Cấu trúc cuối cùng được gọi là bộ điều khiển. Mỗi luồng có thể ghi đều có một bộ điều khiển được liên kết cho phép bạn kiểm soát luồng (ví dụ: huỷ luồng đó).

Tạo luồng có thể ghi

Giao diện WritableStream của API Luồng cung cấp một mô hình trừu tượng tiêu chuẩn để ghi dữ liệu truyền trực tuyến vào một đích đến, đã biết như một bồn rửa. Đối tượng này đi kèm với hệ thống áp suất ngược và tính năng xếp hàng tích hợp sẵn. Bạn tạo một luồng có thể ghi bằng cách đang gọi hàm khởi tạo WritableStream(). Lớp này có tham số underlyingSink (không bắt buộc), đại diện cho một đối tượng bằng các phương thức và thuộc tính xác định cách hoạt động của thực thể luồng đã tạo.

underlyingSink

underlyingSink có thể bao gồm các phương thức không bắt buộc do nhà phát triển xác định sau đây. controller được chuyển đến một số phương thức. WritableStreamDefaultController.

  • start(controller): Phương thức này được gọi ngay khi đối tượng được tạo. Chiến lược phát hành đĩa đơn nội dung của phương thức này nên nhằm mục đích truy cập vào bồn lưu trữ dữ liệu cơ bản. Nếu quá trình này muốn được thực hiện không đồng bộ, nó có thể trả về hứa hẹn cho biết thành công hoặc thất bại.
  • write(chunk, controller): Phương thức này sẽ được gọi khi một đoạn dữ liệu mới (được chỉ định trong chunk) đã sẵn sàng được ghi vào bồn lưu trữ dữ liệu cơ bản. Có thể trả lại lời hứa cho báo hiệu sự thành công hay thất bại của thao tác ghi. Phương thức này sẽ chỉ được gọi sau phương thức trước đó đã ghi thành công và không bao giờ được ghi sau khi luồng bị đóng hoặc bị huỷ.
  • close(controller): Phương thức này sẽ được gọi nếu ứng dụng báo hiệu rằng ứng dụng đã ghi xong đoạn dữ liệu vào luồng. Nội dung phải làm bất cứ điều gì cần thiết để hoàn tất quá trình ghi vào bồn lưu trữ dữ liệu cơ bản và giải phóng quyền truy cập vào đó. Nếu quá trình này không đồng bộ, nó có thể trả về một giá trị hứa hẹn sẽ cho thấy thành công hay thất bại. Phương thức này sẽ chỉ được gọi sau khi tất cả các lượt ghi vào hàng đợi đã thành công.
  • abort(reason): Phương thức này sẽ được gọi nếu ứng dụng báo hiệu muốn đóng đột ngột luồng và đặt luồng đó ở trạng thái lỗi. Công cụ này có thể dọn dẹp mọi tài nguyên bị giữ lại, giống như close(), nhưng abort() sẽ được gọi ngay cả khi các lượt ghi đã được đưa vào hàng đợi. Những phần đó sẽ bị ném ngay lập tức. Nếu quá trình này không đồng bộ, nó có thể trả về một thông báo hứa hẹn thành công hoặc không thành công. Chiến lược phát hành đĩa đơn Tham số reason chứa DOMString mô tả lý do luồng bị huỷ.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Chiến lược phát hành đĩa đơn WritableStreamDefaultController giao diện của API Luồng thể hiện một đơn vị điều khiển cho phép kiểm soát trạng thái của WritableStream trong quá trình thiết lập, vì có nhiều phần được gửi để viết hoặc ở cuối quá trình viết. Khi xây dựng một WritableStream, bồn lưu trữ dữ liệu cơ bản sẽ được cấp một WritableStreamDefaultController tương ứng thực thể để thao tác. WritableStreamDefaultController chỉ có một phương thức: WritableStreamDefaultController.error(), khiến mọi tương tác trong tương lai với luồng được liên kết gặp lỗi. WritableStreamDefaultController cũng hỗ trợ thuộc tính signal trả về một phiên bản của AbortSignal! cho phép dừng thao tác WritableStream nếu cần.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Đối số thứ hai, tương tự như đối số không bắt buộc, của hàm khởi tạo WritableStream()queuingStrategy. Đây là một đối tượng tuỳ ý xác định chiến lược xếp hàng cho luồng, trong đó mất hai thông số:

  • highWaterMark: Một số không âm cho biết vạch nước cao của dòng suối bằng chiến lược xếp hàng này.
  • size(chunk): Một hàm tính toán và trả về kích thước hữu hạn không âm của giá trị đoạn đã cho. Kết quả này được dùng để xác định áp lực ngược, biểu thị thông qua thuộc tính WritableStreamDefaultWriter.desiredSize thích hợp.

Phương thức getWriter()write()

Để ghi vào một luồng có thể ghi, bạn cần một người viết, WritableStreamDefaultWriter. Phương thức getWriter() của giao diện WritableStream trả về một thực thể mới của WritableStreamDefaultWriter và khoá luồng đối với thực thể đó. Trong khi luồng bị khoá, không thể có được tác giả khác cho đến khi luồng hiện tại được phát hành.

write() của WritableStreamDefaultWriter giao diện ghi một đoạn dữ liệu đã truyền vào WritableStream và bồn lưu trữ dữ liệu cơ bản, sau đó trả về lời hứa giải quyết để cho biết thành công hay thất bại của thao tác ghi. Xin lưu ý rằng những gì "thành công" có nghĩa là lên bồn lưu trữ dữ liệu bên dưới; nó có thể cho biết rằng phân đoạn đã được chấp nhận, và không nhất thiết phải lưu an toàn vào đích đến cuối cùng.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Thuộc tính locked

Bạn có thể kiểm tra xem luồng có thể ghi có bị khoá hay không bằng cách truy cập vào WritableStream.locked thuộc tính này.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Mẫu mã luồng có thể ghi

Mã mẫu dưới đây cho thấy tất cả các bước đang hoạt động.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent  = chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Chuyển từ luồng có thể đọc sang luồng có thể ghi

Một luồng đọc được có thể được dẫn tới luồng có thể ghi thông qua Phương thức pipeTo(). ReadableStream.pipeTo() lấy ReadableStream hiện tại đến một WritableStream nhất định và trả về một hứa hẹn sẽ thực hiện khi quy trình định hướng hoàn tất thành công hoặc từ chối nếu có lỗi gặp phải.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent  = chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Tạo luồng biến đổi

Giao diện TransformStream của API Luồng biểu thị một tập hợp dữ liệu có thể biến đổi. Bạn tạo một luồng biến đổi bằng cách gọi hàm khởi tạo TransformStream(). Luồng này sẽ tạo và trả về một đối tượng luồng chuyển đổi từ các trình xử lý đã cho. Hàm khởi tạo TransformStream() chấp nhận là đối số đầu tiên của nó là một đối tượng JavaScript không bắt buộc đại diện cho transformer. Những đối tượng như vậy có thể chứa bất kỳ phương thức nào sau đây:

transformer

  • start(controller): Phương thức này được gọi ngay khi đối tượng được tạo. Giá thông thường thao tác này dùng để đưa các phân đoạn có tiền tố vào hàng đợi bằng cách sử dụng controller.enqueue(). Các phần đó sẽ được đọc từ phía đọc được nhưng không phụ thuộc vào bất kỳ lượt ghi nào vào phía có thể ghi. Nếu đây là tên viết tắt quá trình này không đồng bộ, ví dụ: vì mất chút công sức để thu nạp các đoạn tiền tố, hàm có thể trả về một lời hứa để báo hiệu thành công hay thất bại; lời hứa bị từ chối sẽ gây ra lỗi luồng. Mọi trường hợp ngoại lệ được gửi sẽ được hàm khởi tạo TransformStream() gửi lại.
  • transform(chunk, controller): Phương thức này được gọi khi một đoạn mới ban đầu được ghi vào dữ liệu có thể ghi đã sẵn sàng được chuyển đổi. Việc triển khai luồng đảm bảo rằng chức năng này sẽ chỉ được gọi sau khi các phép biến đổi trước đó thành công và không bao giờ được gọi trước khi start() hoàn tất hoặc sau khi flush() được gọi. Hàm này thực hiện phép biến đổi thực tế công của dòng chuyển đổi. Hàm này có thể thêm các kết quả vào hàng đợi bằng cách sử dụng controller.enqueue(). Chiến dịch này cho phép một đoạn dữ liệu được ghi vào phía có thể ghi để không gây ra hoặc nhiều đoạn trên phía đọc được, tuỳ thuộc vào số lần controller.enqueue() được gọi. Nếu quá trình biến đổi không đồng bộ, hàm này có thể trả về hứa hẹn báo hiệu thành công hay thất bại phép biến đổi. Lời hứa bị từ chối sẽ gây ra lỗi ở cả phần có thể đọc và có thể ghi của biến đổi luồng. Nếu không có phương thức transform() nào được cung cấp, thì biến đổi nhận dạng sẽ được sử dụng. Điều này xếp hàng các đoạn không thay đổi từ phía có thể ghi sang phía có thể đọc.
  • flush(controller): Phương thức này được gọi sau khi tất cả các đoạn được ghi vào phía có thể ghi đã được chuyển đổi bằng cách truyền thành công qua transform(), và phía có thể ghi sắp được đã đóng. Thông thường, thuộc tính này dùng để xếp các đoạn hậu tố vào hàng đợi phía có thể đọc, trước đó sẽ bị đóng. Nếu quá trình xả dữ liệu không đồng bộ, hàm này có thể trả về một lời hứa (promise) đối với báo hiệu thành công hay thất bại; kết quả sẽ được thông báo đến phương thức gọi của stream.writable.write(). Ngoài ra, lời hứa bị từ chối sẽ gây ra lỗi cả chế độ có thể đọc và các khía cạnh có thể ghi trong luồng. Việc gửi một trường hợp ngoại lệ được xử lý giống như việc trả về một trường hợp bị từ chối hứa hẹn.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Chiến lược xếp hàng writableStrategyreadableStrategy

Tham số không bắt buộc thứ hai và thứ ba của hàm khởi tạo TransformStream() là không bắt buộc Chiến lược xếp hàng writableStrategyreadableStrategy. Các điều khoản này được định nghĩa như được nêu trong có thể đọc và luồng có thể ghi .

Biến đổi mã mẫu trong luồng

Mã mẫu sau đây cho thấy một luồng biến đổi đơn giản đang hoạt động.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Đường ống có thể đọc được qua dòng biến đổi

pipeThrough() phương thức của giao diện ReadableStream cung cấp một cách có thể tạo chuỗi để dẫn luồng hiện tại thông qua luồng biến đổi hoặc bất kỳ cặp có thể ghi/đọc nào khác. Việc tạo đường dẫn cho luồng thường sẽ khoá nó trong suốt thời gian ống dẫn, giúp các độc giả khác không khoá được.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Mã mẫu tiếp theo (một chút khó hiểu) cho thấy cách bạn có thể triển khai một "lời cảm ơn" phiên bản fetch() viết hoa tất cả văn bản bằng cách sử dụng hứa hẹn phản hồi được trả về dưới dạng luồng và viết hoa từng đoạn. Ưu điểm của phương pháp này là bạn không cần phải đợi toàn bộ tài liệu cần tải xuống. Điều này có thể tạo ra sự khác biệt lớn khi xử lý các tệp có kích thước lớn.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Bản minh hoạ

Bản minh hoạ dưới đây cho thấy các luồng có thể đọc, có thể ghi và biến đổi trong thực tế. Bài viết cũng bao gồm các ví dụ của chuỗi ống pipeThrough()pipeTo(), đồng thời minh hoạ tee(). Bạn có thể tuỳ ý chạy bản minh hoạ trong cửa sổ riêng hoặc xem mã nguồn.

Các luồng hữu ích có trong trình duyệt

Có một số luồng hữu ích được tích hợp ngay trong trình duyệt. Bạn có thể dễ dàng tạo ReadableStream khỏi một blob. Blob phương thức stream() của giao diện trả về ReadableStream khi đọc sẽ trả về dữ liệu có trong blob. Ngoài ra, hãy nhớ rằng một Đối tượng File là một loại cụ thể của Blob và có thể dùng trong mọi ngữ cảnh mà blob có thể sử dụng.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Các biến thể truyền trực tuyến của TextDecoder.decode()TextEncoder.encode() được gọi TextDecoderStreamTextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Việc nén hoặc giải nén tệp thật dễ dàng bằng CompressionStreamDecompressionStream biến đổi sự kiện phát trực tiếp . Mã mẫu dưới đây cho bạn biết cách tải xuống thông số Luồng, nén (gzip) ngay trong trình duyệt và ghi tệp nén trực tiếp vào ổ đĩa.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

API File System Access FileSystemWritableFileStreamfetch() luồng yêu cầu thử nghiệm đang ví dụ về dòng dữ liệu có thể ghi trong tự nhiên.

API nối tiếp sử dụng nhiều luồng có thể đọc và có thể ghi.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Cuối cùng, API WebSocketStream tích hợp luồng với API WebSocket.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Tài nguyên hữu ích

Xác nhận

Bài viết này được đánh giá bởi Jake Archibald, François Beaufort, Sam Dutton thân mến! Mattias Buelens, Surma thân mến! Joe MedleyCơm Adam. Các bài đăng trên blog của Jake Archibald đã giúp tôi rất nhiều trong việc hiểu phát trực tuyến. Một số mã mẫu lấy cảm hứng từ người dùng GitHub Khám phá của @bellbind và các phần của văn bản được xây dựng chủ yếu dựa trên Tài liệu web MDN trên các luồng. Chiến lược phát hành đĩa đơn Chuẩn của luồng các tác giả đã đóng góp rất nhiều vào việc viết thông số này. Hình ảnh chính của Ryan Lara đang bật Không giật gân.