summaryrefslogtreecommitdiffstats
path: root/vendor/futures
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/futures')
-rw-r--r--vendor/futures/.cargo-checksum.json2
-rw-r--r--vendor/futures/Cargo.toml108
-rw-r--r--vendor/futures/README.md61
-rw-r--r--vendor/futures/src/lib.rs74
-rw-r--r--vendor/futures/tests/async_await_macros.rs12
-rw-r--r--vendor/futures/tests/auto_traits.rs8
-rw-r--r--vendor/futures/tests/bilock.rs (renamed from vendor/futures/tests_disabled/bilock.rs)68
-rw-r--r--vendor/futures/tests/compat.rs1
-rw-r--r--vendor/futures/tests/eventual.rs22
-rw-r--r--vendor/futures/tests/future_join.rs32
-rw-r--r--vendor/futures/tests/future_join_all.rs25
-rw-r--r--vendor/futures/tests/future_shared.rs78
-rw-r--r--vendor/futures/tests/future_try_join_all.rs24
-rw-r--r--vendor/futures/tests/lock_mutex.rs49
-rw-r--r--vendor/futures/tests/ready_queue.rs2
-rw-r--r--vendor/futures/tests/sink.rs2
-rw-r--r--vendor/futures/tests/stream.rs388
-rw-r--r--vendor/futures/tests/stream_futures_ordered.rs88
-rw-r--r--vendor/futures/tests/stream_futures_unordered.rs16
-rw-r--r--vendor/futures/tests/stream_try_stream.rs98
-rw-r--r--vendor/futures/tests_disabled/stream.rs1
21 files changed, 1035 insertions, 124 deletions
diff --git a/vendor/futures/.cargo-checksum.json b/vendor/futures/.cargo-checksum.json
index 04b9549c1..3254c7dbc 100644
--- a/vendor/futures/.cargo-checksum.json
+++ b/vendor/futures/.cargo-checksum.json
@@ -1 +1 @@
-{"files":{"Cargo.toml":"d5f1e8cd3101583132847daf46805ef6c0576cc5fcf954db0bb06948da552ce2","LICENSE-APACHE":"275c491d6d1160553c32fd6127061d7f9606c3ea25abfad6ca3f6ed088785427","LICENSE-MIT":"6652c868f35dfe5e8ef636810a4e576b9d663f3a17fb0f5613ad73583e1b88fd","src/lib.rs":"64dde6f72fcb20b91783219bb78cedfeb1558696de0b7aaee92598cf82ab6144","tests/_require_features.rs":"5ad24019430b498addfc1fd853e955c7b646d78d0727a8ca29f586c9aab45cff","tests/async_await_macros.rs":"e171f9f02e7b7b0d9c254ad9b0f777b0282a6742b7c72a0080d9f6e4a6a44a4d","tests/auto_traits.rs":"afd108f67ce7a1549071c0f2316af63be3cb9ef864ff9856c9b2d4e47987c349","tests/compat.rs":"1ab5af07f13fad9b8fbf29c0df89102687b6abe855ce92bac153d5f916b28689","tests/eager_drop.rs":"dc25d067207c06bbe094752d70bf161e206f00e162ffa3219583c8b4eb0816a1","tests/eventual.rs":"4e3db25ac3f5ebb191caf538c460234eb95413b17441372cc3234d2cbecdc551","tests/future_abortable.rs":"4c81607472a85c5d87a5fe8a510a24cf1e8793fedf7f6cd6741ba1efd66615cd","tests/future_basic_combinators.rs":"4508c1250b85a4f749b7261bbd0ba728d3970e7ba277e84a006e76cf068fb54f","tests/future_fuse.rs":"bb63141f1486e755d0cdea1d93e302ad864a2186aa5287f909a0b3a922e82065","tests/future_inspect.rs":"9c03ceb770ce04fe9fd88a3489362642a0e34ae86a7b4958703e89e8b7a1ecf4","tests/future_join_all.rs":"4c7ab90afc4a0ae721e16f92615cd990a7a608de50b88ba06e6f931478ea04cd","tests/future_obj.rs":"a6aae88a194dc7d3bb961c20db78f180a01796cf7ea4bf106da98c40d89ed36d","tests/future_select_all.rs":"4cefc84d6b7ae2cf0007912cd0325fff6b926a4c26310e7b14a21868de61616f","tests/future_select_ok.rs":"1cabd03268641e1ac42b880344528bad73e3aeb6d6a8a141e652f339dd40184b","tests/future_shared.rs":"778e8763dea8df205581ec8dd9bf1453ca9f17065b496cecb6728147a148efeb","tests/future_try_flatten_stream.rs":"aa4542b5d88f62522b736fac4567613081df45ad3eb54b0b659cdadc9409c4db","tests/future_try_join_all.rs":"2bdd2e7d7f6d8b9c28b05e374906e10a914c2ff36762a0fd81ca4d892fad1341","tests/io_buf_reader.rs":"1d60479224d5aa9378d4aed6246362b08a823ee7c9977f6a5e44fce7c40116be","tests/io_buf_writer.rs":"8f7a78ab2955d2beb69d0881321d4191235540aef6448e875e7f76a2ffc55b89","tests/io_cursor.rs":"cba5a7b968b9f816ac33316ce1e4da67cb320aa5a21332c0f9a45694fa445dd7","tests/io_line_writer.rs":"5b1140de776a721a677911496daa4e7956cc52cc08838d593ab300a93e0d7984","tests/io_lines.rs":"72a310c885591793ed724d0aa2158ac2c9d1af22de417044d96b714f78317586","tests/io_read.rs":"e0a8fa9b27e042f03c9fe14e8f0f329a67e24afad1ce40b906a1ab4d2abef23a","tests/io_read_exact.rs":"42049cd67589992dc09764ffb3836c475115b26dee441fd4cc7e847b2d166667","tests/io_read_line.rs":"f360c30c32fc8c73b371281e86c3f1095da7ef23b702debb30d335046dc77dac","tests/io_read_to_end.rs":"ea3e961e39a0b92930bded05e8ba26e4902461ab53818843d40fae8065b1a803","tests/io_read_to_string.rs":"824921601ac49f15b9a0b349c900f9cc9081cf2646e6a86f443166f841f1320e","tests/io_read_until.rs":"36d9a98149b2410894121ccba49e5134e3209826b2225acfc787016cea2bc92a","tests/io_window.rs":"0d18334b1eb35f5e93099e19c0cab22abe5971d8531176b81345fc89d07692a8","tests/io_write.rs":"701032ff3d5a6e6a3d8cb4e373d1c93e4708f2e5ee0a6742fa626f27b6094b4d","tests/lock_mutex.rs":"055ec0365e7ccd3698aa4b02336fd4dd801017aeb2c19345c58b43415d40fa06","tests/macro_comma_support.rs":"627024ccadfe95194469d5bae2cc29b897b0118a664d7222408a2e234a10e939","tests/object_safety.rs":"9d047190387ed8334113687003c23407c80c858411f5ec7d5c505500f9639dfc","tests/oneshot.rs":"2109a8b3b524f4b36be9fb100f9b8c0d38bbd38d51716adcafdb65994b4a81d6","tests/ready_queue.rs":"cf7047cefab12ff0e2e0ca1ff2123ae87b85a2464fa4c2b6a0e2fc8ee5f25aa1","tests/recurse.rs":"b01b3d73b69ad90a767d297f974dac435817c39e12556fa6a3e6c725dd84f706","tests/sink.rs":"a96700307d6b2bea87c5567a93e0ac81d9ebc7ed354a35fa1b893b39ac8b3759","tests/sink_fanout.rs":"67ab58422040308353955311f75222e55378e4cc34557c7b34140bd20c259132","tests/stream.rs":"78be652d49845b2562e275293398686079b512d88e12661ea644e0881c97be27","tests/stream_abortable.rs":"60052b83b5eeb2395b77bc213f35098d2d5880529f0d83884582a8bbff78b139","tests/stream_buffer_unordered.rs":"143ee19056b9ee9e480903cf4a1b00da7d4e528c5804569bf8c40869e6ac6eed","tests/stream_catch_unwind.rs":"5cdaaf70436c49d3a7107bdc5547ddb8757c3d2057635aded70e485d0cb9cbfc","tests/stream_futures_ordered.rs":"b6f8beafd37e44e82c1f6de322ecba752f9d833d5520ed3ea63c303ea1979644","tests/stream_futures_unordered.rs":"12a361ac0d3694908127372de8b710acc5ff08a7ad5e493ca795bdcfb9601c86","tests/stream_into_async_read.rs":"00ecb18289ebc8f46ea0cf43e0dce0631d7698bd1303a7bcd84d0addc9d8b645","tests/stream_peekable.rs":"c0addb0c510e13183ba3d6102633b75a9223651ae80a64542e913c712fe69a30","tests/stream_select_all.rs":"3a9045754939da5b30305e78f0571d79a03aaa77030c6ccf82225f076e9843c9","tests/stream_select_next_some.rs":"871edcee3ffc16c697251b29c9ba500aa4e3e503aa738748d7392e3462c82dce","tests/stream_split.rs":"074e9c9b51b6f7ea83d77347b5a0c8d414ca32b90445fec9b85f7f4cd2a6049f","tests/stream_try_stream.rs":"cf9af07a31697a43ab0071d958f71fba6d84b2f3031301fd309821a72f3de5f7","tests/stream_unfold.rs":"7c6fbd10c782828793cbe1eb347ec776d99b185dad498e886f7161da76f76880","tests/task_arc_wake.rs":"5a49d074d1d5d9d5ec383dcd9a3868f636c1d7e34662e2573e467948db126206","tests/task_atomic_waker.rs":"8e85b4bc1360788646a52633dfe896d852773d6b482f81626cf534b97b7d937a","tests/test_macro.rs":"a46a946169c342c576936b60909165a50b94350501280ed9bba89d365af69287","tests/try_join.rs":"65f282f8351bd9a74642f2465c7aaf72ee7097002920989f156d60271652549e","tests_disabled/all.rs":"ddcd8fefb0d4a4a91a78328e7e652c35f93dc3669639d76fa0f56452b51abc23","tests_disabled/bilock.rs":"74e598568403df45460085166b7b90012d40dae8670b1c8dec126322a4ce171f","tests_disabled/stream.rs":"10e701f0eb83bcc6ec74d96529ad7dad5ad38bf5826574049501aeb07c5b76fa"},"package":"28560757fe2bb34e79f907794bb6b22ae8b0e5c669b638a1132f2592b19035b4"} \ No newline at end of file
+{"files":{"Cargo.toml":"f6608fc3b8f2c4ceff1d038056f9ae359c2c77d72f83158867abcf53a4c957b0","LICENSE-APACHE":"275c491d6d1160553c32fd6127061d7f9606c3ea25abfad6ca3f6ed088785427","LICENSE-MIT":"6652c868f35dfe5e8ef636810a4e576b9d663f3a17fb0f5613ad73583e1b88fd","README.md":"842d0b8a539ab13ba2b9863cd8fb27da4fc7e9def1aefeb21db5aa04269b1e34","src/lib.rs":"bb07f533ba89a36c0385b57de17d51bf23ccab9e13e00ec812a74f376df15930","tests/_require_features.rs":"5ad24019430b498addfc1fd853e955c7b646d78d0727a8ca29f586c9aab45cff","tests/async_await_macros.rs":"87863f5b73217d727a4789d69229ab5dd85252b8e76a1aca0220feb98a0922af","tests/auto_traits.rs":"d5e0e5ed4b6b93103a5d5725cca87b3c65a07c3f891f8c8f52e7d66e3a991833","tests/bilock.rs":"bd0bf617352528f686b3fbb1847f4da9f6fe351e456e0bdce888bc738311fa83","tests/compat.rs":"1449926cc046d2ae9f86a263efd9353ca8e174ea546c083b360136c5a2aef1d1","tests/eager_drop.rs":"dc25d067207c06bbe094752d70bf161e206f00e162ffa3219583c8b4eb0816a1","tests/eventual.rs":"9050809e5196d0870a3ee2a268a5b4b398739b01617e1e317a673ac0660974cf","tests/future_abortable.rs":"4c81607472a85c5d87a5fe8a510a24cf1e8793fedf7f6cd6741ba1efd66615cd","tests/future_basic_combinators.rs":"4508c1250b85a4f749b7261bbd0ba728d3970e7ba277e84a006e76cf068fb54f","tests/future_fuse.rs":"bb63141f1486e755d0cdea1d93e302ad864a2186aa5287f909a0b3a922e82065","tests/future_inspect.rs":"9c03ceb770ce04fe9fd88a3489362642a0e34ae86a7b4958703e89e8b7a1ecf4","tests/future_join.rs":"f59d7b948df7019e52f902ca7aef17f89ad26582bd1902d520ba99f6f61ba508","tests/future_join_all.rs":"6adacfca4d33a769dbe72fd04c54b49580ecd7a9994a185cfe97dd7a2b55c298","tests/future_obj.rs":"a6aae88a194dc7d3bb961c20db78f180a01796cf7ea4bf106da98c40d89ed36d","tests/future_select_all.rs":"4cefc84d6b7ae2cf0007912cd0325fff6b926a4c26310e7b14a21868de61616f","tests/future_select_ok.rs":"1cabd03268641e1ac42b880344528bad73e3aeb6d6a8a141e652f339dd40184b","tests/future_shared.rs":"4f2cba1e74dacc4fc6b92eef04700df832533efe4fe6a392e3fd0f655b5b8450","tests/future_try_flatten_stream.rs":"aa4542b5d88f62522b736fac4567613081df45ad3eb54b0b659cdadc9409c4db","tests/future_try_join_all.rs":"cca2c5a3b42fe4bf9705301cd1450b30a3822736c5c09793eee06b28ce686a19","tests/io_buf_reader.rs":"1d60479224d5aa9378d4aed6246362b08a823ee7c9977f6a5e44fce7c40116be","tests/io_buf_writer.rs":"8f7a78ab2955d2beb69d0881321d4191235540aef6448e875e7f76a2ffc55b89","tests/io_cursor.rs":"cba5a7b968b9f816ac33316ce1e4da67cb320aa5a21332c0f9a45694fa445dd7","tests/io_line_writer.rs":"5b1140de776a721a677911496daa4e7956cc52cc08838d593ab300a93e0d7984","tests/io_lines.rs":"72a310c885591793ed724d0aa2158ac2c9d1af22de417044d96b714f78317586","tests/io_read.rs":"e0a8fa9b27e042f03c9fe14e8f0f329a67e24afad1ce40b906a1ab4d2abef23a","tests/io_read_exact.rs":"42049cd67589992dc09764ffb3836c475115b26dee441fd4cc7e847b2d166667","tests/io_read_line.rs":"f360c30c32fc8c73b371281e86c3f1095da7ef23b702debb30d335046dc77dac","tests/io_read_to_end.rs":"ea3e961e39a0b92930bded05e8ba26e4902461ab53818843d40fae8065b1a803","tests/io_read_to_string.rs":"824921601ac49f15b9a0b349c900f9cc9081cf2646e6a86f443166f841f1320e","tests/io_read_until.rs":"36d9a98149b2410894121ccba49e5134e3209826b2225acfc787016cea2bc92a","tests/io_window.rs":"0d18334b1eb35f5e93099e19c0cab22abe5971d8531176b81345fc89d07692a8","tests/io_write.rs":"701032ff3d5a6e6a3d8cb4e373d1c93e4708f2e5ee0a6742fa626f27b6094b4d","tests/lock_mutex.rs":"eb47121b842096353165b1444bf679a2df0103b181f811b40042f5c3f1d00c73","tests/macro_comma_support.rs":"627024ccadfe95194469d5bae2cc29b897b0118a664d7222408a2e234a10e939","tests/object_safety.rs":"9d047190387ed8334113687003c23407c80c858411f5ec7d5c505500f9639dfc","tests/oneshot.rs":"2109a8b3b524f4b36be9fb100f9b8c0d38bbd38d51716adcafdb65994b4a81d6","tests/ready_queue.rs":"6380025061025c27cb3b521df9520f169c7aa8e1802b881d539023bb4651744a","tests/recurse.rs":"b01b3d73b69ad90a767d297f974dac435817c39e12556fa6a3e6c725dd84f706","tests/sink.rs":"d9b2ddcbbb6af9e36d057db97dbba233547be645a7e4901b2842a4671f9f0212","tests/sink_fanout.rs":"67ab58422040308353955311f75222e55378e4cc34557c7b34140bd20c259132","tests/stream.rs":"049762ea6dad747cc9e7609f63487e25065a4c0032488e276f65fd522a07867c","tests/stream_abortable.rs":"60052b83b5eeb2395b77bc213f35098d2d5880529f0d83884582a8bbff78b139","tests/stream_buffer_unordered.rs":"143ee19056b9ee9e480903cf4a1b00da7d4e528c5804569bf8c40869e6ac6eed","tests/stream_catch_unwind.rs":"5cdaaf70436c49d3a7107bdc5547ddb8757c3d2057635aded70e485d0cb9cbfc","tests/stream_futures_ordered.rs":"f9083bd8cfa86620c51abffc390564432022b5c8d15a7cba15dd5cb53ae99dd6","tests/stream_futures_unordered.rs":"c888112d760db856e4d9191a2a6a3aa4a757d65e47a12fcd16fc5be7bf0b3e78","tests/stream_into_async_read.rs":"00ecb18289ebc8f46ea0cf43e0dce0631d7698bd1303a7bcd84d0addc9d8b645","tests/stream_peekable.rs":"c0addb0c510e13183ba3d6102633b75a9223651ae80a64542e913c712fe69a30","tests/stream_select_all.rs":"3a9045754939da5b30305e78f0571d79a03aaa77030c6ccf82225f076e9843c9","tests/stream_select_next_some.rs":"871edcee3ffc16c697251b29c9ba500aa4e3e503aa738748d7392e3462c82dce","tests/stream_split.rs":"074e9c9b51b6f7ea83d77347b5a0c8d414ca32b90445fec9b85f7f4cd2a6049f","tests/stream_try_stream.rs":"eba57cdda77f2aeee3a6059d5771c9a100f99a7eeb4460e7f7819803759fba86","tests/stream_unfold.rs":"7c6fbd10c782828793cbe1eb347ec776d99b185dad498e886f7161da76f76880","tests/task_arc_wake.rs":"5a49d074d1d5d9d5ec383dcd9a3868f636c1d7e34662e2573e467948db126206","tests/task_atomic_waker.rs":"8e85b4bc1360788646a52633dfe896d852773d6b482f81626cf534b97b7d937a","tests/test_macro.rs":"a46a946169c342c576936b60909165a50b94350501280ed9bba89d365af69287","tests/try_join.rs":"65f282f8351bd9a74642f2465c7aaf72ee7097002920989f156d60271652549e","tests_disabled/all.rs":"ddcd8fefb0d4a4a91a78328e7e652c35f93dc3669639d76fa0f56452b51abc23","tests_disabled/stream.rs":"8a615a472a35053d12b269d40fe244dfb3ba66fb78d217323aa2be177d5a111e"},"package":"23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40"} \ No newline at end of file
diff --git a/vendor/futures/Cargo.toml b/vendor/futures/Cargo.toml
index ac96b0383..07828eb79 100644
--- a/vendor/futures/Cargo.toml
+++ b/vendor/futures/Cargo.toml
@@ -11,57 +11,77 @@
[package]
edition = "2018"
-rust-version = "1.45"
+rust-version = "1.56"
name = "futures"
-version = "0.3.19"
-description = "An implementation of futures and streams featuring zero allocations,\ncomposability, and iterator-like interfaces.\n"
+version = "0.3.28"
+description = """
+An implementation of futures and streams featuring zero allocations,
+composability, and iterator-like interfaces.
+"""
homepage = "https://rust-lang.github.io/futures-rs"
-readme = "../README.md"
-keywords = ["futures", "async", "future"]
+readme = "README.md"
+keywords = [
+ "futures",
+ "async",
+ "future",
+]
categories = ["asynchronous"]
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
+
[package.metadata.docs.rs]
all-features = true
-rustdoc-args = ["--cfg", "docsrs"]
+rustdoc-args = [
+ "--cfg",
+ "docsrs",
+]
[package.metadata.playground]
-features = ["std", "async-await", "compat", "io-compat", "executor", "thread-pool"]
+features = [
+ "std",
+ "async-await",
+ "compat",
+ "io-compat",
+ "executor",
+ "thread-pool",
+]
+
[dependencies.futures-channel]
-version = "0.3.19"
+version = "0.3.28"
features = ["sink"]
default-features = false
[dependencies.futures-core]
-version = "0.3.19"
+version = "0.3.28"
default-features = false
[dependencies.futures-executor]
-version = "0.3.19"
+version = "0.3.28"
optional = true
default-features = false
[dependencies.futures-io]
-version = "0.3.19"
+version = "0.3.28"
default-features = false
[dependencies.futures-sink]
-version = "0.3.19"
+version = "0.3.28"
default-features = false
[dependencies.futures-task]
-version = "0.3.19"
+version = "0.3.28"
default-features = false
[dependencies.futures-util]
-version = "0.3.19"
+version = "0.3.28"
features = ["sink"]
default-features = false
+
[dev-dependencies.assert_matches]
version = "1.3.0"
[dev-dependencies.pin-project]
-version = "1.0.1"
+version = "1.0.11"
[dev-dependencies.pin-utils]
version = "0.1.0"
@@ -73,15 +93,55 @@ version = "1"
version = "0.1.11"
[features]
-alloc = ["futures-core/alloc", "futures-task/alloc", "futures-sink/alloc", "futures-channel/alloc", "futures-util/alloc"]
-async-await = ["futures-util/async-await", "futures-util/async-await-macro"]
+alloc = [
+ "futures-core/alloc",
+ "futures-task/alloc",
+ "futures-sink/alloc",
+ "futures-channel/alloc",
+ "futures-util/alloc",
+]
+async-await = [
+ "futures-util/async-await",
+ "futures-util/async-await-macro",
+]
bilock = ["futures-util/bilock"]
cfg-target-has-atomic = []
-compat = ["std", "futures-util/compat"]
-default = ["std", "async-await", "executor"]
-executor = ["std", "futures-executor/std"]
-io-compat = ["compat", "futures-util/io-compat"]
-std = ["alloc", "futures-core/std", "futures-task/std", "futures-io/std", "futures-sink/std", "futures-util/std", "futures-util/io", "futures-util/channel"]
-thread-pool = ["executor", "futures-executor/thread-pool"]
-unstable = ["futures-core/unstable", "futures-task/unstable", "futures-channel/unstable", "futures-io/unstable", "futures-util/unstable"]
+compat = [
+ "std",
+ "futures-util/compat",
+]
+default = [
+ "std",
+ "async-await",
+ "executor",
+]
+executor = [
+ "std",
+ "futures-executor/std",
+]
+io-compat = [
+ "compat",
+ "futures-util/io-compat",
+]
+std = [
+ "alloc",
+ "futures-core/std",
+ "futures-task/std",
+ "futures-io/std",
+ "futures-sink/std",
+ "futures-util/std",
+ "futures-util/io",
+ "futures-util/channel",
+]
+thread-pool = [
+ "executor",
+ "futures-executor/thread-pool",
+]
+unstable = [
+ "futures-core/unstable",
+ "futures-task/unstable",
+ "futures-channel/unstable",
+ "futures-io/unstable",
+ "futures-util/unstable",
+]
write-all-vectored = ["futures-util/write-all-vectored"]
diff --git a/vendor/futures/README.md b/vendor/futures/README.md
new file mode 100644
index 000000000..355d6078e
--- /dev/null
+++ b/vendor/futures/README.md
@@ -0,0 +1,61 @@
+<p align="center">
+ <img alt="futures-rs" src="https://raw.githubusercontent.com/rust-lang/futures-rs/gh-pages/assets/images/futures-rs-logo.svg?sanitize=true" width="400">
+</p>
+
+<p align="center">
+ Zero-cost asynchronous programming in Rust
+</p>
+
+<p align="center">
+ <a href="https://github.com/rust-lang/futures-rs/actions?query=branch%3Amaster">
+ <img alt="Build Status" src="https://img.shields.io/github/actions/workflow/status/rust-lang/futures-rs/ci.yml?branch=master">
+ </a>
+
+ <a href="https://crates.io/crates/futures">
+ <img alt="crates.io" src="https://img.shields.io/crates/v/futures.svg">
+ </a>
+</p>
+
+<p align="center">
+ <a href="https://docs.rs/futures">
+ Documentation
+ </a> | <a href="https://rust-lang.github.io/futures-rs/">
+ Website
+ </a>
+</p>
+
+`futures-rs` is a library providing the foundations for asynchronous programming in Rust.
+It includes key trait definitions like `Stream`, as well as utilities like `join!`,
+`select!`, and various futures combinator methods which enable expressive asynchronous
+control flow.
+
+## Usage
+
+Add this to your `Cargo.toml`:
+
+```toml
+[dependencies]
+futures = "0.3"
+```
+
+The current `futures` requires Rust 1.56 or later.
+
+### Feature `std`
+
+Futures-rs works without the standard library, such as in bare metal environments.
+However, it has a significantly reduced API surface. To use futures-rs in
+a `#[no_std]` environment, use:
+
+```toml
+[dependencies]
+futures = { version = "0.3", default-features = false }
+```
+
+## License
+
+Licensed under either of [Apache License, Version 2.0](LICENSE-APACHE) or
+[MIT license](LICENSE-MIT) at your option.
+
+Unless you explicitly state otherwise, any contribution intentionally submitted
+for inclusion in the work by you, as defined in the Apache-2.0 license, shall
+be dual licensed as above, without any additional terms or conditions.
diff --git a/vendor/futures/src/lib.rs b/vendor/futures/src/lib.rs
index 8e21c8ebe..b972f5175 100644
--- a/vendor/futures/src/lib.rs
+++ b/vendor/futures/src/lib.rs
@@ -31,6 +31,7 @@
//! # use futures::StreamExt;
//! #
//! fn main() {
+//! # {
//! let pool = ThreadPool::new().expect("Failed to build pool");
//! let (tx, rx) = mpsc::unbounded::<i32>();
//!
@@ -72,6 +73,8 @@
//! let values: Vec<i32> = executor::block_on(fut_values);
//!
//! println!("Values={:?}", values);
+//! # }
+//! # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
//! }
//! ```
//!
@@ -150,13 +153,73 @@ pub use futures_util::io;
#[cfg(feature = "executor")]
#[cfg_attr(docsrs, doc(cfg(feature = "executor")))]
-#[doc(inline)]
-pub use futures_executor as executor;
+pub mod executor {
+ //! Built-in executors and related tools.
+ //!
+ //! All asynchronous computation occurs within an executor, which is
+ //! capable of spawning futures as tasks. This module provides several
+ //! built-in executors, as well as tools for building your own.
+ //!
+ //!
+ //! This module is only available when the `executor` feature of this
+ //! library is activated.
+ //!
+ //! # Using a thread pool (M:N task scheduling)
+ //!
+ //! Most of the time tasks should be executed on a [thread pool](ThreadPool).
+ //! A small set of worker threads can handle a very large set of spawned tasks
+ //! (which are much lighter weight than threads). Tasks spawned onto the pool
+ //! with the [`spawn_ok`](ThreadPool::spawn_ok) function will run ambiently on
+ //! the created threads.
+ //!
+ //! # Spawning additional tasks
+ //!
+ //! Tasks can be spawned onto a spawner by calling its [`spawn_obj`] method
+ //! directly. In the case of `!Send` futures, [`spawn_local_obj`] can be used
+ //! instead.
+ //!
+ //! # Single-threaded execution
+ //!
+ //! In addition to thread pools, it's possible to run a task (and the tasks
+ //! it spawns) entirely within a single thread via the [`LocalPool`] executor.
+ //! Aside from cutting down on synchronization costs, this executor also makes
+ //! it possible to spawn non-`Send` tasks, via [`spawn_local_obj`]. The
+ //! [`LocalPool`] is best suited for running I/O-bound tasks that do relatively
+ //! little work between I/O operations.
+ //!
+ //! There is also a convenience function [`block_on`] for simply running a
+ //! future to completion on the current thread.
+ //!
+ //! [`spawn_obj`]: https://docs.rs/futures/0.3/futures/task/trait.Spawn.html#tymethod.spawn_obj
+ //! [`spawn_local_obj`]: https://docs.rs/futures/0.3/futures/task/trait.LocalSpawn.html#tymethod.spawn_local_obj
+
+ pub use futures_executor::{
+ block_on, block_on_stream, enter, BlockingStream, Enter, EnterError, LocalPool,
+ LocalSpawner,
+ };
+
+ #[cfg(feature = "thread-pool")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "thread-pool")))]
+ pub use futures_executor::{ThreadPool, ThreadPoolBuilder};
+}
#[cfg(feature = "compat")]
#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
-#[doc(inline)]
-pub use futures_util::compat;
+pub mod compat {
+ //! Interop between `futures` 0.1 and 0.3.
+ //!
+ //! This module is only available when the `compat` feature of this
+ //! library is activated.
+
+ pub use futures_util::compat::{
+ Compat, Compat01As03, Compat01As03Sink, CompatSink, Executor01As03, Executor01CompatExt,
+ Executor01Future, Future01CompatExt, Sink01CompatExt, Stream01CompatExt,
+ };
+
+ #[cfg(feature = "io-compat")]
+ #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
+ pub use futures_util::compat::{AsyncRead01CompatExt, AsyncWrite01CompatExt};
+}
pub mod prelude {
//! A "prelude" for crates using the `futures` crate.
@@ -177,10 +240,12 @@ pub mod prelude {
pub use crate::stream::{self, Stream, TryStream};
#[doc(no_inline)]
+ #[allow(unreachable_pub)]
pub use crate::future::{FutureExt as _, TryFutureExt as _};
#[doc(no_inline)]
pub use crate::sink::SinkExt as _;
#[doc(no_inline)]
+ #[allow(unreachable_pub)]
pub use crate::stream::{StreamExt as _, TryStreamExt as _};
#[cfg(feature = "std")]
@@ -188,6 +253,7 @@ pub mod prelude {
#[cfg(feature = "std")]
#[doc(no_inline)]
+ #[allow(unreachable_pub)]
pub use crate::io::{
AsyncBufReadExt as _, AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _,
};
diff --git a/vendor/futures/tests/async_await_macros.rs b/vendor/futures/tests/async_await_macros.rs
index ce1f3a337..82a617f2c 100644
--- a/vendor/futures/tests/async_await_macros.rs
+++ b/vendor/futures/tests/async_await_macros.rs
@@ -346,43 +346,47 @@ fn stream_select() {
});
}
+#[cfg_attr(not(target_pointer_width = "64"), ignore)]
#[test]
fn join_size() {
let fut = async {
let ready = future::ready(0i32);
join!(ready)
};
- assert_eq!(mem::size_of_val(&fut), 16);
+ assert_eq!(mem::size_of_val(&fut), 24);
let fut = async {
let ready1 = future::ready(0i32);
let ready2 = future::ready(0i32);
join!(ready1, ready2)
};
- assert_eq!(mem::size_of_val(&fut), 28);
+ assert_eq!(mem::size_of_val(&fut), 40);
}
+#[cfg_attr(not(target_pointer_width = "64"), ignore)]
#[test]
fn try_join_size() {
let fut = async {
let ready = future::ready(Ok::<i32, i32>(0));
try_join!(ready)
};
- assert_eq!(mem::size_of_val(&fut), 16);
+ assert_eq!(mem::size_of_val(&fut), 24);
let fut = async {
let ready1 = future::ready(Ok::<i32, i32>(0));
let ready2 = future::ready(Ok::<i32, i32>(0));
try_join!(ready1, ready2)
};
- assert_eq!(mem::size_of_val(&fut), 28);
+ assert_eq!(mem::size_of_val(&fut), 48);
}
+#[allow(clippy::let_underscore_future)]
#[test]
fn join_doesnt_require_unpin() {
let _ = async { join!(async {}, async {}) };
}
+#[allow(clippy::let_underscore_future)]
#[test]
fn try_join_doesnt_require_unpin() {
let _ = async { try_join!(async { Ok::<(), ()>(()) }, async { Ok::<(), ()>(()) },) };
diff --git a/vendor/futures/tests/auto_traits.rs b/vendor/futures/tests/auto_traits.rs
index b3d8b0077..5fc0f7d67 100644
--- a/vendor/futures/tests/auto_traits.rs
+++ b/vendor/futures/tests/auto_traits.rs
@@ -576,10 +576,10 @@ pub mod future {
// TryJoin3, TryJoin4, TryJoin5 are the same as TryJoin
- assert_impl!(TryJoinAll<SendTryFuture<()>>: Send);
+ assert_impl!(TryJoinAll<SendTryFuture<(), ()>>: Send);
assert_not_impl!(TryJoinAll<LocalTryFuture>: Send);
assert_not_impl!(TryJoinAll<SendTryFuture>: Send);
- assert_impl!(TryJoinAll<SyncTryFuture<()>>: Sync);
+ assert_impl!(TryJoinAll<SyncTryFuture<(), ()>>: Sync);
assert_not_impl!(TryJoinAll<LocalTryFuture>: Sync);
assert_not_impl!(TryJoinAll<SyncTryFuture>: Sync);
assert_impl!(TryJoinAll<PinnedTryFuture>: Unpin);
@@ -1480,10 +1480,10 @@ pub mod stream {
assert_not_impl!(PollImmediate<PinnedStream>: Unpin);
assert_impl!(ReadyChunks<SendStream<()>>: Send);
- assert_not_impl!(ReadyChunks<SendStream>: Send);
+ assert_impl!(ReadyChunks<SendStream>: Send);
assert_not_impl!(ReadyChunks<LocalStream>: Send);
assert_impl!(ReadyChunks<SyncStream<()>>: Sync);
- assert_not_impl!(ReadyChunks<SyncStream>: Sync);
+ assert_impl!(ReadyChunks<SyncStream>: Sync);
assert_not_impl!(ReadyChunks<LocalStream>: Sync);
assert_impl!(ReadyChunks<UnpinStream>: Unpin);
assert_not_impl!(ReadyChunks<PinnedStream>: Unpin);
diff --git a/vendor/futures/tests_disabled/bilock.rs b/vendor/futures/tests/bilock.rs
index 0166ca48b..b10348784 100644
--- a/vendor/futures/tests_disabled/bilock.rs
+++ b/vendor/futures/tests/bilock.rs
@@ -1,34 +1,38 @@
+#![cfg(feature = "bilock")]
+
+use futures::executor::block_on;
use futures::future;
use futures::stream;
-use futures::task;
+use futures::task::{Context, Poll};
+use futures::Future;
+use futures::StreamExt;
+use futures_test::task::noop_context;
use futures_util::lock::BiLock;
+use std::pin::Pin;
use std::thread;
-// mod support;
-// use support::*;
-
#[test]
fn smoke() {
- let future = future::lazy(|_| {
+ let future = future::lazy(|cx| {
let (a, b) = BiLock::new(1);
{
- let mut lock = match a.poll_lock() {
+ let mut lock = match a.poll_lock(cx) {
Poll::Ready(l) => l,
Poll::Pending => panic!("poll not ready"),
};
assert_eq!(*lock, 1);
*lock = 2;
- assert!(b.poll_lock().is_pending());
- assert!(a.poll_lock().is_pending());
+ assert!(b.poll_lock(cx).is_pending());
+ assert!(a.poll_lock(cx).is_pending());
}
- assert!(b.poll_lock().is_ready());
- assert!(a.poll_lock().is_ready());
+ assert!(b.poll_lock(cx).is_ready());
+ assert!(a.poll_lock(cx).is_ready());
{
- let lock = match b.poll_lock() {
+ let lock = match b.poll_lock(cx) {
Poll::Ready(l) => l,
Poll::Pending => panic!("poll not ready"),
};
@@ -40,34 +44,32 @@ fn smoke() {
Ok::<(), ()>(())
});
- assert!(task::spawn(future)
- .poll_future_notify(&notify_noop(), 0)
- .expect("failure in poll")
- .is_ready());
+ assert_eq!(block_on(future), Ok(()));
}
#[test]
fn concurrent() {
const N: usize = 10000;
+ let mut cx = noop_context();
let (a, b) = BiLock::new(0);
let a = Increment { a: Some(a), remaining: N };
- let b = stream::iter_ok(0..N).fold(b, |b, _n| {
- b.lock().map(|mut b| {
- *b += 1;
- b.unlock()
- })
+ let b = stream::iter(0..N).fold(b, |b, _n| async {
+ let mut g = b.lock().await;
+ *g += 1;
+ drop(g);
+ b
});
- let t1 = thread::spawn(move || a.wait());
- let b = b.wait().expect("b error");
- let a = t1.join().unwrap().expect("a error");
+ let t1 = thread::spawn(move || block_on(a));
+ let b = block_on(b);
+ let a = t1.join().unwrap();
- match a.poll_lock() {
+ match a.poll_lock(&mut cx) {
Poll::Ready(l) => assert_eq!(*l, 2 * N),
Poll::Pending => panic!("poll not ready"),
}
- match b.poll_lock() {
+ match b.poll_lock(&mut cx) {
Poll::Ready(l) => assert_eq!(*l, 2 * N),
Poll::Pending => panic!("poll not ready"),
}
@@ -80,22 +82,22 @@ fn concurrent() {
}
impl Future for Increment {
- type Item = BiLock<usize>;
- type Error = ();
+ type Output = BiLock<usize>;
- fn poll(&mut self) -> Poll<BiLock<usize>, ()> {
+ fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<BiLock<usize>> {
loop {
if self.remaining == 0 {
- return Ok(self.a.take().unwrap().into());
+ return self.a.take().unwrap().into();
}
- let a = self.a.as_ref().unwrap();
- let mut a = match a.poll_lock() {
+ let a = self.a.as_mut().unwrap();
+ let mut a = match a.poll_lock(cx) {
Poll::Ready(l) => l,
- Poll::Pending => return Ok(Poll::Pending),
+ Poll::Pending => return Poll::Pending,
};
- self.remaining -= 1;
*a += 1;
+ drop(a);
+ self.remaining -= 1;
}
}
}
diff --git a/vendor/futures/tests/compat.rs b/vendor/futures/tests/compat.rs
index c4125d895..ac04a95ea 100644
--- a/vendor/futures/tests/compat.rs
+++ b/vendor/futures/tests/compat.rs
@@ -1,4 +1,5 @@
#![cfg(feature = "compat")]
+#![cfg(not(miri))] // Miri does not support epoll
use futures::compat::Future01CompatExt;
use futures::prelude::*;
diff --git a/vendor/futures/tests/eventual.rs b/vendor/futures/tests/eventual.rs
index bff000dd0..57a49b241 100644
--- a/vendor/futures/tests/eventual.rs
+++ b/vendor/futures/tests/eventual.rs
@@ -16,6 +16,8 @@ fn join1() {
run(future::try_join(ok::<i32, i32>(1), ok(2)).map_ok(move |v| tx.send(v).unwrap()));
assert_eq!(rx.recv(), Ok((1, 2)));
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -30,6 +32,8 @@ fn join2() {
c2.send(2).unwrap();
assert_eq!(rx.recv(), Ok((1, 2)));
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -43,6 +47,8 @@ fn join3() {
assert_eq!(rx.recv(), Ok(1));
assert!(rx.recv().is_err());
drop(c2);
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -56,6 +62,8 @@ fn join4() {
assert!(rx.recv().is_ok());
drop(c2);
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -73,6 +81,8 @@ fn join5() {
c3.send(3).unwrap();
assert_eq!(rx.recv(), Ok(((1, 2), 3)));
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -92,6 +102,8 @@ fn select1() {
c2.send(2).unwrap();
assert_eq!(rx.recv(), Ok(2));
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -111,6 +123,8 @@ fn select2() {
c2.send(2).unwrap();
assert_eq!(rx.recv(), Ok(2));
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
@@ -130,10 +144,14 @@ fn select3() {
drop(c2);
assert_eq!(rx.recv(), Ok(2));
assert!(rx.recv().is_err());
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
#[test]
fn select4() {
+ const N: usize = if cfg!(miri) { 100 } else { 10000 };
+
let (tx, rx) = mpsc::channel::<oneshot::Sender<i32>>();
let t = thread::spawn(move || {
@@ -143,7 +161,7 @@ fn select4() {
});
let (tx2, rx2) = mpsc::channel();
- for _ in 0..10000 {
+ for _ in 0..N {
let (c1, p1) = oneshot::channel::<i32>();
let (c2, p2) = oneshot::channel::<i32>();
@@ -156,4 +174,6 @@ fn select4() {
drop(tx);
t.join().unwrap();
+
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
diff --git a/vendor/futures/tests/future_join.rs b/vendor/futures/tests/future_join.rs
new file mode 100644
index 000000000..f5df9d777
--- /dev/null
+++ b/vendor/futures/tests/future_join.rs
@@ -0,0 +1,32 @@
+use futures::executor::block_on;
+use futures::future::Future;
+use std::task::Poll;
+
+/// This tests verifies (through miri) that self-referencing
+/// futures are not invalidated when joining them.
+#[test]
+fn futures_join_macro_self_referential() {
+ block_on(async { futures::join!(yield_now(), trouble()) });
+}
+
+async fn trouble() {
+ let lucky_number = 42;
+ let problematic_variable = &lucky_number;
+
+ yield_now().await;
+
+ // problematic dereference
+ let _ = { *problematic_variable };
+}
+
+fn yield_now() -> impl Future<Output = ()> {
+ let mut yielded = false;
+ std::future::poll_fn(move |cx| {
+ if core::mem::replace(&mut yielded, true) {
+ Poll::Ready(())
+ } else {
+ cx.waker().wake_by_ref();
+ Poll::Pending
+ }
+ })
+}
diff --git a/vendor/futures/tests/future_join_all.rs b/vendor/futures/tests/future_join_all.rs
index ae05a21b7..44486e1ca 100644
--- a/vendor/futures/tests/future_join_all.rs
+++ b/vendor/futures/tests/future_join_all.rs
@@ -1,22 +1,24 @@
use futures::executor::block_on;
use futures::future::{join_all, ready, Future, JoinAll};
+use futures::pin_mut;
use std::fmt::Debug;
-fn assert_done<T, F>(actual_fut: F, expected: T)
+#[track_caller]
+fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T)
where
T: PartialEq + Debug,
- F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
{
- let output = block_on(actual_fut());
+ pin_mut!(actual_fut);
+ let output = block_on(actual_fut);
assert_eq!(output, expected);
}
#[test]
fn collect_collects() {
- assert_done(|| Box::new(join_all(vec![ready(1), ready(2)])), vec![1, 2]);
- assert_done(|| Box::new(join_all(vec![ready(1)])), vec![1]);
+ assert_done(join_all(vec![ready(1), ready(2)]), vec![1, 2]);
+ assert_done(join_all(vec![ready(1)]), vec![1]);
// REVIEW: should this be implemented?
- // assert_done(|| Box::new(join_all(Vec::<i32>::new())), vec![]);
+ // assert_done(join_all(Vec::<i32>::new()), vec![]);
// TODO: needs more tests
}
@@ -25,18 +27,15 @@ fn collect_collects() {
fn join_all_iter_lifetime() {
// In futures-rs version 0.1, this function would fail to typecheck due to an overly
// conservative type parameterization of `JoinAll`.
- fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Vec<usize>> + Unpin> {
+ fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Vec<usize>> {
let iter = bufs.into_iter().map(|b| ready::<usize>(b.len()));
- Box::new(join_all(iter))
+ join_all(iter)
}
- assert_done(|| sizes(vec![&[1, 2, 3], &[], &[0]]), vec![3_usize, 0, 1]);
+ assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), vec![3_usize, 0, 1]);
}
#[test]
fn join_all_from_iter() {
- assert_done(
- || Box::new(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>()),
- vec![1, 2],
- )
+ assert_done(vec![ready(1), ready(2)].into_iter().collect::<JoinAll<_>>(), vec![1, 2])
}
diff --git a/vendor/futures/tests/future_shared.rs b/vendor/futures/tests/future_shared.rs
index 718d6c41b..bd69c1d7c 100644
--- a/vendor/futures/tests/future_shared.rs
+++ b/vendor/futures/tests/future_shared.rs
@@ -3,6 +3,7 @@ use futures::executor::{block_on, LocalPool};
use futures::future::{self, FutureExt, LocalFutureObj, TryFutureExt};
use futures::task::LocalSpawn;
use std::cell::{Cell, RefCell};
+use std::panic::AssertUnwindSafe;
use std::rc::Rc;
use std::task::Poll;
use std::thread;
@@ -151,6 +152,52 @@ fn downgrade() {
}
#[test]
+fn ptr_eq() {
+ use future::FusedFuture;
+ use std::collections::hash_map::DefaultHasher;
+ use std::hash::Hasher;
+
+ let (tx, rx) = oneshot::channel::<i32>();
+ let shared = rx.shared();
+ let mut shared2 = shared.clone();
+ let mut hasher = DefaultHasher::new();
+ let mut hasher2 = DefaultHasher::new();
+
+ // Because these two futures share the same underlying future,
+ // `ptr_eq` should return true.
+ assert!(shared.ptr_eq(&shared2));
+ // Equivalence relations are symmetric
+ assert!(shared2.ptr_eq(&shared));
+
+ // If `ptr_eq` returns true, they should hash to the same value.
+ shared.ptr_hash(&mut hasher);
+ shared2.ptr_hash(&mut hasher2);
+ assert_eq!(hasher.finish(), hasher2.finish());
+
+ tx.send(42).unwrap();
+ assert_eq!(block_on(&mut shared2).unwrap(), 42);
+
+ // Now that `shared2` has completed, `ptr_eq` should return false.
+ assert!(shared2.is_terminated());
+ assert!(!shared.ptr_eq(&shared2));
+
+ // `ptr_eq` should continue to work for the other `Shared`.
+ let shared3 = shared.clone();
+ let mut hasher3 = DefaultHasher::new();
+ assert!(shared.ptr_eq(&shared3));
+
+ shared3.ptr_hash(&mut hasher3);
+ assert_eq!(hasher.finish(), hasher3.finish());
+
+ let (_tx, rx) = oneshot::channel::<i32>();
+ let shared4 = rx.shared();
+
+ // And `ptr_eq` should return false for two futures that don't share
+ // the underlying future.
+ assert!(!shared.ptr_eq(&shared4));
+}
+
+#[test]
fn dont_clone_in_single_owner_shared_future() {
let counter = CountClone(Rc::new(Cell::new(0)));
let (tx, rx) = oneshot::channel();
@@ -193,3 +240,34 @@ fn shared_future_that_wakes_itself_until_pending_is_returned() {
// has returned pending
assert_eq!(block_on(futures::future::join(fut, async { proceed.set(true) })), ((), ()));
}
+
+#[test]
+#[should_panic(expected = "inner future panicked during poll")]
+fn panic_while_poll() {
+ let fut = futures::future::poll_fn::<i8, _>(|_cx| panic!("test")).shared();
+
+ let fut_captured = fut.clone();
+ std::panic::catch_unwind(AssertUnwindSafe(|| {
+ block_on(fut_captured);
+ }))
+ .unwrap_err();
+
+ block_on(fut);
+}
+
+#[test]
+#[should_panic(expected = "test_marker")]
+fn poll_while_panic() {
+ struct S;
+
+ impl Drop for S {
+ fn drop(&mut self) {
+ let fut = futures::future::ready(1).shared();
+ assert_eq!(block_on(fut.clone()), 1);
+ assert_eq!(block_on(fut), 1);
+ }
+ }
+
+ let _s = S {};
+ panic!("test_marker");
+}
diff --git a/vendor/futures/tests/future_try_join_all.rs b/vendor/futures/tests/future_try_join_all.rs
index a4b3bb76a..9a824872f 100644
--- a/vendor/futures/tests/future_try_join_all.rs
+++ b/vendor/futures/tests/future_try_join_all.rs
@@ -1,24 +1,26 @@
use futures::executor::block_on;
+use futures::pin_mut;
use futures_util::future::{err, ok, try_join_all, TryJoinAll};
use std::fmt::Debug;
use std::future::Future;
-fn assert_done<T, F>(actual_fut: F, expected: T)
+#[track_caller]
+fn assert_done<T>(actual_fut: impl Future<Output = T>, expected: T)
where
T: PartialEq + Debug,
- F: FnOnce() -> Box<dyn Future<Output = T> + Unpin>,
{
- let output = block_on(actual_fut());
+ pin_mut!(actual_fut);
+ let output = block_on(actual_fut);
assert_eq!(output, expected);
}
#[test]
fn collect_collects() {
- assert_done(|| Box::new(try_join_all(vec![ok(1), ok(2)])), Ok::<_, usize>(vec![1, 2]));
- assert_done(|| Box::new(try_join_all(vec![ok(1), err(2)])), Err(2));
- assert_done(|| Box::new(try_join_all(vec![ok(1)])), Ok::<_, usize>(vec![1]));
+ assert_done(try_join_all(vec![ok(1), ok(2)]), Ok::<_, usize>(vec![1, 2]));
+ assert_done(try_join_all(vec![ok(1), err(2)]), Err(2));
+ assert_done(try_join_all(vec![ok(1)]), Ok::<_, usize>(vec![1]));
// REVIEW: should this be implemented?
- // assert_done(|| Box::new(try_join_all(Vec::<i32>::new())), Ok(vec![]));
+ // assert_done(try_join_all(Vec::<i32>::new()), Ok(vec![]));
// TODO: needs more tests
}
@@ -27,18 +29,18 @@ fn collect_collects() {
fn try_join_all_iter_lifetime() {
// In futures-rs version 0.1, this function would fail to typecheck due to an overly
// conservative type parameterization of `TryJoinAll`.
- fn sizes(bufs: Vec<&[u8]>) -> Box<dyn Future<Output = Result<Vec<usize>, ()>> + Unpin> {
+ fn sizes(bufs: Vec<&[u8]>) -> impl Future<Output = Result<Vec<usize>, ()>> {
let iter = bufs.into_iter().map(|b| ok::<usize, ()>(b.len()));
- Box::new(try_join_all(iter))
+ try_join_all(iter)
}
- assert_done(|| sizes(vec![&[1, 2, 3], &[], &[0]]), Ok(vec![3_usize, 0, 1]));
+ assert_done(sizes(vec![&[1, 2, 3], &[], &[0]]), Ok(vec![3_usize, 0, 1]));
}
#[test]
fn try_join_all_from_iter() {
assert_done(
- || Box::new(vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>()),
+ vec![ok(1), ok(2)].into_iter().collect::<TryJoinAll<_>>(),
Ok::<_, usize>(vec![1, 2]),
)
}
diff --git a/vendor/futures/tests/lock_mutex.rs b/vendor/futures/tests/lock_mutex.rs
index 7c33864c7..c15e76bd8 100644
--- a/vendor/futures/tests/lock_mutex.rs
+++ b/vendor/futures/tests/lock_mutex.rs
@@ -36,31 +36,34 @@ fn mutex_wakes_waiters() {
#[test]
fn mutex_contested() {
- let (tx, mut rx) = mpsc::unbounded();
- let pool = ThreadPool::builder().pool_size(16).create().unwrap();
+ {
+ let (tx, mut rx) = mpsc::unbounded();
+ let pool = ThreadPool::builder().pool_size(16).create().unwrap();
- let tx = Arc::new(tx);
- let mutex = Arc::new(Mutex::new(0));
+ let tx = Arc::new(tx);
+ let mutex = Arc::new(Mutex::new(0));
- let num_tasks = 1000;
- for _ in 0..num_tasks {
- let tx = tx.clone();
- let mutex = mutex.clone();
- pool.spawn(async move {
- let mut lock = mutex.lock().await;
- ready(()).pending_once().await;
- *lock += 1;
- tx.unbounded_send(()).unwrap();
- drop(lock);
- })
- .unwrap();
- }
-
- block_on(async {
+ let num_tasks = 1000;
for _ in 0..num_tasks {
- rx.next().await.unwrap();
+ let tx = tx.clone();
+ let mutex = mutex.clone();
+ pool.spawn(async move {
+ let mut lock = mutex.lock().await;
+ ready(()).pending_once().await;
+ *lock += 1;
+ tx.unbounded_send(()).unwrap();
+ drop(lock);
+ })
+ .unwrap();
}
- let lock = mutex.lock().await;
- assert_eq!(num_tasks, *lock);
- })
+
+ block_on(async {
+ for _ in 0..num_tasks {
+ rx.next().await.unwrap();
+ }
+ let lock = mutex.lock().await;
+ assert_eq!(num_tasks, *lock);
+ });
+ }
+ std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
}
diff --git a/vendor/futures/tests/ready_queue.rs b/vendor/futures/tests/ready_queue.rs
index 82901327f..c19d62593 100644
--- a/vendor/futures/tests/ready_queue.rs
+++ b/vendor/futures/tests/ready_queue.rs
@@ -93,7 +93,7 @@ fn dropping_ready_queue() {
#[test]
fn stress() {
- const ITER: usize = 300;
+ const ITER: usize = if cfg!(miri) { 30 } else { 300 };
for i in 0..ITER {
let n = (i % 10) + 1;
diff --git a/vendor/futures/tests/sink.rs b/vendor/futures/tests/sink.rs
index f3cf11b93..5b691e74c 100644
--- a/vendor/futures/tests/sink.rs
+++ b/vendor/futures/tests/sink.rs
@@ -138,7 +138,7 @@ impl<T: Unpin> ManualFlush<T> {
for task in self.waiting_tasks.drain(..) {
task.wake()
}
- mem::replace(&mut self.data, Vec::new())
+ mem::take(&mut self.data)
}
}
diff --git a/vendor/futures/tests/stream.rs b/vendor/futures/tests/stream.rs
index 0d453d175..79d8e233c 100644
--- a/vendor/futures/tests/stream.rs
+++ b/vendor/futures/tests/stream.rs
@@ -1,10 +1,20 @@
+use std::cell::Cell;
+use std::iter;
+use std::pin::Pin;
+use std::rc::Rc;
+use std::sync::Arc;
+use std::task::Context;
+
use futures::channel::mpsc;
use futures::executor::block_on;
use futures::future::{self, Future};
+use futures::lock::Mutex;
use futures::sink::SinkExt;
use futures::stream::{self, StreamExt};
use futures::task::Poll;
-use futures::FutureExt;
+use futures::{ready, FutureExt};
+use futures_core::Stream;
+use futures_executor::ThreadPool;
use futures_test::task::noop_context;
#[test]
@@ -50,6 +60,345 @@ fn scan() {
}
#[test]
+fn flatten_unordered() {
+ use futures::executor::block_on;
+ use futures::stream::*;
+ use futures::task::*;
+ use std::convert::identity;
+ use std::pin::Pin;
+ use std::sync::atomic::{AtomicBool, Ordering};
+ use std::thread;
+ use std::time::Duration;
+
+ struct DataStream {
+ data: Vec<u8>,
+ polled: bool,
+ wake_immediately: bool,
+ }
+
+ impl Stream for DataStream {
+ type Item = u8;
+
+ fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
+ if !self.polled {
+ if !self.wake_immediately {
+ let waker = ctx.waker().clone();
+ let sleep_time =
+ Duration::from_millis(*self.data.first().unwrap_or(&0) as u64 / 10);
+ thread::spawn(move || {
+ thread::sleep(sleep_time);
+ waker.wake_by_ref();
+ });
+ } else {
+ ctx.waker().wake_by_ref();
+ }
+ self.polled = true;
+ Poll::Pending
+ } else {
+ self.polled = false;
+ Poll::Ready(self.data.pop())
+ }
+ }
+ }
+
+ struct Interchanger {
+ polled: bool,
+ base: u8,
+ wake_immediately: bool,
+ }
+
+ impl Stream for Interchanger {
+ type Item = DataStream;
+
+ fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Option<Self::Item>> {
+ if !self.polled {
+ self.polled = true;
+ if !self.wake_immediately {
+ let waker = ctx.waker().clone();
+ let sleep_time = Duration::from_millis(self.base as u64);
+ thread::spawn(move || {
+ thread::sleep(sleep_time);
+ waker.wake_by_ref();
+ });
+ } else {
+ ctx.waker().wake_by_ref();
+ }
+ Poll::Pending
+ } else {
+ let data: Vec<_> = (0..6).rev().map(|v| v + self.base * 6).collect();
+ self.base += 1;
+ self.polled = false;
+ Poll::Ready(Some(DataStream {
+ polled: false,
+ data,
+ wake_immediately: self.wake_immediately && self.base % 2 == 0,
+ }))
+ }
+ }
+ }
+
+ // basic behaviour
+ {
+ block_on(async {
+ let st = stream::iter(vec![
+ stream::iter(0..=4u8),
+ stream::iter(6..=10),
+ stream::iter(10..=12),
+ ]);
+
+ let fl_unordered = st.flatten_unordered(3).collect::<Vec<_>>().await;
+
+ assert_eq!(fl_unordered, vec![0, 6, 10, 1, 7, 11, 2, 8, 12, 3, 9, 4, 10]);
+ });
+
+ block_on(async {
+ let st = stream::iter(vec![
+ stream::iter(0..=4u8),
+ stream::iter(6..=10),
+ stream::iter(0..=2),
+ ]);
+
+ let mut fm_unordered = st
+ .flat_map_unordered(1, |s| s.filter(|v| futures::future::ready(v % 2 == 0)))
+ .collect::<Vec<_>>()
+ .await;
+
+ fm_unordered.sort_unstable();
+
+ assert_eq!(fm_unordered, vec![0, 0, 2, 2, 4, 6, 8, 10]);
+ });
+ }
+
+ // wake up immediately
+ {
+ block_on(async {
+ let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
+ .take(10)
+ .map(|s| s.map(identity))
+ .flatten_unordered(10)
+ .collect::<Vec<_>>()
+ .await;
+
+ fl_unordered.sort_unstable();
+
+ assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
+ });
+
+ block_on(async {
+ let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: true }
+ .take(10)
+ .flat_map_unordered(10, |s| s.map(identity))
+ .collect::<Vec<_>>()
+ .await;
+
+ fm_unordered.sort_unstable();
+
+ assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
+ });
+ }
+
+ // wake up after delay
+ {
+ block_on(async {
+ let mut fl_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
+ .take(10)
+ .map(|s| s.map(identity))
+ .flatten_unordered(10)
+ .collect::<Vec<_>>()
+ .await;
+
+ fl_unordered.sort_unstable();
+
+ assert_eq!(fl_unordered, (0..60).collect::<Vec<u8>>());
+ });
+
+ block_on(async {
+ let mut fm_unordered = Interchanger { polled: false, base: 0, wake_immediately: false }
+ .take(10)
+ .flat_map_unordered(10, |s| s.map(identity))
+ .collect::<Vec<_>>()
+ .await;
+
+ fm_unordered.sort_unstable();
+
+ assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
+ });
+
+ block_on(async {
+ let (mut fm_unordered, mut fl_unordered) = futures_util::join!(
+ Interchanger { polled: false, base: 0, wake_immediately: false }
+ .take(10)
+ .flat_map_unordered(10, |s| s.map(identity))
+ .collect::<Vec<_>>(),
+ Interchanger { polled: false, base: 0, wake_immediately: false }
+ .take(10)
+ .map(|s| s.map(identity))
+ .flatten_unordered(10)
+ .collect::<Vec<_>>()
+ );
+
+ fm_unordered.sort_unstable();
+ fl_unordered.sort_unstable();
+
+ assert_eq!(fm_unordered, fl_unordered);
+ assert_eq!(fm_unordered, (0..60).collect::<Vec<u8>>());
+ });
+ }
+
+ // waker panics
+ {
+ let stream = Arc::new(Mutex::new(
+ Interchanger { polled: false, base: 0, wake_immediately: true }
+ .take(10)
+ .flat_map_unordered(10, |s| s.map(identity)),
+ ));
+
+ struct PanicWaker;
+
+ impl ArcWake for PanicWaker {
+ fn wake_by_ref(_arc_self: &Arc<Self>) {
+ panic!("WAKE UP");
+ }
+ }
+
+ std::thread::spawn({
+ let stream = stream.clone();
+ move || {
+ let mut st = poll_fn(|cx| {
+ let mut lock = ready!(stream.lock().poll_unpin(cx));
+
+ let panic_waker = waker(Arc::new(PanicWaker));
+ let mut panic_cx = Context::from_waker(&panic_waker);
+ let _ = ready!(lock.poll_next_unpin(&mut panic_cx));
+
+ Poll::Ready(Some(()))
+ });
+
+ block_on(st.next())
+ }
+ })
+ .join()
+ .unwrap_err();
+
+ block_on(async move {
+ let mut values: Vec<_> = stream.lock().await.by_ref().collect().await;
+ values.sort_unstable();
+
+ assert_eq!(values, (0..60).collect::<Vec<u8>>());
+ });
+ }
+
+ // stream panics
+ {
+ let st = stream::iter(iter::once(
+ once(Box::pin(async { panic!("Polled") })).left_stream::<DataStream>(),
+ ))
+ .chain(
+ Interchanger { polled: false, base: 0, wake_immediately: true }
+ .map(|stream| stream.right_stream())
+ .take(10),
+ );
+
+ let stream = Arc::new(Mutex::new(st.flatten_unordered(10)));
+
+ std::thread::spawn({
+ let stream = stream.clone();
+ move || {
+ let mut st = poll_fn(|cx| {
+ let mut lock = ready!(stream.lock().poll_unpin(cx));
+ let data = ready!(lock.poll_next_unpin(cx));
+
+ Poll::Ready(data)
+ });
+
+ block_on(st.next())
+ }
+ })
+ .join()
+ .unwrap_err();
+
+ block_on(async move {
+ let mut values: Vec<_> = stream.lock().await.by_ref().collect().await;
+ values.sort_unstable();
+
+ assert_eq!(values, (0..60).collect::<Vec<u8>>());
+ });
+ }
+
+ fn timeout<I: Clone>(time: Duration, value: I) -> impl Future<Output = I> {
+ let ready = Arc::new(AtomicBool::new(false));
+ let mut spawned = false;
+
+ future::poll_fn(move |cx| {
+ if !spawned {
+ let waker = cx.waker().clone();
+ let ready = ready.clone();
+
+ std::thread::spawn(move || {
+ std::thread::sleep(time);
+ ready.store(true, Ordering::Release);
+
+ waker.wake_by_ref()
+ });
+ spawned = true;
+ }
+
+ if ready.load(Ordering::Acquire) {
+ Poll::Ready(value.clone())
+ } else {
+ Poll::Pending
+ }
+ })
+ }
+
+ fn build_nested_fu<S: Stream + Unpin>(st: S) -> impl Stream<Item = S::Item> + Unpin
+ where
+ S::Item: Clone,
+ {
+ let inner = st
+ .then(|item| timeout(Duration::from_millis(50), item))
+ .enumerate()
+ .map(|(idx, value)| {
+ stream::once(if idx % 2 == 0 {
+ future::ready(value).left_future()
+ } else {
+ timeout(Duration::from_millis(100), value).right_future()
+ })
+ })
+ .flatten_unordered(None);
+
+ stream::once(future::ready(inner)).flatten_unordered(None)
+ }
+
+ // nested `flatten_unordered`
+ let te = ThreadPool::new().unwrap();
+ let base_handle = te
+ .spawn_with_handle(async move {
+ let fu = build_nested_fu(stream::iter(1..=10));
+
+ assert_eq!(fu.count().await, 10);
+ })
+ .unwrap();
+
+ block_on(base_handle);
+
+ let empty_state_move_handle = te
+ .spawn_with_handle(async move {
+ let mut fu = build_nested_fu(stream::iter(1..10));
+ {
+ let mut cx = noop_context();
+ let _ = fu.poll_next_unpin(&mut cx);
+ let _ = fu.poll_next_unpin(&mut cx);
+ }
+
+ assert_eq!(fu.count().await, 9);
+ })
+ .unwrap();
+
+ block_on(empty_state_move_handle);
+}
+
+#[test]
fn take_until() {
fn make_stop_fut(stop_on: u32) -> impl Future<Output = ()> {
let mut i = 0;
@@ -149,3 +498,40 @@ fn ready_chunks() {
assert_eq!(s.next().await.unwrap(), vec![4]);
});
}
+
+struct SlowStream {
+ times_should_poll: usize,
+ times_polled: Rc<Cell<usize>>,
+}
+impl Stream for SlowStream {
+ type Item = usize;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ self.times_polled.set(self.times_polled.get() + 1);
+ if self.times_polled.get() % 2 == 0 {
+ cx.waker().wake_by_ref();
+ return Poll::Pending;
+ }
+ if self.times_polled.get() >= self.times_should_poll {
+ return Poll::Ready(None);
+ }
+ Poll::Ready(Some(self.times_polled.get()))
+ }
+}
+
+#[test]
+fn select_with_strategy_doesnt_terminate_early() {
+ for side in [stream::PollNext::Left, stream::PollNext::Right] {
+ let times_should_poll = 10;
+ let count = Rc::new(Cell::new(0));
+ let b = stream::iter([10, 20]);
+
+ let mut selected = stream::select_with_strategy(
+ SlowStream { times_should_poll, times_polled: count.clone() },
+ b,
+ |_: &mut ()| side,
+ );
+ block_on(async move { while selected.next().await.is_some() {} });
+ assert_eq!(count.get(), times_should_poll + 1);
+ }
+}
diff --git a/vendor/futures/tests/stream_futures_ordered.rs b/vendor/futures/tests/stream_futures_ordered.rs
index 7506c65a6..5a4a3e22e 100644
--- a/vendor/futures/tests/stream_futures_ordered.rs
+++ b/vendor/futures/tests/stream_futures_ordered.rs
@@ -2,6 +2,7 @@ use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::future::{self, join, Future, FutureExt, TryFutureExt};
use futures::stream::{FuturesOrdered, StreamExt};
+use futures::task::Poll;
use futures_test::task::noop_context;
use std::any::Any;
@@ -46,6 +47,69 @@ fn works_2() {
}
#[test]
+fn test_push_front() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+ let (d_tx, d_rx) = oneshot::channel::<i32>();
+
+ let mut stream = FuturesOrdered::new();
+
+ let mut cx = noop_context();
+
+ stream.push_back(a_rx);
+ stream.push_back(b_rx);
+ stream.push_back(c_rx);
+
+ a_tx.send(1).unwrap();
+ b_tx.send(2).unwrap();
+ c_tx.send(3).unwrap();
+
+ // 1 and 2 should be received in order
+ assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
+
+ stream.push_front(d_rx);
+ d_tx.send(4).unwrap();
+
+ // we pushed `d_rx` to the front and sent 4, so we should recieve 4 next
+ // and then 3 after it
+ assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
+}
+
+#[test]
+fn test_push_back() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+ let (d_tx, d_rx) = oneshot::channel::<i32>();
+
+ let mut stream = FuturesOrdered::new();
+
+ let mut cx = noop_context();
+
+ stream.push_back(a_rx);
+ stream.push_back(b_rx);
+ stream.push_back(c_rx);
+
+ a_tx.send(1).unwrap();
+ b_tx.send(2).unwrap();
+ c_tx.send(3).unwrap();
+
+ // All results should be received in order
+
+ assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
+
+ stream.push_back(d_rx);
+ d_tx.send(4).unwrap();
+
+ assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(4))), stream.poll_next_unpin(&mut cx));
+}
+
+#[test]
fn from_iterator() {
let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)]
.into_iter()
@@ -82,3 +146,27 @@ fn queue_never_unblocked() {
assert!(stream.poll_next_unpin(cx).is_pending());
assert!(stream.poll_next_unpin(cx).is_pending());
}
+
+#[test]
+fn test_push_front_negative() {
+ let (a_tx, a_rx) = oneshot::channel::<i32>();
+ let (b_tx, b_rx) = oneshot::channel::<i32>();
+ let (c_tx, c_rx) = oneshot::channel::<i32>();
+
+ let mut stream = FuturesOrdered::new();
+
+ let mut cx = noop_context();
+
+ stream.push_front(a_rx);
+ stream.push_front(b_rx);
+ stream.push_front(c_rx);
+
+ a_tx.send(1).unwrap();
+ b_tx.send(2).unwrap();
+ c_tx.send(3).unwrap();
+
+ // These should all be recieved in reverse order
+ assert_eq!(Poll::Ready(Some(Ok(3))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(2))), stream.poll_next_unpin(&mut cx));
+ assert_eq!(Poll::Ready(Some(Ok(1))), stream.poll_next_unpin(&mut cx));
+}
diff --git a/vendor/futures/tests/stream_futures_unordered.rs b/vendor/futures/tests/stream_futures_unordered.rs
index 439c809be..b56828047 100644
--- a/vendor/futures/tests/stream_futures_unordered.rs
+++ b/vendor/futures/tests/stream_futures_unordered.rs
@@ -260,6 +260,20 @@ fn into_iter_len() {
}
#[test]
+fn into_iter_partial() {
+ let stream = vec![future::ready(1), future::ready(2), future::ready(3), future::ready(4)]
+ .into_iter()
+ .collect::<FuturesUnordered<_>>();
+
+ let mut into_iter = stream.into_iter();
+ assert!(into_iter.next().is_some());
+ assert!(into_iter.next().is_some());
+ assert!(into_iter.next().is_some());
+ assert_eq!(into_iter.len(), 1);
+ // don't panic when iterator is dropped before completing
+}
+
+#[test]
fn futures_not_moved_after_poll() {
// Future that will be ready after being polled twice,
// asserting that it does not move.
@@ -340,7 +354,7 @@ fn polled_only_once_at_most_per_iteration() {
let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
assert!(tasks.poll_next_unpin(cx).is_pending());
- assert_eq!(32, tasks.iter().filter(|f| f.polled).count());
+ assert_eq!(33, tasks.iter().filter(|f| f.polled).count());
let mut tasks = FuturesUnordered::<F>::new();
assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));
diff --git a/vendor/futures/tests/stream_try_stream.rs b/vendor/futures/tests/stream_try_stream.rs
index 194e74db7..b3d04b920 100644
--- a/vendor/futures/tests/stream_try_stream.rs
+++ b/vendor/futures/tests/stream_try_stream.rs
@@ -1,7 +1,12 @@
+use core::pin::Pin;
+
use futures::{
- stream::{self, StreamExt, TryStreamExt},
+ stream::{self, repeat, Repeat, StreamExt, TryStreamExt},
task::Poll,
+ Stream,
};
+use futures_executor::block_on;
+use futures_task::Context;
use futures_test::task::noop_context;
#[test]
@@ -36,3 +41,94 @@ fn try_take_while_after_err() {
.boxed();
assert_eq!(Poll::Ready(None), s.poll_next_unpin(cx));
}
+
+#[test]
+fn try_flatten_unordered() {
+ let test_st = stream::iter(1..7)
+ .map(|val: u32| {
+ if val % 2 == 0 {
+ Ok(stream::unfold((val, 1), |(val, pow)| async move {
+ Some((val.pow(pow), (val, pow + 1)))
+ })
+ .take(3)
+ .map(move |val| if val % 16 != 0 { Ok(val) } else { Err(val) }))
+ } else {
+ Err(val)
+ }
+ })
+ .map_ok(Box::pin)
+ .try_flatten_unordered(None);
+
+ block_on(async move {
+ assert_eq!(
+ // All numbers can be divided by 16 and odds must be `Err`
+ // For all basic evens we must have powers from 1 to 3
+ vec![
+ Err(1),
+ Err(3),
+ Err(5),
+ Ok(2),
+ Ok(4),
+ Ok(6),
+ Ok(4),
+ Err(16),
+ Ok(36),
+ Ok(8),
+ Err(64),
+ Ok(216)
+ ],
+ test_st.collect::<Vec<_>>().await
+ )
+ });
+
+ #[derive(Clone, Debug)]
+ struct ErrorStream {
+ error_after: usize,
+ polled: usize,
+ }
+
+ impl Stream for ErrorStream {
+ type Item = Result<Repeat<Result<(), ()>>, ()>;
+
+ fn poll_next(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
+ if self.polled > self.error_after {
+ panic!("Polled after error");
+ } else {
+ let out =
+ if self.polled == self.error_after { Err(()) } else { Ok(repeat(Ok(()))) };
+ self.polled += 1;
+ Poll::Ready(Some(out))
+ }
+ }
+ }
+
+ block_on(async move {
+ let mut st = ErrorStream { error_after: 3, polled: 0 }.try_flatten_unordered(None);
+ let mut ctr = 0;
+ while (st.try_next().await).is_ok() {
+ ctr += 1;
+ }
+ assert_eq!(ctr, 0);
+
+ assert_eq!(
+ ErrorStream { error_after: 10, polled: 0 }
+ .try_flatten_unordered(None)
+ .inspect_ok(|_| panic!("Unexpected `Ok`"))
+ .try_collect::<Vec<_>>()
+ .await,
+ Err(())
+ );
+
+ let mut taken = 0;
+ assert_eq!(
+ ErrorStream { error_after: 10, polled: 0 }
+ .map_ok(|st| st.take(3))
+ .try_flatten_unordered(1)
+ .inspect(|_| taken += 1)
+ .try_fold((), |(), res| async move { Ok(res) })
+ .await,
+ Err(())
+ );
+ assert_eq!(taken, 31);
+ })
+}
diff --git a/vendor/futures/tests_disabled/stream.rs b/vendor/futures/tests_disabled/stream.rs
index 854dbad82..a4eec2c7a 100644
--- a/vendor/futures/tests_disabled/stream.rs
+++ b/vendor/futures/tests_disabled/stream.rs
@@ -318,7 +318,6 @@ fn forward() {
}
#[test]
-#[allow(deprecated)]
fn concat() {
let a = iter_ok::<_, ()>(vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]);
assert_done(move || a.concat(), Ok(vec![1, 2, 3, 4, 5, 6, 7, 8, 9]));