分离物

Last updated: ... / Reads: 32 Edit

本页讨论一些使用 Isolate API 实现隔离的示例。

每当您的应用程序处理的计算量大到足以暂时阻止其他计算时,您就应该使用隔离。最常见的示例是在 Flutter 应用程序中,当您需要执行大型计算时,否则可能会导致 UI 无响应。

没有任何规则规定何时必须使用分离物,但在以下一些情况下它们可能会很有用:

  • 解析和解码特别大的 JSON blob。
  • 处理和压缩照片、音频和视频。
  • 转换音频和视频文件。
  • 在大型列表或文件系统内执行复杂的搜索和过滤。
  • 执行 I/O,例如与数据库通信。
  • 处理大量网络请求。

实现一个简单的工人隔离

这些示例实现了一个主隔离,它生成一个简单的工作隔离。 Isolate.run() 简化了设置和管理工作隔离的步骤:

  1. 生成(启动并创建)一个隔离体。
  2. 在生成的隔离上运行函数。
  3. 捕获结果。
  4. 将结果返回到主隔离。
  5. 工作完成后终止隔离。
  6. 检查、捕获异常和错误并将其抛出回主隔离区。

如果您使用 Flutter,则可以使用 Flutter 的 compute 函数而不是 Isolate.run() 。

在新隔离中运行现有方法

  1. 调用 run() 直接在主隔离中生成一个新的隔离(后台工作人员),同时 main() 等待结果:
const String filename = 'with_keys.json';

void main() async {
  // Read some data.
  final jsonData = await Isolate.run(_readAndParseJson);

  // Use that data.
  print('Number of JSON keys: ${jsonData.length}');
}

将您希望其执行的函数作为其第一个参数传递给工作程序隔离。在此示例中,它是现有函数 _readAndParseJson() :

Future<Map<String, dynamic>> _readAndParseJson() async {
  final fileData = await File(filename).readAsString();
  final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
  return jsonData;
}
  1. Isolate.run() 获取 _readAndParseJson() 返回的结果并将该值发送回主隔离,从而关闭工作隔离。

  2. 工作隔离将保存结果的内存传输到主隔离。它不复制数据。工作隔离执行验证过程以确保允许传输对象。

_readAndParseJson() 是一个现有的异步函数,可以轻松地直接在主隔离中运行。使用 Isolate.run() 来运行它可以启用并发性。工作隔离完全抽象了 _readAndParseJson() 的计算。它可以在不阻塞主要隔离物的情况下完成。

Isolate.run() 的结果始终是 Future,因为主隔离中的代码继续运行。无论工作隔离执行的计算是同步还是异步,都不会影响主隔离,因为无论哪种方式它都会同时运行。

对于完整的程序,请查看 send_and_receive.dart 示例。

发送带有隔离物的闭包

您还可以直接在主隔离中使用函数文字或闭包使用 run() 创建一个简单的工作隔离。

const String filename = 'with_keys.json';

void main() async {
  // Read some data.
  final jsonData = await Isolate.run(() async {
    final fileData = await File(filename).readAsString();
    final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
    return jsonData;
  });

  // Use that data.
  print('Number of JSON keys: ${jsonData.length}');
}

此示例完成与前一个相同的任务。一个新的隔离产生,计算一些东西,然后发回结果。

然而,现在隔离发送了一个关闭。闭包比典型的命名函数受到的限制更少,无论是在功能上还是在代码中的编写方式上。在此示例中, Isolate.run() 同时执行看似本地代码的内容。从这个意义上说,您可以想象 run() 像“并行运行”的控制流运算符一样工作。

使用端口在隔离之间发送多条消息

短暂的隔离使用起来很方便,但需要性能开销来生成新的隔离并将对象从一个隔离复制到另一个隔离。如果您的代码依赖于使用 Isolate.run 重复运行相同的计算,您可以通过创建不会立即退出的长期隔离来提高性能。

为此,您可以使用 Isolate.run 抽象的一些低级隔离 API:

  • Isolate.spawn() 和 Isolate.exit()
  • ReceivePort 和 SendPort
  • SendPort.send() 方法

本节介绍在新生成的隔离和主隔离之间建立双向通信所需的步骤。第一个示例“基本端口”概括介绍了该过程。第二个示例是“鲁棒端口”,它逐渐在第一个示例的基础上添加了更实用、更真实的功能。

ReceivePort 和 SendPort

在隔离之间建立长期通信需要两个类(除了 Isolate 之外): ReceivePort 和 SendPort 。这些端口是隔离体之间相互通信的唯一方式。

ReceivePort 是一个处理从其他隔离区发送的消息的对象。这些消息通过 SendPort 发送。

一个 SendPort 对象恰好与一个 ReceivePort 关联,但单个 ReceivePort 可以有多个 SendPorts 。当您创建 ReceivePort 时,它会为自己创建一个 SendPort 。您可以创建额外的 SendPorts 来将消息发送到现有的 ReceivePort 。

端口的行为类似于 Stream 对象(事实上,接收端口实现 Stream !)您可以将 SendPort 和 ReceivePort 想象成 Stream 的分别是 StreamController 和侦听器。 SendPort 类似于 StreamController ,因为您使用 SendPort.send() 方法向它们“添加”消息,并且这些消息由侦听器处理,在本例中为 ReceivePort 。然后, ReceivePort 通过将消息作为参数传递给您提供的回调来处理它收到的消息。

设置端口

新生成的隔离仅具有通过 Isolate.spawn 调用接收的信息。如果您需要主隔离在初始创建后继续与生成的隔离进行通信,则必须设置一个通信通道,以便生成的隔离可以向主隔离发送消息。隔离只能通过消息传递进行通信。他们无法“看到”彼此的记忆,这就是“隔离”这个名字的由来。

要设置这种双向通信,首先在主隔离中创建一个 ReceivePort ,然后在使用 Isolate.spawn ,并将其 SendPort 发送回主隔离传递的 SendPort 上。主隔离收到这个 SendPort ,现在双方都有一个开放的通道来发送和接收消息。

本节中的图表是高级图表,旨在传达使用端口进行隔离的概念。实际实现需要更多代码,您将在本页后面找到这些代码。

ports-setup

  1. 在主隔离中创建一个 ReceivePort 。 SendPort 作为 ReceivePort 的属性自动创建。
  2. 使用 Isolate.spawn() 生成工人隔离
  3. 将对 ReceivePort.sendPort 的引用作为第一条消息传递给工作隔离。
  4. 在工作隔离中创建另一个新的 ReceivePort 。
  5. 将对工作隔离的 ReceivePort.sendPort 的引用作为第一条消息传递回主隔离。

除了创建端口和设置通信之外,您还需要告诉端口在收到消息时要做什么。这是通过在每个相应的 ReceivePort 上使用 listen 方法来完成的。

ports-passing-messages

  1. 通过主隔离区的引用发送消息到工作隔离区的 SendPort 。
  2. 通过工作隔离的 ReceivePort 上的侦听器接收并处理消息。这是执行您想要从主隔离中移出的计算的地方。
  3. 通过工作隔离对主隔离的 SendPort 的引用发送返回消息。
  4. 通过主隔离区 ReceivePort 上的侦听器接收消息。

基本端口示例

此示例演示了如何设置长期工作隔离区,并在其与主隔离区之间进行双向通信。该代码使用将 JSON 文本发送到新隔离的示例,其中 JSON 将被解析和解码,然后再发送回主隔离。

此示例旨在教授生成一个可以随时间发送和接收多条消息的新隔离所需的最低限度。 它不涵盖生产软件中预期的重要功能,例如错误处理、关闭端口和消息排序。 下一节中的稳健端口示例将介绍此功能,并讨论如果没有此功能可能会出现的一些问题。

步骤1:定义工人阶级 首先,为您的后台工作者隔离创建一个类。此类包含您需要的所有功能:

  • 产生一个隔离体。
  • 向该隔离区发送消息。
  • 让隔离解码一些 JSON。
  • 将解码后的 JSON 发送回主隔离。

该类公开了两种公共方法:一种用于生成工作隔离,另一种用于处理向该工作隔离发送消息。

本示例的其余部分将向您展示如何一一填写类方法。

class Worker {
  Future<void> spawn() async {
    // TODO: Add functionality to spawn a worker isolate.
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: Define code that should be executed on the worker isolate.
  }

  static void _startRemoteIsolate(SendPort port) {
    // TODO: Handle messages sent back from the worker isolate.
  }

  Future<void> parseJson(String message) async {
    // TODO: Define a public method that can
    // be used to send messages to the worker isolate.
  }
}

第 2 步:产生工人隔离体 Worker.spawn 方法是您将用于创建工作隔离并确保它可以接收和发送消息的代码分组的地方。

首先,创建一个 ReceivePort 。这允许主隔离区接收从新生成的工作隔离区发送的消息。

接下来,向接收端口添加一个侦听器以处理工作隔离将发回的消息。传递给侦听器的回调 _handleResponsesFromIsolate 将在步骤 4 中介绍。

最后,使用 Isolate.spawn 生成工作隔离。它需要两个参数:要在工作隔离上执行的函数(在步骤 3 中介绍)和接收端口的 sendPort 属性。

Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

当在工作隔离上调用时, receivePort.sendPort 参数将作为参数传递给回调( _startRemoteIsolate )。这是确保工作隔离可以将消息发送回主隔离的第一步。

第 3 步:在工作隔离上执行代码 在此步骤中,您定义发送到工作隔离以在其生成时执行的方法 _startRemoteIsolate 。此方法类似于工作隔离的“主要”方法。

首先,创建另一个新的 ReceivePort 。该端口接收来自主隔离的未来消息。

接下来,将该端口的 SendPort 发送回主隔离。

最后,为新的 ReceivePort 添加一个监听器。该侦听器处理主隔离发送到工作隔离的消息。

static void _startRemoteIsolate(SendPort port) {
  final receivePort = ReceivePort();
  port.send(receivePort.sendPort);

  receivePort.listen((dynamic message) async {
    if (message is String) {
      final transformed = jsonDecode(message);
      port.send(transformed);
    }
  });
}

Worker ReceivePort 上的侦听器对从主隔离传递的 JSON 进行解码,然后将解码后的 JSON 发送回主隔离。

此侦听器是从主隔离发送到工作隔离的消息的入口点。这是您必须告诉工作人员隔离将来要执行哪些代码的唯一机会。

步骤 4:处理主隔离上的消息 最后,您需要告诉主隔离如何处理从工作隔离发送回主隔离的消息。为此,您需要填写 _handleResponsesFromIsolate 方法。回想一下,此方法被传递给 receivePort.listen 方法,如步骤 2 中所述:

Future<void> spawn() async {
  final receivePort = ReceivePort();
  receivePort.listen(_handleResponsesFromIsolate);
  await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
}

另请记住,您在步骤 3 中将 SendPort 发送回主隔离。此方法处理 SendPort 的接收,以及处理未来的消息(将被解码为 JSON) 。

首先,检查消息是否为 SendPort 。如果是这样,请将该端口分配给类的 _sendPort 属性,以便稍后可以使用它来发送消息。 接下来,检查消息的类型是否为 Map<String, dynamic> ,即解码后的 JSON 的预期类型。如果是这样,请使用特定于应用程序的逻辑处理该消息。在此示例中,将打印该消息。

void _handleResponsesFromIsolate(dynamic message) {
  if (message is SendPort) {
    _sendPort = message;
    _isolateReady.complete();
  } else if (message is Map<String, dynamic>) {
    print(message);
  }
}

第 5 步:添加完成程序以确保您的隔离已设置 要完成该类,请定义一个名为 parseJson 的公共方法,该方法负责向工作隔离隔离发送消息。它还需要确保可以在隔离完全设置之前发送消息。要处理此问题,请使用 Completer 。

首先,添加一个名为 Completer 的类级属性,并将其命名为 _isolateReady 。 接下来,如果消息是 SendPort ,则在 _handleResponsesFromIsolate 方法(在步骤 4 中创建)的完成器上添加对 complete() 的调用。 最后,在 parseJson 方法中,在添加 _sendPort.send 之前添加 await _isolateReady.future 。这确保了在工作隔离区产生并将其 SendPort 发送回主隔离区之前,不会向其发送任何消息。

Future<void> parseJson(String message) async {
  await _isolateReady.future;
  _sendPort.send(message);
}

完整示例

import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = Worker();
  await worker.spawn();
  await worker.parseJson('{"key":"value"}');
}

class Worker {
  late SendPort _sendPort;
  final Completer<void> _isolateReady = Completer.sync();

  Future<void> spawn() async {
    final receivePort = ReceivePort();
    receivePort.listen(_handleResponsesFromIsolate);
    await Isolate.spawn(_startRemoteIsolate, receivePort.sendPort);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    if (message is SendPort) {
      _sendPort = message;
      _isolateReady.complete();
    } else if (message is Map<String, dynamic>) {
      print(message);
    }
  }

  static void _startRemoteIsolate(SendPort port) {
    final receivePort = ReceivePort();
    port.send(receivePort.sendPort);

    receivePort.listen((dynamic message) async {
      if (message is String) {
        final transformed = jsonDecode(message);
        port.send(transformed);
      }
    });
  }

  Future<void> parseJson(String message) async {
    await _isolateReady.future;
    _sendPort.send(message);
  }
}

强大的端口示例

前面的示例解释了建立具有双向通信的长期隔离所需的基本构建块。如前所述,该示例缺少一些重要功能,例如错误处理、不再使用端口时关闭端口的能力,以及某些情况下消息排序的不一致。

此示例通过创建具有这些附加功能和更多功能的长期工作隔离区来扩展第一个示例中的信息,并遵循更好的设计模式。尽管此代码与第一个示例有相似之处,但它并不是该示例的扩展。

此示例假设您已经熟悉在带有 Isolate.spawn 的隔离区和端口之间建立通信,这在前面的示例中已介绍过。

步骤1:定义工人阶级 首先,为您的后台工作者隔离创建一个类。此类包含您需要的所有功能:

  • 产生一个隔离体。
  • 向该隔离区发送消息。
  • 让隔离解码一些 JSON。
  • 将解码后的 JSON 发送回主隔离。

该类公开了三种公共方法:一种用于创建工作隔离,一种用于处理向该工作隔离发送消息,一种可以在端口不再使用时将其关闭。

class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  Future<Object?> parseJson(String message) async {
    // TODO: Ensure the port is still open.
    _commands.send(message);
  }

  static Future<Worker> spawn() async {
    // TODO: Add functionality to create a new Worker object with a
    //  connection to a spawned isolate.
    throw UnimplementedError();
  }

  Worker._(this._commands, this._responses) {
    // TODO: Initialize main isolate receive port listener.
  }

  void _handleResponsesFromIsolate(dynamic message) {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _handleCommandsToIsolate(ReceivePort rp, SendPort sp) async {
    // TODO: Handle messages sent back from the worker isolate.
  }

  static void _startRemoteIsolate(SendPort sp) {
    // TODO: Initialize worker isolate's ports.
  }
}

在此示例中, SendPort 和 ReceivePort 实例遵循最佳实践命名约定,其中它们是根据主隔离来命名的。通过 SendPort 从主隔离发送到工作隔离的消息称为命令,发送回主隔离的消息称为响应。

步骤2:在 Worker.spawn 方法中创建 RawReceivePort 在生成隔离之前,您需要创建一个 RawReceivePort ,它是一个较低级别的 ReceivePort 。使用 RawReceivePort 是首选模式,因为它允许您将隔离启动逻辑与处理隔离上传递的消息的逻辑分开。

在 Worker.spawn 方法中: 首先,创建 RawReceivePort 。这个 ReceivePort 仅负责接收来自工作隔离的初始消息,该消息将是 SendPort 。 接下来,创建一个 Completer 来指示隔离何时准备好接收消息。完成后,它将返回一条带有 ReceivePort 和 SendPort 的记录。 接下来,定义 RawReceivePort.handler 属性。此属性是一个 Function? ,其行为类似于 ReceivePort.listener 。当该端口收到消息时调用该函数。 在处理函数中,调用 connection.complete() 。此方法需要一个带有 ReceivePort 和 SendPort 作为参数的记录。 SendPort 是从工作进程isolate发送的初始消息,它将在下一步中分配给名为 _commands 的类级别 SendPort 。 然后,使用 ReceivePort.fromRawReceivePort 构造函数创建一个新的 ReceivePort ,并传入 initPort 。

class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler.
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
// ···
  }

通过首先创建 RawReceivePort ,然后创建 ReceivePort ,您稍后将能够向 ReceivePort.listen 添加新的回调。相反,如果您要立即创建一个 ReceivePort ,则只能添加一个 listener ,因为 ReceivePort 实现了 Stream ,而不是 BroadcastStream 。

实际上,这允许您将隔离启动逻辑与在设置通信完成后处理接收消息的逻辑分开。随着其他方法中逻辑的增长,这种好处将变得更加明显。

第 3 步:用 Isolate.spawn 生成一个工人隔离体 这一步继续填写 Worker.spawn 方法。您将添加生成隔离所需的代码,并从此类返回 Worker 的实例。在此示例中,对 Isolate.spawn 的调用包装在 try / catch 块中,这确保了如果隔离启动失败, initPort 将被关闭,并且 Worker 对象将不会被创建。

首先,尝试在 try / catch 块中生成工作隔离。如果生成工作隔离失败,请关闭在上一步中创建的接收端口。传递给 Isolate.spawn 的方法将在后面的步骤中介绍。 接下来,等待 connection.future ,并从它返回的记录中解构发送端口和接收端口。 最后,通过调用其私有构造函数并传入来自该完成器的端口来返回 Worker 的实例。

class Worker {
  final SendPort _commands;
  final ReceivePort _responses;

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };
    // Spawn the isolate.
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(sendPort, receivePort);
  }

请注意,在此示例中(与前面的示例相比), Worker.spawn 充当此类的异步静态构造函数,并且是创建 Worker 实例的唯一方法。这简化了 API,使创建 Worker 实例的代码更加清晰。

第 4 步:完成隔离设置过程 在此步骤中,您将完成基本的隔离设置过程。这几乎与前面的例子完全相关,并且没有新的概念。有一个细微的变化,代码被分解为更多的方法,这是一种设计实践,可以帮助您通过本示例的其余部分添加更多功能。有关设置隔离的基本过程的深入演练,请参阅基本端口示例。

首先,创建从 Worker.spawn 方法返回的私有构造函数。在构造函数主体中,将侦听器添加到主隔离使用的接收端口,并将一个尚未定义的方法传递给该侦听器,称为 _handleResponsesFromIsolate 。

class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
// ···
  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

接下来,将代码添加到 _startRemoteIsolate 中,负责初始化工作隔离上的端口。回想一下,该方法在 Worker.spawn 方法中被传递给 Isolate.spawn ,并且它将作为参数传递给主隔离的 SendPort 。

创建一个新的 ReceivePort 。 将该端口的 SendPort 发送回主隔离。 调用一个名为 _handleCommandsToIsolate 的新方法,并将主隔离中的新 ReceivePort 和 SendPort 作为参数传递。

static void _startRemoteIsolate(SendPort sendPort) {
  final receivePort = ReceivePort();
  sendPort.send(receivePort.sendPort);
  _handleCommandsToIsolate(receivePort, sendPort);
}

接下来,添加 _handleCommandsToIsolate 方法,该方法负责从主isolate接收消息,在workerisolate上解码json,并将解码后的json作为响应发送回来。

首先,在工作隔离的 ReceivePort 上声明一个侦听器。 在添加到侦听器的回调中,尝试解码从 try / catch 块内的主隔离传递的 JSON。如果解码成功,则将解码后的 JSON 发送回主隔离。 如果出现错误,则发回 RemoteError 。

static void _handleCommandsToIsolate(
    ReceivePort receivePort, SendPort sendPort) {
  receivePort.listen((message) {
    try {
      final jsonData = jsonDecode(message as String);
      sendPort.send(jsonData);
    } catch (e) {
      sendPort.send(RemoteError(e.toString(), ''));
    }
  });
}

接下来,添加 _handleResponsesFromIsolate 方法的代码。

首先,检查消息是否为 RemoteError ,在这种情况下,您应该 throw 该错误。 否则,打印该消息。在后续步骤中,您将更新此代码以返回消息而不是打印消息。

void _handleResponsesFromIsolate(dynamic message) {
  if (message is RemoteError) {
    throw message;
  } else {
    print(message);
  }
}

最后,添加 parseJson 方法,这是一个公共方法,允许外部代码将 JSON 发送到工作隔离进行解码。

Future<Object?> parseJson(String message) async {
  _commands.send(message);
}

您将在下一步中更新此方法。

步骤5:同时处理多条消息 目前,如果您快速向工作隔离隔离发送消息,隔离隔离将按照消息完成的顺序(而不是发送的顺序)发送解码后的 json 响应。您无法确定哪个响应对应于哪个消息。

在此步骤中,您将通过为每条消息提供一个 ID 来解决此问题,并使用 Completer 对象来确保当外部代码调用 parseJson 时,返回给该调用者的响应是正确的反应。

首先,向 Worker 添加两个类级属性:

  • Map<int, Completer<Object?>> _activeRequests
  • int _idCounter
lass Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;

_activeRequests 映射将发送到工作隔离的消息与 Completer 相关联。 _activeRequests 中使用的密钥取自 _idCounter ,随着发送的消息数量的增加,密钥也会增加。

接下来,更新 parseJson 方法以在将消息发送到工作隔离之前创建完成程序。

首先创建一个 Completer 。 接下来,递增 _idCounter ,以便每个 Completer 与唯一的数字关联。 向 _activeRequests 映射添加一个条目,其中键是 _idCounter 的当前编号,完成符是值。 将消息连同 ID 一起发送到工作隔离区。由于您只能通过 SendPort 发送一个值,因此请将 id 和消息包装在一条记录中。 最后,返回完成者的未来,其中最终将包含来自工作隔离的响应。

Future<Object?> parseJson(String message) async {
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

您还需要更新 _handleResponsesFromIsolate 和 _handleCommandsToIsolate 来处理此系统。

在 _handleCommandsToIsolate 中,您需要考虑 message 是具有两个值的记录,而不仅仅是 json 文本。通过解构 message 中的值来实现此目的。

然后,在解码 json 后,更新对 sendPort.send 的调用,以将 id 和解码后的 json 传递回主隔离,再次使用记录。

static void _handleCommandsToIsolate(
    ReceivePort receivePort, SendPort sendPort) {
  receivePort.listen((message) {
    final (int id, String jsonText) = message as (int, String); // New
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData)); // Updated
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}

最后,更新 _handleResponsesFromIsolate 。

首先,再次解构 id 和消息参数的响应。 然后,从 _activeRequests 映射中删除与此请求对应的完成符。 最后,不要抛出错误或打印解码后的 json,而是完成完成器,传入响应。完成后,响应将返回到在主隔离上调用 parseJson 的代码。

void _handleResponsesFromIsolate(dynamic message) {
  final (int id, Object? response) = message as (int, Object?); // New
  final completer = _activeRequests.remove(id)!; // New

  if (response is RemoteError) {
    completer.completeError(response); // Updated
  } else {
    completer.complete(response); // Updated
  }
}

第 6 步:添加关闭端口的功能 当代码不再使用隔离时,您应该关闭主隔离和工作隔离上的端口。

首先,添加一个类级布尔值来跟踪端口是否关闭。 然后,添加 Worker.close 方法。在此方法中: 将 _closed 更新为 true。 向工作隔离发送最终消息。此消息是一个 String ,内容为“shutdown”,但它可以是您想要的任何对象。您将在下一个代码片段中使用它。 最后,检查 _activeRequests 是否为空。如果是,关闭主隔离区的 ReceivePort 名为 _responses 。

class Worker {
  bool _closed = false;
// ···
  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }

接下来,您需要处理工作隔离中的“关闭”消息。将以下代码添加到 _handleCommandsToIsolate 方法中。此代码将检查消息是否为显示“shutdown”的 String 。如果是,它将关闭工作隔离的 ReceivePort ,然后返回。

static void _handleCommandsToIsolate(
  ReceivePort receivePort,
  SendPort sendPort,
) {
  receivePort.listen((message) {
    // New if-block.
    if (message == 'shutdown') {
      receivePort.close();
      return;
    }
    final (int id, String jsonText) = message as (int, String);
    try {
      final jsonData = jsonDecode(jsonText);
      sendPort.send((id, jsonData));
    } catch (e) {
      sendPort.send((id, RemoteError(e.toString(), '')));
    }
  });
}

最后,您应该添加代码以在尝试发送消息之前检查端口是否已关闭。在 Worker.parseJson 方法中添加一行。

Future<Object?> parseJson(String message) async {
  if (_closed) throw StateError('Closed'); // New
  final completer = Completer<Object?>.sync();
  final id = _idCounter++;
  _activeRequests[id] = completer;
  _commands.send((id, message));
  return await completer.future;
}

完整示例 在此处展开​​以查看完整示例

import 'dart:async';
import 'dart:convert';
import 'dart:isolate';

void main() async {
  final worker = await Worker.spawn();
  print(await worker.parseJson('{"key":"value"}'));
  print(await worker.parseJson('"banana"'));
  print(await worker.parseJson('[true, false, null, 1, "string"]'));
  print(
      await Future.wait([worker.parseJson('"yes"'), worker.parseJson('"no"')]));
  worker.close();
}

class Worker {
  final SendPort _commands;
  final ReceivePort _responses;
  final Map<int, Completer<Object?>> _activeRequests = {};
  int _idCounter = 0;
  bool _closed = false;

  Future<Object?> parseJson(String message) async {
    if (_closed) throw StateError('Closed');
    final completer = Completer<Object?>.sync();
    final id = _idCounter++;
    _activeRequests[id] = completer;
    _commands.send((id, message));
    return await completer.future;
  }

  static Future<Worker> spawn() async {
    // Create a receive port and add its initial message handler
    final initPort = RawReceivePort();
    final connection = Completer<(ReceivePort, SendPort)>.sync();
    initPort.handler = (initialMessage) {
      final commandPort = initialMessage as SendPort;
      connection.complete((
        ReceivePort.fromRawReceivePort(initPort),
        commandPort,
      ));
    };

    // Spawn the isolate.
    try {
      await Isolate.spawn(_startRemoteIsolate, (initPort.sendPort));
    } on Object {
      initPort.close();
      rethrow;
    }

    final (ReceivePort receivePort, SendPort sendPort) =
        await connection.future;

    return Worker._(receivePort, sendPort);
  }

  Worker._(this._responses, this._commands) {
    _responses.listen(_handleResponsesFromIsolate);
  }

  void _handleResponsesFromIsolate(dynamic message) {
    final (int id, Object? response) = message as (int, Object?);
    final completer = _activeRequests.remove(id)!;

    if (response is RemoteError) {
      completer.completeError(response);
    } else {
      completer.complete(response);
    }

    if (_closed && _activeRequests.isEmpty) _responses.close();
  }

  static void _handleCommandsToIsolate(
    ReceivePort receivePort,
    SendPort sendPort,
  ) {
    receivePort.listen((message) {
      if (message == 'shutdown') {
        receivePort.close();
        return;
      }
      final (int id, String jsonText) = message as (int, String);
      try {
        final jsonData = jsonDecode(jsonText);
        sendPort.send((id, jsonData));
      } catch (e) {
        sendPort.send((id, RemoteError(e.toString(), '')));
      }
    });
  }

  static void _startRemoteIsolate(SendPort sendPort) {
    final receivePort = ReceivePort();
    sendPort.send(receivePort.sendPort);
    _handleCommandsToIsolate(receivePort, sendPort);
  }

  void close() {
    if (!_closed) {
      _closed = true;
      _commands.send('shutdown');
      if (_activeRequests.isEmpty) _responses.close();
      print('--- port closed --- ');
    }
  }
}

Comments

Make a comment