2.5 Stream与响应式编程

基础知识

在 Dart 中,Stream(流)是处理一系列异步事件的强大机制。它类似于 Future,但 Future 只处理单个异步结果,而 Stream 可以处理零个、一个或多个异步事件序列。Stream 是实现响应式编程(Reactive Programming)的关键,它允许我们以声明式的方式处理随时间变化的事件流。

1. Stream 的概念

Stream 可以看作是数据、错误或完成事件的“管道”。当有新的事件发生时,它会通过这个管道发送出去,而订阅者(Listeners)可以接收并处理这些事件。

  • 单订阅流 (Single-subscription Stream):只能有一个监听器。一旦监听器开始监听,流就会开始发送事件。如果监听器取消订阅,流可能会关闭。这种流适用于一次性事件,例如文件下载。
  • 广播流 (Broadcast Stream):可以有多个监听器。即使没有监听器,流也可以继续发送事件。这种流适用于持续性事件,例如用户点击事件或传感器数据。

2. 创建 Stream

  • Stream.fromIterable():从一个 Iterable 创建流。
    dart
    复制代码
    Stream<int> numbersStream = Stream.fromIterable([1, 2, 3, 4, 5]);
  • Stream.periodic():创建一个周期性发送事件的流。
    dart
    复制代码
    Stream<int> tickStream = Stream.periodic(Duration(seconds: 1), (count) => count);
  • StreamController:最常用的创建流的方式,它允许你手动添加事件、错误和完成信号。
    dart
    复制代码
    import 'dart:async';
    
    final StreamController<String> controller = StreamController<String>();
    
    void addData() {
      controller.sink.add("Hello");
      controller.sink.add("World");
      controller.sink.addError("Something went wrong!");
      controller.sink.close(); // 关闭流
    }
    
    void main() {
      controller.stream.listen(
        (data) => print("Data: $data"),
        onError: (error) => print("Error: $error"),
        onDone: () => print("Stream finished."),
      );
      addData();
    }
    // 预期输出:
    // Data: Hello
    // Data: World
    // Error: Something went wrong!
    // Stream finished.

3. 监听 Stream

使用 listen() 方法来订阅流并处理事件。listen() 方法返回一个 StreamSubscription 对象,可以用来暂停、恢复或取消订阅。

dart
复制代码
import 'dart:async';

void main() {
  Stream<int> numbers = Stream.fromIterable([1, 2, 3, 4, 5]);

  StreamSubscription<int> subscription = numbers.listen(
    (data) {
      print("Received: $data");
      if (data == 3) {
        subscription.pause(); // 暂停流
        print("Stream paused.");
        Future.delayed(Duration(seconds: 2), () {
          subscription.resume(); // 2秒后恢复流
          print("Stream resumed.");
        });
      }
    },
    onError: (error) => print("Error: $error"),
    onDone: () => print("Stream completed."),
    cancelOnError: true, // 发生错误时自动取消订阅
  );

  // 可以在任何时候取消订阅
  // Future.delayed(Duration(seconds: 5), () {
  //   subscription.cancel();
  //   print("Stream cancelled.");
  // });
}

4. async*yield

async*(异步生成器函数)和 yield 关键字提供了一种更简洁的方式来创建 Streamasync* 函数返回一个 Stream,而 yield 关键字用于向流中添加事件。

dart
复制代码
Stream<int> countStream(int max) async* {
  for (int i = 1; i <= max; i++) {
    await Future.delayed(Duration(milliseconds: 500)); // 模拟异步操作
    yield i; // 发送事件
  }
}

void main() {
  countStream(5).listen(
    (data) => print("Count: $data"),
    onDone: () => print("Counting finished."),
  );
}
// 预期输出:
// Count: 1 (0.5秒后)
// Count: 2 (0.5秒后)
// ...
// Counting finished.

5. Stream 转换操作符

Stream 提供了丰富的转换操作符,用于对流中的事件进行过滤、映射、合并等操作,这使得响应式编程变得非常强大。

  • map(): 将流中的每个事件转换为新的事件。
  • where(): 过滤流中的事件,只保留满足条件的事件。
  • take(): 只取流中的前 N 个事件。
  • skip(): 跳过流中的前 N 个事件。
  • distinct(): 过滤掉连续重复的事件。
  • reduce(): 将流中的所有事件聚合成一个单一的值。
  • fold(): 类似于 reduce,但可以提供一个初始值。
dart
复制代码
void main() {
  Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
      .where((number) => number % 2 == 0) // 过滤偶数
      .map((number) => "Even: $number") // 转换为字符串
      .listen(
        (data) => print(data),
        onDone: () => print("Done processing even numbers."),
      );
  // 预期输出:
  // Even: 2
  // Even: 4
  // Even: 6
  // Even: 8
  // Even: 10
  // Done processing even numbers.
}

官方文档链接

Flutter 开发中的应用案例

Stream 在 Flutter 开发中扮演着至关重要的角色,尤其是在处理持续性事件、构建响应式 UI 和实现复杂状态管理时。许多 Flutter 框架内部的机制,如 AnimationControllerFirebase 实时数据库更新、BLoCRxDart 状态管理库,都大量依赖于 Stream

案例:一个实时搜索功能

我们将创建一个简单的实时搜索功能,用户在输入框中输入文本时,应用会根据输入内容实时过滤列表。这个案例将演示如何使用 StreamControllerStream 的转换操作符来实现响应式搜索。

dart
复制代码
import 'package:flutter/material.dart';
import 'dart:async';

class Product {
  final String name;
  final String category;

  Product({required this.name, required this.category});
}

class RealtimeSearchScreen extends StatefulWidget {
  const RealtimeSearchScreen({super.key});

  @override
  State<RealtimeSearchScreen> createState() => _RealtimeSearchScreenState();
}

class _RealtimeSearchScreenState extends State<RealtimeSearchScreen> {
  final List<Product> _allProducts = [
    Product(name: '苹果', category: '水果'),
    Product(name: '香蕉', category: '水果'),
    Product(name: '牛奶', category: '乳制品'),
    Product(name: '面包', category: '烘焙'),
    Product(name: '酸奶', category: '乳制品'),
    Product(name: '蛋糕', category: '烘焙'),
    Product(name: '橙子', category: '水果'),
    Product(name: '奶酪', category: '乳制品'),
  ];

  // StreamController 用于管理搜索文本的流
  final StreamController<String> _searchQueryController = StreamController<String>();
  List<Product> _filteredProducts = [];

  @override
  void initState() {
    super.initState();
    _filteredProducts = _allProducts; // 初始显示所有产品

    // 监听搜索查询流,并进行转换操作
    _searchQueryController.stream
        .debounce(const Duration(milliseconds: 300)) // 防抖:等待用户停止输入一段时间
        .distinct() // 去重:只处理不同的查询
        .listen((query) {
      _filterProducts(query); // 过滤产品
    });
  }

  void _filterProducts(String query) {
    setState(() {
      if (query.isEmpty) {
        _filteredProducts = _allProducts;
      } else {
        _filteredProducts = _allProducts
            .where((product) =>
                product.name.toLowerCase().contains(query.toLowerCase()) ||
                product.category.toLowerCase().contains(query.toLowerCase()))
            .toList();
      }
    });
  }

  @override
  void dispose() {
    _searchQueryController.close(); // 关闭 StreamController
    super.dispose();
  }

  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: const Text('实时搜索'),
      ),
      body: Column(
        children: <Widget>[
          Padding(
            padding: const EdgeInsets.all(8.0),
            child: TextField(
              onChanged: (value) {
                _searchQueryController.sink.add(value); // 将输入文本添加到流中
              },
              decoration: const InputDecoration(
                labelText: '搜索商品或分类',
                border: OutlineInputBorder(),
                prefixIcon: Icon(Icons.search),
              ),
            ),
          ),
          Expanded(
            child: ListView.builder(
              itemCount: _filteredProducts.length,
              itemBuilder: (context, index) {
                final product = _filteredProducts[index];
                return ListTile(
                  title: Text(product.name),
                  subtitle: Text(product.category),
                );
              },
            ),
          ),
        ],
      ),
    );
  }
}

// 扩展方法,用于 Stream 的 debounce 操作
extension DebounceExtension<T> on Stream<T> {
  Stream<T> debounce(Duration duration) {
    StreamController<T> controller = StreamController<T>();
    Timer? _debounceTimer;

    listen(
      (data) {
        if (_debounceTimer?.isActive ?? false) {
          _debounceTimer!.cancel();
        }
        _debounceTimer = Timer(duration, () {
          controller.add(data);
        });
      },
      onError: controller.addError,
      onDone: controller.close,
    );

    return controller.stream;
  }
}

void main() {
  runApp(const MaterialApp(home: RealtimeSearchScreen()));
}

案例分析:

  • StreamController<String> _searchQueryController:我们创建了一个 StreamController 来管理用户输入的搜索查询字符串。当用户在 TextField 中输入文本时,onChanged 回调会将文本通过 _searchQueryController.sink.add(value) 添加到流中。
  • _searchQueryController.stream.debounce(...):这里使用了我们自定义的 debounce 扩展方法。debounce 操作符会等待一段时间(例如 300 毫秒),如果在这段时间内没有新的事件到来,它才会将最新的事件发送出去。这对于实时搜索非常重要,可以避免在用户每次输入字符时都立即触发搜索,从而减少不必要的计算和网络请求。
  • .distinct()distinct 操作符会过滤掉连续重复的事件。如果用户连续输入相同的搜索词,只有第一次会触发搜索。
  • .listen((query) { _filterProducts(query); }):最终,经过 debouncedistinct 处理后的搜索查询会通过 listen 方法被订阅。在 listen 的回调中,我们调用 _filterProducts 方法来更新 UI。
  • _filterProducts(String query):这个方法根据传入的查询字符串过滤 _allProducts 列表,并使用 setState 更新 _filteredProducts,从而触发 UI 刷新。
  • DebounceExtension<T> on Stream<T>:我们自定义了一个 debounce 扩展方法,它作用于任何 Stream 类型。这个扩展方法内部使用了 StreamControllerTimer 来实现防抖逻辑。这展示了如何为 Stream 添加自定义的转换操作符。
  • _searchQueryController.close():在 dispose 方法中,我们关闭了 StreamController。这是非常重要的,可以释放资源并防止内存泄漏。

这个案例清晰地展示了 Dart 的 Stream 和响应式编程在 Flutter 应用中的强大应用。通过 StreamController 和各种转换操作符,我们可以优雅地处理异步事件流,实现实时搜索、数据流处理等复杂功能,从而构建出响应迅速、用户体验良好的应用程序。理解 Stream 是掌握 Flutter 复杂状态管理和数据流处理的关键一步。