在 Dart 中,Stream(流)是处理一系列异步事件的强大机制。它类似于 Future,但 Future 只处理单个异步结果,而 Stream 可以处理零个、一个或多个异步事件序列。Stream 是实现响应式编程(Reactive Programming)的关键,它允许我们以声明式的方式处理随时间变化的事件流。
1. Stream 的概念
Stream 可以看作是数据、错误或完成事件的“管道”。当有新的事件发生时,它会通过这个管道发送出去,而订阅者(Listeners)可以接收并处理这些事件。
2. 创建 Stream
Stream.fromIterable():从一个 Iterable 创建流。
Stream<int> numbersStream = Stream.fromIterable([1, 2, 3, 4, 5]);
Stream.periodic():创建一个周期性发送事件的流。
Stream<int> tickStream = Stream.periodic(Duration(seconds: 1), (count) => count);
StreamController:最常用的创建流的方式,它允许你手动添加事件、错误和完成信号。
import 'dart:async';
final StreamController<String> controller = StreamController<String>();
void addData() {
controller.sink.add("Hello");
controller.sink.add("World");
controller.sink.addError("Something went wrong!");
controller.sink.close(); // 关闭流
}
void main() {
controller.stream.listen(
(data) => print("Data: $data"),
onError: (error) => print("Error: $error"),
onDone: () => print("Stream finished."),
);
addData();
}
// 预期输出:
// Data: Hello
// Data: World
// Error: Something went wrong!
// Stream finished.
3. 监听 Stream
使用 listen() 方法来订阅流并处理事件。listen() 方法返回一个 StreamSubscription 对象,可以用来暂停、恢复或取消订阅。
import 'dart:async';
void main() {
Stream<int> numbers = Stream.fromIterable([1, 2, 3, 4, 5]);
StreamSubscription<int> subscription = numbers.listen(
(data) {
print("Received: $data");
if (data == 3) {
subscription.pause(); // 暂停流
print("Stream paused.");
Future.delayed(Duration(seconds: 2), () {
subscription.resume(); // 2秒后恢复流
print("Stream resumed.");
});
}
},
onError: (error) => print("Error: $error"),
onDone: () => print("Stream completed."),
cancelOnError: true, // 发生错误时自动取消订阅
);
// 可以在任何时候取消订阅
// Future.delayed(Duration(seconds: 5), () {
// subscription.cancel();
// print("Stream cancelled.");
// });
}
4. async* 和 yield
async*(异步生成器函数)和 yield 关键字提供了一种更简洁的方式来创建 Stream。async* 函数返回一个 Stream,而 yield 关键字用于向流中添加事件。
Stream<int> countStream(int max) async* {
for (int i = 1; i <= max; i++) {
await Future.delayed(Duration(milliseconds: 500)); // 模拟异步操作
yield i; // 发送事件
}
}
void main() {
countStream(5).listen(
(data) => print("Count: $data"),
onDone: () => print("Counting finished."),
);
}
// 预期输出:
// Count: 1 (0.5秒后)
// Count: 2 (0.5秒后)
// ...
// Counting finished.
5. Stream 转换操作符
Stream 提供了丰富的转换操作符,用于对流中的事件进行过滤、映射、合并等操作,这使得响应式编程变得非常强大。
map(): 将流中的每个事件转换为新的事件。where(): 过滤流中的事件,只保留满足条件的事件。take(): 只取流中的前 N 个事件。skip(): 跳过流中的前 N 个事件。distinct(): 过滤掉连续重复的事件。reduce(): 将流中的所有事件聚合成一个单一的值。fold(): 类似于 reduce,但可以提供一个初始值。void main() {
Stream.fromIterable([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
.where((number) => number % 2 == 0) // 过滤偶数
.map((number) => "Even: $number") // 转换为字符串
.listen(
(data) => print(data),
onDone: () => print("Done processing even numbers."),
);
// 预期输出:
// Even: 2
// Even: 4
// Even: 6
// Even: 8
// Even: 10
// Done processing even numbers.
}
Stream 在 Flutter 开发中扮演着至关重要的角色,尤其是在处理持续性事件、构建响应式 UI 和实现复杂状态管理时。许多 Flutter 框架内部的机制,如 AnimationController、Firebase 实时数据库更新、BLoC 或 RxDart 状态管理库,都大量依赖于 Stream。
案例:一个实时搜索功能
我们将创建一个简单的实时搜索功能,用户在输入框中输入文本时,应用会根据输入内容实时过滤列表。这个案例将演示如何使用 StreamController 和 Stream 的转换操作符来实现响应式搜索。
import 'package:flutter/material.dart';
import 'dart:async';
class Product {
final String name;
final String category;
Product({required this.name, required this.category});
}
class RealtimeSearchScreen extends StatefulWidget {
const RealtimeSearchScreen({super.key});
@override
State<RealtimeSearchScreen> createState() => _RealtimeSearchScreenState();
}
class _RealtimeSearchScreenState extends State<RealtimeSearchScreen> {
final List<Product> _allProducts = [
Product(name: '苹果', category: '水果'),
Product(name: '香蕉', category: '水果'),
Product(name: '牛奶', category: '乳制品'),
Product(name: '面包', category: '烘焙'),
Product(name: '酸奶', category: '乳制品'),
Product(name: '蛋糕', category: '烘焙'),
Product(name: '橙子', category: '水果'),
Product(name: '奶酪', category: '乳制品'),
];
// StreamController 用于管理搜索文本的流
final StreamController<String> _searchQueryController = StreamController<String>();
List<Product> _filteredProducts = [];
@override
void initState() {
super.initState();
_filteredProducts = _allProducts; // 初始显示所有产品
// 监听搜索查询流,并进行转换操作
_searchQueryController.stream
.debounce(const Duration(milliseconds: 300)) // 防抖:等待用户停止输入一段时间
.distinct() // 去重:只处理不同的查询
.listen((query) {
_filterProducts(query); // 过滤产品
});
}
void _filterProducts(String query) {
setState(() {
if (query.isEmpty) {
_filteredProducts = _allProducts;
} else {
_filteredProducts = _allProducts
.where((product) =>
product.name.toLowerCase().contains(query.toLowerCase()) ||
product.category.toLowerCase().contains(query.toLowerCase()))
.toList();
}
});
}
@override
void dispose() {
_searchQueryController.close(); // 关闭 StreamController
super.dispose();
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: const Text('实时搜索'),
),
body: Column(
children: <Widget>[
Padding(
padding: const EdgeInsets.all(8.0),
child: TextField(
onChanged: (value) {
_searchQueryController.sink.add(value); // 将输入文本添加到流中
},
decoration: const InputDecoration(
labelText: '搜索商品或分类',
border: OutlineInputBorder(),
prefixIcon: Icon(Icons.search),
),
),
),
Expanded(
child: ListView.builder(
itemCount: _filteredProducts.length,
itemBuilder: (context, index) {
final product = _filteredProducts[index];
return ListTile(
title: Text(product.name),
subtitle: Text(product.category),
);
},
),
),
],
),
);
}
}
// 扩展方法,用于 Stream 的 debounce 操作
extension DebounceExtension<T> on Stream<T> {
Stream<T> debounce(Duration duration) {
StreamController<T> controller = StreamController<T>();
Timer? _debounceTimer;
listen(
(data) {
if (_debounceTimer?.isActive ?? false) {
_debounceTimer!.cancel();
}
_debounceTimer = Timer(duration, () {
controller.add(data);
});
},
onError: controller.addError,
onDone: controller.close,
);
return controller.stream;
}
}
void main() {
runApp(const MaterialApp(home: RealtimeSearchScreen()));
}
案例分析:
StreamController<String> _searchQueryController:我们创建了一个 StreamController 来管理用户输入的搜索查询字符串。当用户在 TextField 中输入文本时,onChanged 回调会将文本通过 _searchQueryController.sink.add(value) 添加到流中。_searchQueryController.stream.debounce(...):这里使用了我们自定义的 debounce 扩展方法。debounce 操作符会等待一段时间(例如 300 毫秒),如果在这段时间内没有新的事件到来,它才会将最新的事件发送出去。这对于实时搜索非常重要,可以避免在用户每次输入字符时都立即触发搜索,从而减少不必要的计算和网络请求。.distinct():distinct 操作符会过滤掉连续重复的事件。如果用户连续输入相同的搜索词,只有第一次会触发搜索。.listen((query) { _filterProducts(query); }):最终,经过 debounce 和 distinct 处理后的搜索查询会通过 listen 方法被订阅。在 listen 的回调中,我们调用 _filterProducts 方法来更新 UI。_filterProducts(String query):这个方法根据传入的查询字符串过滤 _allProducts 列表,并使用 setState 更新 _filteredProducts,从而触发 UI 刷新。DebounceExtension<T> on Stream<T>:我们自定义了一个 debounce 扩展方法,它作用于任何 Stream 类型。这个扩展方法内部使用了 StreamController 和 Timer 来实现防抖逻辑。这展示了如何为 Stream 添加自定义的转换操作符。_searchQueryController.close():在 dispose 方法中,我们关闭了 StreamController。这是非常重要的,可以释放资源并防止内存泄漏。这个案例清晰地展示了 Dart 的 Stream 和响应式编程在 Flutter 应用中的强大应用。通过 StreamController 和各种转换操作符,我们可以优雅地处理异步事件流,实现实时搜索、数据流处理等复杂功能,从而构建出响应迅速、用户体验良好的应用程序。理解 Stream 是掌握 Flutter 复杂状态管理和数据流处理的关键一步。