5.1 异步编程进阶:Event Loop, Microtasks, and Isolates

基础知识

在 Dart 中,异步编程是处理耗时操作(如网络请求、文件读写、复杂的计算)的关键,以确保应用程序的响应性和流畅性。Dart 是单线程的,但它通过事件循环(Event Loop)、微任务队列(Microtask Queue)和隔离区(Isolates)来实现并发。

1. 事件循环 (Event Loop)

Dart 的事件循环是其异步模型的核心。它不断地从两个队列中取出事件并执行:

  • 事件队列 (Event Queue):包含外部事件,如 I/O 操作(文件读写、网络请求)、定时器事件、用户交互事件(点击、滑动)等。当这些外部事件完成时,它们的回调函数会被放入事件队列。
  • 微任务队列 (Microtask Queue):包含内部的、优先级更高的短任务。例如,Future.then() 的回调、scheduleMicrotask() 调用的函数等。微任务队列会在事件队列中的下一个事件被处理之前清空。

事件循环的执行顺序

  1. 执行完当前同步代码。
  2. 清空微任务队列中的所有微任务。
  3. 从事件队列中取出一个事件并执行其回调。
  4. 重复步骤 2 和 3,直到两个队列都为空。

示例:事件循环的执行顺序

dart
复制代码
import 'dart:async';

void main() {
  print('1. main 函数开始');

  Future(() => print('4. Event Queue: Future 1')).then((_) {
    print('5. Microtask Queue: Future 1 的 then');
    scheduleMicrotask(() => print('6. Microtask Queue: scheduleMicrotask in Future 1'));
  });

  Future.sync(() => print('2. Microtask Queue: Future.sync')).then((_) {
    print('3. Microtask Queue: Future.sync 的 then');
  });

  scheduleMicrotask(() => print('7. Microtask Queue: scheduleMicrotask 1'));

  Future(() => print('8. Event Queue: Future 2'));

  print('9. main 函数结束');
}

// 预期输出:
// 1. main 函数开始
// 2. Microtask Queue: Future.sync
// 3. Microtask Queue: Future.sync 的 then
// 9. main 函数结束
// 7. Microtask Queue: scheduleMicrotask 1
// 4. Event Queue: Future 1
// 5. Microtask Queue: Future 1 的 then
// 6. Microtask Queue: scheduleMicrotask in Future 1
// 8. Event Queue: Future 2

分析

  • main 函数中的同步代码首先执行 (19)。
  • Future.sync 会立即将其回调放入微任务队列,所以它会在同步代码之后、其他 Future 之前执行 (2, 3)。
  • scheduleMicrotask 也会将回调放入微任务队列,因此它会在 Future 之前执行 (7)。
  • Future 的回调会被放入事件队列,按照它们被创建的顺序执行 (4, 8)。
  • Future.then() 的回调会作为微任务执行 (5)。
  • Future.then() 中再次调用的 scheduleMicrotask 也会被放入微任务队列,并在当前事件循环迭代中执行 (6)。

2. Future (未来)

Future 代表一个异步操作的最终结果或错误。当一个 Future 完成时,它会携带一个值(成功)或一个错误(失败)。

  • 创建 Future

    • Future.value(value):创建一个立即完成并带有指定值的 Future
    • Future.error(error):创建一个立即完成并带有指定错误的 Future
    • Future.delayed(Duration, [computation]):创建一个在指定延迟后完成的 Future
    • async 函数:返回 Future 的函数。
  • 处理 Future

    • then():注册一个回调函数,当 Future 成功完成时执行。
    • catchError():注册一个回调函数,当 Future 失败时执行。
    • whenComplete():注册一个回调函数,无论 Future 成功或失败都会执行。
    • awaitasync:更简洁地处理 Future 的语法糖。

示例:Future 的链式调用和错误处理

dart
复制代码
void main() async {
  print('开始执行异步操作');

  // 链式调用
  Future.delayed(const Duration(seconds: 1), () => '数据1')
      .then((data) {
        print('获取到: $data');
        return Future.delayed(const Duration(seconds: 1), () => '数据2');
      })
      .then((data) {
        print('获取到: $data');
        throw Exception('发生错误'); // 抛出异常
      })
      .catchError((error) {
        print('捕获到错误: $error');
      })
      .whenComplete(() {
        print('所有操作完成');
      });

  // 使用 async/await
  try {
    String result1 = await Future.delayed(const Duration(seconds: 1), () => '数据A');
    print('await 获取到: $result1');
    String result2 = await Future.delayed(const Duration(seconds: 1), () => '数据B');
    print('await 获取到: $result2');
    // 模拟一个错误
    // throw Exception('await 发生错误');
  } catch (e) {
    print('await 捕获到错误: $e');
  } finally {
    print('await 所有操作完成');
  }

  print('异步操作已调度');
}

3. Stream (流)

Stream 代表一系列异步事件。与 Future 只产生一个结果不同,Stream 可以产生零个、一个或多个事件。它适用于处理连续的数据流,如用户输入事件、网络数据包、传感器数据等。

  • 创建 Stream

    • Stream.fromIterable():从一个可迭代对象创建 Stream
    • Stream.periodic():创建一个周期性发出事件的 Stream
    • StreamController:用于手动控制 Stream 的事件。
    • async* 函数:返回 Stream 的函数。
  • 处理 Stream

    • listen():订阅 Stream 并注册回调函数来处理事件、错误和完成通知。
    • await for:在 async 函数中,使用 await for 循环来迭代 Stream 的事件。

示例:Stream 的使用

dart
复制代码
import 'dart:async';

void main() async {
  // 使用 Stream.periodic 创建一个每秒发出一个事件的流
  Stream<int> counterStream = Stream.periodic(const Duration(seconds: 1), (count) => count).take(5);

  print('开始监听 Stream (listen)');
  StreamSubscription<int> subscription = counterStream.listen(
    (data) {
      print('listen 接收到数据: $data');
    },
    onError: (error) {
      print('listen 接收到错误: $error');
    },
    onDone: () {
      print('listen Stream 完成');
    },
    cancelOnError: true, // 遇到错误时取消订阅
  );

  // 可以在需要时取消订阅
  // Future.delayed(const Duration(seconds: 3), () => subscription.cancel());

  print('开始监听 Stream (await for)');
  try {
    await for (int data in counterStream) {
      print('await for 接收到数据: $data');
    }
  } catch (e) {
    print('await for 接收到错误: $e');
  }
  print('await for Stream 完成');
}

4. Isolate (隔离区)

如前所述,Isolate 是 Dart 实现并发的机制,它允许在独立的内存空间中运行代码,从而避免了共享内存带来的复杂性(如锁和竞态条件)。Isolate 主要用于处理 CPU 密集型任务,以避免阻塞主 UI 线程。

  • Isolate.spawn():创建新的 Isolate
  • SendPortReceivePort:用于 Isolate 之间的消息传递。
  • compute 函数 (Flutter 特有):简化 Isolate 使用的便捷函数。

官方文档链接

Flutter 开发中的应用案例

异步编程是 Flutter 开发中无处不在的。从网络请求到数据库操作,再到复杂的动画和后台计算,理解并熟练运用 FutureStreamIsolate 是构建高性能、响应式 Flutter 应用的关键。

案例:使用 StreamBuilder 实时显示时间

我们将创建一个简单的应用,使用 StreamStreamBuilder 来实时更新显示当前时间,而不会阻塞 UI。

dart
复制代码
import 'package:flutter/material.dart';
import 'dart:async';

class RealtimeClockScreen extends StatefulWidget {
  const RealtimeClockScreen({super.key});

  @override
  State<RealtimeClockScreen> createState() => _RealtimeClockScreenState();
}

class _RealtimeClockScreenState extends State<RealtimeClockScreen> {
  late Stream<DateTime> _clockStream;

  @override
  void initState() {
    super.initState();
    // 创建一个每秒发出当前时间的 Stream
    _clockStream = Stream.periodic(const Duration(seconds: 1), (_) => DateTime.now());
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: const Text('实时时钟示例'),
      ),
      body: Center(
        // 使用 StreamBuilder 监听 Stream 的变化并更新 UI
        child: StreamBuilder<DateTime>(
          stream: _clockStream,
          builder: (context, snapshot) {
            if (snapshot.connectionState == ConnectionState.waiting) {
              return const CircularProgressIndicator();
            } else if (snapshot.hasError) {
              return Text('错误: ${snapshot.error}');
            } else if (snapshot.hasData) {
              // 格式化时间显示
              final String formattedTime = '${snapshot.data!.hour.toString().padLeft(2, '0')}:' +
                                         '${snapshot.data!.minute.toString().padLeft(2, '0')}:' +
                                         '${snapshot.data!.second.toString().padLeft(2, '0')}';
              return Text(
                formattedTime,
                style: const TextStyle(fontSize: 48, fontWeight: FontWeight.bold),
              );
            } else {
              return const Text('等待时间数据...');
            }
          },
        ),
      ),
    );
  }
}

void main() {
  runApp(const MaterialApp(home: RealtimeClockScreen()));
}

案例分析:

  • Stream.periodic(const Duration(seconds: 1), (_) => DateTime.now()):这是一个非常简洁的 Stream 创建方式。它会每隔一秒发出一个事件,事件的值是当前的 DateTime 对象。take(5) 是可选的,用于限制事件的数量,这里为了演示实时性,我们不限制。
  • StreamBuilder<DateTime>:这是一个专门用于处理 Stream 的 Widget。它接收一个 stream 参数,并提供一个 builder 回调函数。当 stream 发出新事件时,builder 函数会被调用,并传入最新的 snapshot
  • snapshot.connectionStateStreamBuildersnapshot 对象包含了 Stream 的当前状态。ConnectionState.waiting 表示 Stream 正在等待第一个事件,ConnectionState.active 表示 Stream 正在发出事件,ConnectionState.done 表示 Stream 已完成。
  • snapshot.hasData:当 Stream 发出数据时,hasDatatrue,并且可以通过 snapshot.data 获取最新的数据。
  • snapshot.hasError:当 Stream 发生错误时,hasErrortrue,并且可以通过 snapshot.error 获取错误信息。

这个案例清晰地展示了如何使用 StreamStreamBuilder 来构建实时更新的 UI。StreamBuilder 是处理连续数据流的强大工具,在需要实时数据更新的场景(如聊天应用、股票行情、传感器数据)中非常有用。