summaryrefslogtreecommitdiffstats
path: root/vendor/futures-util
diff options
context:
space:
mode:
authorDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:20:39 +0000
committerDaniel Baumann <daniel.baumann@progress-linux.org>2024-04-17 12:20:39 +0000
commit1376c5a617be5c25655d0d7cb63e3beaa5a6e026 (patch)
tree3bb8d61aee02bc7a15eab3f36e3b921afc2075d0 /vendor/futures-util
parentReleasing progress-linux version 1.69.0+dfsg1-1~progress7.99u1. (diff)
downloadrustc-1376c5a617be5c25655d0d7cb63e3beaa5a6e026.tar.xz
rustc-1376c5a617be5c25655d0d7cb63e3beaa5a6e026.zip
Merging upstream version 1.70.0+dfsg1.
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'vendor/futures-util')
-rw-r--r--vendor/futures-util/.cargo-checksum.json2
-rw-r--r--vendor/futures-util/Cargo.toml82
-rw-r--r--vendor/futures-util/README.md2
-rw-r--r--vendor/futures-util/benches/bilock.rs68
-rw-r--r--vendor/futures-util/benches/flatten_unordered.rs58
-rw-r--r--vendor/futures-util/benches/select.rs35
-rw-r--r--vendor/futures-util/benches_disabled/bilock.rs122
-rw-r--r--vendor/futures-util/build.rs21
-rw-r--r--vendor/futures-util/no_atomic_cas.rs6
-rw-r--r--vendor/futures-util/src/abortable.rs32
-rw-r--r--vendor/futures-util/src/compat/compat01as03.rs5
-rw-r--r--vendor/futures-util/src/compat/executor.rs1
-rw-r--r--vendor/futures-util/src/future/either.rs58
-rw-r--r--vendor/futures-util/src/future/future/fuse.rs12
-rw-r--r--vendor/futures-util/src/future/future/shared.rs64
-rw-r--r--vendor/futures-util/src/future/join_all.rs29
-rw-r--r--vendor/futures-util/src/future/pending.rs1
-rw-r--r--vendor/futures-util/src/future/select.rs30
-rw-r--r--vendor/futures-util/src/future/select_all.rs3
-rw-r--r--vendor/futures-util/src/future/select_ok.rs2
-rw-r--r--vendor/futures-util/src/future/try_future/mod.rs6
-rw-r--r--vendor/futures-util/src/future/try_join_all.rs137
-rw-r--r--vendor/futures-util/src/future/try_select.rs13
-rw-r--r--vendor/futures-util/src/io/copy_buf_abortable.rs124
-rw-r--r--vendor/futures-util/src/io/cursor.rs8
-rw-r--r--vendor/futures-util/src/io/lines.rs2
-rw-r--r--vendor/futures-util/src/io/mod.rs3
-rw-r--r--vendor/futures-util/src/io/read_exact.rs2
-rw-r--r--vendor/futures-util/src/io/read_line.rs2
-rw-r--r--vendor/futures-util/src/io/read_to_string.rs2
-rw-r--r--vendor/futures-util/src/io/write_all.rs2
-rw-r--r--vendor/futures-util/src/lock/bilock.rs49
-rw-r--r--vendor/futures-util/src/lock/mod.rs22
-rw-r--r--vendor/futures-util/src/lock/mutex.rs155
-rw-r--r--vendor/futures-util/src/sink/drain.rs6
-rw-r--r--vendor/futures-util/src/sink/unfold.rs5
-rw-r--r--vendor/futures-util/src/stream/futures_ordered.rs36
-rw-r--r--vendor/futures-util/src/stream/futures_unordered/iter.rs4
-rw-r--r--vendor/futures-util/src/stream/futures_unordered/mod.rs61
-rw-r--r--vendor/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs2
-rw-r--r--vendor/futures-util/src/stream/futures_unordered/task.rs11
-rw-r--r--vendor/futures-util/src/stream/mod.rs15
-rw-r--r--vendor/futures-util/src/stream/select_all.rs35
-rw-r--r--vendor/futures-util/src/stream/select_with_strategy.rs143
-rw-r--r--vendor/futures-util/src/stream/stream/buffer_unordered.rs6
-rw-r--r--vendor/futures-util/src/stream/stream/buffered.rs14
-rw-r--r--vendor/futures-util/src/stream/stream/chain.rs3
-rw-r--r--vendor/futures-util/src/stream/stream/chunks.rs11
-rw-r--r--vendor/futures-util/src/stream/stream/collect.rs2
-rw-r--r--vendor/futures-util/src/stream/stream/filter.rs2
-rw-r--r--vendor/futures-util/src/stream/stream/filter_map.rs2
-rw-r--r--vendor/futures-util/src/stream/stream/flatten_unordered.rs536
-rw-r--r--vendor/futures-util/src/stream/stream/mod.rs138
-rw-r--r--vendor/futures-util/src/stream/stream/peek.rs2
-rw-r--r--vendor/futures-util/src/stream/stream/ready_chunks.rs53
-rw-r--r--vendor/futures-util/src/stream/stream/skip_while.rs2
-rw-r--r--vendor/futures-util/src/stream/stream/split.rs2
-rw-r--r--vendor/futures-util/src/stream/stream/take.rs6
-rw-r--r--vendor/futures-util/src/stream/stream/take_while.rs2
-rw-r--r--vendor/futures-util/src/stream/stream/then.rs2
-rw-r--r--vendor/futures-util/src/stream/stream/unzip.rs2
-rw-r--r--vendor/futures-util/src/stream/stream/zip.rs4
-rw-r--r--vendor/futures-util/src/stream/try_stream/and_then.rs2
-rw-r--r--vendor/futures-util/src/stream/try_stream/into_async_read.rs101
-rw-r--r--vendor/futures-util/src/stream/try_stream/mod.rs92
-rw-r--r--vendor/futures-util/src/stream/try_stream/or_else.rs2
-rw-r--r--vendor/futures-util/src/stream/try_stream/try_buffered.rs2
-rw-r--r--vendor/futures-util/src/stream/try_stream/try_chunks.rs11
-rw-r--r--vendor/futures-util/src/stream/try_stream/try_collect.rs2
-rw-r--r--vendor/futures-util/src/stream/try_stream/try_filter.rs2
-rw-r--r--vendor/futures-util/src/stream/try_stream/try_filter_map.rs2
-rw-r--r--vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs176
-rw-r--r--vendor/futures-util/src/stream/try_stream/try_skip_while.rs2
-rw-r--r--vendor/futures-util/src/stream/try_stream/try_take_while.rs2
-rw-r--r--vendor/futures-util/src/task/spawn.rs6
75 files changed, 2103 insertions, 566 deletions
diff --git a/vendor/futures-util/.cargo-checksum.json b/vendor/futures-util/.cargo-checksum.json
index 2db230686..0d1186307 100644
--- a/vendor/futures-util/.cargo-checksum.json
+++ b/vendor/futures-util/.cargo-checksum.json
@@ -1 +1 @@
-{"files":{"Cargo.toml":"fb0e0a9cd20fff0e75bddf0f79a2c06200f5177b338a3bb2b3cd27f3b08eaff7","LICENSE-APACHE":"275c491d6d1160553c32fd6127061d7f9606c3ea25abfad6ca3f6ed088785427","LICENSE-MIT":"6652c868f35dfe5e8ef636810a4e576b9d663f3a17fb0f5613ad73583e1b88fd","README.md":"727f58ddc0ad88244d784b56a090410b39805e256e3415d4ae20cf2ba471d260","benches/futures_unordered.rs":"5eb8280be8d8fb7bd5fb103ce20db10f618f47e180a402105e0d5e9f8c9fe35a","benches_disabled/bilock.rs":"ab8b47fba5cfa5366477ef1034a042efde9b0aff6b77110d1f3a2715ab7508e8","build.rs":"f6e21c09f18cc405bd7048cb7a2958f92d5414b9ca6b301d137e120a84fa020a","no_atomic_cas.rs":"ff8be002b49a5cd9e4ca0db17b1c9e6b98e55f556319eb6b953dd6ff52c397a6","src/abortable.rs":"d88dd2501ed379b3540bd971d367b4629755d6d9f264e7b54ae59eea0ff83623","src/async_await/join_mod.rs":"8f83c0001df867f5eb47a4174bf4a0c0b548f8ff3be3b532e0c759ad981b87da","src/async_await/mod.rs":"3d25c343cc3e789d3f982cdacd6f8ed91511ba656c3923da310700f318f423a4","src/async_await/pending.rs":"7971ec1d5d89ad80390e2a0c51e396257b2e78f1436cce79ea2b55ac2f13b328","src/async_await/poll.rs":"440c19a89fd42b12da09ff48a69523b5a8a5baea0bcd2f860589a0ab996ed781","src/async_await/random.rs":"daf229cd01595d38ef0f6284865fe2f60ed3b8134f7a15c82564b97ff3a5be98","src/async_await/select_mod.rs":"414c7fb7923cfe21116d558bf3cd1a6ae5bef4ed01f9877f0e7cb3e42ee6c79d","src/async_await/stream_select_mod.rs":"9a51338914cbb1502619fed591dfe4fc676919499b9d041898e59f630fe5e7f0","src/compat/compat01as03.rs":"07c83b0820e1b6a5a792043e0b0eb5be70f365e5462c8a1fa5fa6b0f62f9e57d","src/compat/compat03as01.rs":"7cf29e57f8ee14b64123b3d2c16dceced25af5491a5ef81b655b2de2e9587fbe","src/compat/executor.rs":"a0edd7baa2192daa14a5a26bf53bd229defaeac538d2ec771c60992c6dfeb393","src/compat/mod.rs":"6cf3412f6a3f9ee8406118ea75de65468a83febc6ba61bdbad69261f0cfea02e","src/fns.rs":"f8e396128791169098a38a82c3c28aaa6dd5d40718635f7cc30b59b32f7110b8","src/future/abortable.rs":"373ce61c0c7c31718ff572113503bb88f55e3b49ed5d028a3dfafd69070f44c1","src/future/either.rs":"d7fb8728fac6fccbbc14afaa16ebd51c2633dd653277289c089b5c478742b37f","src/future/future/catch_unwind.rs":"08b0ac049cdee28325d378209aa5bb4d91b14a29ddd9c2b0e5c661b61f9cfcfe","src/future/future/flatten.rs":"5bf9846cef8dec5dcc38b992653e11146bc149a0d3efc09b1f8268bd29de0b2b","src/future/future/fuse.rs":"6531a95dc1917b2a5724b35e364faa741143811afc654a45c360111e9807864c","src/future/future/map.rs":"de607c2a4d80d2bddb590781c37328ddd294bb9d5064a9ecb99455244239b597","src/future/future/mod.rs":"ecfac09dcba801cede7c58acfaa76a9ab76d26a3f4c968d66c2a49caa57faefe","src/future/future/remote_handle.rs":"2ae17a409569b32c78e20026a8ecdf667352c2597a4a0a8deefa4761fafcb223","src/future/future/shared.rs":"ebf46b4bf428676bf553015e384f7f41da03558481aaa38deb1e8a896d212dae","src/future/join.rs":"38b55fc7cdbbdaaa525e51f8ce09783dbbcb65eabfd7de9f46610593e0bbef17","src/future/join_all.rs":"294a8e76862f447dea8759f78f7224d885a826c80193ceef2389345221e6e3c0","src/future/lazy.rs":"d161fc4108a97348c1becbbd5ba8fccb7225dcf1d81c097666f5c8b40718251d","src/future/maybe_done.rs":"559e41cb170f9fe7246d2a5b112527a9f9cbca63b8a5a872b3aa9c861f70f307","src/future/mod.rs":"51e018100362f20b071225268f1d81f25c8e9664e94730af199069c2692bf26a","src/future/option.rs":"73daca814800b91b707753dcfe074265372b0077fae2504ea6efddc713453579","src/future/pending.rs":"86598a5d5ade7c0416566deb280150bac34bd4703c997d2c7af572342d8d6d02","src/future/poll_fn.rs":"8e54bf57d60e01d496ae31df35e0b96868f4bda504c024a14f51ab723d67885f","src/future/poll_immediate.rs":"7e199fc102894c9095de17af602a7c8f05d427269aefce5d71cd5136d54659c0","src/future/ready.rs":"c9860ccd8ac529f44f66dee73ca9b9d7f1b1b3e5e9e4dc70c59640c752553d58","src/future/select.rs":"a582b1ed9c1e6cd8dcaa80b5f45e2176ed4a1740fe303f7143e29cab8e0dbc22","src/future/select_all.rs":"179b8168370e2c7105e348fdfbeb965eb746336d9660aa7fbc9185d681ae8c2d","src/future/select_ok.rs":"fed28e1fd368cdd465d297a84ea9436a00757eff4b34e592d94d7747b3bf4996","src/future/try_future/into_future.rs":"d966bde7b06a88443f0efd877e95f91541778c4e713f3f4b66e00ca5d3f352b6","src/future/try_future/mod.rs":"4733167d93e8f728e87f79b7d4dfe66de6afb306735ca76f9f843270286b3f6b","src/future/try_future/try_flatten.rs":"16c02e1780bd312b8b386e41c1d9dd4bcc4e8ef10f26007364f857b3adcc6e99","src/future/try_future/try_flatten_err.rs":"130f3fc3fd95a19f4e4a50e69301106fab02f77d0faf3aac9c473a92b826c2ca","src/future/try_join.rs":"1836931f8ba32da41c6810e6acc0ea2fee75b74b3153e760c4542cb12b220540","src/future/try_join_all.rs":"d4f262e80bb5347597c75b25de3c7784ffb4bd766227d6dc70cdeb77a38f4a5d","src/future/try_maybe_done.rs":"1cce46b2ee43ad51b7c5f9c02bc90a890af32bc549ce99098a2c8813508051e1","src/future/try_select.rs":"2e1b7e0b0cb7343f766fade87269093558db206f7fbe7dddfa3143885e17bac4","src/io/allow_std.rs":"a125959c255fd344399fb0be19218a8ee7d613ce2485d6df9cdbc2ed5d3987df","src/io/buf_reader.rs":"46a1e24046c5bc2ab8f266e3d904281bec3ab4ba6c13d4213a52599b57b8de66","src/io/buf_writer.rs":"d6666b8dde60eefbb7fa69da4a2eea2b34ea0e4a85e21e5ac6e83cc680ea9140","src/io/chain.rs":"12f508fc39c3234a71a0f886505245c5d659aed09c7d874b1bd8ca0a0d456cf3","src/io/close.rs":"9832210a870637198fa58642cdf2779afab71f2e31a9953e663fa6854bd73ac7","src/io/copy.rs":"cb2466dcd7ea8bb1f07d00c03e66ed55abf71fe4be6937adc9f533ef9d99fb2d","src/io/copy_buf.rs":"e9a5f6aac8375e298bddb332f23d8b626d056ce452b58f772a05df7e2cd326cf","src/io/cursor.rs":"612bdb8b4055d26816fb0e4c3e9859b06b3d08c99e4a27ed4946c95e219a29ab","src/io/empty.rs":"6ae40b4bc8fc41572abad2d013285d78d8df445868d41fac77bde508ec9bc1a5","src/io/fill_buf.rs":"4f217fed8eb3f66dbde2371c3fbcfa9420d38ba20da544a0658584e5778aa47d","src/io/flush.rs":"0c9b588dfd9da039dc123ba9448ac31ca21ee3da0a164a21f6c2c182183d43e2","src/io/into_sink.rs":"ab5bdb12bff62672175b69b8c9f5a4bbbea716b9cf89169ed6a723ab43da9df8","src/io/line_writer.rs":"16c151c68d89b7c2ab929c4a782539b1ad512b723eed9b544f50f1ff06f0b661","src/io/lines.rs":"ccfa24e212a610aad0c81042cfa64ada820c4305ba0e911a2c16221d7867468e","src/io/mod.rs":"599416e4d7dd5c6523a87bf778001ce0c3848ee760827af0b69c7b7aafd8a8a0","src/io/read.rs":"4ea675a83cec98a22c9c4731ff980209f0cf67f63c71871cd1deed53c1266345","src/io/read_exact.rs":"d27d5ec082ccb1b051d1292e029e398926a164c82e1f0c983ca9928410aa2abe","src/io/read_line.rs":"a3c62ca2034089a22ea9567e0b3cab0dfe09309782fcf151d92311a77223e37c","src/io/read_to_end.rs":"5e9e38dc087623dac5a3ae3ad329ed44ffe4f6205a78e546adadc3ffb76703fc","src/io/read_to_string.rs":"2c073d05f0361acda1f0172b24fd4c5da61840ac925a5bdfae9111c697759d1b","src/io/read_until.rs":"354507ce95242a735940f0aaa6ef11cc7d6d0505ae148f05277ce6e7537f168a","src/io/read_vectored.rs":"bd7f442c92f2cb320075d0983b0d08d51c23078898d72e6c2857cf6c7ad4cec7","src/io/repeat.rs":"53bc472e4bd7d286bf90765ce574f13b7aabc871c4f04f712da7cea160491390","src/io/seek.rs":"9863e9fb6495eb6e1f8c45c283c8a6993b9bdb1462f75a3e525e135c6840dec7","src/io/sink.rs":"30a503631d196e5da92c386d0afc1af9656a5f7682456cfa2489a2c30a05cac5","src/io/split.rs":"2aa567452b713497d5b85813980b69e888aee32be14492c92404d261fd50eb09","src/io/take.rs":"c53fec5b5e8c3742b7e60e6ebfa625cf2e566fbea193fb1eee2f0a8e561d63d5","src/io/window.rs":"295d7dc18ad101642003cd67687242e4bdba11552cfb7f18c521cbff369e6f71","src/io/write.rs":"60670eb00f999f2e2c43b099759a7fb030325b323744d88c9d20f75926ec30df","src/io/write_all.rs":"c88930fd23c88cc01fef2c6118d53d33996c011c4abf28778a27646fe1f7896a","src/io/write_all_vectored.rs":"53becf89c031bf4c3073f0903ce809eee7606b1b4fbeb518605875badba216d3","src/io/write_vectored.rs":"bc98ff4a709cb75cd9ffedefa8ef251089a49906b98e142d76447ddf4ac098bb","src/lib.rs":"384447fb9bfcd3b110656979cca71b53c3abe72690e970c30563c1baba27fd74","src/lock/bilock.rs":"f1b955cb2e10c906933e63bbfb8e953af634428ce15faf3696b07d11da0cc279","src/lock/mod.rs":"e964dd0d999ccf9d9d167d7ecbfeb7a66d180a80eeb6fd41ec3fa698c1067674","src/lock/mutex.rs":"782375724e4abbdaf3221eb422911c37fe13e794e6f30ea819acece7303c3368","src/never.rs":"2066481ab04921269cfa768cb8b778a035ab6aa49ec404d9ac0aeb07a4bf6094","src/sink/buffer.rs":"33a7380f8232225a8e9ac5ee138fd095979efa3a64f9fecf5fcaf2e78fcbc355","src/sink/close.rs":"f2f31c884f048163abebd4f5a877b7b4306f7d02beae428325636fd00ed42ca9","src/sink/drain.rs":"392d9487003fcd55a3373c3e2558f6549b9633b82fc08b5a665a573b137ae9f7","src/sink/err_into.rs":"ced2998b2b0b792d80f7543523c9e07e8f5d20a4336cae93084b995e46671b15","src/sink/fanout.rs":"66dcde056e0bbee4e0074d331838ed2743dc872ea1597f05d61970523dc34926","src/sink/feed.rs":"64b9d296d37aedde37e1421c459ebcd9a7e8814db905996996167850124f3b3f","src/sink/flush.rs":"fbba344f428ca7636541ba013f7db2ece480b404a9e0b421c5537552d61e2492","src/sink/map_err.rs":"0f68f444ef13fe7115164be855c3b7b1d269e1119e69fcdad1706988255641f1","src/sink/mod.rs":"37cf379170f3099992eb59f3181be4c4e4a5c2d3581dbe424d22ab360840d321","src/sink/send.rs":"56aaba9aa4a562e0af39473a5779206d91b0acb1fced4fc06cd8b959d1897524","src/sink/send_all.rs":"a8e4956604fe73e321b0a3896c2018bc5c27149f2862f8406112db140b3aa2dd","src/sink/unfold.rs":"428080b76213b504fcc981d2f05840f1a93c8db305301af1cf5852b6c47c4be5","src/sink/with.rs":"850cd3b96304df1f38360a0bc60b02d485535e399ef7642acdd9add7876867d8","src/sink/with_flat_map.rs":"5e0f527b33ee8f1cc6a6a46d45b6d74dad5c735d88b2cb24e1cb34fdc6ef501b","src/stream/abortable.rs":"935d79aa44d793f4abe87ca27a9e4a20891500488cf942693cd2756d65b3aab2","src/stream/empty.rs":"5000c856186408a17f68bbef432d4a1a3edb7fb5a07ed8699342fef04b10a181","src/stream/futures_ordered.rs":"46217ed3802d052724a4a3166370f74e6d5fcd248d6f983caea10bc3335a1f0e","src/stream/futures_unordered/abort.rs":"bdfece9f91accafd5122be36d628c37c5b219ac0eecec181267840fbb1e95a45","src/stream/futures_unordered/iter.rs":"e8862300ddb0504090c059b3dba2425af6335874cb6ef393fef26e87788b6d3e","src/stream/futures_unordered/mod.rs":"da8b7adb93b50c1c6507c1af4b3c1ec0de4ce33f68f64dc876749c07e231d642","src/stream/futures_unordered/ready_to_run_queue.rs":"6223c67519c1ae35cbc449dd5654fda422aaba61a2344cc0b190c73b4b1e9f80","src/stream/futures_unordered/task.rs":"ab2de99b2a42c1da70d56e4be43c0ef72e8d5a4504adc0f870f8d28afd332a37","src/stream/iter.rs":"609fa821a460e901a54ae51f8da58220881157cef02b8b7b8c9e4321c2d05a23","src/stream/mod.rs":"33873b13535443cce2d49fdb3f0b359286bfc74f3553419fe7174cf7c1840da0","src/stream/once.rs":"d7b70adabad1f10af711ac3dcef33fd4c287e9852fdb678406e7ff350ba8fd47","src/stream/pending.rs":"84aaa15c8bbb17a250da5b1b5f0c7f6717410915d63340a3fcbf098bebe19d6f","src/stream/poll_fn.rs":"35952ea514b8aade14a3934d7777006475f50bbf0c5b50141710e31637f980be","src/stream/poll_immediate.rs":"e7a53ff8275ebe89dab8f9b984cce2ee0fde0a828e540b77c5500ca017d5bb98","src/stream/repeat.rs":"e4e4a9b6f2fca72bcbf098c3ac0c4a41323a840741d4dce9d9416464b7e8bd0d","src/stream/repeat_with.rs":"525780d24f3f99152b879765ca6eab99bcc0c757dc6654b6635c099b93ea654d","src/stream/select.rs":"28eb422c0eca9fd02778a6003004471b3489db09746a70e617a506303ea8b81d","src/stream/select_all.rs":"4358fa26cfe8c1b56f19d077b841bbdfe22f7adc043034fd6313b004e94e310d","src/stream/select_with_strategy.rs":"7fe249fd92fc66ad2bfa5a2dec7148b2a0102b3a8d915b2103bfbcd1b8870447","src/stream/stream/all.rs":"43cfb69de0ea991497d26d0aeb02091f10eb241ef93758b54c5e7aced5b63b63","src/stream/stream/any.rs":"2582da02f9a1ce2bd0af87a64b65188fc93686c5e3dd9128e89e5f57c1d70e43","src/stream/stream/buffer_unordered.rs":"66c3f4bd2fabfbdf6a4033dfaed44dd0262b68e6533509029c984ae037e35392","src/stream/stream/buffered.rs":"eabd0c0e50eaaaf0a92a7b39fdb5b77e068bbfbbfd5e216a09c3a6e0c1fc102d","src/stream/stream/catch_unwind.rs":"b2e801ff744d5d9e17177ec1156b0ab67bdd56b94c618ed8590344ec8a0f35e7","src/stream/stream/chain.rs":"ba1a206b3ce0160186021f5c1e4c95a770d26b843e3640e52609a2facaf756ac","src/stream/stream/chunks.rs":"d3aaddc05779ef70e2f0e59570cf6d5a1d231ae4885c8b8b2e4813fc02832562","src/stream/stream/collect.rs":"977ed1970b46029517ecc45f4af924b8e585d3770f01b2a0d2df0e01519ca50f","src/stream/stream/concat.rs":"171ea941b45c0295ed978c3f318a449ea295e33cb4ea82c764f4e9e7c48ad5de","src/stream/stream/count.rs":"ff218aea3d2d2456c8163926ea0c357b2752e92578e5fd4bec6b789fe1246556","src/stream/stream/cycle.rs":"ed7e3d15e7b1adec5ad5789b0d3186b5995a3353cc974fb7f41a72f6d8ad4cbb","src/stream/stream/enumerate.rs":"fc7565d21d39565790859eeac9ae8dd74123a9d15b88258d3abe894f1876cc39","src/stream/stream/filter.rs":"3dd080914e6770f8790455fc9cbedf30c5f44a589ef99b19112344c75f9e9042","src/stream/stream/filter_map.rs":"3a8a3e06dfac48dd3f7b6b1a552a51a3e31ea943e012dd35729d461a1fcfad80","src/stream/stream/flatten.rs":"69493fc106a1447abe109fd54375bb30363f7bc419463a8f835e4c80d97f2186","src/stream/stream/fold.rs":"75d61d4321db1bcbbdd1a0102d9ad60206275777167c008fc8953e50cd978a09","src/stream/stream/for_each.rs":"07bca889821bad18ff083e54abe679fbeb8cd19c086581c2f2722cba6b42263f","src/stream/stream/for_each_concurrent.rs":"4e1e7eb3d4ccfae0e8000651b75834e2960a7f9c62ab92dba35a0bdbbf5bbb21","src/stream/stream/forward.rs":"cd024ba1a3d5098d3ff2d5178a12e068916cc4307284b00c18dbc54b554a5560","src/stream/stream/fuse.rs":"061c5385f12f80c7906cb15ddb8f455ced6ce21d1de9a97de9db2616407c0cac","src/stream/stream/into_future.rs":"b46ad45cc03ddd778a9ffaa0d603c8ee0b411f49333100160959942cde9588bd","src/stream/stream/map.rs":"b91bdd5b33821a50c9b5034261a14f89ff1a9d541ab99b9d9a6921b12a5d434e","src/stream/stream/mod.rs":"106daa368424cca9e35aab6ec1bc177570aeca1dfaec57b188682ae3aaee11b7","src/stream/stream/next.rs":"7b4d5a22b5e00aa191ea82346bb1f392121cc68692864a8230e462d59e622928","src/stream/stream/peek.rs":"1ef5f11b1f0cc11d01690bebe282d8953ff8e860597f4ce21208fc274be5e98e","src/stream/stream/ready_chunks.rs":"4e6deb3a6d453fd4e982bdba416188311a72daca1b218d4e9ef20819fc09b5b2","src/stream/stream/scan.rs":"54489c8efef60dbf3c35ee803afee5c5ea7c364fb9b68939a04956e46febb856","src/stream/stream/select_next_some.rs":"0094eccc96cfe78d9b6d0a9bdb82cada8fb7929770a3ac00ffcb5441d7dc4f51","src/stream/stream/skip.rs":"61f7ec7fe25663d2c87cffaad19ed27eda032842edb8af731b521025b244f120","src/stream/stream/skip_while.rs":"75ee580e0111200758d0c0fe154276007ff233db6b63a8223f0baeac1db18874","src/stream/stream/split.rs":"fa4adea18708dad384eb347260cfb965d30c40a677e15f9267f97aa382c6306c","src/stream/stream/take.rs":"505f83d341dc84eeab46f5e66adfa21a36207cb66f2394dd6a256576db665827","src/stream/stream/take_until.rs":"0f1fa7d158192a5dee32392dfdd062c15dab6d246b0ca267e91aae490d7d7fdb","src/stream/stream/take_while.rs":"51007dbde8434fd22c5ef2481a99463f11b3785e4bdeb73fa583a17f29f5c228","src/stream/stream/then.rs":"9dcfdc741d1d7dea0100aa9f1feadb932f8530f7a7d3071befc1e490a6cb50ed","src/stream/stream/unzip.rs":"9ad4db7522f66a9133e464c13d6c95682c797ae5a986e60d3aba358c65031fe8","src/stream/stream/zip.rs":"56f30f513e11754f59ead5ca4112014cba9278d02796eb8fe0937ae8bb4d44cd","src/stream/try_stream/and_then.rs":"22ca6e547d0db2e07b0a928c48118a532adaf28d85c60ab84b8366dbfeab9161","src/stream/try_stream/into_async_read.rs":"f584fd8dfdab90328fc89eac78306caa308d43f3035c1c5489e55384007e77ed","src/stream/try_stream/into_stream.rs":"4fee94e89956a42871fc4a0cdba7ae1b7d4265e884528799cd227c9dd851acce","src/stream/try_stream/mod.rs":"7a83406bfbefe4fe651b9535035fef80d52e995a44dcd0e16105bf274e0fef06","src/stream/try_stream/or_else.rs":"8cc7f602da1ffee21bf06c5203aa0427514a83b67941ae264459f1eff8dc8aec","src/stream/try_stream/try_buffer_unordered.rs":"64e698ea6aefbe7e32d48e737553b20b9cde5c258963bb20486b48b7d6899660","src/stream/try_stream/try_buffered.rs":"7546d396026bf700d3f37f55d5a4e39abe5fb05919e6a269feeb8be7af19256c","src/stream/try_stream/try_chunks.rs":"58b8c5af4914eae3698e528b0361532b391bf4b3463f4c790e43c8069cfe1bd7","src/stream/try_stream/try_collect.rs":"1132751055a51b936ed28b83c4eed7dee3f40a4be13ea374b30086e864e1ee09","src/stream/try_stream/try_concat.rs":"f2330ebeeab30273e9ac0e8600bfe2f405ce671f6386e688b3afb1d2fdd7c2c6","src/stream/try_stream/try_filter.rs":"7c2a09cdb1753ecb49a44d1d84742cb2050a999a8148448b31bb404eb2d17154","src/stream/try_stream/try_filter_map.rs":"5afc6ab35e2b425e37ed217a6bb038459c8828d6bcd6a3699883d6df071dc7e7","src/stream/try_stream/try_flatten.rs":"e05614d86a27ab8386476eea35fd424c07e5f7f99cf0401d63a6655eb7ca1247","src/stream/try_stream/try_fold.rs":"b96aa2fe1a16f625d5045028a86ff8684dcf5198ef8c7c072f52f39aeaa8b619","src/stream/try_stream/try_for_each.rs":"3f3901d618333b740d470eb02fcbb645df92483493872298bb7bd0382646028a","src/stream/try_stream/try_for_each_concurrent.rs":"78a94a77f329862c2a245ec3add97e49c534985f0d9da98f205b7fa3c7c08df3","src/stream/try_stream/try_next.rs":"6e29473153db1435906e79f7eaa13ce9da842d4528ba9eb1c0034665feacc565","src/stream/try_stream/try_skip_while.rs":"c0259ec70bdf4a81c1fa569275766e3e65db9d5715c81e93ada04817c1835add","src/stream/try_stream/try_take_while.rs":"54927dfa95ff58b542a1d7382f564eeae5e02e633c948b1a39ac09bc7e92f5f5","src/stream/try_stream/try_unfold.rs":"aaf0f4857a4ec8233ac842ae509f29e5a210827a0bb40cfc0dc3e858f153d2b4","src/stream/unfold.rs":"8b2feb00f979562b43064eb078d53a160cdb3c65deed17ec25a05938df2d370f","src/task/mod.rs":"074ce7f3869663d2e768bb08ea201ed1be176e13edd4150f201bc1ea362170d3","src/task/spawn.rs":"26bbcf1d65e1467de0ecdad2b56f464a510cda7c1933427d69a1b50459836489","src/unfold_state.rs":"ffe848071a99d6afcdbe8281a8a77a559a7dde434fc41f734c90e6b9b5d8a5af"},"package":"d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164"} \ No newline at end of file
+{"files":{"Cargo.toml":"2889f27d32d20c79aeba0c92bbe5ff2066d96eb0fd1603bd2dece4090ac2eb29","LICENSE-APACHE":"275c491d6d1160553c32fd6127061d7f9606c3ea25abfad6ca3f6ed088785427","LICENSE-MIT":"6652c868f35dfe5e8ef636810a4e576b9d663f3a17fb0f5613ad73583e1b88fd","README.md":"4094b953bfd2bb2687df0c3c3deb05c307c14ac084e6a79878342b8ee56aa710","benches/bilock.rs":"6f59b71f9b9ca5751018a985eff0ea8d63d4cb6d18a17e672e17bc786b972c20","benches/flatten_unordered.rs":"79330465a5d8f2d6e450861e7ca2ed8ae7fe78a5fb221b6ab7121227810c1bcf","benches/futures_unordered.rs":"5eb8280be8d8fb7bd5fb103ce20db10f618f47e180a402105e0d5e9f8c9fe35a","benches/select.rs":"ca0a79bc3434f0fc025e0b0e37941ba1d592b40f36ce6544cdfede9f23e70581","build.rs":"5b263bd2bd587511a9c8daef580b05e0613c15a6c5f800b1e5bc145fa013d99e","no_atomic_cas.rs":"7ae747b83b08dd926c1696faf4ecab9399c652ae77d5179221258c73b8eecb6f","src/abortable.rs":"38bcb3d48361e4cfa89cc2e225c5f1dc97129837da834d28b69cff3bbf5200a6","src/async_await/join_mod.rs":"8f83c0001df867f5eb47a4174bf4a0c0b548f8ff3be3b532e0c759ad981b87da","src/async_await/mod.rs":"3d25c343cc3e789d3f982cdacd6f8ed91511ba656c3923da310700f318f423a4","src/async_await/pending.rs":"7971ec1d5d89ad80390e2a0c51e396257b2e78f1436cce79ea2b55ac2f13b328","src/async_await/poll.rs":"440c19a89fd42b12da09ff48a69523b5a8a5baea0bcd2f860589a0ab996ed781","src/async_await/random.rs":"daf229cd01595d38ef0f6284865fe2f60ed3b8134f7a15c82564b97ff3a5be98","src/async_await/select_mod.rs":"414c7fb7923cfe21116d558bf3cd1a6ae5bef4ed01f9877f0e7cb3e42ee6c79d","src/async_await/stream_select_mod.rs":"9a51338914cbb1502619fed591dfe4fc676919499b9d041898e59f630fe5e7f0","src/compat/compat01as03.rs":"6728ffd4f0a92d4e6aff8b7ff7916ad7ae20a317633d2739813a3b6ffc814204","src/compat/compat03as01.rs":"7cf29e57f8ee14b64123b3d2c16dceced25af5491a5ef81b655b2de2e9587fbe","src/compat/executor.rs":"3e40b4ccd905a99eab42c47fefc5502b530eef869158ce9ceaa28f8f1638436f","src/compat/mod.rs":"6cf3412f6a3f9ee8406118ea75de65468a83febc6ba61bdbad69261f0cfea02e","src/fns.rs":"f8e396128791169098a38a82c3c28aaa6dd5d40718635f7cc30b59b32f7110b8","src/future/abortable.rs":"373ce61c0c7c31718ff572113503bb88f55e3b49ed5d028a3dfafd69070f44c1","src/future/either.rs":"fb00002e68b5c46c8ded09e91efe0be7362c168a1ea00dc5906e1c8c7e38aaa4","src/future/future/catch_unwind.rs":"08b0ac049cdee28325d378209aa5bb4d91b14a29ddd9c2b0e5c661b61f9cfcfe","src/future/future/flatten.rs":"5bf9846cef8dec5dcc38b992653e11146bc149a0d3efc09b1f8268bd29de0b2b","src/future/future/fuse.rs":"65b80a1ba7556e2ef35ce8d23e47489a2a6eb6d1c3ef9ac4e080c63e69eaa07d","src/future/future/map.rs":"de607c2a4d80d2bddb590781c37328ddd294bb9d5064a9ecb99455244239b597","src/future/future/mod.rs":"ecfac09dcba801cede7c58acfaa76a9ab76d26a3f4c968d66c2a49caa57faefe","src/future/future/remote_handle.rs":"2ae17a409569b32c78e20026a8ecdf667352c2597a4a0a8deefa4761fafcb223","src/future/future/shared.rs":"d1973063327851931c75969ec627657e5e34e8cfa97f295d65a4be288377c446","src/future/join.rs":"38b55fc7cdbbdaaa525e51f8ce09783dbbcb65eabfd7de9f46610593e0bbef17","src/future/join_all.rs":"4813aba0e6ddf02310ba3d368eb6af44b30aaac227d1b799e977996db9e3cf36","src/future/lazy.rs":"d161fc4108a97348c1becbbd5ba8fccb7225dcf1d81c097666f5c8b40718251d","src/future/maybe_done.rs":"559e41cb170f9fe7246d2a5b112527a9f9cbca63b8a5a872b3aa9c861f70f307","src/future/mod.rs":"51e018100362f20b071225268f1d81f25c8e9664e94730af199069c2692bf26a","src/future/option.rs":"73daca814800b91b707753dcfe074265372b0077fae2504ea6efddc713453579","src/future/pending.rs":"3967984d2061e6b201c407f28ba8392a21fc9ef7c0b9201e2e244110af0782c5","src/future/poll_fn.rs":"8e54bf57d60e01d496ae31df35e0b96868f4bda504c024a14f51ab723d67885f","src/future/poll_immediate.rs":"7e199fc102894c9095de17af602a7c8f05d427269aefce5d71cd5136d54659c0","src/future/ready.rs":"c9860ccd8ac529f44f66dee73ca9b9d7f1b1b3e5e9e4dc70c59640c752553d58","src/future/select.rs":"0c358a5ae079858f31c61cf6ea835205fdb9092d07536778440b975995d2626c","src/future/select_all.rs":"5b304210c34cc2bd84f7b1819baa30a68eea2ee578b10b243f5dd884ee9a4791","src/future/select_ok.rs":"dc35027db70c0111399c6ab6f7c977e6e7362f069a3891e4a62006c52643528e","src/future/try_future/into_future.rs":"d966bde7b06a88443f0efd877e95f91541778c4e713f3f4b66e00ca5d3f352b6","src/future/try_future/mod.rs":"991edb3b52903ceb3bcb6599d04d898509023cd038c5974f4872eaafa9748f08","src/future/try_future/try_flatten.rs":"16c02e1780bd312b8b386e41c1d9dd4bcc4e8ef10f26007364f857b3adcc6e99","src/future/try_future/try_flatten_err.rs":"130f3fc3fd95a19f4e4a50e69301106fab02f77d0faf3aac9c473a92b826c2ca","src/future/try_join.rs":"1836931f8ba32da41c6810e6acc0ea2fee75b74b3153e760c4542cb12b220540","src/future/try_join_all.rs":"4d01395c74c7b82c581a578f2cb34087824b091f70075ebcd76a8d12b8476c1f","src/future/try_maybe_done.rs":"1cce46b2ee43ad51b7c5f9c02bc90a890af32bc549ce99098a2c8813508051e1","src/future/try_select.rs":"5d6187ace76b5f26e60c713a1fe9fcb9cbb0d161c5881c532ce9472a230b595d","src/io/allow_std.rs":"a125959c255fd344399fb0be19218a8ee7d613ce2485d6df9cdbc2ed5d3987df","src/io/buf_reader.rs":"46a1e24046c5bc2ab8f266e3d904281bec3ab4ba6c13d4213a52599b57b8de66","src/io/buf_writer.rs":"d6666b8dde60eefbb7fa69da4a2eea2b34ea0e4a85e21e5ac6e83cc680ea9140","src/io/chain.rs":"12f508fc39c3234a71a0f886505245c5d659aed09c7d874b1bd8ca0a0d456cf3","src/io/close.rs":"9832210a870637198fa58642cdf2779afab71f2e31a9953e663fa6854bd73ac7","src/io/copy.rs":"cb2466dcd7ea8bb1f07d00c03e66ed55abf71fe4be6937adc9f533ef9d99fb2d","src/io/copy_buf.rs":"e9a5f6aac8375e298bddb332f23d8b626d056ce452b58f772a05df7e2cd326cf","src/io/copy_buf_abortable.rs":"28ef452bc49423e0a6e8323b5956b37c57335941f99797867e5c5932f9366136","src/io/cursor.rs":"c12e9b82c6eff2108a5524b026d73fbb2c250072e8e3f673cc04d4da02a553b8","src/io/empty.rs":"6ae40b4bc8fc41572abad2d013285d78d8df445868d41fac77bde508ec9bc1a5","src/io/fill_buf.rs":"4f217fed8eb3f66dbde2371c3fbcfa9420d38ba20da544a0658584e5778aa47d","src/io/flush.rs":"0c9b588dfd9da039dc123ba9448ac31ca21ee3da0a164a21f6c2c182183d43e2","src/io/into_sink.rs":"ab5bdb12bff62672175b69b8c9f5a4bbbea716b9cf89169ed6a723ab43da9df8","src/io/line_writer.rs":"16c151c68d89b7c2ab929c4a782539b1ad512b723eed9b544f50f1ff06f0b661","src/io/lines.rs":"137279b6b899ce438fb1b0ee9e6a412976f9f9db54fb7b961d2bad8787a26b1e","src/io/mod.rs":"bead8faa1bd4c3733543e38cf64dc9b52d703440367f2efb460bda9f9baafa0b","src/io/read.rs":"4ea675a83cec98a22c9c4731ff980209f0cf67f63c71871cd1deed53c1266345","src/io/read_exact.rs":"ddebd58db9f6766efa3f50543fb51b138538533921e1ee1da4621fff9c64efe2","src/io/read_line.rs":"e22c853ddfd769c441b1a1dc59cbcda4f22a9c49e86f6a697f94193fce3bcdfb","src/io/read_to_end.rs":"5e9e38dc087623dac5a3ae3ad329ed44ffe4f6205a78e546adadc3ffb76703fc","src/io/read_to_string.rs":"bef4cc292dd95fa9c850d0438ad0cf49a8cc4caf40a0384f763f8c9512ad9e79","src/io/read_until.rs":"354507ce95242a735940f0aaa6ef11cc7d6d0505ae148f05277ce6e7537f168a","src/io/read_vectored.rs":"bd7f442c92f2cb320075d0983b0d08d51c23078898d72e6c2857cf6c7ad4cec7","src/io/repeat.rs":"53bc472e4bd7d286bf90765ce574f13b7aabc871c4f04f712da7cea160491390","src/io/seek.rs":"9863e9fb6495eb6e1f8c45c283c8a6993b9bdb1462f75a3e525e135c6840dec7","src/io/sink.rs":"30a503631d196e5da92c386d0afc1af9656a5f7682456cfa2489a2c30a05cac5","src/io/split.rs":"2aa567452b713497d5b85813980b69e888aee32be14492c92404d261fd50eb09","src/io/take.rs":"c53fec5b5e8c3742b7e60e6ebfa625cf2e566fbea193fb1eee2f0a8e561d63d5","src/io/window.rs":"295d7dc18ad101642003cd67687242e4bdba11552cfb7f18c521cbff369e6f71","src/io/write.rs":"60670eb00f999f2e2c43b099759a7fb030325b323744d88c9d20f75926ec30df","src/io/write_all.rs":"8fcd4ff233650b5abd20f7b987000cac095d8de23445572de588dccf710623c6","src/io/write_all_vectored.rs":"53becf89c031bf4c3073f0903ce809eee7606b1b4fbeb518605875badba216d3","src/io/write_vectored.rs":"bc98ff4a709cb75cd9ffedefa8ef251089a49906b98e142d76447ddf4ac098bb","src/lib.rs":"384447fb9bfcd3b110656979cca71b53c3abe72690e970c30563c1baba27fd74","src/lock/bilock.rs":"a294b016cfb39fb54406a6190438546a5fd7c8ef21667ab38a6cea9cb2d3ef7b","src/lock/mod.rs":"ed0f4ef97af382f6038730bd5932b449f32dc3a634e73e7ebb48a24bb7782d6f","src/lock/mutex.rs":"745c68e571f84a7456681cd683b2b8eed28ea8b6d3f9a38337efad105a65e0b6","src/never.rs":"2066481ab04921269cfa768cb8b778a035ab6aa49ec404d9ac0aeb07a4bf6094","src/sink/buffer.rs":"33a7380f8232225a8e9ac5ee138fd095979efa3a64f9fecf5fcaf2e78fcbc355","src/sink/close.rs":"f2f31c884f048163abebd4f5a877b7b4306f7d02beae428325636fd00ed42ca9","src/sink/drain.rs":"60262bf3ef48c09b4d52e52953f9437d536e20f63690b73e975388751405d239","src/sink/err_into.rs":"ced2998b2b0b792d80f7543523c9e07e8f5d20a4336cae93084b995e46671b15","src/sink/fanout.rs":"66dcde056e0bbee4e0074d331838ed2743dc872ea1597f05d61970523dc34926","src/sink/feed.rs":"64b9d296d37aedde37e1421c459ebcd9a7e8814db905996996167850124f3b3f","src/sink/flush.rs":"fbba344f428ca7636541ba013f7db2ece480b404a9e0b421c5537552d61e2492","src/sink/map_err.rs":"0f68f444ef13fe7115164be855c3b7b1d269e1119e69fcdad1706988255641f1","src/sink/mod.rs":"37cf379170f3099992eb59f3181be4c4e4a5c2d3581dbe424d22ab360840d321","src/sink/send.rs":"56aaba9aa4a562e0af39473a5779206d91b0acb1fced4fc06cd8b959d1897524","src/sink/send_all.rs":"a8e4956604fe73e321b0a3896c2018bc5c27149f2862f8406112db140b3aa2dd","src/sink/unfold.rs":"5febcfb9295a79fe1187284d0d45055c787e399b00d73c0e85a0446ae2246d18","src/sink/with.rs":"850cd3b96304df1f38360a0bc60b02d485535e399ef7642acdd9add7876867d8","src/sink/with_flat_map.rs":"5e0f527b33ee8f1cc6a6a46d45b6d74dad5c735d88b2cb24e1cb34fdc6ef501b","src/stream/abortable.rs":"935d79aa44d793f4abe87ca27a9e4a20891500488cf942693cd2756d65b3aab2","src/stream/empty.rs":"5000c856186408a17f68bbef432d4a1a3edb7fb5a07ed8699342fef04b10a181","src/stream/futures_ordered.rs":"c62010493e68e1c6317c189ce36af48770736407c2e0e60e6c677f3b20b4b12b","src/stream/futures_unordered/abort.rs":"bdfece9f91accafd5122be36d628c37c5b219ac0eecec181267840fbb1e95a45","src/stream/futures_unordered/iter.rs":"01f8aaa2ac7ea493bf727a945424cc6ae695c9a0c289ac57cbb26697abb05827","src/stream/futures_unordered/mod.rs":"460cdf03695f6b292d46bc58138952f0c3d84fe58974337bca6be5a1ff30e48a","src/stream/futures_unordered/ready_to_run_queue.rs":"3a9c08cb5df28e57f2bfe613b8174d0dfb420b8664dd7c46a053e2980a6d3482","src/stream/futures_unordered/task.rs":"2b780bcc97844bc0bdeced7bb4318066e86ba082c08c8628c9b2e92bfe36fb61","src/stream/iter.rs":"609fa821a460e901a54ae51f8da58220881157cef02b8b7b8c9e4321c2d05a23","src/stream/mod.rs":"8ec9b052297b82a1be6b9a2ad631cf686e8cc17e763794ebeeea3a39e3a72805","src/stream/once.rs":"d7b70adabad1f10af711ac3dcef33fd4c287e9852fdb678406e7ff350ba8fd47","src/stream/pending.rs":"84aaa15c8bbb17a250da5b1b5f0c7f6717410915d63340a3fcbf098bebe19d6f","src/stream/poll_fn.rs":"35952ea514b8aade14a3934d7777006475f50bbf0c5b50141710e31637f980be","src/stream/poll_immediate.rs":"e7a53ff8275ebe89dab8f9b984cce2ee0fde0a828e540b77c5500ca017d5bb98","src/stream/repeat.rs":"e4e4a9b6f2fca72bcbf098c3ac0c4a41323a840741d4dce9d9416464b7e8bd0d","src/stream/repeat_with.rs":"525780d24f3f99152b879765ca6eab99bcc0c757dc6654b6635c099b93ea654d","src/stream/select.rs":"28eb422c0eca9fd02778a6003004471b3489db09746a70e617a506303ea8b81d","src/stream/select_all.rs":"19ef94abcf63fa9e46a73b6ab783642d2d069a015c7fa57fea36eeac7b6f2a20","src/stream/select_with_strategy.rs":"caa0f5d1fd02824b48a1cd2be13a6f96b532039eb88cf47ea5d2becf58595073","src/stream/stream/all.rs":"43cfb69de0ea991497d26d0aeb02091f10eb241ef93758b54c5e7aced5b63b63","src/stream/stream/any.rs":"2582da02f9a1ce2bd0af87a64b65188fc93686c5e3dd9128e89e5f57c1d70e43","src/stream/stream/buffer_unordered.rs":"b0f7a1c72cee178e7bfd8990e6e426c1258eeba6d952b82c6be8e4cac0a054ea","src/stream/stream/buffered.rs":"e37d08d6a18090ba37079937575920cc8c7569f4183dba710d3f4b94c11da01b","src/stream/stream/catch_unwind.rs":"b2e801ff744d5d9e17177ec1156b0ab67bdd56b94c618ed8590344ec8a0f35e7","src/stream/stream/chain.rs":"809b6b5c8372f65341dc9810d39f60ae3bcf74a78f133b4ab8d289fb5f2a7cbb","src/stream/stream/chunks.rs":"9f872b473de14d2251584050f04d56eada9c3b1d8dc3e746bdd57c1f757bfc6f","src/stream/stream/collect.rs":"6e4d2d580189f7d3b6b294b6b17437e8e2570502f08c11786a71caac207f0309","src/stream/stream/concat.rs":"171ea941b45c0295ed978c3f318a449ea295e33cb4ea82c764f4e9e7c48ad5de","src/stream/stream/count.rs":"ff218aea3d2d2456c8163926ea0c357b2752e92578e5fd4bec6b789fe1246556","src/stream/stream/cycle.rs":"ed7e3d15e7b1adec5ad5789b0d3186b5995a3353cc974fb7f41a72f6d8ad4cbb","src/stream/stream/enumerate.rs":"fc7565d21d39565790859eeac9ae8dd74123a9d15b88258d3abe894f1876cc39","src/stream/stream/filter.rs":"5d871f416d41baff3733121f564229fe31bdf7dfaaeb78ab940fafba6ab4b7c6","src/stream/stream/filter_map.rs":"179045a5ab1295e77ab5cfea1964be69dc50984ef8ac9ee04034adf0a043514f","src/stream/stream/flatten.rs":"69493fc106a1447abe109fd54375bb30363f7bc419463a8f835e4c80d97f2186","src/stream/stream/flatten_unordered.rs":"dd5216fc0e34d09cc69ae6b3c4690efe8ff01404853756bf5aa6b92eb2e6750b","src/stream/stream/fold.rs":"75d61d4321db1bcbbdd1a0102d9ad60206275777167c008fc8953e50cd978a09","src/stream/stream/for_each.rs":"07bca889821bad18ff083e54abe679fbeb8cd19c086581c2f2722cba6b42263f","src/stream/stream/for_each_concurrent.rs":"4e1e7eb3d4ccfae0e8000651b75834e2960a7f9c62ab92dba35a0bdbbf5bbb21","src/stream/stream/forward.rs":"cd024ba1a3d5098d3ff2d5178a12e068916cc4307284b00c18dbc54b554a5560","src/stream/stream/fuse.rs":"061c5385f12f80c7906cb15ddb8f455ced6ce21d1de9a97de9db2616407c0cac","src/stream/stream/into_future.rs":"b46ad45cc03ddd778a9ffaa0d603c8ee0b411f49333100160959942cde9588bd","src/stream/stream/map.rs":"b91bdd5b33821a50c9b5034261a14f89ff1a9d541ab99b9d9a6921b12a5d434e","src/stream/stream/mod.rs":"ca8f514b5157373408c6b59a8892dca8a1441a4c82557c34e6570342990da487","src/stream/stream/next.rs":"7b4d5a22b5e00aa191ea82346bb1f392121cc68692864a8230e462d59e622928","src/stream/stream/peek.rs":"2e08e6990c31186c97edb21737f83fe8640a19561062879af83090935aef99cf","src/stream/stream/ready_chunks.rs":"7e17c49ff29c106c13a2ec13fb05f32ff048e482b47a157d3965bd03c38c01c2","src/stream/stream/scan.rs":"54489c8efef60dbf3c35ee803afee5c5ea7c364fb9b68939a04956e46febb856","src/stream/stream/select_next_some.rs":"0094eccc96cfe78d9b6d0a9bdb82cada8fb7929770a3ac00ffcb5441d7dc4f51","src/stream/stream/skip.rs":"61f7ec7fe25663d2c87cffaad19ed27eda032842edb8af731b521025b244f120","src/stream/stream/skip_while.rs":"6f114a3fa538bd479e4fa24d8aa0e0e0454613643a97c44242c5683ae7293b82","src/stream/stream/split.rs":"0552ddf8f7f3a9980dbc692d0c34b72503107c714f81e853445fb6c81fe328ff","src/stream/stream/take.rs":"57d381b482c3d584c4c26b0e15941bc2ea58e3f39a2e5c74391a2ee7b825cc8c","src/stream/stream/take_until.rs":"0f1fa7d158192a5dee32392dfdd062c15dab6d246b0ca267e91aae490d7d7fdb","src/stream/stream/take_while.rs":"2f57a6e5b903c045da642e9a40eb19dabbc612a80a6ce8098df1a1973555f108","src/stream/stream/then.rs":"c995c6b0d9151927b26b10fba70e135dfc41224b969d1367dc8c11697218c1e9","src/stream/stream/unzip.rs":"e7beedc2192604e0091ac3d0265b487127a37c780198838f6419c21ef1b38df0","src/stream/stream/zip.rs":"3890b40daea00341fac6ac977de0b534d1ec7cdaabece44af5df2ca56026fe62","src/stream/try_stream/and_then.rs":"6f92b333955f5ec30fddf8e087e3f60ebf53a054769fc72c80bbccdf13a9431e","src/stream/try_stream/into_async_read.rs":"5b200c76ccb95460d94286ca8e63f5454940eb62b5f15aae998da48aa06fbffd","src/stream/try_stream/into_stream.rs":"4fee94e89956a42871fc4a0cdba7ae1b7d4265e884528799cd227c9dd851acce","src/stream/try_stream/mod.rs":"e2460ce64e3b43c92860b9fd6dd6b36f9c6f6750e9d1e7bec8f766ad84889269","src/stream/try_stream/or_else.rs":"473ca77e0e81a1a0834d2d882076b8823a5a3027b2d7d78f887be2d5edfd0de3","src/stream/try_stream/try_buffer_unordered.rs":"64e698ea6aefbe7e32d48e737553b20b9cde5c258963bb20486b48b7d6899660","src/stream/try_stream/try_buffered.rs":"38f60d7290f44471a02084c6b394b754c224a84ee8d2ba01c08568168b48a21f","src/stream/try_stream/try_chunks.rs":"69c4d85a256250d73c6372d8047e6055da7eac918cb5d7ef4f3697898f4dcb4c","src/stream/try_stream/try_collect.rs":"979920e3034dad6c75961e3f6b4c0234691db7063eca1a05562cc5d41f2943c1","src/stream/try_stream/try_concat.rs":"f2330ebeeab30273e9ac0e8600bfe2f405ce671f6386e688b3afb1d2fdd7c2c6","src/stream/try_stream/try_filter.rs":"1344e9aea05e2d0078f30caff176a99e1ccb8fcdf0a287817abc82fbaf09c48b","src/stream/try_stream/try_filter_map.rs":"285e7ea875a3ea3e16942c1b1acae5a1cb26b9bac476dce3903547cb99306602","src/stream/try_stream/try_flatten.rs":"e05614d86a27ab8386476eea35fd424c07e5f7f99cf0401d63a6655eb7ca1247","src/stream/try_stream/try_flatten_unordered.rs":"1cc4c4a5ea0a8db3010958f34fb1886dcfbd2e1584082d2004030eb70b13cd6c","src/stream/try_stream/try_fold.rs":"b96aa2fe1a16f625d5045028a86ff8684dcf5198ef8c7c072f52f39aeaa8b619","src/stream/try_stream/try_for_each.rs":"3f3901d618333b740d470eb02fcbb645df92483493872298bb7bd0382646028a","src/stream/try_stream/try_for_each_concurrent.rs":"78a94a77f329862c2a245ec3add97e49c534985f0d9da98f205b7fa3c7c08df3","src/stream/try_stream/try_next.rs":"6e29473153db1435906e79f7eaa13ce9da842d4528ba9eb1c0034665feacc565","src/stream/try_stream/try_skip_while.rs":"7c2fa31fe8b0b4e59c5d7f2972c8d9f83e8f01a687b08f5cd631f92a14b402f1","src/stream/try_stream/try_take_while.rs":"2783664637aff0442f0c9204d35600139c941332310f70495cbc4dc345cae99d","src/stream/try_stream/try_unfold.rs":"aaf0f4857a4ec8233ac842ae509f29e5a210827a0bb40cfc0dc3e858f153d2b4","src/stream/unfold.rs":"8b2feb00f979562b43064eb078d53a160cdb3c65deed17ec25a05938df2d370f","src/task/mod.rs":"074ce7f3869663d2e768bb08ea201ed1be176e13edd4150f201bc1ea362170d3","src/task/spawn.rs":"8ff3a3652d8d2cb45717324b6ead9c3f111629e7eb0c0b33d3639a0e7c5bbf3e","src/unfold_state.rs":"ffe848071a99d6afcdbe8281a8a77a559a7dde434fc41f734c90e6b9b5d8a5af"},"package":"26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"} \ No newline at end of file
diff --git a/vendor/futures-util/Cargo.toml b/vendor/futures-util/Cargo.toml
index f18cfbcf9..a3b9cadf2 100644
--- a/vendor/futures-util/Cargo.toml
+++ b/vendor/futures-util/Cargo.toml
@@ -11,44 +11,52 @@
[package]
edition = "2018"
-rust-version = "1.45"
+rust-version = "1.56"
name = "futures-util"
-version = "0.3.19"
-description = "Common utilities and extension traits for the futures-rs library.\n"
+version = "0.3.28"
+description = """
+Common utilities and extension traits for the futures-rs library.
+"""
homepage = "https://rust-lang.github.io/futures-rs"
+readme = "README.md"
license = "MIT OR Apache-2.0"
repository = "https://github.com/rust-lang/futures-rs"
+
[package.metadata.docs.rs]
all-features = true
-rustdoc-args = ["--cfg", "docsrs"]
+rustdoc-args = [
+ "--cfg",
+ "docsrs",
+]
+
[dependencies.futures-channel]
-version = "0.3.19"
+version = "0.3.28"
features = ["std"]
optional = true
default-features = false
[dependencies.futures-core]
-version = "0.3.19"
+version = "0.3.28"
default-features = false
[dependencies.futures-io]
-version = "0.3.19"
+version = "0.3.28"
features = ["std"]
optional = true
default-features = false
[dependencies.futures-macro]
-version = "=0.3.19"
+version = "=0.3.28"
optional = true
default-features = false
[dependencies.futures-sink]
-version = "0.3.19"
+version = "0.3.28"
optional = true
default-features = false
[dependencies.futures-task]
-version = "0.3.19"
+version = "0.3.28"
default-features = false
[dependencies.futures_01]
@@ -61,7 +69,7 @@ version = "2.2"
optional = true
[dependencies.pin-project-lite]
-version = "0.2.4"
+version = "0.2.6"
[dependencies.pin-utils]
version = "0.1.0"
@@ -73,21 +81,55 @@ optional = true
[dependencies.tokio-io]
version = "0.1.9"
optional = true
+
[dev-dependencies.tokio]
version = "0.1.11"
[features]
-alloc = ["futures-core/alloc", "futures-task/alloc"]
+alloc = [
+ "futures-core/alloc",
+ "futures-task/alloc",
+]
async-await = []
-async-await-macro = ["async-await", "futures-macro"]
+async-await-macro = [
+ "async-await",
+ "futures-macro",
+]
bilock = []
cfg-target-has-atomic = []
-channel = ["std", "futures-channel"]
-compat = ["std", "futures_01"]
-default = ["std", "async-await", "async-await-macro"]
-io = ["std", "futures-io", "memchr"]
-io-compat = ["io", "compat", "tokio-io"]
+channel = [
+ "std",
+ "futures-channel",
+]
+compat = [
+ "std",
+ "futures_01",
+]
+default = [
+ "std",
+ "async-await",
+ "async-await-macro",
+]
+io = [
+ "std",
+ "futures-io",
+ "memchr",
+]
+io-compat = [
+ "io",
+ "compat",
+ "tokio-io",
+]
+portable-atomic = ["futures-core/portable-atomic"]
sink = ["futures-sink"]
-std = ["alloc", "futures-core/std", "futures-task/std", "slab"]
-unstable = ["futures-core/unstable", "futures-task/unstable"]
+std = [
+ "alloc",
+ "futures-core/std",
+ "futures-task/std",
+ "slab",
+]
+unstable = [
+ "futures-core/unstable",
+ "futures-task/unstable",
+]
write-all-vectored = ["io"]
diff --git a/vendor/futures-util/README.md b/vendor/futures-util/README.md
index 6e0aaed84..60e2c2109 100644
--- a/vendor/futures-util/README.md
+++ b/vendor/futures-util/README.md
@@ -11,7 +11,7 @@ Add this to your `Cargo.toml`:
futures-util = "0.3"
```
-The current `futures-util` requires Rust 1.45 or later.
+The current `futures-util` requires Rust 1.56 or later.
## License
diff --git a/vendor/futures-util/benches/bilock.rs b/vendor/futures-util/benches/bilock.rs
new file mode 100644
index 000000000..013f3351e
--- /dev/null
+++ b/vendor/futures-util/benches/bilock.rs
@@ -0,0 +1,68 @@
+#![feature(test)]
+#![cfg(feature = "bilock")]
+
+extern crate test;
+
+use futures::task::Poll;
+use futures_test::task::noop_context;
+use futures_util::lock::BiLock;
+
+use crate::test::Bencher;
+
+#[bench]
+fn contended(b: &mut Bencher) {
+ let mut context = noop_context();
+
+ b.iter(|| {
+ let (x, y) = BiLock::new(1);
+
+ for _ in 0..1000 {
+ let x_guard = match x.poll_lock(&mut context) {
+ Poll::Ready(guard) => guard,
+ _ => panic!(),
+ };
+
+ // Try poll second lock while first lock still holds the lock
+ match y.poll_lock(&mut context) {
+ Poll::Pending => (),
+ _ => panic!(),
+ };
+
+ drop(x_guard);
+
+ let y_guard = match y.poll_lock(&mut context) {
+ Poll::Ready(guard) => guard,
+ _ => panic!(),
+ };
+
+ drop(y_guard);
+ }
+ (x, y)
+ });
+}
+
+#[bench]
+fn lock_unlock(b: &mut Bencher) {
+ let mut context = noop_context();
+
+ b.iter(|| {
+ let (x, y) = BiLock::new(1);
+
+ for _ in 0..1000 {
+ let x_guard = match x.poll_lock(&mut context) {
+ Poll::Ready(guard) => guard,
+ _ => panic!(),
+ };
+
+ drop(x_guard);
+
+ let y_guard = match y.poll_lock(&mut context) {
+ Poll::Ready(guard) => guard,
+ _ => panic!(),
+ };
+
+ drop(y_guard);
+ }
+ (x, y)
+ })
+}
diff --git a/vendor/futures-util/benches/flatten_unordered.rs b/vendor/futures-util/benches/flatten_unordered.rs
new file mode 100644
index 000000000..517b2816c
--- /dev/null
+++ b/vendor/futures-util/benches/flatten_unordered.rs
@@ -0,0 +1,58 @@
+#![feature(test)]
+
+extern crate test;
+use crate::test::Bencher;
+
+use futures::channel::oneshot;
+use futures::executor::block_on;
+use futures::future;
+use futures::stream::{self, StreamExt};
+use futures::task::Poll;
+use futures_util::FutureExt;
+use std::collections::VecDeque;
+use std::thread;
+
+#[bench]
+fn oneshot_streams(b: &mut Bencher) {
+ const STREAM_COUNT: usize = 10_000;
+ const STREAM_ITEM_COUNT: usize = 1;
+
+ b.iter(|| {
+ let mut txs = VecDeque::with_capacity(STREAM_COUNT);
+ let mut rxs = Vec::new();
+
+ for _ in 0..STREAM_COUNT {
+ let (tx, rx) = oneshot::channel();
+ txs.push_back(tx);
+ rxs.push(rx);
+ }
+
+ thread::spawn(move || {
+ let mut last = 1;
+ while let Some(tx) = txs.pop_front() {
+ let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT));
+ last += STREAM_ITEM_COUNT;
+ }
+ });
+
+ let mut flatten = stream::iter(rxs)
+ .map(|recv| recv.into_stream().map(|val| val.unwrap()).flatten())
+ .flatten_unordered(None);
+
+ block_on(future::poll_fn(move |cx| {
+ let mut count = 0;
+ loop {
+ match flatten.poll_next_unpin(cx) {
+ Poll::Ready(None) => break,
+ Poll::Ready(Some(_)) => {
+ count += 1;
+ }
+ _ => {}
+ }
+ }
+ assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT);
+
+ Poll::Ready(())
+ }))
+ });
+}
diff --git a/vendor/futures-util/benches/select.rs b/vendor/futures-util/benches/select.rs
new file mode 100644
index 000000000..5410a9529
--- /dev/null
+++ b/vendor/futures-util/benches/select.rs
@@ -0,0 +1,35 @@
+#![feature(test)]
+
+extern crate test;
+use crate::test::Bencher;
+
+use futures::executor::block_on;
+use futures::stream::{repeat, select, StreamExt};
+
+#[bench]
+fn select_streams(b: &mut Bencher) {
+ const STREAM_COUNT: usize = 10_000;
+
+ b.iter(|| {
+ let stream1 = repeat(1).take(STREAM_COUNT);
+ let stream2 = repeat(2).take(STREAM_COUNT);
+ let stream3 = repeat(3).take(STREAM_COUNT);
+ let stream4 = repeat(4).take(STREAM_COUNT);
+ let stream5 = repeat(5).take(STREAM_COUNT);
+ let stream6 = repeat(6).take(STREAM_COUNT);
+ let stream7 = repeat(7).take(STREAM_COUNT);
+ let count = block_on(async {
+ let count = select(
+ stream1,
+ select(
+ stream2,
+ select(stream3, select(stream4, select(stream5, select(stream6, stream7)))),
+ ),
+ )
+ .count()
+ .await;
+ count
+ });
+ assert_eq!(count, STREAM_COUNT * 7);
+ });
+}
diff --git a/vendor/futures-util/benches_disabled/bilock.rs b/vendor/futures-util/benches_disabled/bilock.rs
deleted file mode 100644
index 417f75d31..000000000
--- a/vendor/futures-util/benches_disabled/bilock.rs
+++ /dev/null
@@ -1,122 +0,0 @@
-#![feature(test)]
-
-#[cfg(feature = "bilock")]
-mod bench {
- use futures::executor::LocalPool;
- use futures::task::{Context, Waker};
- use futures_util::lock::BiLock;
- use futures_util::lock::BiLockAcquire;
- use futures_util::lock::BiLockAcquired;
- use futures_util::task::ArcWake;
-
- use std::sync::Arc;
- use test::Bencher;
-
- fn notify_noop() -> Waker {
- struct Noop;
-
- impl ArcWake for Noop {
- fn wake(_: &Arc<Self>) {}
- }
-
- ArcWake::into_waker(Arc::new(Noop))
- }
-
- /// Pseudo-stream which simply calls `lock.poll()` on `poll`
- struct LockStream {
- lock: BiLockAcquire<u32>,
- }
-
- impl LockStream {
- fn new(lock: BiLock<u32>) -> Self {
- Self { lock: lock.lock() }
- }
-
- /// Release a lock after it was acquired in `poll`,
- /// so `poll` could be called again.
- fn release_lock(&mut self, guard: BiLockAcquired<u32>) {
- self.lock = guard.unlock().lock()
- }
- }
-
- impl Stream for LockStream {
- type Item = BiLockAcquired<u32>;
- type Error = ();
-
- fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>, Self::Error> {
- self.lock.poll(cx).map(|a| a.map(Some))
- }
- }
-
- #[bench]
- fn contended(b: &mut Bencher) {
- let pool = LocalPool::new();
- let mut exec = pool.executor();
- let waker = notify_noop();
- let mut map = task::LocalMap::new();
- let mut waker = task::Context::new(&mut map, &waker, &mut exec);
-
- b.iter(|| {
- let (x, y) = BiLock::new(1);
-
- let mut x = LockStream::new(x);
- let mut y = LockStream::new(y);
-
- for _ in 0..1000 {
- let x_guard = match x.poll_next(&mut waker) {
- Ok(Poll::Ready(Some(guard))) => guard,
- _ => panic!(),
- };
-
- // Try poll second lock while first lock still holds the lock
- match y.poll_next(&mut waker) {
- Ok(Poll::Pending) => (),
- _ => panic!(),
- };
-
- x.release_lock(x_guard);
-
- let y_guard = match y.poll_next(&mut waker) {
- Ok(Poll::Ready(Some(guard))) => guard,
- _ => panic!(),
- };
-
- y.release_lock(y_guard);
- }
- (x, y)
- });
- }
-
- #[bench]
- fn lock_unlock(b: &mut Bencher) {
- let pool = LocalPool::new();
- let mut exec = pool.executor();
- let waker = notify_noop();
- let mut map = task::LocalMap::new();
- let mut waker = task::Context::new(&mut map, &waker, &mut exec);
-
- b.iter(|| {
- let (x, y) = BiLock::new(1);
-
- let mut x = LockStream::new(x);
- let mut y = LockStream::new(y);
-
- for _ in 0..1000 {
- let x_guard = match x.poll_next(&mut waker) {
- Ok(Poll::Ready(Some(guard))) => guard,
- _ => panic!(),
- };
-
- x.release_lock(x_guard);
-
- let y_guard = match y.poll_next(&mut waker) {
- Ok(Poll::Ready(Some(guard))) => guard,
- _ => panic!(),
- };
-
- y.release_lock(y_guard);
- }
- (x, y)
- })
- }
-}
diff --git a/vendor/futures-util/build.rs b/vendor/futures-util/build.rs
index 07b50bd55..05e0496d9 100644
--- a/vendor/futures-util/build.rs
+++ b/vendor/futures-util/build.rs
@@ -1,9 +1,3 @@
-#![warn(rust_2018_idioms, single_use_lifetimes)]
-
-use std::env;
-
-include!("no_atomic_cas.rs");
-
// The rustc-cfg listed below are considered public API, but it is *unstable*
// and outside of the normal semver guarantees:
//
@@ -13,10 +7,15 @@ include!("no_atomic_cas.rs");
// need to enable it manually when building for custom targets or using
// non-cargo build systems that don't run the build script.
//
-// With the exceptions mentioned above, the rustc-cfg strings below are
-// *not* public API. Please let us know by opening a GitHub issue if your build
-// environment requires some way to enable these cfgs other than by executing
-// our build script.
+// With the exceptions mentioned above, the rustc-cfg emitted by the build
+// script are *not* public API.
+
+#![warn(rust_2018_idioms, single_use_lifetimes)]
+
+use std::env;
+
+include!("no_atomic_cas.rs");
+
fn main() {
let target = match env::var("TARGET") {
Ok(target) => target,
@@ -34,7 +33,7 @@ fn main() {
// `cfg(target_has_atomic = "ptr")` as true when the build script doesn't
// run. This is needed for compatibility with non-cargo build systems that
// don't run the build script.
- if NO_ATOMIC_CAS_TARGETS.contains(&&*target) {
+ if NO_ATOMIC_CAS.contains(&&*target) {
println!("cargo:rustc-cfg=futures_no_atomic_cas");
}
diff --git a/vendor/futures-util/no_atomic_cas.rs b/vendor/futures-util/no_atomic_cas.rs
index 4708bf853..16ec628cd 100644
--- a/vendor/futures-util/no_atomic_cas.rs
+++ b/vendor/futures-util/no_atomic_cas.rs
@@ -1,13 +1,17 @@
// This file is @generated by no_atomic_cas.sh.
// It is not intended for manual editing.
-const NO_ATOMIC_CAS_TARGETS: &[&str] = &[
+const NO_ATOMIC_CAS: &[&str] = &[
+ "armv4t-none-eabi",
+ "armv5te-none-eabi",
"avr-unknown-gnu-atmega328",
"bpfeb-unknown-none",
"bpfel-unknown-none",
"msp430-none-elf",
"riscv32i-unknown-none-elf",
+ "riscv32im-unknown-none-elf",
"riscv32imc-unknown-none-elf",
"thumbv4t-none-eabi",
+ "thumbv5te-none-eabi",
"thumbv6m-none-eabi",
];
diff --git a/vendor/futures-util/src/abortable.rs b/vendor/futures-util/src/abortable.rs
index bb82dd0db..9dbcfc2b5 100644
--- a/vendor/futures-util/src/abortable.rs
+++ b/vendor/futures-util/src/abortable.rs
@@ -75,7 +75,18 @@ impl<T> Abortable<T> {
/// in calls to `Abortable::new`.
#[derive(Debug)]
pub struct AbortRegistration {
- inner: Arc<AbortInner>,
+ pub(crate) inner: Arc<AbortInner>,
+}
+
+impl AbortRegistration {
+ /// Create an [`AbortHandle`] from the given [`AbortRegistration`].
+ ///
+ /// The created [`AbortHandle`] is functionally the same as any other
+ /// [`AbortHandle`]s that are associated with the same [`AbortRegistration`],
+ /// such as the one created by [`AbortHandle::new_pair`].
+ pub fn handle(&self) -> AbortHandle {
+ AbortHandle { inner: self.inner.clone() }
+ }
}
/// A handle to an `Abortable` task.
@@ -100,9 +111,9 @@ impl AbortHandle {
// Inner type storing the waker to awaken and a bool indicating that it
// should be aborted.
#[derive(Debug)]
-struct AbortInner {
- waker: AtomicWaker,
- aborted: AtomicBool,
+pub(crate) struct AbortInner {
+ pub(crate) waker: AtomicWaker,
+ pub(crate) aborted: AtomicBool,
}
/// Indicator that the `Abortable` task was aborted.
@@ -182,4 +193,17 @@ impl AbortHandle {
self.inner.aborted.store(true, Ordering::Relaxed);
self.inner.waker.wake();
}
+
+ /// Checks whether [`AbortHandle::abort`] was *called* on any associated
+ /// [`AbortHandle`]s, which includes all the [`AbortHandle`]s linked with
+ /// the same [`AbortRegistration`]. This means that it will return `true`
+ /// even if:
+ /// * `abort` was called after the task had completed.
+ /// * `abort` was called while the task was being polled - the task may still be running and
+ /// will not be stopped until `poll` returns.
+ ///
+ /// This operation has a Relaxed ordering.
+ pub fn is_aborted(&self) -> bool {
+ self.inner.aborted.load(Ordering::Relaxed)
+ }
}
diff --git a/vendor/futures-util/src/compat/compat01as03.rs b/vendor/futures-util/src/compat/compat01as03.rs
index 754e3d82a..36de1da98 100644
--- a/vendor/futures-util/src/compat/compat01as03.rs
+++ b/vendor/futures-util/src/compat/compat01as03.rs
@@ -64,6 +64,7 @@ pub trait Future01CompatExt: Future01 {
/// [`Future<Output = Result<T, E>>`](futures_core::future::Future).
///
/// ```
+ /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
/// # futures::executor::block_on(async {
/// # // TODO: These should be all using `futures::compat`, but that runs up against Cargo
/// # // feature issues
@@ -90,6 +91,7 @@ pub trait Stream01CompatExt: Stream01 {
/// [`Stream<Item = Result<T, E>>`](futures_core::stream::Stream).
///
/// ```
+ /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
/// # futures::executor::block_on(async {
/// use futures::stream::StreamExt;
/// use futures_util::compat::Stream01CompatExt;
@@ -119,6 +121,7 @@ pub trait Sink01CompatExt: Sink01 {
/// [`Sink<T, Error = E>`](futures_sink::Sink).
///
/// ```
+ /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
/// # futures::executor::block_on(async {
/// use futures::{sink::SinkExt, stream::StreamExt};
/// use futures_util::compat::{Stream01CompatExt, Sink01CompatExt};
@@ -362,6 +365,7 @@ mod io {
/// [`AsyncRead`](futures_io::AsyncRead).
///
/// ```
+ /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
/// # futures::executor::block_on(async {
/// use futures::io::AsyncReadExt;
/// use futures_util::compat::AsyncRead01CompatExt;
@@ -391,6 +395,7 @@ mod io {
/// [`AsyncWrite`](futures_io::AsyncWrite).
///
/// ```
+ /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514
/// # futures::executor::block_on(async {
/// use futures::io::AsyncWriteExt;
/// use futures_util::compat::AsyncWrite01CompatExt;
diff --git a/vendor/futures-util/src/compat/executor.rs b/vendor/futures-util/src/compat/executor.rs
index e25705be1..ea0c67a0a 100644
--- a/vendor/futures-util/src/compat/executor.rs
+++ b/vendor/futures-util/src/compat/executor.rs
@@ -17,6 +17,7 @@ pub trait Executor01CompatExt: Executor01<Executor01Future> + Clone + Send + 'st
/// futures 0.3 [`Spawn`](futures_task::Spawn).
///
/// ```
+ /// # if cfg!(miri) { return; } // Miri does not support epoll
/// use futures::task::SpawnExt;
/// use futures::future::{FutureExt, TryFutureExt};
/// use futures_util::compat::Executor01CompatExt;
diff --git a/vendor/futures-util/src/future/either.rs b/vendor/futures-util/src/future/either.rs
index 9602de7a4..27e5064df 100644
--- a/vendor/futures-util/src/future/either.rs
+++ b/vendor/futures-util/src/future/either.rs
@@ -33,11 +33,31 @@ pub enum Either<A, B> {
}
impl<A, B> Either<A, B> {
- fn project(self: Pin<&mut Self>) -> Either<Pin<&mut A>, Pin<&mut B>> {
+ /// Convert `Pin<&Either<A, B>>` to `Either<Pin<&A>, Pin<&B>>`,
+ /// pinned projections of the inner variants.
+ pub fn as_pin_ref(self: Pin<&Self>) -> Either<Pin<&A>, Pin<&B>> {
+ // SAFETY: We can use `new_unchecked` because the `inner` parts are
+ // guaranteed to be pinned, as they come from `self` which is pinned.
unsafe {
- match self.get_unchecked_mut() {
- Either::Left(a) => Either::Left(Pin::new_unchecked(a)),
- Either::Right(b) => Either::Right(Pin::new_unchecked(b)),
+ match *Pin::get_ref(self) {
+ Either::Left(ref inner) => Either::Left(Pin::new_unchecked(inner)),
+ Either::Right(ref inner) => Either::Right(Pin::new_unchecked(inner)),
+ }
+ }
+ }
+
+ /// Convert `Pin<&mut Either<A, B>>` to `Either<Pin<&mut A>, Pin<&mut B>>`,
+ /// pinned projections of the inner variants.
+ pub fn as_pin_mut(self: Pin<&mut Self>) -> Either<Pin<&mut A>, Pin<&mut B>> {
+ // SAFETY: `get_unchecked_mut` is fine because we don't move anything.
+ // We can use `new_unchecked` because the `inner` parts are guaranteed
+ // to be pinned, as they come from `self` which is pinned, and we never
+ // offer an unpinned `&mut A` or `&mut B` through `Pin<&mut Self>`. We
+ // also don't have an implementation of `Drop`, nor manual `Unpin`.
+ unsafe {
+ match *Pin::get_unchecked_mut(self) {
+ Either::Left(ref mut inner) => Either::Left(Pin::new_unchecked(inner)),
+ Either::Right(ref mut inner) => Either::Right(Pin::new_unchecked(inner)),
}
}
}
@@ -85,7 +105,7 @@ where
type Output = A::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll(cx),
Either::Right(x) => x.poll(cx),
}
@@ -113,7 +133,7 @@ where
type Item = A::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_next(cx),
Either::Right(x) => x.poll_next(cx),
}
@@ -149,28 +169,28 @@ where
type Error = A::Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_ready(cx),
Either::Right(x) => x.poll_ready(cx),
}
}
fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.start_send(item),
Either::Right(x) => x.start_send(item),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_flush(cx),
Either::Right(x) => x.poll_flush(cx),
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_close(cx),
Either::Right(x) => x.poll_close(cx),
}
@@ -198,7 +218,7 @@ mod if_std {
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_read(cx, buf),
Either::Right(x) => x.poll_read(cx, buf),
}
@@ -209,7 +229,7 @@ mod if_std {
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<Result<usize>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_read_vectored(cx, bufs),
Either::Right(x) => x.poll_read_vectored(cx, bufs),
}
@@ -226,7 +246,7 @@ mod if_std {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_write(cx, buf),
Either::Right(x) => x.poll_write(cx, buf),
}
@@ -237,21 +257,21 @@ mod if_std {
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_write_vectored(cx, bufs),
Either::Right(x) => x.poll_write_vectored(cx, bufs),
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_flush(cx),
Either::Right(x) => x.poll_flush(cx),
}
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_close(cx),
Either::Right(x) => x.poll_close(cx),
}
@@ -268,7 +288,7 @@ mod if_std {
cx: &mut Context<'_>,
pos: SeekFrom,
) -> Poll<Result<u64>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_seek(cx, pos),
Either::Right(x) => x.poll_seek(cx, pos),
}
@@ -281,14 +301,14 @@ mod if_std {
B: AsyncBufRead,
{
fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.poll_fill_buf(cx),
Either::Right(x) => x.poll_fill_buf(cx),
}
}
fn consume(self: Pin<&mut Self>, amt: usize) {
- match self.project() {
+ match self.as_pin_mut() {
Either::Left(x) => x.consume(amt),
Either::Right(x) => x.consume(amt),
}
diff --git a/vendor/futures-util/src/future/future/fuse.rs b/vendor/futures-util/src/future/future/fuse.rs
index 597aec1a4..225790672 100644
--- a/vendor/futures-util/src/future/future/fuse.rs
+++ b/vendor/futures-util/src/future/future/fuse.rs
@@ -1,6 +1,5 @@
use core::pin::Pin;
use futures_core::future::{FusedFuture, Future};
-use futures_core::ready;
use futures_core::task::{Context, Poll};
use pin_project_lite::pin_project;
@@ -81,13 +80,12 @@ impl<Fut: Future> Future for Fuse<Fut> {
type Output = Fut::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> {
- Poll::Ready(match self.as_mut().project().inner.as_pin_mut() {
- Some(fut) => {
- let output = ready!(fut.poll(cx));
+ match self.as_mut().project().inner.as_pin_mut() {
+ Some(fut) => fut.poll(cx).map(|output| {
self.project().inner.set(None);
output
- }
- None => return Poll::Pending,
- })
+ }),
+ None => Poll::Pending,
+ }
}
}
diff --git a/vendor/futures-util/src/future/future/shared.rs b/vendor/futures-util/src/future/future/shared.rs
index 9b31932fe..ecd1b426d 100644
--- a/vendor/futures-util/src/future/future/shared.rs
+++ b/vendor/futures-util/src/future/future/shared.rs
@@ -4,7 +4,9 @@ use futures_core::task::{Context, Poll, Waker};
use slab::Slab;
use std::cell::UnsafeCell;
use std::fmt;
+use std::hash::Hasher;
use std::pin::Pin;
+use std::ptr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::{Acquire, SeqCst};
use std::sync::{Arc, Mutex, Weak};
@@ -103,7 +105,6 @@ impl<Fut: Future> Shared<Fut> {
impl<Fut> Shared<Fut>
where
Fut: Future,
- Fut::Output: Clone,
{
/// Returns [`Some`] containing a reference to this [`Shared`]'s output if
/// it has already been computed by a clone or [`None`] if it hasn't been
@@ -139,6 +140,7 @@ where
/// This method by itself is safe, but using it correctly requires extra care. Another thread
/// can change the strong count at any time, including potentially between calling this method
/// and acting on the result.
+ #[allow(clippy::unnecessary_safety_doc)]
pub fn strong_count(&self) -> Option<usize> {
self.inner.as_ref().map(|arc| Arc::strong_count(arc))
}
@@ -152,15 +154,44 @@ where
/// This method by itself is safe, but using it correctly requires extra care. Another thread
/// can change the weak count at any time, including potentially between calling this method
/// and acting on the result.
+ #[allow(clippy::unnecessary_safety_doc)]
pub fn weak_count(&self) -> Option<usize> {
self.inner.as_ref().map(|arc| Arc::weak_count(arc))
}
+
+ /// Hashes the internal state of this `Shared` in a way that's compatible with `ptr_eq`.
+ pub fn ptr_hash<H: Hasher>(&self, state: &mut H) {
+ match self.inner.as_ref() {
+ Some(arc) => {
+ state.write_u8(1);
+ ptr::hash(Arc::as_ptr(arc), state);
+ }
+ None => {
+ state.write_u8(0);
+ }
+ }
+ }
+
+ /// Returns `true` if the two `Shared`s point to the same future (in a vein similar to
+ /// `Arc::ptr_eq`).
+ ///
+ /// Returns `false` if either `Shared` has terminated.
+ pub fn ptr_eq(&self, rhs: &Self) -> bool {
+ let lhs = match self.inner.as_ref() {
+ Some(lhs) => lhs,
+ None => return false,
+ };
+ let rhs = match rhs.inner.as_ref() {
+ Some(rhs) => rhs,
+ None => return false,
+ };
+ Arc::ptr_eq(lhs, rhs)
+ }
}
impl<Fut> Inner<Fut>
where
Fut: Future,
- Fut::Output: Clone,
{
/// Safety: callers must first ensure that `self.inner.state`
/// is `COMPLETE`
@@ -170,6 +201,13 @@ where
FutureOrOutput::Future(_) => unreachable!(),
}
}
+}
+
+impl<Fut> Inner<Fut>
+where
+ Fut: Future,
+ Fut::Output: Clone,
+{
/// Registers the current task to receive a wakeup when we are awoken.
fn record_waker(&self, waker_key: &mut usize, cx: &mut Context<'_>) {
let mut wakers_guard = self.notifier.wakers.lock().unwrap();
@@ -262,19 +300,20 @@ where
let waker = waker_ref(&inner.notifier);
let mut cx = Context::from_waker(&waker);
- struct Reset<'a>(&'a AtomicUsize);
+ struct Reset<'a> {
+ state: &'a AtomicUsize,
+ did_not_panic: bool,
+ }
impl Drop for Reset<'_> {
fn drop(&mut self) {
- use std::thread;
-
- if thread::panicking() {
- self.0.store(POISONED, SeqCst);
+ if !self.did_not_panic {
+ self.state.store(POISONED, SeqCst);
}
}
}
- let _reset = Reset(&inner.notifier.state);
+ let mut reset = Reset { state: &inner.notifier.state, did_not_panic: false };
let output = {
let future = unsafe {
@@ -284,12 +323,15 @@ where
}
};
- match future.poll(&mut cx) {
+ let poll_result = future.poll(&mut cx);
+ reset.did_not_panic = true;
+
+ match poll_result {
Poll::Pending => {
if inner.notifier.state.compare_exchange(POLLING, IDLE, SeqCst, SeqCst).is_ok()
{
// Success
- drop(_reset);
+ drop(reset);
this.inner = Some(inner);
return Poll::Pending;
} else {
@@ -313,7 +355,7 @@ where
waker.wake();
}
- drop(_reset); // Make borrow checker happy
+ drop(reset); // Make borrow checker happy
drop(wakers_guard);
// Safety: We're in the COMPLETE state
diff --git a/vendor/futures-util/src/future/join_all.rs b/vendor/futures-util/src/future/join_all.rs
index 2e52ac17f..7dc159ba0 100644
--- a/vendor/futures-util/src/future/join_all.rs
+++ b/vendor/futures-util/src/future/join_all.rs
@@ -15,7 +15,7 @@ use super::{assert_future, MaybeDone};
#[cfg(not(futures_no_atomic_cas))]
use crate::stream::{Collect, FuturesOrdered, StreamExt};
-fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
+pub(crate) fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
// Safety: `std` _could_ make this unsound if it were to decide Pin's
// invariants aren't required to transmit through slices. Otherwise this has
// the same safety as a normal field pin projection.
@@ -32,9 +32,9 @@ where
}
#[cfg(not(futures_no_atomic_cas))]
-const SMALL: usize = 30;
+pub(crate) const SMALL: usize = 30;
-pub(crate) enum JoinAllKind<F>
+enum JoinAllKind<F>
where
F: Future,
{
@@ -104,26 +104,25 @@ where
I: IntoIterator,
I::Item: Future,
{
+ let iter = iter.into_iter();
+
#[cfg(futures_no_atomic_cas)]
{
- let elems = iter.into_iter().map(MaybeDone::Future).collect::<Box<[_]>>().into();
- let kind = JoinAllKind::Small { elems };
+ let kind =
+ JoinAllKind::Small { elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into() };
+
assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
}
+
#[cfg(not(futures_no_atomic_cas))]
{
- let iter = iter.into_iter();
let kind = match iter.size_hint().1 {
- None => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() },
- Some(max) => {
- if max <= SMALL {
- let elems = iter.map(MaybeDone::Future).collect::<Box<[_]>>().into();
- JoinAllKind::Small { elems }
- } else {
- JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() }
- }
- }
+ Some(max) if max <= SMALL => JoinAllKind::Small {
+ elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into(),
+ },
+ _ => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() },
};
+
assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind })
}
}
diff --git a/vendor/futures-util/src/future/pending.rs b/vendor/futures-util/src/future/pending.rs
index 92c78d52b..b8e28686e 100644
--- a/vendor/futures-util/src/future/pending.rs
+++ b/vendor/futures-util/src/future/pending.rs
@@ -33,6 +33,7 @@ impl<T> FusedFuture for Pending<T> {
/// unreachable!();
/// # });
/// ```
+#[cfg_attr(docsrs, doc(alias = "never"))]
pub fn pending<T>() -> Pending<T> {
assert_future::<T, _>(Pending { _data: marker::PhantomData })
}
diff --git a/vendor/futures-util/src/future/select.rs b/vendor/futures-util/src/future/select.rs
index bd44f20f7..7e33d195f 100644
--- a/vendor/futures-util/src/future/select.rs
+++ b/vendor/futures-util/src/future/select.rs
@@ -99,17 +99,27 @@ where
type Output = Either<(A::Output, B), (B::Output, A)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
- match a.poll_unpin(cx) {
- Poll::Ready(x) => Poll::Ready(Either::Left((x, b))),
- Poll::Pending => match b.poll_unpin(cx) {
- Poll::Ready(x) => Poll::Ready(Either::Right((x, a))),
- Poll::Pending => {
- self.inner = Some((a, b));
- Poll::Pending
- }
- },
+ /// When compiled with `-C opt-level=z`, this function will help the compiler eliminate the `None` branch, where
+ /// `Option::unwrap` does not.
+ #[inline(always)]
+ fn unwrap_option<T>(value: Option<T>) -> T {
+ match value {
+ None => unreachable!(),
+ Some(value) => value,
+ }
}
+
+ let (a, b) = self.inner.as_mut().expect("cannot poll Select twice");
+
+ if let Poll::Ready(val) = a.poll_unpin(cx) {
+ return Poll::Ready(Either::Left((val, unwrap_option(self.inner.take()).1)));
+ }
+
+ if let Poll::Ready(val) = b.poll_unpin(cx) {
+ return Poll::Ready(Either::Right((val, unwrap_option(self.inner.take()).0)));
+ }
+
+ Poll::Pending
}
}
diff --git a/vendor/futures-util/src/future/select_all.rs b/vendor/futures-util/src/future/select_all.rs
index 106e50844..0a51d0da6 100644
--- a/vendor/futures-util/src/future/select_all.rs
+++ b/vendor/futures-util/src/future/select_all.rs
@@ -58,8 +58,9 @@ impl<Fut: Future + Unpin> Future for SelectAll<Fut> {
});
match item {
Some((idx, res)) => {
+ #[allow(clippy::let_underscore_future)]
let _ = self.inner.swap_remove(idx);
- let rest = mem::replace(&mut self.inner, Vec::new());
+ let rest = mem::take(&mut self.inner);
Poll::Ready((res, idx, rest))
}
None => Poll::Pending,
diff --git a/vendor/futures-util/src/future/select_ok.rs b/vendor/futures-util/src/future/select_ok.rs
index 0ad83c6db..5d5579930 100644
--- a/vendor/futures-util/src/future/select_ok.rs
+++ b/vendor/futures-util/src/future/select_ok.rs
@@ -59,7 +59,7 @@ impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> {
drop(self.inner.remove(idx));
match res {
Ok(e) => {
- let rest = mem::replace(&mut self.inner, Vec::new());
+ let rest = mem::take(&mut self.inner);
return Poll::Ready(Ok((e, rest)));
}
Err(e) => {
diff --git a/vendor/futures-util/src/future/try_future/mod.rs b/vendor/futures-util/src/future/try_future/mod.rs
index fb3bdd8a0..e5bc70071 100644
--- a/vendor/futures-util/src/future/try_future/mod.rs
+++ b/vendor/futures-util/src/future/try_future/mod.rs
@@ -302,6 +302,9 @@ pub trait TryFutureExt: TryFuture {
/// assert_eq!(future.await, Ok(1));
/// # });
/// ```
+ ///
+ /// [`join!`]: crate::join
+ /// [`select!`]: crate::select
fn map_err<E, F>(self, f: F) -> MapErr<Self, F>
where
F: FnOnce(Self::Error) -> E,
@@ -332,6 +335,9 @@ pub trait TryFutureExt: TryFuture {
/// let future_err_i32 = future_err_u8.err_into::<i32>();
/// # });
/// ```
+ ///
+ /// [`join!`]: crate::join
+ /// [`select!`]: crate::select
fn err_into<E>(self) -> ErrInto<Self, E>
where
Self: Sized,
diff --git a/vendor/futures-util/src/future/try_join_all.rs b/vendor/futures-util/src/future/try_join_all.rs
index 29244af83..506f45065 100644
--- a/vendor/futures-util/src/future/try_join_all.rs
+++ b/vendor/futures-util/src/future/try_join_all.rs
@@ -10,14 +10,11 @@ use core::mem;
use core::pin::Pin;
use core::task::{Context, Poll};
-use super::{assert_future, TryFuture, TryMaybeDone};
+use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone};
-fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> {
- // Safety: `std` _could_ make this unsound if it were to decide Pin's
- // invariants aren't required to transmit through slices. Otherwise this has
- // the same safety as a normal field pin projection.
- unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) })
-}
+#[cfg(not(futures_no_atomic_cas))]
+use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt};
+use crate::TryFutureExt;
enum FinalState<E = ()> {
Pending,
@@ -31,7 +28,20 @@ pub struct TryJoinAll<F>
where
F: TryFuture,
{
- elems: Pin<Box<[TryMaybeDone<F>]>>,
+ kind: TryJoinAllKind<F>,
+}
+
+enum TryJoinAllKind<F>
+where
+ F: TryFuture,
+{
+ Small {
+ elems: Pin<Box<[TryMaybeDone<IntoFuture<F>>]>>,
+ },
+ #[cfg(not(futures_no_atomic_cas))]
+ Big {
+ fut: TryCollect<FuturesOrdered<IntoFuture<F>>, Vec<F::Ok>>,
+ },
}
impl<F> fmt::Debug for TryJoinAll<F>
@@ -39,9 +49,16 @@ where
F: TryFuture + fmt::Debug,
F::Ok: fmt::Debug,
F::Error: fmt::Debug,
+ F::Output: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("TryJoinAll").field("elems", &self.elems).finish()
+ match self.kind {
+ TryJoinAllKind::Small { ref elems } => {
+ f.debug_struct("TryJoinAll").field("elems", elems).finish()
+ }
+ #[cfg(not(futures_no_atomic_cas))]
+ TryJoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f),
+ }
}
}
@@ -60,6 +77,20 @@ where
/// This function is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
///
+/// # See Also
+///
+/// `try_join_all` will switch to the more powerful [`FuturesOrdered`] for performance
+/// reasons if the number of futures is large. You may want to look into using it or
+/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly.
+///
+/// Some examples for additional functionality provided by these are:
+///
+/// * Adding new futures to the set even after it has been started.
+///
+/// * Only polling the specific futures that have been woken. In cases where
+/// you have a lot of futures this will result in much more efficient polling.
+///
+///
/// # Examples
///
/// ```
@@ -83,15 +114,37 @@ where
/// assert_eq!(try_join_all(futures).await, Err(2));
/// # });
/// ```
-pub fn try_join_all<I>(i: I) -> TryJoinAll<I::Item>
+pub fn try_join_all<I>(iter: I) -> TryJoinAll<I::Item>
where
I: IntoIterator,
I::Item: TryFuture,
{
- let elems: Box<[_]> = i.into_iter().map(TryMaybeDone::Future).collect();
- assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(
- TryJoinAll { elems: elems.into() },
- )
+ let iter = iter.into_iter().map(TryFutureExt::into_future);
+
+ #[cfg(futures_no_atomic_cas)]
+ {
+ let kind = TryJoinAllKind::Small {
+ elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(),
+ };
+
+ assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(
+ TryJoinAll { kind },
+ )
+ }
+
+ #[cfg(not(futures_no_atomic_cas))]
+ {
+ let kind = match iter.size_hint().1 {
+ Some(max) if max <= join_all::SMALL => TryJoinAllKind::Small {
+ elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(),
+ },
+ _ => TryJoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().try_collect() },
+ };
+
+ assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>(
+ TryJoinAll { kind },
+ )
+ }
}
impl<F> Future for TryJoinAll<F>
@@ -101,36 +154,46 @@ where
type Output = Result<Vec<F::Ok>, F::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
- let mut state = FinalState::AllDone;
-
- for elem in iter_pin_mut(self.elems.as_mut()) {
- match elem.try_poll(cx) {
- Poll::Pending => state = FinalState::Pending,
- Poll::Ready(Ok(())) => {}
- Poll::Ready(Err(e)) => {
- state = FinalState::Error(e);
- break;
+ match &mut self.kind {
+ TryJoinAllKind::Small { elems } => {
+ let mut state = FinalState::AllDone;
+
+ for elem in join_all::iter_pin_mut(elems.as_mut()) {
+ match elem.try_poll(cx) {
+ Poll::Pending => state = FinalState::Pending,
+ Poll::Ready(Ok(())) => {}
+ Poll::Ready(Err(e)) => {
+ state = FinalState::Error(e);
+ break;
+ }
+ }
}
- }
- }
- match state {
- FinalState::Pending => Poll::Pending,
- FinalState::AllDone => {
- let mut elems = mem::replace(&mut self.elems, Box::pin([]));
- let results =
- iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect();
- Poll::Ready(Ok(results))
- }
- FinalState::Error(e) => {
- let _ = mem::replace(&mut self.elems, Box::pin([]));
- Poll::Ready(Err(e))
+ match state {
+ FinalState::Pending => Poll::Pending,
+ FinalState::AllDone => {
+ let mut elems = mem::replace(elems, Box::pin([]));
+ let results = join_all::iter_pin_mut(elems.as_mut())
+ .map(|e| e.take_output().unwrap())
+ .collect();
+ Poll::Ready(Ok(results))
+ }
+ FinalState::Error(e) => {
+ let _ = mem::replace(elems, Box::pin([]));
+ Poll::Ready(Err(e))
+ }
+ }
}
+ #[cfg(not(futures_no_atomic_cas))]
+ TryJoinAllKind::Big { fut } => Pin::new(fut).poll(cx),
}
}
}
-impl<F: TryFuture> FromIterator<F> for TryJoinAll<F> {
+impl<F> FromIterator<F> for TryJoinAll<F>
+where
+ F: TryFuture,
+{
fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
try_join_all(iter)
}
diff --git a/vendor/futures-util/src/future/try_select.rs b/vendor/futures-util/src/future/try_select.rs
index 4d0b7ff13..bc282f7db 100644
--- a/vendor/futures-util/src/future/try_select.rs
+++ b/vendor/futures-util/src/future/try_select.rs
@@ -12,6 +12,9 @@ pub struct TrySelect<A, B> {
impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {}
+type EitherOk<A, B> = Either<(<A as TryFuture>::Ok, B), (<B as TryFuture>::Ok, A)>;
+type EitherErr<A, B> = Either<(<A as TryFuture>::Error, B), (<B as TryFuture>::Error, A)>;
+
/// Waits for either one of two differently-typed futures to complete.
///
/// This function will return a new future which awaits for either one of both
@@ -52,10 +55,9 @@ where
A: TryFuture + Unpin,
B: TryFuture + Unpin,
{
- super::assert_future::<
- Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>,
- _,
- >(TrySelect { inner: Some((future1, future2)) })
+ super::assert_future::<Result<EitherOk<A, B>, EitherErr<A, B>>, _>(TrySelect {
+ inner: Some((future1, future2)),
+ })
}
impl<A: Unpin, B: Unpin> Future for TrySelect<A, B>
@@ -63,8 +65,7 @@ where
A: TryFuture,
B: TryFuture,
{
- #[allow(clippy::type_complexity)]
- type Output = Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>;
+ type Output = Result<EitherOk<A, B>, EitherErr<A, B>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice");
diff --git a/vendor/futures-util/src/io/copy_buf_abortable.rs b/vendor/futures-util/src/io/copy_buf_abortable.rs
new file mode 100644
index 000000000..fdbc4a5f0
--- /dev/null
+++ b/vendor/futures-util/src/io/copy_buf_abortable.rs
@@ -0,0 +1,124 @@
+use crate::abortable::{AbortHandle, AbortInner, Aborted};
+use futures_core::future::Future;
+use futures_core::task::{Context, Poll};
+use futures_io::{AsyncBufRead, AsyncWrite};
+use pin_project_lite::pin_project;
+use std::io;
+use std::pin::Pin;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+
+/// Creates a future which copies all the bytes from one object to another, with its `AbortHandle`.
+///
+/// The returned future will copy all the bytes read from this `AsyncBufRead` into the
+/// `writer` specified. This future will only complete once abort has been requested or the `reader` has hit
+/// EOF and all bytes have been written to and flushed from the `writer`
+/// provided.
+///
+/// On success the number of bytes is returned. If aborted, `Aborted` is returned. Otherwise, the underlying error is returned.
+///
+/// # Examples
+///
+/// ```
+/// # futures::executor::block_on(async {
+/// use futures::io::{self, AsyncWriteExt, Cursor};
+/// use futures::future::Aborted;
+///
+/// let reader = Cursor::new([1, 2, 3, 4]);
+/// let mut writer = Cursor::new(vec![0u8; 5]);
+///
+/// let (fut, abort_handle) = io::copy_buf_abortable(reader, &mut writer);
+/// let bytes = fut.await;
+/// abort_handle.abort();
+/// writer.close().await.unwrap();
+/// match bytes {
+/// Ok(Ok(n)) => {
+/// assert_eq!(n, 4);
+/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
+/// Ok(n)
+/// },
+/// Ok(Err(a)) => {
+/// Err::<u64, Aborted>(a)
+/// }
+/// Err(e) => panic!("{}", e)
+/// }
+/// # }).unwrap();
+/// ```
+pub fn copy_buf_abortable<R, W>(
+ reader: R,
+ writer: &mut W,
+) -> (CopyBufAbortable<'_, R, W>, AbortHandle)
+where
+ R: AsyncBufRead,
+ W: AsyncWrite + Unpin + ?Sized,
+{
+ let (handle, reg) = AbortHandle::new_pair();
+ (CopyBufAbortable { reader, writer, amt: 0, inner: reg.inner }, handle)
+}
+
+pin_project! {
+ /// Future for the [`copy_buf()`] function.
+ #[derive(Debug)]
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ pub struct CopyBufAbortable<'a, R, W: ?Sized> {
+ #[pin]
+ reader: R,
+ writer: &'a mut W,
+ amt: u64,
+ inner: Arc<AbortInner>
+ }
+}
+
+macro_rules! ready_or_break {
+ ($e:expr $(,)?) => {
+ match $e {
+ $crate::task::Poll::Ready(t) => t,
+ $crate::task::Poll::Pending => break,
+ }
+ };
+}
+
+impl<R, W> Future for CopyBufAbortable<'_, R, W>
+where
+ R: AsyncBufRead,
+ W: AsyncWrite + Unpin + Sized,
+{
+ type Output = Result<Result<u64, Aborted>, io::Error>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut this = self.project();
+ loop {
+ // Check if the task has been aborted
+ if this.inner.aborted.load(Ordering::Relaxed) {
+ return Poll::Ready(Ok(Err(Aborted)));
+ }
+
+ // Read some bytes from the reader, and if we have reached EOF, return total bytes read
+ let buffer = ready_or_break!(this.reader.as_mut().poll_fill_buf(cx))?;
+ if buffer.is_empty() {
+ ready_or_break!(Pin::new(&mut this.writer).poll_flush(cx))?;
+ return Poll::Ready(Ok(Ok(*this.amt)));
+ }
+
+ // Pass the buffer to the writer, and update the amount written
+ let i = ready_or_break!(Pin::new(&mut this.writer).poll_write(cx, buffer))?;
+ if i == 0 {
+ return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
+ }
+ *this.amt += i as u64;
+ this.reader.as_mut().consume(i);
+ }
+ // Schedule the task to be woken up again.
+ // Never called unless Poll::Pending is returned from io objects.
+ this.inner.waker.register(cx.waker());
+
+ // Check to see if the task was aborted between the first check and
+ // registration.
+ // Checking with `Relaxed` is sufficient because
+ // `register` introduces an `AcqRel` barrier.
+ if this.inner.aborted.load(Ordering::Relaxed) {
+ return Poll::Ready(Ok(Err(Aborted)));
+ }
+ Poll::Pending
+ }
+}
diff --git a/vendor/futures-util/src/io/cursor.rs b/vendor/futures-util/src/io/cursor.rs
index b6fb3724c..c6e2aeea2 100644
--- a/vendor/futures-util/src/io/cursor.rs
+++ b/vendor/futures-util/src/io/cursor.rs
@@ -1,6 +1,4 @@
use futures_core::task::{Context, Poll};
-#[cfg(feature = "read_initializer")]
-use futures_io::Initializer;
use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom};
use std::io;
use std::pin::Pin;
@@ -159,12 +157,6 @@ where
}
impl<T: AsRef<[u8]> + Unpin> AsyncRead for Cursor<T> {
- #[cfg(feature = "read_initializer")]
- #[inline]
- unsafe fn initializer(&self) -> Initializer {
- io::Read::initializer(&self.inner)
- }
-
fn poll_read(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
diff --git a/vendor/futures-util/src/io/lines.rs b/vendor/futures-util/src/io/lines.rs
index 13e70df23..b5561bfa7 100644
--- a/vendor/futures-util/src/io/lines.rs
+++ b/vendor/futures-util/src/io/lines.rs
@@ -42,6 +42,6 @@ impl<R: AsyncBufRead> Stream for Lines<R> {
this.buf.pop();
}
}
- Poll::Ready(Some(Ok(mem::replace(this.buf, String::new()))))
+ Poll::Ready(Some(Ok(mem::take(this.buf))))
}
}
diff --git a/vendor/futures-util/src/io/mod.rs b/vendor/futures-util/src/io/mod.rs
index 4dd2e029b..8ce3ad644 100644
--- a/vendor/futures-util/src/io/mod.rs
+++ b/vendor/futures-util/src/io/mod.rs
@@ -66,6 +66,9 @@ pub use self::copy::{copy, Copy};
mod copy_buf;
pub use self::copy_buf::{copy_buf, CopyBuf};
+mod copy_buf_abortable;
+pub use self::copy_buf_abortable::{copy_buf_abortable, CopyBufAbortable};
+
mod cursor;
pub use self::cursor::Cursor;
diff --git a/vendor/futures-util/src/io/read_exact.rs b/vendor/futures-util/src/io/read_exact.rs
index 02e38c35b..cd0b20e59 100644
--- a/vendor/futures-util/src/io/read_exact.rs
+++ b/vendor/futures-util/src/io/read_exact.rs
@@ -30,7 +30,7 @@ impl<R: AsyncRead + ?Sized + Unpin> Future for ReadExact<'_, R> {
while !this.buf.is_empty() {
let n = ready!(Pin::new(&mut this.reader).poll_read(cx, this.buf))?;
{
- let (_, rest) = mem::replace(&mut this.buf, &mut []).split_at_mut(n);
+ let (_, rest) = mem::take(&mut this.buf).split_at_mut(n);
this.buf = rest;
}
if n == 0 {
diff --git a/vendor/futures-util/src/io/read_line.rs b/vendor/futures-util/src/io/read_line.rs
index c75af9471..e1b8fc945 100644
--- a/vendor/futures-util/src/io/read_line.rs
+++ b/vendor/futures-util/src/io/read_line.rs
@@ -22,7 +22,7 @@ impl<R: ?Sized + Unpin> Unpin for ReadLine<'_, R> {}
impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> {
pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self {
- Self { reader, bytes: mem::replace(buf, String::new()).into_bytes(), buf, read: 0 }
+ Self { reader, bytes: mem::take(buf).into_bytes(), buf, read: 0 }
}
}
diff --git a/vendor/futures-util/src/io/read_to_string.rs b/vendor/futures-util/src/io/read_to_string.rs
index 457af59e4..c175396d8 100644
--- a/vendor/futures-util/src/io/read_to_string.rs
+++ b/vendor/futures-util/src/io/read_to_string.rs
@@ -22,7 +22,7 @@ impl<R: ?Sized + Unpin> Unpin for ReadToString<'_, R> {}
impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToString<'a, R> {
pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self {
let start_len = buf.len();
- Self { reader, bytes: mem::replace(buf, String::new()).into_bytes(), buf, start_len }
+ Self { reader, bytes: mem::take(buf).into_bytes(), buf, start_len }
}
}
diff --git a/vendor/futures-util/src/io/write_all.rs b/vendor/futures-util/src/io/write_all.rs
index b134bf1b2..08c025f94 100644
--- a/vendor/futures-util/src/io/write_all.rs
+++ b/vendor/futures-util/src/io/write_all.rs
@@ -30,7 +30,7 @@ impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteAll<'_, W> {
while !this.buf.is_empty() {
let n = ready!(Pin::new(&mut this.writer).poll_write(cx, this.buf))?;
{
- let (_, rest) = mem::replace(&mut this.buf, &[]).split_at(n);
+ let (_, rest) = mem::take(&mut this.buf).split_at(n);
this.buf = rest;
}
if n == 0 {
diff --git a/vendor/futures-util/src/lock/bilock.rs b/vendor/futures-util/src/lock/bilock.rs
index 2f51ae7c9..7ddc66ad2 100644
--- a/vendor/futures-util/src/lock/bilock.rs
+++ b/vendor/futures-util/src/lock/bilock.rs
@@ -3,11 +3,11 @@
use alloc::boxed::Box;
use alloc::sync::Arc;
use core::cell::UnsafeCell;
-use core::fmt;
use core::ops::{Deref, DerefMut};
use core::pin::Pin;
-use core::sync::atomic::AtomicUsize;
+use core::sync::atomic::AtomicPtr;
use core::sync::atomic::Ordering::SeqCst;
+use core::{fmt, ptr};
#[cfg(feature = "bilock")]
use futures_core::future::Future;
use futures_core::task::{Context, Poll, Waker};
@@ -41,7 +41,7 @@ pub struct BiLock<T> {
#[derive(Debug)]
struct Inner<T> {
- state: AtomicUsize,
+ state: AtomicPtr<Waker>,
value: Option<UnsafeCell<T>>,
}
@@ -61,7 +61,10 @@ impl<T> BiLock<T> {
/// Similarly, reuniting the lock and extracting the inner value is only
/// possible when `T` is `Unpin`.
pub fn new(t: T) -> (Self, Self) {
- let arc = Arc::new(Inner { state: AtomicUsize::new(0), value: Some(UnsafeCell::new(t)) });
+ let arc = Arc::new(Inner {
+ state: AtomicPtr::new(ptr::null_mut()),
+ value: Some(UnsafeCell::new(t)),
+ });
(Self { arc: arc.clone() }, Self { arc })
}
@@ -87,7 +90,8 @@ impl<T> BiLock<T> {
pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<BiLockGuard<'_, T>> {
let mut waker = None;
loop {
- match self.arc.state.swap(1, SeqCst) {
+ let n = self.arc.state.swap(invalid_ptr(1), SeqCst);
+ match n as usize {
// Woohoo, we grabbed the lock!
0 => return Poll::Ready(BiLockGuard { bilock: self }),
@@ -96,8 +100,8 @@ impl<T> BiLock<T> {
// A task was previously blocked on this lock, likely our task,
// so we need to update that task.
- n => unsafe {
- let mut prev = Box::from_raw(n as *mut Waker);
+ _ => unsafe {
+ let mut prev = Box::from_raw(n);
*prev = cx.waker().clone();
waker = Some(prev);
},
@@ -105,9 +109,9 @@ impl<T> BiLock<T> {
// type ascription for safety's sake!
let me: Box<Waker> = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone()));
- let me = Box::into_raw(me) as usize;
+ let me = Box::into_raw(me);
- match self.arc.state.compare_exchange(1, me, SeqCst, SeqCst) {
+ match self.arc.state.compare_exchange(invalid_ptr(1), me, SeqCst, SeqCst) {
// The lock is still locked, but we've now parked ourselves, so
// just report that we're scheduled to receive a notification.
Ok(_) => return Poll::Pending,
@@ -115,8 +119,8 @@ impl<T> BiLock<T> {
// Oops, looks like the lock was unlocked after our swap above
// and before the compare_exchange. Deallocate what we just
// allocated and go through the loop again.
- Err(0) => unsafe {
- waker = Some(Box::from_raw(me as *mut Waker));
+ Err(n) if n.is_null() => unsafe {
+ waker = Some(Box::from_raw(me));
},
// The top of this loop set the previous state to 1, so if we
@@ -125,7 +129,7 @@ impl<T> BiLock<T> {
// but we're trying to acquire the lock and there's only one
// other reference of the lock, so it should be impossible for
// that task to ever block itself.
- Err(n) => panic!("invalid state: {}", n),
+ Err(n) => panic!("invalid state: {}", n as usize),
}
}
}
@@ -164,7 +168,8 @@ impl<T> BiLock<T> {
}
fn unlock(&self) {
- match self.arc.state.swap(0, SeqCst) {
+ let n = self.arc.state.swap(ptr::null_mut(), SeqCst);
+ match n as usize {
// we've locked the lock, shouldn't be possible for us to see an
// unlocked lock.
0 => panic!("invalid unlocked state"),
@@ -174,8 +179,8 @@ impl<T> BiLock<T> {
// Another task has parked themselves on this lock, let's wake them
// up as its now their turn.
- n => unsafe {
- Box::from_raw(n as *mut Waker).wake();
+ _ => unsafe {
+ Box::from_raw(n).wake();
},
}
}
@@ -189,7 +194,7 @@ impl<T: Unpin> Inner<T> {
impl<T> Drop for Inner<T> {
fn drop(&mut self) {
- assert_eq!(self.state.load(SeqCst), 0);
+ assert!(self.state.load(SeqCst).is_null());
}
}
@@ -224,6 +229,9 @@ pub struct BiLockGuard<'a, T> {
bilock: &'a BiLock<T>,
}
+// We allow parallel access to T via Deref, so Sync bound is also needed here.
+unsafe impl<T: Send + Sync> Sync for BiLockGuard<'_, T> {}
+
impl<T> Deref for BiLockGuard<'_, T> {
type Target = T;
fn deref(&self) -> &T {
@@ -274,3 +282,12 @@ impl<'a, T> Future for BiLockAcquire<'a, T> {
self.bilock.poll_lock(cx)
}
}
+
+// Based on core::ptr::invalid_mut. Equivalent to `addr as *mut T`, but is strict-provenance compatible.
+#[allow(clippy::useless_transmute)]
+#[inline]
+fn invalid_ptr<T>(addr: usize) -> *mut T {
+ // SAFETY: every valid integer is also a valid pointer (as long as you don't dereference that
+ // pointer).
+ unsafe { core::mem::transmute(addr) }
+}
diff --git a/vendor/futures-util/src/lock/mod.rs b/vendor/futures-util/src/lock/mod.rs
index cf374c016..0be72717c 100644
--- a/vendor/futures-util/src/lock/mod.rs
+++ b/vendor/futures-util/src/lock/mod.rs
@@ -4,11 +4,18 @@
//! library is activated, and it is activated by default.
#[cfg(not(futures_no_atomic_cas))]
-#[cfg(feature = "std")]
-mod mutex;
+#[cfg(any(feature = "sink", feature = "io"))]
+#[cfg(not(feature = "bilock"))]
+pub(crate) use self::bilock::BiLock;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "bilock")]
+#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
+pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError};
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "std")]
-pub use self::mutex::{MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture};
+pub use self::mutex::{
+ MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture, OwnedMutexGuard, OwnedMutexLockFuture,
+};
#[cfg(not(futures_no_atomic_cas))]
#[cfg(any(feature = "bilock", feature = "sink", feature = "io"))]
@@ -16,10 +23,5 @@ pub use self::mutex::{MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture};
#[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))]
mod bilock;
#[cfg(not(futures_no_atomic_cas))]
-#[cfg(any(feature = "sink", feature = "io"))]
-#[cfg(not(feature = "bilock"))]
-pub(crate) use self::bilock::BiLock;
-#[cfg(not(futures_no_atomic_cas))]
-#[cfg(feature = "bilock")]
-#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))]
-pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError};
+#[cfg(feature = "std")]
+mod mutex;
diff --git a/vendor/futures-util/src/lock/mutex.rs b/vendor/futures-util/src/lock/mutex.rs
index 85dcb1537..335ad1427 100644
--- a/vendor/futures-util/src/lock/mutex.rs
+++ b/vendor/futures-util/src/lock/mutex.rs
@@ -1,14 +1,16 @@
-use futures_core::future::{FusedFuture, Future};
-use futures_core::task::{Context, Poll, Waker};
-use slab::Slab;
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Mutex as StdMutex;
+use std::sync::{Arc, Mutex as StdMutex};
use std::{fmt, mem};
+use slab::Slab;
+
+use futures_core::future::{FusedFuture, Future};
+use futures_core::task::{Context, Poll, Waker};
+
/// A futures-aware mutex.
///
/// # Fairness
@@ -107,6 +109,18 @@ impl<T: ?Sized> Mutex<T> {
}
}
+ /// Attempt to acquire the lock immediately.
+ ///
+ /// If the lock is currently held, this will return `None`.
+ pub fn try_lock_owned(self: &Arc<Self>) -> Option<OwnedMutexGuard<T>> {
+ let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire);
+ if (old_state & IS_LOCKED) == 0 {
+ Some(OwnedMutexGuard { mutex: self.clone() })
+ } else {
+ None
+ }
+ }
+
/// Acquire the lock asynchronously.
///
/// This method returns a future that will resolve once the lock has been
@@ -115,6 +129,14 @@ impl<T: ?Sized> Mutex<T> {
MutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE }
}
+ /// Acquire the lock asynchronously.
+ ///
+ /// This method returns a future that will resolve once the lock has been
+ /// successfully acquired.
+ pub fn lock_owned(self: Arc<Self>) -> OwnedMutexLockFuture<T> {
+ OwnedMutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE }
+ }
+
/// Returns a mutable reference to the underlying data.
///
/// Since this call borrows the `Mutex` mutably, no actual locking needs to
@@ -173,7 +195,118 @@ impl<T: ?Sized> Mutex<T> {
}
// Sentinel for when no slot in the `Slab` has been dedicated to this object.
-const WAIT_KEY_NONE: usize = usize::max_value();
+const WAIT_KEY_NONE: usize = usize::MAX;
+
+/// A future which resolves when the target mutex has been successfully acquired, owned version.
+pub struct OwnedMutexLockFuture<T: ?Sized> {
+ // `None` indicates that the mutex was successfully acquired.
+ mutex: Option<Arc<Mutex<T>>>,
+ wait_key: usize,
+}
+
+impl<T: ?Sized> fmt::Debug for OwnedMutexLockFuture<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("OwnedMutexLockFuture")
+ .field("was_acquired", &self.mutex.is_none())
+ .field("mutex", &self.mutex)
+ .field(
+ "wait_key",
+ &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }),
+ )
+ .finish()
+ }
+}
+
+impl<T: ?Sized> FusedFuture for OwnedMutexLockFuture<T> {
+ fn is_terminated(&self) -> bool {
+ self.mutex.is_none()
+ }
+}
+
+impl<T: ?Sized> Future for OwnedMutexLockFuture<T> {
+ type Output = OwnedMutexGuard<T>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let this = self.get_mut();
+
+ let mutex = this.mutex.as_ref().expect("polled OwnedMutexLockFuture after completion");
+
+ if let Some(lock) = mutex.try_lock_owned() {
+ mutex.remove_waker(this.wait_key, false);
+ this.mutex = None;
+ return Poll::Ready(lock);
+ }
+
+ {
+ let mut waiters = mutex.waiters.lock().unwrap();
+ if this.wait_key == WAIT_KEY_NONE {
+ this.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone()));
+ if waiters.len() == 1 {
+ mutex.state.fetch_or(HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock
+ }
+ } else {
+ waiters[this.wait_key].register(cx.waker());
+ }
+ }
+
+ // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by
+ // attempting to acquire the lock again.
+ if let Some(lock) = mutex.try_lock_owned() {
+ mutex.remove_waker(this.wait_key, false);
+ this.mutex = None;
+ return Poll::Ready(lock);
+ }
+
+ Poll::Pending
+ }
+}
+
+impl<T: ?Sized> Drop for OwnedMutexLockFuture<T> {
+ fn drop(&mut self) {
+ if let Some(mutex) = self.mutex.as_ref() {
+ // This future was dropped before it acquired the mutex.
+ //
+ // Remove ourselves from the map, waking up another waiter if we
+ // had been awoken to acquire the lock.
+ mutex.remove_waker(self.wait_key, true);
+ }
+ }
+}
+
+/// An RAII guard returned by the `lock_owned` and `try_lock_owned` methods.
+/// When this structure is dropped (falls out of scope), the lock will be
+/// unlocked.
+pub struct OwnedMutexGuard<T: ?Sized> {
+ mutex: Arc<Mutex<T>>,
+}
+
+impl<T: ?Sized + fmt::Debug> fmt::Debug for OwnedMutexGuard<T> {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("OwnedMutexGuard")
+ .field("value", &&**self)
+ .field("mutex", &self.mutex)
+ .finish()
+ }
+}
+
+impl<T: ?Sized> Drop for OwnedMutexGuard<T> {
+ fn drop(&mut self) {
+ self.mutex.unlock()
+ }
+}
+
+impl<T: ?Sized> Deref for OwnedMutexGuard<T> {
+ type Target = T;
+ fn deref(&self) -> &T {
+ unsafe { &*self.mutex.value.get() }
+ }
+}
+
+impl<T: ?Sized> DerefMut for OwnedMutexGuard<T> {
+ fn deref_mut(&mut self) -> &mut T {
+ unsafe { &mut *self.mutex.value.get() }
+ }
+}
/// A future which resolves when the target mutex has been successfully acquired.
pub struct MutexLockFuture<'a, T: ?Sized> {
@@ -386,13 +519,25 @@ unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}
// It's safe to switch which thread the acquire is being attempted on so long as
// `T` can be accessed on that thread.
unsafe impl<T: ?Sized + Send> Send for MutexLockFuture<'_, T> {}
+
// doesn't have any interesting `&self` methods (only Debug)
unsafe impl<T: ?Sized> Sync for MutexLockFuture<'_, T> {}
+// It's safe to switch which thread the acquire is being attempted on so long as
+// `T` can be accessed on that thread.
+unsafe impl<T: ?Sized + Send> Send for OwnedMutexLockFuture<T> {}
+
+// doesn't have any interesting `&self` methods (only Debug)
+unsafe impl<T: ?Sized> Sync for OwnedMutexLockFuture<T> {}
+
// Safe to send since we don't track any thread-specific details-- the inner
// lock is essentially spinlock-equivalent (attempt to flip an atomic bool)
unsafe impl<T: ?Sized + Send> Send for MutexGuard<'_, T> {}
unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}
+
+unsafe impl<T: ?Sized + Send> Send for OwnedMutexGuard<T> {}
+unsafe impl<T: ?Sized + Sync> Sync for OwnedMutexGuard<T> {}
+
unsafe impl<T: ?Sized + Send, U: ?Sized + Send> Send for MappedMutexGuard<'_, T, U> {}
unsafe impl<T: ?Sized + Sync, U: ?Sized + Sync> Sync for MappedMutexGuard<'_, T, U> {}
diff --git a/vendor/futures-util/src/sink/drain.rs b/vendor/futures-util/src/sink/drain.rs
index 5295115b6..1a5480c0d 100644
--- a/vendor/futures-util/src/sink/drain.rs
+++ b/vendor/futures-util/src/sink/drain.rs
@@ -32,6 +32,12 @@ pub fn drain<T>() -> Drain<T> {
impl<T> Unpin for Drain<T> {}
+impl<T> Clone for Drain<T> {
+ fn clone(&self) -> Self {
+ drain()
+ }
+}
+
impl<T> Sink<T> for Drain<T> {
type Error = Never;
diff --git a/vendor/futures-util/src/sink/unfold.rs b/vendor/futures-util/src/sink/unfold.rs
index 330a068c3..dea1307b6 100644
--- a/vendor/futures-util/src/sink/unfold.rs
+++ b/vendor/futures-util/src/sink/unfold.rs
@@ -73,7 +73,10 @@ where
this.state.set(UnfoldState::Value { value: state });
Ok(())
}
- Err(err) => Err(err),
+ Err(err) => {
+ this.state.set(UnfoldState::Empty);
+ Err(err)
+ }
}
} else {
Ok(())
diff --git a/vendor/futures-util/src/stream/futures_ordered.rs b/vendor/futures-util/src/stream/futures_ordered.rs
index f596b3b0e..618bf1b7b 100644
--- a/vendor/futures-util/src/stream/futures_ordered.rs
+++ b/vendor/futures-util/src/stream/futures_ordered.rs
@@ -19,7 +19,7 @@ pin_project! {
struct OrderWrapper<T> {
#[pin]
data: T, // A future or a future's output
- index: usize,
+ index: isize,
}
}
@@ -58,7 +58,7 @@ where
/// An unbounded queue of futures.
///
-/// This "combinator" is similar to `FuturesUnordered`, but it imposes an order
+/// This "combinator" is similar to [`FuturesUnordered`], but it imposes a FIFO order
/// on top of the set of futures. While futures in the set will race to
/// completion in parallel, results will only be returned in the order their
/// originating futures were added to the queue.
@@ -95,8 +95,8 @@ where
pub struct FuturesOrdered<T: Future> {
in_progress_queue: FuturesUnordered<OrderWrapper<T>>,
queued_outputs: BinaryHeap<OrderWrapper<T::Output>>,
- next_incoming_index: usize,
- next_outgoing_index: usize,
+ next_incoming_index: isize,
+ next_outgoing_index: isize,
}
impl<T: Future> Unpin for FuturesOrdered<T> {}
@@ -135,11 +135,35 @@ impl<Fut: Future> FuturesOrdered<Fut> {
/// This function will not call `poll` on the submitted future. The caller
/// must ensure that `FuturesOrdered::poll` is called in order to receive
/// task notifications.
+ #[deprecated(note = "use `push_back` instead")]
pub fn push(&mut self, future: Fut) {
+ self.push_back(future);
+ }
+
+ /// Pushes a future to the back of the queue.
+ ///
+ /// This function submits the given future to the internal set for managing.
+ /// This function will not call `poll` on the submitted future. The caller
+ /// must ensure that `FuturesOrdered::poll` is called in order to receive
+ /// task notifications.
+ pub fn push_back(&mut self, future: Fut) {
let wrapped = OrderWrapper { data: future, index: self.next_incoming_index };
self.next_incoming_index += 1;
self.in_progress_queue.push(wrapped);
}
+
+ /// Pushes a future to the front of the queue.
+ ///
+ /// This function submits the given future to the internal set for managing.
+ /// This function will not call `poll` on the submitted future. The caller
+ /// must ensure that `FuturesOrdered::poll` is called in order to receive
+ /// task notifications. This future will be the next future to be returned
+ /// complete.
+ pub fn push_front(&mut self, future: Fut) {
+ let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 };
+ self.next_outgoing_index -= 1;
+ self.in_progress_queue.push(wrapped);
+ }
}
impl<Fut: Future> Default for FuturesOrdered<Fut> {
@@ -196,7 +220,7 @@ impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
{
let acc = Self::new();
iter.into_iter().fold(acc, |mut acc, item| {
- acc.push(item);
+ acc.push_back(item);
acc
})
}
@@ -214,7 +238,7 @@ impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
I: IntoIterator<Item = Fut>,
{
for item in iter {
- self.push(item);
+ self.push_back(item);
}
}
}
diff --git a/vendor/futures-util/src/stream/futures_unordered/iter.rs b/vendor/futures-util/src/stream/futures_unordered/iter.rs
index 04db5ee75..20248c70f 100644
--- a/vendor/futures-util/src/stream/futures_unordered/iter.rs
+++ b/vendor/futures-util/src/stream/futures_unordered/iter.rs
@@ -2,6 +2,7 @@ use super::task::Task;
use super::FuturesUnordered;
use core::marker::PhantomData;
use core::pin::Pin;
+use core::ptr;
use core::sync::atomic::Ordering::Relaxed;
/// Mutable iterator over all futures in the unordered set.
@@ -58,6 +59,9 @@ impl<Fut: Unpin> Iterator for IntoIter<Fut> {
// valid `next_all` checks can be skipped.
let next = (**task).next_all.load(Relaxed);
*task = next;
+ if !task.is_null() {
+ *(**task).prev_all.get() = ptr::null_mut();
+ }
self.len -= 1;
Some(future)
}
diff --git a/vendor/futures-util/src/stream/futures_unordered/mod.rs b/vendor/futures-util/src/stream/futures_unordered/mod.rs
index aab2bb446..6b5804dc4 100644
--- a/vendor/futures-util/src/stream/futures_unordered/mod.rs
+++ b/vendor/futures-util/src/stream/futures_unordered/mod.rs
@@ -6,7 +6,6 @@
use crate::task::AtomicWaker;
use alloc::sync::{Arc, Weak};
use core::cell::UnsafeCell;
-use core::cmp;
use core::fmt::{self, Debug};
use core::iter::FromIterator;
use core::marker::PhantomData;
@@ -23,6 +22,7 @@ use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError};
mod abort;
mod iter;
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/102352
pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef};
mod task;
@@ -31,35 +31,11 @@ use self::task::Task;
mod ready_to_run_queue;
use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue};
-/// Constant used for a `FuturesUnordered` to determine how many times it is
-/// allowed to poll underlying futures without yielding.
-///
-/// A single call to `poll_next` may potentially do a lot of work before
-/// yielding. This happens in particular if the underlying futures are awoken
-/// frequently but continue to return `Pending`. This is problematic if other
-/// tasks are waiting on the executor, since they do not get to run. This value
-/// caps the number of calls to `poll` on underlying futures a single call to
-/// `poll_next` is allowed to make.
-///
-/// The value itself is chosen somewhat arbitrarily. It needs to be high enough
-/// that amortize wakeup and scheduling costs, but low enough that we do not
-/// starve other tasks for long.
-///
-/// See also https://github.com/rust-lang/futures-rs/issues/2047.
-///
-/// Note that using the length of the `FuturesUnordered` instead of this value
-/// may cause problems if the number of futures is large.
-/// See also https://github.com/rust-lang/futures-rs/pull/2527.
-///
-/// Additionally, polling the same future twice per iteration may cause another
-/// problem. So, when using this value, it is necessary to limit the max value
-/// based on the length of the `FuturesUnordered`.
-/// (e.g., `cmp::min(self.len(), YIELD_EVERY)`)
-/// See also https://github.com/rust-lang/futures-rs/pull/2333.
-const YIELD_EVERY: usize = 32;
-
/// A set of futures which may complete in any order.
///
+/// See [`FuturesOrdered`](crate::stream::FuturesOrdered) for a version of this
+/// type that preserves a FIFO order.
+///
/// This structure is optimized to manage a large number of futures.
/// Futures managed by [`FuturesUnordered`] will only be polled when they
/// generate wake-up notifications. This reduces the required amount of work
@@ -149,8 +125,9 @@ impl<Fut> FuturesUnordered<Fut> {
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
ready_to_run_queue: Weak::new(),
+ woken: AtomicBool::new(false),
});
- let stub_ptr = &*stub as *const Task<Fut>;
+ let stub_ptr = Arc::as_ptr(&stub);
let ready_to_run_queue = Arc::new(ReadyToRunQueue {
waker: AtomicWaker::new(),
head: AtomicPtr::new(stub_ptr as *mut _),
@@ -195,6 +172,7 @@ impl<Fut> FuturesUnordered<Fut> {
next_ready_to_run: AtomicPtr::new(ptr::null_mut()),
queued: AtomicBool::new(true),
ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue),
+ woken: AtomicBool::new(false),
});
// Reset the `is_terminated` flag if we've previously marked ourselves
@@ -403,7 +381,7 @@ impl<Fut> FuturesUnordered<Fut> {
// The `ReadyToRunQueue` stub is never inserted into the `head_all`
// list, and its pointer value will remain valid for the lifetime of
// this `FuturesUnordered`, so we can make use of its value here.
- &*self.ready_to_run_queue.stub as *const _ as *mut _
+ Arc::as_ptr(&self.ready_to_run_queue.stub) as *mut _
}
}
@@ -411,12 +389,12 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
type Item = Fut::Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
- // See YIELD_EVERY docs for more.
- let yield_every = cmp::min(self.len(), YIELD_EVERY);
+ let len = self.len();
// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
let mut polled = 0;
+ let mut yielded = 0;
// Ensure `parent` is correctly set.
self.ready_to_run_queue.waker.register(cx.waker());
@@ -527,7 +505,11 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
// the internal allocation, appropriately accessing fields and
// deallocating the task if need be.
let res = {
- let waker = Task::waker_ref(bomb.task.as_ref().unwrap());
+ let task = bomb.task.as_ref().unwrap();
+ // We are only interested in whether the future is awoken before it
+ // finishes polling, so reset the flag here.
+ task.woken.store(false, Relaxed);
+ let waker = Task::waker_ref(task);
let mut cx = Context::from_waker(&waker);
// Safety: We won't move the future ever again
@@ -540,12 +522,17 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
match res {
Poll::Pending => {
let task = bomb.task.take().unwrap();
+ // If the future was awoken during polling, we assume
+ // the future wanted to explicitly yield.
+ yielded += task.woken.load(Relaxed) as usize;
bomb.queue.link(task);
- if polled == yield_every {
- // We have polled a large number of futures in a row without yielding.
- // To ensure we do not starve other tasks waiting on the executor,
- // we yield here, but immediately wake ourselves up to continue.
+ // If a future yields, we respect it and yield here.
+ // If all futures have been polled, we also yield here to
+ // avoid starving other tasks waiting on the executor.
+ // (polling the same future twice per iteration may cause
+ // the problem: https://github.com/rust-lang/futures-rs/pull/2333)
+ if yielded >= 2 || polled == len {
cx.waker().wake_by_ref();
return Poll::Pending;
}
diff --git a/vendor/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs b/vendor/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs
index 5ef6cde83..451870532 100644
--- a/vendor/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs
+++ b/vendor/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs
@@ -83,7 +83,7 @@ impl<Fut> ReadyToRunQueue<Fut> {
}
pub(super) fn stub(&self) -> *const Task<Fut> {
- &*self.stub
+ Arc::as_ptr(&self.stub)
}
// Clear the queue of tasks.
diff --git a/vendor/futures-util/src/stream/futures_unordered/task.rs b/vendor/futures-util/src/stream/futures_unordered/task.rs
index da2cd67d9..ec2114eff 100644
--- a/vendor/futures-util/src/stream/futures_unordered/task.rs
+++ b/vendor/futures-util/src/stream/futures_unordered/task.rs
@@ -1,6 +1,6 @@
use alloc::sync::{Arc, Weak};
use core::cell::UnsafeCell;
-use core::sync::atomic::Ordering::{self, SeqCst};
+use core::sync::atomic::Ordering::{self, Relaxed, SeqCst};
use core::sync::atomic::{AtomicBool, AtomicPtr};
use super::abort::abort;
@@ -31,6 +31,11 @@ pub(super) struct Task<Fut> {
// Whether or not this task is currently in the ready to run queue
pub(super) queued: AtomicBool,
+
+ // Whether the future was awoken during polling
+ // It is possible for this flag to be set to true after the polling,
+ // but it will be ignored.
+ pub(super) woken: AtomicBool,
}
// `Task` can be sent across threads safely because it ensures that
@@ -48,6 +53,8 @@ impl<Fut> ArcWake for Task<Fut> {
None => return,
};
+ arc_self.woken.store(true, Relaxed);
+
// It's our job to enqueue this task it into the ready to run queue. To
// do this we set the `queued` flag, and if successful we then do the
// actual queueing operation, ensuring that we're only queued once.
@@ -62,7 +69,7 @@ impl<Fut> ArcWake for Task<Fut> {
// still.
let prev = arc_self.queued.swap(true, SeqCst);
if !prev {
- inner.enqueue(&**arc_self);
+ inner.enqueue(Arc::as_ptr(arc_self));
inner.waker.wake();
}
}
diff --git a/vendor/futures-util/src/stream/mod.rs b/vendor/futures-util/src/stream/mod.rs
index ec685b984..bf9506147 100644
--- a/vendor/futures-util/src/stream/mod.rs
+++ b/vendor/futures-util/src/stream/mod.rs
@@ -18,9 +18,10 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
#[allow(clippy::module_inception)]
mod stream;
pub use self::stream::{
- Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
- Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome,
- Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip,
+ All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten,
+ Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan,
+ SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then,
+ Unzip, Zip,
};
#[cfg(feature = "std")]
@@ -38,7 +39,9 @@ pub use self::stream::Forward;
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
-pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent};
+pub use self::stream::{
+ BufferUnordered, Buffered, FlatMapUnordered, FlattenUnordered, ForEachConcurrent,
+};
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "sink")]
@@ -60,7 +63,9 @@ pub use self::try_stream::IntoAsyncRead;
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
-pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent};
+pub use self::try_stream::{
+ TryBufferUnordered, TryBuffered, TryFlattenUnordered, TryForEachConcurrent,
+};
#[cfg(feature = "alloc")]
pub use self::try_stream::{TryChunks, TryChunksError};
diff --git a/vendor/futures-util/src/stream/select_all.rs b/vendor/futures-util/src/stream/select_all.rs
index 3474331ad..121b6a0e5 100644
--- a/vendor/futures-util/src/stream/select_all.rs
+++ b/vendor/futures-util/src/stream/select_all.rs
@@ -8,29 +8,24 @@ use futures_core::ready;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
-use pin_project_lite::pin_project;
-
use super::assert_stream;
use crate::stream::{futures_unordered, FuturesUnordered, StreamExt, StreamFuture};
-pin_project! {
- /// An unbounded set of streams
- ///
- /// This "combinator" provides the ability to maintain a set of streams
- /// and drive them all to completion.
- ///
- /// Streams are pushed into this set and their realized values are
- /// yielded as they become ready. Streams will only be polled when they
- /// generate notifications. This allows to coordinate a large number of streams.
- ///
- /// Note that you can create a ready-made `SelectAll` via the
- /// `select_all` function in the `stream` module, or you can start with an
- /// empty set with the `SelectAll::new` constructor.
- #[must_use = "streams do nothing unless polled"]
- pub struct SelectAll<St> {
- #[pin]
- inner: FuturesUnordered<StreamFuture<St>>,
- }
+/// An unbounded set of streams
+///
+/// This "combinator" provides the ability to maintain a set of streams
+/// and drive them all to completion.
+///
+/// Streams are pushed into this set and their realized values are
+/// yielded as they become ready. Streams will only be polled when they
+/// generate notifications. This allows to coordinate a large number of streams.
+///
+/// Note that you can create a ready-made `SelectAll` via the
+/// `select_all` function in the `stream` module, or you can start with an
+/// empty set with the `SelectAll::new` constructor.
+#[must_use = "streams do nothing unless polled"]
+pub struct SelectAll<St> {
+ inner: FuturesUnordered<StreamFuture<St>>,
}
impl<St: Debug> Debug for SelectAll<St> {
diff --git a/vendor/futures-util/src/stream/select_with_strategy.rs b/vendor/futures-util/src/stream/select_with_strategy.rs
index bd86990cd..224d5f821 100644
--- a/vendor/futures-util/src/stream/select_with_strategy.rs
+++ b/vendor/futures-util/src/stream/select_with_strategy.rs
@@ -1,5 +1,4 @@
use super::assert_stream;
-use crate::stream::{Fuse, StreamExt};
use core::{fmt, pin::Pin};
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
@@ -18,13 +17,15 @@ impl PollNext {
/// Toggle the value and return the old one.
pub fn toggle(&mut self) -> Self {
let old = *self;
+ *self = self.other();
+ old
+ }
+ fn other(&self) -> PollNext {
match self {
- PollNext::Left => *self = PollNext::Right,
- PollNext::Right => *self = PollNext::Left,
+ PollNext::Left => PollNext::Right,
+ PollNext::Right => PollNext::Left,
}
-
- old
}
}
@@ -34,14 +35,41 @@ impl Default for PollNext {
}
}
+enum InternalState {
+ Start,
+ LeftFinished,
+ RightFinished,
+ BothFinished,
+}
+
+impl InternalState {
+ fn finish(&mut self, ps: PollNext) {
+ match (&self, ps) {
+ (InternalState::Start, PollNext::Left) => {
+ *self = InternalState::LeftFinished;
+ }
+ (InternalState::Start, PollNext::Right) => {
+ *self = InternalState::RightFinished;
+ }
+ (InternalState::LeftFinished, PollNext::Right)
+ | (InternalState::RightFinished, PollNext::Left) => {
+ *self = InternalState::BothFinished;
+ }
+ _ => {}
+ }
+ }
+}
+
pin_project! {
/// Stream for the [`select_with_strategy()`] function. See function docs for details.
#[must_use = "streams do nothing unless polled"]
+ #[project = SelectWithStrategyProj]
pub struct SelectWithStrategy<St1, St2, Clos, State> {
#[pin]
- stream1: Fuse<St1>,
+ stream1: St1,
#[pin]
- stream2: Fuse<St2>,
+ stream2: St2,
+ internal_state: InternalState,
state: State,
clos: Clos,
}
@@ -120,9 +148,10 @@ where
State: Default,
{
assert_stream::<St1::Item, _>(SelectWithStrategy {
- stream1: stream1.fuse(),
- stream2: stream2.fuse(),
+ stream1,
+ stream2,
state: Default::default(),
+ internal_state: InternalState::Start,
clos: which,
})
}
@@ -131,7 +160,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
/// Acquires a reference to the underlying streams that this combinator is
/// pulling from.
pub fn get_ref(&self) -> (&St1, &St2) {
- (self.stream1.get_ref(), self.stream2.get_ref())
+ (&self.stream1, &self.stream2)
}
/// Acquires a mutable reference to the underlying streams that this
@@ -140,7 +169,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
/// Note that care must be taken to avoid tampering with the state of the
/// stream which may otherwise confuse this combinator.
pub fn get_mut(&mut self) -> (&mut St1, &mut St2) {
- (self.stream1.get_mut(), self.stream2.get_mut())
+ (&mut self.stream1, &mut self.stream2)
}
/// Acquires a pinned mutable reference to the underlying streams that this
@@ -150,7 +179,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
/// stream which may otherwise confuse this combinator.
pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) {
let this = self.project();
- (this.stream1.get_pin_mut(), this.stream2.get_pin_mut())
+ (this.stream1, this.stream2)
}
/// Consumes this combinator, returning the underlying streams.
@@ -158,7 +187,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> {
/// Note that this may discard intermediate state of this combinator, so
/// care should be taken to avoid losing resources when this is called.
pub fn into_inner(self) -> (St1, St2) {
- (self.stream1.into_inner(), self.stream2.into_inner())
+ (self.stream1, self.stream2)
}
}
@@ -169,47 +198,93 @@ where
Clos: FnMut(&mut State) -> PollNext,
{
fn is_terminated(&self) -> bool {
- self.stream1.is_terminated() && self.stream2.is_terminated()
+ match self.internal_state {
+ InternalState::BothFinished => true,
+ _ => false,
+ }
}
}
-impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State>
+#[inline]
+fn poll_side<St1, St2, Clos, State>(
+ select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>,
+ side: PollNext,
+ cx: &mut Context<'_>,
+) -> Poll<Option<St1::Item>>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
- Clos: FnMut(&mut State) -> PollNext,
{
- type Item = St1::Item;
-
- fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
- let this = self.project();
-
- match (this.clos)(this.state) {
- PollNext::Left => poll_inner(this.stream1, this.stream2, cx),
- PollNext::Right => poll_inner(this.stream2, this.stream1, cx),
- }
+ match side {
+ PollNext::Left => select.stream1.as_mut().poll_next(cx),
+ PollNext::Right => select.stream2.as_mut().poll_next(cx),
}
}
-fn poll_inner<St1, St2>(
- a: Pin<&mut St1>,
- b: Pin<&mut St2>,
+#[inline]
+fn poll_inner<St1, St2, Clos, State>(
+ select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>,
+ side: PollNext,
cx: &mut Context<'_>,
) -> Poll<Option<St1::Item>>
where
St1: Stream,
St2: Stream<Item = St1::Item>,
{
- let a_done = match a.poll_next(cx) {
+ let first_done = match poll_side(select, side, cx) {
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
- Poll::Ready(None) => true,
+ Poll::Ready(None) => {
+ select.internal_state.finish(side);
+ true
+ }
Poll::Pending => false,
};
+ let other = side.other();
+ match poll_side(select, other, cx) {
+ Poll::Ready(None) => {
+ select.internal_state.finish(other);
+ if first_done {
+ Poll::Ready(None)
+ } else {
+ Poll::Pending
+ }
+ }
+ a => a,
+ }
+}
+
+impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State>
+where
+ St1: Stream,
+ St2: Stream<Item = St1::Item>,
+ Clos: FnMut(&mut State) -> PollNext,
+{
+ type Item = St1::Item;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> {
+ let mut this = self.project();
- match b.poll_next(cx) {
- Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
- Poll::Ready(None) if a_done => Poll::Ready(None),
- Poll::Ready(None) | Poll::Pending => Poll::Pending,
+ match this.internal_state {
+ InternalState::Start => {
+ let next_side = (this.clos)(this.state);
+ poll_inner(&mut this, next_side, cx)
+ }
+ InternalState::LeftFinished => match this.stream2.poll_next(cx) {
+ Poll::Ready(None) => {
+ *this.internal_state = InternalState::BothFinished;
+ Poll::Ready(None)
+ }
+ a => a,
+ },
+ InternalState::RightFinished => match this.stream1.poll_next(cx) {
+ Poll::Ready(None) => {
+ *this.internal_state = InternalState::BothFinished;
+ Poll::Ready(None)
+ }
+ a => a,
+ },
+ InternalState::BothFinished => Poll::Ready(None),
+ }
}
}
diff --git a/vendor/futures-util/src/stream/stream/buffer_unordered.rs b/vendor/futures-util/src/stream/stream/buffer_unordered.rs
index d64c142b4..91b0f6bcc 100644
--- a/vendor/futures-util/src/stream/stream/buffer_unordered.rs
+++ b/vendor/futures-util/src/stream/stream/buffer_unordered.rs
@@ -41,11 +41,7 @@ where
St: Stream,
St::Item: Future,
{
- pub(super) fn new(stream: St, n: usize) -> Self
- where
- St: Stream,
- St::Item: Future,
- {
+ pub(super) fn new(stream: St, n: usize) -> Self {
Self {
stream: super::Fuse::new(stream),
in_progress_queue: FuturesUnordered::new(),
diff --git a/vendor/futures-util/src/stream/stream/buffered.rs b/vendor/futures-util/src/stream/stream/buffered.rs
index 6052a737b..5854eb7ea 100644
--- a/vendor/futures-util/src/stream/stream/buffered.rs
+++ b/vendor/futures-util/src/stream/stream/buffered.rs
@@ -1,4 +1,4 @@
-use crate::stream::{Fuse, FuturesOrdered, StreamExt};
+use crate::stream::{Fuse, FusedStream, FuturesOrdered, StreamExt};
use core::fmt;
use core::pin::Pin;
use futures_core::future::Future;
@@ -64,7 +64,7 @@ where
// our queue of futures.
while this.in_progress_queue.len() < *this.max {
match this.stream.as_mut().poll_next(cx) {
- Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut),
+ Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut),
Poll::Ready(None) | Poll::Pending => break,
}
}
@@ -95,6 +95,16 @@ where
}
}
+impl<St> FusedStream for Buffered<St>
+where
+ St: Stream,
+ St::Item: Future,
+{
+ fn is_terminated(&self) -> bool {
+ self.stream.is_done() && self.in_progress_queue.is_terminated()
+ }
+}
+
// Forwarding impl of Sink from the underlying stream
#[cfg(feature = "sink")]
impl<S, Item> Sink<Item> for Buffered<S>
diff --git a/vendor/futures-util/src/stream/stream/chain.rs b/vendor/futures-util/src/stream/stream/chain.rs
index c5da35e25..36ff1e533 100644
--- a/vendor/futures-util/src/stream/stream/chain.rs
+++ b/vendor/futures-util/src/stream/stream/chain.rs
@@ -50,8 +50,9 @@ where
if let Some(item) = ready!(first.poll_next(cx)) {
return Poll::Ready(Some(item));
}
+
+ this.first.set(None);
}
- this.first.set(None);
this.second.poll_next(cx)
}
diff --git a/vendor/futures-util/src/stream/stream/chunks.rs b/vendor/futures-util/src/stream/stream/chunks.rs
index 845786999..2a71ebc6c 100644
--- a/vendor/futures-util/src/stream/stream/chunks.rs
+++ b/vendor/futures-util/src/stream/stream/chunks.rs
@@ -21,10 +21,7 @@ pin_project! {
}
}
-impl<St: Stream> Chunks<St>
-where
- St: Stream,
-{
+impl<St: Stream> Chunks<St> {
pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);
@@ -66,7 +63,7 @@ impl<St: Stream> Stream for Chunks<St> {
let last = if this.items.is_empty() {
None
} else {
- let full_buf = mem::replace(this.items, Vec::new());
+ let full_buf = mem::take(this.items);
Some(full_buf)
};
@@ -77,9 +74,9 @@ impl<St: Stream> Stream for Chunks<St> {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let chunk_len = if self.items.is_empty() { 0 } else { 1 };
+ let chunk_len = usize::from(!self.items.is_empty());
let (lower, upper) = self.stream.size_hint();
- let lower = lower.saturating_add(chunk_len);
+ let lower = (lower / self.cap).saturating_add(chunk_len);
let upper = match upper {
Some(x) => x.checked_add(chunk_len),
None => None,
diff --git a/vendor/futures-util/src/stream/stream/collect.rs b/vendor/futures-util/src/stream/stream/collect.rs
index b0e81b9ce..970ac26db 100644
--- a/vendor/futures-util/src/stream/stream/collect.rs
+++ b/vendor/futures-util/src/stream/stream/collect.rs
@@ -19,7 +19,7 @@ pin_project! {
impl<St: Stream, C: Default> Collect<St, C> {
fn finish(self: Pin<&mut Self>) -> C {
- mem::replace(self.project().collection, Default::default())
+ mem::take(self.project().collection)
}
pub(super) fn new(stream: St) -> Self {
diff --git a/vendor/futures-util/src/stream/stream/filter.rs b/vendor/futures-util/src/stream/stream/filter.rs
index ccf1a5122..997fe9977 100644
--- a/vendor/futures-util/src/stream/stream/filter.rs
+++ b/vendor/futures-util/src/stream/stream/filter.rs
@@ -93,7 +93,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending_item.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/vendor/futures-util/src/stream/stream/filter_map.rs b/vendor/futures-util/src/stream/stream/filter_map.rs
index 02a0a4386..6b7d0070d 100644
--- a/vendor/futures-util/src/stream/stream/filter_map.rs
+++ b/vendor/futures-util/src/stream/stream/filter_map.rs
@@ -87,7 +87,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let pending_len = if self.pending.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/vendor/futures-util/src/stream/stream/flatten_unordered.rs b/vendor/futures-util/src/stream/stream/flatten_unordered.rs
new file mode 100644
index 000000000..44c6ace2f
--- /dev/null
+++ b/vendor/futures-util/src/stream/stream/flatten_unordered.rs
@@ -0,0 +1,536 @@
+use alloc::sync::Arc;
+use core::{
+ cell::UnsafeCell,
+ convert::identity,
+ fmt,
+ marker::PhantomData,
+ num::NonZeroUsize,
+ pin::Pin,
+ sync::atomic::{AtomicU8, Ordering},
+};
+
+use pin_project_lite::pin_project;
+
+use futures_core::{
+ future::Future,
+ ready,
+ stream::{FusedStream, Stream},
+ task::{Context, Poll, Waker},
+};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+use futures_task::{waker, ArcWake};
+
+use crate::stream::FuturesUnordered;
+
+/// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered)
+/// method.
+pub type FlattenUnordered<St> = FlattenUnorderedWithFlowController<St, ()>;
+
+/// There is nothing to poll and stream isn't being polled/waking/woken at the moment.
+const NONE: u8 = 0;
+
+/// Inner streams need to be polled.
+const NEED_TO_POLL_INNER_STREAMS: u8 = 1;
+
+/// The base stream needs to be polled.
+const NEED_TO_POLL_STREAM: u8 = 0b10;
+
+/// Both base stream and inner streams need to be polled.
+const NEED_TO_POLL_ALL: u8 = NEED_TO_POLL_INNER_STREAMS | NEED_TO_POLL_STREAM;
+
+/// The current stream is being polled at the moment.
+const POLLING: u8 = 0b100;
+
+/// Stream is being woken at the moment.
+const WAKING: u8 = 0b1000;
+
+/// The stream was waked and will be polled.
+const WOKEN: u8 = 0b10000;
+
+/// Internal polling state of the stream.
+#[derive(Clone, Debug)]
+struct SharedPollState {
+ state: Arc<AtomicU8>,
+}
+
+impl SharedPollState {
+ /// Constructs new `SharedPollState` with the given state.
+ fn new(value: u8) -> SharedPollState {
+ SharedPollState { state: Arc::new(AtomicU8::new(value)) }
+ }
+
+ /// Attempts to start polling, returning stored state in case of success.
+ /// Returns `None` if either waker is waking at the moment.
+ fn start_polling(
+ &self,
+ ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
+ let value = self
+ .state
+ .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
+ if value & WAKING == NONE {
+ Some(POLLING)
+ } else {
+ None
+ }
+ })
+ .ok()?;
+ let bomb = PollStateBomb::new(self, SharedPollState::reset);
+
+ Some((value, bomb))
+ }
+
+ /// Attempts to start the waking process and performs bitwise or with the given value.
+ ///
+ /// If some waker is already in progress or stream is already woken/being polled, waking process won't start, however
+ /// state will be disjuncted with the given value.
+ fn start_waking(
+ &self,
+ to_poll: u8,
+ ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
+ let value = self
+ .state
+ .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
+ let mut next_value = value | to_poll;
+ if value & (WOKEN | POLLING) == NONE {
+ next_value |= WAKING;
+ }
+
+ if next_value != value {
+ Some(next_value)
+ } else {
+ None
+ }
+ })
+ .ok()?;
+
+ // Only start the waking process if we're not in the polling/waking phase and the stream isn't woken already
+ if value & (WOKEN | POLLING | WAKING) == NONE {
+ let bomb = PollStateBomb::new(self, SharedPollState::stop_waking);
+
+ Some((value, bomb))
+ } else {
+ None
+ }
+ }
+
+ /// Sets current state to
+ /// - `!POLLING` allowing to use wakers
+ /// - `WOKEN` if the state was changed during `POLLING` phase as waker will be called,
+ /// or `will_be_woken` flag supplied
+ /// - `!WAKING` as
+ /// * Wakers called during the `POLLING` phase won't propagate their calls
+ /// * `POLLING` phase can't start if some of the wakers are active
+ /// So no wrapped waker can touch the inner waker's cell, it's safe to poll again.
+ fn stop_polling(&self, to_poll: u8, will_be_woken: bool) -> u8 {
+ self.state
+ .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |mut value| {
+ let mut next_value = to_poll;
+
+ value &= NEED_TO_POLL_ALL;
+ if value != NONE || will_be_woken {
+ next_value |= WOKEN;
+ }
+ next_value |= value;
+
+ Some(next_value & !POLLING & !WAKING)
+ })
+ .unwrap()
+ }
+
+ /// Toggles state to non-waking, allowing to start polling.
+ fn stop_waking(&self) -> u8 {
+ let value = self
+ .state
+ .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
+ let next_value = value & !WAKING | WOKEN;
+
+ if next_value != value {
+ Some(next_value)
+ } else {
+ None
+ }
+ })
+ .unwrap_or_else(identity);
+
+ debug_assert!(value & (WOKEN | POLLING | WAKING) == WAKING);
+ value
+ }
+
+ /// Resets current state allowing to poll the stream and wake up wakers.
+ fn reset(&self) -> u8 {
+ self.state.swap(NEED_TO_POLL_ALL, Ordering::SeqCst)
+ }
+}
+
+/// Used to execute some function on the given state when dropped.
+struct PollStateBomb<'a, F: FnOnce(&SharedPollState) -> u8> {
+ state: &'a SharedPollState,
+ drop: Option<F>,
+}
+
+impl<'a, F: FnOnce(&SharedPollState) -> u8> PollStateBomb<'a, F> {
+ /// Constructs new bomb with the given state.
+ fn new(state: &'a SharedPollState, drop: F) -> Self {
+ Self { state, drop: Some(drop) }
+ }
+
+ /// Deactivates bomb, forces it to not call provided function when dropped.
+ fn deactivate(mut self) {
+ self.drop.take();
+ }
+}
+
+impl<F: FnOnce(&SharedPollState) -> u8> Drop for PollStateBomb<'_, F> {
+ fn drop(&mut self) {
+ if let Some(drop) = self.drop.take() {
+ (drop)(self.state);
+ }
+ }
+}
+
+/// Will update state with the provided value on `wake_by_ref` call
+/// and then, if there is a need, call `inner_waker`.
+struct WrappedWaker {
+ inner_waker: UnsafeCell<Option<Waker>>,
+ poll_state: SharedPollState,
+ need_to_poll: u8,
+}
+
+unsafe impl Send for WrappedWaker {}
+unsafe impl Sync for WrappedWaker {}
+
+impl WrappedWaker {
+ /// Replaces given waker's inner_waker for polling stream/futures which will
+ /// update poll state on `wake_by_ref` call. Use only if you need several
+ /// contexts.
+ ///
+ /// ## Safety
+ ///
+ /// This function will modify waker's `inner_waker` via `UnsafeCell`, so
+ /// it should be used only during `POLLING` phase by one thread at the time.
+ unsafe fn replace_waker(self_arc: &mut Arc<Self>, cx: &Context<'_>) {
+ *self_arc.inner_waker.get() = cx.waker().clone().into();
+ }
+
+ /// Attempts to start the waking process for the waker with the given value.
+ /// If succeeded, then the stream isn't yet woken and not being polled at the moment.
+ fn start_waking(&self) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> {
+ self.poll_state.start_waking(self.need_to_poll)
+ }
+}
+
+impl ArcWake for WrappedWaker {
+ fn wake_by_ref(self_arc: &Arc<Self>) {
+ if let Some((_, state_bomb)) = self_arc.start_waking() {
+ // Safety: now state is not `POLLING`
+ let waker_opt = unsafe { self_arc.inner_waker.get().as_ref().unwrap() };
+
+ if let Some(inner_waker) = waker_opt.clone() {
+ // Stop waking to allow polling stream
+ drop(state_bomb);
+
+ // Wake up inner waker
+ inner_waker.wake();
+ }
+ }
+ }
+}
+
+pin_project! {
+ /// Future which polls optional inner stream.
+ ///
+ /// If it's `Some`, it will attempt to call `poll_next` on it,
+ /// returning `Some((item, next_item_fut))` in case of `Poll::Ready(Some(...))`
+ /// or `None` in case of `Poll::Ready(None)`.
+ ///
+ /// If `poll_next` will return `Poll::Pending`, it will be forwarded to
+ /// the future and current task will be notified by waker.
+ #[must_use = "futures do nothing unless you `.await` or poll them"]
+ struct PollStreamFut<St> {
+ #[pin]
+ stream: Option<St>,
+ }
+}
+
+impl<St> PollStreamFut<St> {
+ /// Constructs new `PollStreamFut` using given `stream`.
+ fn new(stream: impl Into<Option<St>>) -> Self {
+ Self { stream: stream.into() }
+ }
+}
+
+impl<St: Stream + Unpin> Future for PollStreamFut<St> {
+ type Output = Option<(St::Item, PollStreamFut<St>)>;
+
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
+ let mut stream = self.project().stream;
+
+ let item = if let Some(stream) = stream.as_mut().as_pin_mut() {
+ ready!(stream.poll_next(cx))
+ } else {
+ None
+ };
+ let next_item_fut = PollStreamFut::new(stream.get_mut().take());
+ let out = item.map(|item| (item, next_item_fut));
+
+ Poll::Ready(out)
+ }
+}
+
+pin_project! {
+ /// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered)
+ /// method with ability to specify flow controller.
+ #[project = FlattenUnorderedWithFlowControllerProj]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct FlattenUnorderedWithFlowController<St, Fc> where St: Stream {
+ #[pin]
+ inner_streams: FuturesUnordered<PollStreamFut<St::Item>>,
+ #[pin]
+ stream: St,
+ poll_state: SharedPollState,
+ limit: Option<NonZeroUsize>,
+ is_stream_done: bool,
+ inner_streams_waker: Arc<WrappedWaker>,
+ stream_waker: Arc<WrappedWaker>,
+ flow_controller: PhantomData<Fc>
+ }
+}
+
+impl<St, Fc> fmt::Debug for FlattenUnorderedWithFlowController<St, Fc>
+where
+ St: Stream + fmt::Debug,
+ St::Item: Stream + fmt::Debug,
+{
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("FlattenUnorderedWithFlowController")
+ .field("poll_state", &self.poll_state)
+ .field("inner_streams", &self.inner_streams)
+ .field("limit", &self.limit)
+ .field("stream", &self.stream)
+ .field("is_stream_done", &self.is_stream_done)
+ .field("flow_controller", &self.flow_controller)
+ .finish()
+ }
+}
+
+impl<St, Fc> FlattenUnorderedWithFlowController<St, Fc>
+where
+ St: Stream,
+ Fc: FlowController<St::Item, <St::Item as Stream>::Item>,
+ St::Item: Stream + Unpin,
+{
+ pub(crate) fn new(
+ stream: St,
+ limit: Option<usize>,
+ ) -> FlattenUnorderedWithFlowController<St, Fc> {
+ let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM);
+
+ FlattenUnorderedWithFlowController {
+ inner_streams: FuturesUnordered::new(),
+ stream,
+ is_stream_done: false,
+ limit: limit.and_then(NonZeroUsize::new),
+ inner_streams_waker: Arc::new(WrappedWaker {
+ inner_waker: UnsafeCell::new(None),
+ poll_state: poll_state.clone(),
+ need_to_poll: NEED_TO_POLL_INNER_STREAMS,
+ }),
+ stream_waker: Arc::new(WrappedWaker {
+ inner_waker: UnsafeCell::new(None),
+ poll_state: poll_state.clone(),
+ need_to_poll: NEED_TO_POLL_STREAM,
+ }),
+ poll_state,
+ flow_controller: PhantomData,
+ }
+ }
+
+ delegate_access_inner!(stream, St, ());
+}
+
+/// Returns the next flow step based on the received item.
+pub trait FlowController<I, O> {
+ /// Handles an item producing `FlowStep` describing the next flow step.
+ fn next_step(item: I) -> FlowStep<I, O>;
+}
+
+impl<I, O> FlowController<I, O> for () {
+ fn next_step(item: I) -> FlowStep<I, O> {
+ FlowStep::Continue(item)
+ }
+}
+
+/// Describes the next flow step.
+#[derive(Debug, Clone)]
+pub enum FlowStep<C, R> {
+ /// Just yields an item and continues standard flow.
+ Continue(C),
+ /// Immediately returns an underlying item from the function.
+ Return(R),
+}
+
+impl<St, Fc> FlattenUnorderedWithFlowControllerProj<'_, St, Fc>
+where
+ St: Stream,
+{
+ /// Checks if current `inner_streams` bucket size is greater than optional limit.
+ fn is_exceeded_limit(&self) -> bool {
+ self.limit.map_or(false, |limit| self.inner_streams.len() >= limit.get())
+ }
+}
+
+impl<St, Fc> FusedStream for FlattenUnorderedWithFlowController<St, Fc>
+where
+ St: FusedStream,
+ Fc: FlowController<St::Item, <St::Item as Stream>::Item>,
+ St::Item: Stream + Unpin,
+{
+ fn is_terminated(&self) -> bool {
+ self.stream.is_terminated() && self.inner_streams.is_empty()
+ }
+}
+
+impl<St, Fc> Stream for FlattenUnorderedWithFlowController<St, Fc>
+where
+ St: Stream,
+ Fc: FlowController<St::Item, <St::Item as Stream>::Item>,
+ St::Item: Stream + Unpin,
+{
+ type Item = <St::Item as Stream>::Item;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let mut next_item = None;
+ let mut need_to_poll_next = NONE;
+
+ let mut this = self.as_mut().project();
+
+ // Attempt to start polling, in case some waker is holding the lock, wait in loop
+ let (mut poll_state_value, state_bomb) = loop {
+ if let Some(value) = this.poll_state.start_polling() {
+ break value;
+ }
+ };
+
+ // Safety: now state is `POLLING`.
+ unsafe {
+ WrappedWaker::replace_waker(this.stream_waker, cx);
+ WrappedWaker::replace_waker(this.inner_streams_waker, cx)
+ };
+
+ if poll_state_value & NEED_TO_POLL_STREAM != NONE {
+ let mut stream_waker = None;
+
+ // Here we need to poll the base stream.
+ //
+ // To improve performance, we will attempt to place as many items as we can
+ // to the `FuturesUnordered` bucket before polling inner streams
+ loop {
+ if this.is_exceeded_limit() || *this.is_stream_done {
+ // We either exceeded the limit or the stream is exhausted
+ if !*this.is_stream_done {
+ // The stream needs to be polled in the next iteration
+ need_to_poll_next |= NEED_TO_POLL_STREAM;
+ }
+
+ break;
+ } else {
+ let mut cx = Context::from_waker(
+ stream_waker.get_or_insert_with(|| waker(this.stream_waker.clone())),
+ );
+
+ match this.stream.as_mut().poll_next(&mut cx) {
+ Poll::Ready(Some(item)) => {
+ let next_item_fut = match Fc::next_step(item) {
+ // Propagates an item immediately (the main use-case is for errors)
+ FlowStep::Return(item) => {
+ need_to_poll_next |= NEED_TO_POLL_STREAM
+ | (poll_state_value & NEED_TO_POLL_INNER_STREAMS);
+ poll_state_value &= !NEED_TO_POLL_INNER_STREAMS;
+
+ next_item = Some(item);
+
+ break;
+ }
+ // Yields an item and continues processing (normal case)
+ FlowStep::Continue(inner_stream) => {
+ PollStreamFut::new(inner_stream)
+ }
+ };
+ // Add new stream to the inner streams bucket
+ this.inner_streams.as_mut().push(next_item_fut);
+ // Inner streams must be polled afterward
+ poll_state_value |= NEED_TO_POLL_INNER_STREAMS;
+ }
+ Poll::Ready(None) => {
+ // Mark the base stream as done
+ *this.is_stream_done = true;
+ }
+ Poll::Pending => {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ if poll_state_value & NEED_TO_POLL_INNER_STREAMS != NONE {
+ let inner_streams_waker = waker(this.inner_streams_waker.clone());
+ let mut cx = Context::from_waker(&inner_streams_waker);
+
+ match this.inner_streams.as_mut().poll_next(&mut cx) {
+ Poll::Ready(Some(Some((item, next_item_fut)))) => {
+ // Push next inner stream item future to the list of inner streams futures
+ this.inner_streams.as_mut().push(next_item_fut);
+ // Take the received item
+ next_item = Some(item);
+ // On the next iteration, inner streams must be polled again
+ need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS;
+ }
+ Poll::Ready(Some(None)) => {
+ // On the next iteration, inner streams must be polled again
+ need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS;
+ }
+ _ => {}
+ }
+ }
+
+ // We didn't have any `poll_next` panic, so it's time to deactivate the bomb
+ state_bomb.deactivate();
+
+ // Call the waker at the end of polling if
+ let mut force_wake =
+ // we need to poll the stream and didn't reach the limit yet
+ need_to_poll_next & NEED_TO_POLL_STREAM != NONE && !this.is_exceeded_limit()
+ // or we need to poll the inner streams again
+ || need_to_poll_next & NEED_TO_POLL_INNER_STREAMS != NONE;
+
+ // Stop polling and swap the latest state
+ poll_state_value = this.poll_state.stop_polling(need_to_poll_next, force_wake);
+ // If state was changed during `POLLING` phase, we also need to manually call a waker
+ force_wake |= poll_state_value & NEED_TO_POLL_ALL != NONE;
+
+ let is_done = *this.is_stream_done && this.inner_streams.is_empty();
+
+ if next_item.is_some() || is_done {
+ Poll::Ready(next_item)
+ } else {
+ if force_wake {
+ cx.waker().wake_by_ref();
+ }
+
+ Poll::Pending
+ }
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<St, Item, Fc> Sink<Item> for FlattenUnorderedWithFlowController<St, Fc>
+where
+ St: Stream + Sink<Item>,
+{
+ type Error = St::Error;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/vendor/futures-util/src/stream/stream/mod.rs b/vendor/futures-util/src/stream/stream/mod.rs
index 9cfcc09ba..558dc22bd 100644
--- a/vendor/futures-util/src/stream/stream/mod.rs
+++ b/vendor/futures-util/src/stream/stream/mod.rs
@@ -199,6 +199,25 @@ pub use self::buffered::Buffered;
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
+pub(crate) mod flatten_unordered;
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)]
+pub use self::flatten_unordered::FlattenUnordered;
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+delegate_all!(
+ /// Stream for the [`flat_map_unordered`](StreamExt::flat_map_unordered) method.
+ FlatMapUnordered<St, U, F>(
+ FlattenUnordered<Map<St, F>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option<usize>, f: F| FlattenUnordered::new(Map::new(x, f), limit)]
+ where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U
+);
+
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
mod for_each_concurrent;
#[cfg(not(futures_no_atomic_cas))]
#[cfg(feature = "alloc")]
@@ -390,9 +409,9 @@ pub trait StreamExt: Stream {
/// use futures::stream::{self, StreamExt};
///
/// let stream = stream::iter(1..=10);
- /// let evens = stream.filter(|x| future::ready(x % 2 == 0));
+ /// let events = stream.filter(|x| future::ready(x % 2 == 0));
///
- /// assert_eq!(vec![2, 4, 6, 8, 10], evens.collect::<Vec<_>>().await);
+ /// assert_eq!(vec![2, 4, 6, 8, 10], events.collect::<Vec<_>>().await);
/// # });
/// ```
fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F>
@@ -422,11 +441,11 @@ pub trait StreamExt: Stream {
/// use futures::stream::{self, StreamExt};
///
/// let stream = stream::iter(1..=10);
- /// let evens = stream.filter_map(|x| async move {
+ /// let events = stream.filter_map(|x| async move {
/// if x % 2 == 0 { Some(x + 1) } else { None }
/// });
///
- /// assert_eq!(vec![3, 5, 7, 9, 11], evens.collect::<Vec<_>>().await);
+ /// assert_eq!(vec![3, 5, 7, 9, 11], events.collect::<Vec<_>>().await);
/// # });
/// ```
fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F>
@@ -754,13 +773,64 @@ pub trait StreamExt: Stream {
assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self))
}
+ /// Flattens a stream of streams into just one continuous stream. Polls
+ /// inner streams produced by the base stream concurrently.
+ ///
+ /// The only argument is an optional limit on the number of concurrently
+ /// polled streams. If this limit is not `None`, no more than `limit` streams
+ /// will be polled at the same time. The `limit` argument is of type
+ /// `Into<Option<usize>>`, and so can be provided as either `None`,
+ /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
+ /// no limit at all, and will have the same result as passing in `None`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::mpsc;
+ /// use futures::stream::StreamExt;
+ /// use std::thread;
+ ///
+ /// let (tx1, rx1) = mpsc::unbounded();
+ /// let (tx2, rx2) = mpsc::unbounded();
+ /// let (tx3, rx3) = mpsc::unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// tx1.unbounded_send(1).unwrap();
+ /// tx1.unbounded_send(2).unwrap();
+ /// });
+ /// thread::spawn(move || {
+ /// tx2.unbounded_send(3).unwrap();
+ /// tx2.unbounded_send(4).unwrap();
+ /// });
+ /// thread::spawn(move || {
+ /// tx3.unbounded_send(rx1).unwrap();
+ /// tx3.unbounded_send(rx2).unwrap();
+ /// });
+ ///
+ /// let mut output = rx3.flatten_unordered(None).collect::<Vec<i32>>().await;
+ /// output.sort();
+ ///
+ /// assert_eq!(output, vec![1, 2, 3, 4]);
+ /// # });
+ /// ```
+ #[cfg(not(futures_no_atomic_cas))]
+ #[cfg(feature = "alloc")]
+ fn flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self>
+ where
+ Self::Item: Stream + Unpin,
+ Self: Sized,
+ {
+ assert_stream::<<Self::Item as Stream>::Item, _>(FlattenUnordered::new(self, limit.into()))
+ }
+
/// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s.
///
/// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead,
/// you would have to chain combinators like `.map(f).flatten()` while this
/// combinator provides ability to write `.flat_map(f)` instead of chaining.
///
- /// The provided closure which produce inner streams is executed over all elements
+ /// The provided closure which produces inner streams is executed over all elements
/// of stream as last inner stream is terminated and next stream item is available.
///
/// Note that this function consumes the stream passed into it and returns a
@@ -788,6 +858,59 @@ pub trait StreamExt: Stream {
assert_stream::<U::Item, _>(FlatMap::new(self, f))
}
+ /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s
+ /// and polls them concurrently, yielding items in any order, as they made
+ /// available.
+ ///
+ /// [`StreamExt::map`] is very useful, but if it produces `Stream`s
+ /// instead, and you need to poll all of them concurrently, you would
+ /// have to use something like `for_each_concurrent` and merge values
+ /// by hand. This combinator provides ability to collect all values
+ /// from concurrently polled streams into one stream.
+ ///
+ /// The first argument is an optional limit on the number of concurrently
+ /// polled streams. If this limit is not `None`, no more than `limit` streams
+ /// will be polled at the same time. The `limit` argument is of type
+ /// `Into<Option<usize>>`, and so can be provided as either `None`,
+ /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
+ /// no limit at all, and will have the same result as passing in `None`.
+ ///
+ /// The provided closure which produces inner streams is executed over
+ /// all elements of stream as next stream item is available and limit
+ /// of concurrently processed streams isn't exceeded.
+ ///
+ /// Note that this function consumes the stream passed into it and
+ /// returns a wrapped version of it.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::stream::{self, StreamExt};
+ ///
+ /// let stream = stream::iter(1..5);
+ /// let stream = stream.flat_map_unordered(1, |x| stream::iter(vec![x; x]));
+ /// let mut values = stream.collect::<Vec<_>>().await;
+ /// values.sort();
+ ///
+ /// assert_eq!(vec![1usize, 2, 2, 3, 3, 3, 4, 4, 4, 4], values);
+ /// # });
+ /// ```
+ #[cfg(not(futures_no_atomic_cas))]
+ #[cfg(feature = "alloc")]
+ fn flat_map_unordered<U, F>(
+ self,
+ limit: impl Into<Option<usize>>,
+ f: F,
+ ) -> FlatMapUnordered<Self, U, F>
+ where
+ U: Stream + Unpin,
+ F: FnMut(Self::Item) -> U,
+ Self: Sized,
+ {
+ assert_stream::<U::Item, _>(FlatMapUnordered::new(self, limit.into(), f))
+ }
+
/// Combinator similar to [`StreamExt::fold`] that holds internal state
/// and produces a new stream.
///
@@ -1397,8 +1520,7 @@ pub trait StreamExt: Stream {
/// be immediately returned.
///
/// If the underlying stream ended and only a partial vector was created,
- /// it'll be returned. Additionally if an error happens from the underlying
- /// stream then the currently buffered items will be yielded.
+ /// it will be returned.
///
/// This method is only available when the `std` or `alloc` feature of this
/// library is activated, and it is activated by default.
@@ -1558,6 +1680,8 @@ pub trait StreamExt: Stream {
/// assert_eq!(total, 6);
/// # });
/// ```
+ ///
+ /// [`select!`]: crate::select
fn select_next_some(&mut self) -> SelectNextSome<'_, Self>
where
Self: Unpin + FusedStream,
diff --git a/vendor/futures-util/src/stream/stream/peek.rs b/vendor/futures-util/src/stream/stream/peek.rs
index c72dfc366..ea3d6243f 100644
--- a/vendor/futures-util/src/stream/stream/peek.rs
+++ b/vendor/futures-util/src/stream/stream/peek.rs
@@ -204,7 +204,7 @@ impl<S: Stream> Stream for Peekable<S> {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let peek_len = if self.peeked.is_some() { 1 } else { 0 };
+ let peek_len = usize::from(self.peeked.is_some());
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(peek_len);
let upper = match upper {
diff --git a/vendor/futures-util/src/stream/stream/ready_chunks.rs b/vendor/futures-util/src/stream/stream/ready_chunks.rs
index 5ebc9582d..192054c4a 100644
--- a/vendor/futures-util/src/stream/stream/ready_chunks.rs
+++ b/vendor/futures-util/src/stream/stream/ready_chunks.rs
@@ -1,6 +1,5 @@
-use crate::stream::Fuse;
+use crate::stream::{Fuse, StreamExt};
use alloc::vec::Vec;
-use core::mem;
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
@@ -15,23 +14,15 @@ pin_project! {
pub struct ReadyChunks<St: Stream> {
#[pin]
stream: Fuse<St>,
- items: Vec<St::Item>,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
}
}
-impl<St: Stream> ReadyChunks<St>
-where
- St: Stream,
-{
+impl<St: Stream> ReadyChunks<St> {
pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);
- Self {
- stream: super::Fuse::new(stream),
- items: Vec::with_capacity(capacity),
- cap: capacity,
- }
+ Self { stream: stream.fuse(), cap: capacity }
}
delegate_access_inner!(stream, St, (.));
@@ -43,40 +34,33 @@ impl<St: Stream> Stream for ReadyChunks<St> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
+ let mut items: Vec<St::Item> = Vec::new();
+
loop {
match this.stream.as_mut().poll_next(cx) {
// Flush all collected data if underlying stream doesn't contain
// more ready values
Poll::Pending => {
- return if this.items.is_empty() {
- Poll::Pending
- } else {
- Poll::Ready(Some(mem::replace(this.items, Vec::with_capacity(*this.cap))))
- }
+ return if items.is_empty() { Poll::Pending } else { Poll::Ready(Some(items)) }
}
// Push the ready item into the buffer and check whether it is full.
// If so, replace our buffer with a new and empty one and return
// the full one.
Poll::Ready(Some(item)) => {
- this.items.push(item);
- if this.items.len() >= *this.cap {
- return Poll::Ready(Some(mem::replace(
- this.items,
- Vec::with_capacity(*this.cap),
- )));
+ if items.is_empty() {
+ items.reserve(*this.cap);
+ }
+ items.push(item);
+ if items.len() >= *this.cap {
+ return Poll::Ready(Some(items));
}
}
// Since the underlying stream ran out of values, return what we
// have buffered, if we have anything.
Poll::Ready(None) => {
- let last = if this.items.is_empty() {
- None
- } else {
- let full_buf = mem::replace(this.items, Vec::new());
- Some(full_buf)
- };
+ let last = if items.is_empty() { None } else { Some(items) };
return Poll::Ready(last);
}
@@ -85,20 +69,15 @@ impl<St: Stream> Stream for ReadyChunks<St> {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let chunk_len = if self.items.is_empty() { 0 } else { 1 };
let (lower, upper) = self.stream.size_hint();
- let lower = lower.saturating_add(chunk_len);
- let upper = match upper {
- Some(x) => x.checked_add(chunk_len),
- None => None,
- };
+ let lower = lower / self.cap;
(lower, upper)
}
}
-impl<St: FusedStream> FusedStream for ReadyChunks<St> {
+impl<St: Stream> FusedStream for ReadyChunks<St> {
fn is_terminated(&self) -> bool {
- self.stream.is_terminated() && self.items.is_empty()
+ self.stream.is_terminated()
}
}
diff --git a/vendor/futures-util/src/stream/stream/skip_while.rs b/vendor/futures-util/src/stream/stream/skip_while.rs
index 50a21a21a..dabd5eefa 100644
--- a/vendor/futures-util/src/stream/stream/skip_while.rs
+++ b/vendor/futures-util/src/stream/stream/skip_while.rs
@@ -99,7 +99,7 @@ where
if self.done_skipping {
self.stream.size_hint()
} else {
- let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending_item.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/vendor/futures-util/src/stream/stream/split.rs b/vendor/futures-util/src/stream/stream/split.rs
index 3a72fee30..e2034e0c2 100644
--- a/vendor/futures-util/src/stream/stream/split.rs
+++ b/vendor/futures-util/src/stream/stream/split.rs
@@ -35,7 +35,7 @@ impl<S: Stream> Stream for SplitStream<S> {
}
}
-#[allow(bad_style)]
+#[allow(non_snake_case)]
fn SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item> {
SplitSink { lock, slot: None }
}
diff --git a/vendor/futures-util/src/stream/stream/take.rs b/vendor/futures-util/src/stream/stream/take.rs
index b1c728e33..29d6c39ee 100644
--- a/vendor/futures-util/src/stream/stream/take.rs
+++ b/vendor/futures-util/src/stream/stream/take.rs
@@ -54,11 +54,11 @@ where
let (lower, upper) = self.stream.size_hint();
- let lower = cmp::min(lower, self.remaining as usize);
+ let lower = cmp::min(lower, self.remaining);
let upper = match upper {
- Some(x) if x < self.remaining as usize => Some(x),
- _ => Some(self.remaining as usize),
+ Some(x) if x < self.remaining => Some(x),
+ _ => Some(self.remaining),
};
(lower, upper)
diff --git a/vendor/futures-util/src/stream/stream/take_while.rs b/vendor/futures-util/src/stream/stream/take_while.rs
index 01b27654b..925694301 100644
--- a/vendor/futures-util/src/stream/stream/take_while.rs
+++ b/vendor/futures-util/src/stream/stream/take_while.rs
@@ -91,7 +91,7 @@ where
return (0, Some(0));
}
- let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending_item.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/vendor/futures-util/src/stream/stream/then.rs b/vendor/futures-util/src/stream/stream/then.rs
index d4531d4b9..9192c0b0c 100644
--- a/vendor/futures-util/src/stream/stream/then.rs
+++ b/vendor/futures-util/src/stream/stream/then.rs
@@ -78,7 +78,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let future_len = if self.future.is_some() { 1 } else { 0 };
+ let future_len = usize::from(self.future.is_some());
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(future_len);
let upper = match upper {
diff --git a/vendor/futures-util/src/stream/stream/unzip.rs b/vendor/futures-util/src/stream/stream/unzip.rs
index 15f22e80b..a88cf0326 100644
--- a/vendor/futures-util/src/stream/stream/unzip.rs
+++ b/vendor/futures-util/src/stream/stream/unzip.rs
@@ -21,7 +21,7 @@ pin_project! {
impl<St: Stream, FromA: Default, FromB: Default> Unzip<St, FromA, FromB> {
fn finish(self: Pin<&mut Self>) -> (FromA, FromB) {
let this = self.project();
- (mem::replace(this.left, Default::default()), mem::replace(this.right, Default::default()))
+ (mem::take(this.left), mem::take(this.right))
}
pub(super) fn new(stream: St) -> Self {
diff --git a/vendor/futures-util/src/stream/stream/zip.rs b/vendor/futures-util/src/stream/stream/zip.rs
index 360a8b63b..25a47e96b 100644
--- a/vendor/futures-util/src/stream/stream/zip.rs
+++ b/vendor/futures-util/src/stream/stream/zip.rs
@@ -102,8 +102,8 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let queued1_len = if self.queued1.is_some() { 1 } else { 0 };
- let queued2_len = if self.queued2.is_some() { 1 } else { 0 };
+ let queued1_len = usize::from(self.queued1.is_some());
+ let queued2_len = usize::from(self.queued2.is_some());
let (stream1_lower, stream1_upper) = self.stream1.size_hint();
let (stream2_lower, stream2_upper) = self.stream2.size_hint();
diff --git a/vendor/futures-util/src/stream/try_stream/and_then.rs b/vendor/futures-util/src/stream/try_stream/and_then.rs
index a7b50db0b..2f8b6f258 100644
--- a/vendor/futures-util/src/stream/try_stream/and_then.rs
+++ b/vendor/futures-util/src/stream/try_stream/and_then.rs
@@ -71,7 +71,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let future_len = if self.future.is_some() { 1 } else { 0 };
+ let future_len = usize::from(self.future.is_some());
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(future_len);
let upper = match upper {
diff --git a/vendor/futures-util/src/stream/try_stream/into_async_read.rs b/vendor/futures-util/src/stream/try_stream/into_async_read.rs
index 914b277a0..ffbfc7eae 100644
--- a/vendor/futures-util/src/stream/try_stream/into_async_read.rs
+++ b/vendor/futures-util/src/stream/try_stream/into_async_read.rs
@@ -1,30 +1,26 @@
-use crate::stream::TryStreamExt;
use core::pin::Pin;
use futures_core::ready;
use futures_core::stream::TryStream;
use futures_core::task::{Context, Poll};
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
+use pin_project_lite::pin_project;
use std::cmp;
use std::io::{Error, Result};
-/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
-#[derive(Debug)]
-#[must_use = "readers do nothing unless polled"]
-#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
-pub struct IntoAsyncRead<St>
-where
- St: TryStream<Error = Error> + Unpin,
- St::Ok: AsRef<[u8]>,
-{
- stream: St,
- state: ReadState<St::Ok>,
-}
-
-impl<St> Unpin for IntoAsyncRead<St>
-where
- St: TryStream<Error = Error> + Unpin,
- St::Ok: AsRef<[u8]>,
-{
+pin_project! {
+ /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method.
+ #[derive(Debug)]
+ #[must_use = "readers do nothing unless polled"]
+ #[cfg_attr(docsrs, doc(cfg(feature = "io")))]
+ pub struct IntoAsyncRead<St>
+ where
+ St: TryStream<Error = Error>,
+ St::Ok: AsRef<[u8]>,
+ {
+ #[pin]
+ stream: St,
+ state: ReadState<St::Ok>,
+ }
}
#[derive(Debug)]
@@ -36,7 +32,7 @@ enum ReadState<T: AsRef<[u8]>> {
impl<St> IntoAsyncRead<St>
where
- St: TryStream<Error = Error> + Unpin,
+ St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
pub(super) fn new(stream: St) -> Self {
@@ -46,16 +42,18 @@ where
impl<St> AsyncRead for IntoAsyncRead<St>
where
- St: TryStream<Error = Error> + Unpin,
+ St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
fn poll_read(
- mut self: Pin<&mut Self>,
+ self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<Result<usize>> {
+ let mut this = self.project();
+
loop {
- match &mut self.state {
+ match this.state {
ReadState::Ready { chunk, chunk_start } => {
let chunk = chunk.as_ref();
let len = cmp::min(buf.len(), chunk.len() - *chunk_start);
@@ -64,23 +62,23 @@ where
*chunk_start += len;
if chunk.len() == *chunk_start {
- self.state = ReadState::PendingChunk;
+ *this.state = ReadState::PendingChunk;
}
return Poll::Ready(Ok(len));
}
- ReadState::PendingChunk => match ready!(self.stream.try_poll_next_unpin(cx)) {
+ ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) {
Some(Ok(chunk)) => {
if !chunk.as_ref().is_empty() {
- self.state = ReadState::Ready { chunk, chunk_start: 0 };
+ *this.state = ReadState::Ready { chunk, chunk_start: 0 };
}
}
Some(Err(err)) => {
- self.state = ReadState::Eof;
+ *this.state = ReadState::Eof;
return Poll::Ready(Err(err));
}
None => {
- self.state = ReadState::Eof;
+ *this.state = ReadState::Eof;
return Poll::Ready(Ok(0));
}
},
@@ -94,51 +92,52 @@ where
impl<St> AsyncWrite for IntoAsyncRead<St>
where
- St: TryStream<Error = Error> + AsyncWrite + Unpin,
+ St: TryStream<Error = Error> + AsyncWrite,
St::Ok: AsRef<[u8]>,
{
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8],
- ) -> Poll<Result<usize>> {
- Pin::new(&mut self.stream).poll_write(cx, buf)
+ fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
+ let this = self.project();
+ this.stream.poll_write(cx, buf)
}
- fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
- Pin::new(&mut self.stream).poll_flush(cx)
+ fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ let this = self.project();
+ this.stream.poll_flush(cx)
}
- fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
- Pin::new(&mut self.stream).poll_close(cx)
+ fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
+ let this = self.project();
+ this.stream.poll_close(cx)
}
}
impl<St> AsyncBufRead for IntoAsyncRead<St>
where
- St: TryStream<Error = Error> + Unpin,
+ St: TryStream<Error = Error>,
St::Ok: AsRef<[u8]>,
{
- fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
- while let ReadState::PendingChunk = self.state {
- match ready!(self.stream.try_poll_next_unpin(cx)) {
+ fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
+ let mut this = self.project();
+
+ while let ReadState::PendingChunk = this.state {
+ match ready!(this.stream.as_mut().try_poll_next(cx)) {
Some(Ok(chunk)) => {
if !chunk.as_ref().is_empty() {
- self.state = ReadState::Ready { chunk, chunk_start: 0 };
+ *this.state = ReadState::Ready { chunk, chunk_start: 0 };
}
}
Some(Err(err)) => {
- self.state = ReadState::Eof;
+ *this.state = ReadState::Eof;
return Poll::Ready(Err(err));
}
None => {
- self.state = ReadState::Eof;
+ *this.state = ReadState::Eof;
return Poll::Ready(Ok(&[]));
}
}
}
- if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state {
+ if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state {
let chunk = chunk.as_ref();
return Poll::Ready(Ok(&chunk[chunk_start..]));
}
@@ -147,16 +146,18 @@ where
Poll::Ready(Ok(&[]))
}
- fn consume(mut self: Pin<&mut Self>, amount: usize) {
+ fn consume(self: Pin<&mut Self>, amount: usize) {
+ let this = self.project();
+
// https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295
if amount == 0 {
return;
}
- if let ReadState::Ready { chunk, chunk_start } = &mut self.state {
+ if let ReadState::Ready { chunk, chunk_start } = this.state {
*chunk_start += amount;
debug_assert!(*chunk_start <= chunk.as_ref().len());
if *chunk_start >= chunk.as_ref().len() {
- self.state = ReadState::PendingChunk;
+ *this.state = ReadState::PendingChunk;
}
} else {
debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk");
diff --git a/vendor/futures-util/src/stream/try_stream/mod.rs b/vendor/futures-util/src/stream/try_stream/mod.rs
index 455ddca3f..414a40dbe 100644
--- a/vendor/futures-util/src/stream/try_stream/mod.rs
+++ b/vendor/futures-util/src/stream/try_stream/mod.rs
@@ -15,6 +15,7 @@ use crate::stream::{Inspect, Map};
#[cfg(feature = "alloc")]
use alloc::vec::Vec;
use core::pin::Pin;
+
use futures_core::{
future::{Future, TryFuture},
stream::TryStream,
@@ -88,6 +89,14 @@ mod try_flatten;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_flatten::TryFlatten;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+mod try_flatten_unordered;
+#[cfg(not(futures_no_atomic_cas))]
+#[cfg(feature = "alloc")]
+#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
+pub use self::try_flatten_unordered::TryFlattenUnordered;
+
mod try_collect;
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
pub use self::try_collect::TryCollect;
@@ -711,6 +720,63 @@ pub trait TryStreamExt: TryStream {
assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f))
}
+ /// Flattens a stream of streams into just one continuous stream. Produced streams
+ /// will be polled concurrently and any errors will be passed through without looking at them.
+ /// If the underlying base stream returns an error, it will be **immediately** propagated.
+ ///
+ /// The only argument is an optional limit on the number of concurrently
+ /// polled streams. If this limit is not `None`, no more than `limit` streams
+ /// will be polled at the same time. The `limit` argument is of type
+ /// `Into<Option<usize>>`, and so can be provided as either `None`,
+ /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as
+ /// no limit at all, and will have the same result as passing in `None`.
+ ///
+ /// # Examples
+ ///
+ /// ```
+ /// # futures::executor::block_on(async {
+ /// use futures::channel::mpsc;
+ /// use futures::stream::{StreamExt, TryStreamExt};
+ /// use std::thread;
+ ///
+ /// let (tx1, rx1) = mpsc::unbounded();
+ /// let (tx2, rx2) = mpsc::unbounded();
+ /// let (tx3, rx3) = mpsc::unbounded();
+ ///
+ /// thread::spawn(move || {
+ /// tx1.unbounded_send(Ok(1)).unwrap();
+ /// });
+ /// thread::spawn(move || {
+ /// tx2.unbounded_send(Ok(2)).unwrap();
+ /// tx2.unbounded_send(Err(3)).unwrap();
+ /// tx2.unbounded_send(Ok(4)).unwrap();
+ /// });
+ /// thread::spawn(move || {
+ /// tx3.unbounded_send(Ok(rx1)).unwrap();
+ /// tx3.unbounded_send(Ok(rx2)).unwrap();
+ /// tx3.unbounded_send(Err(5)).unwrap();
+ /// });
+ ///
+ /// let stream = rx3.try_flatten_unordered(None);
+ /// let mut values: Vec<_> = stream.collect().await;
+ /// values.sort();
+ ///
+ /// assert_eq!(values, vec![Ok(1), Ok(2), Ok(4), Err(3), Err(5)]);
+ /// # });
+ /// ```
+ #[cfg(not(futures_no_atomic_cas))]
+ #[cfg(feature = "alloc")]
+ fn try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self>
+ where
+ Self::Ok: TryStream + Unpin,
+ <Self::Ok as TryStream>::Error: From<Self::Error>,
+ Self: Sized,
+ {
+ assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>(
+ TryFlattenUnordered::new(self, limit),
+ )
+ }
+
/// Flattens a stream of streams into just one continuous stream.
///
/// If this stream's elements are themselves streams then this combinator
@@ -736,17 +802,21 @@ pub trait TryStreamExt: TryStream {
/// thread::spawn(move || {
/// tx2.unbounded_send(Ok(2)).unwrap();
/// tx2.unbounded_send(Err(3)).unwrap();
+ /// tx2.unbounded_send(Ok(4)).unwrap();
/// });
/// thread::spawn(move || {
/// tx3.unbounded_send(Ok(rx1)).unwrap();
/// tx3.unbounded_send(Ok(rx2)).unwrap();
- /// tx3.unbounded_send(Err(4)).unwrap();
+ /// tx3.unbounded_send(Err(5)).unwrap();
/// });
///
/// let mut stream = rx3.try_flatten();
/// assert_eq!(stream.next().await, Some(Ok(1)));
/// assert_eq!(stream.next().await, Some(Ok(2)));
/// assert_eq!(stream.next().await, Some(Err(3)));
+ /// assert_eq!(stream.next().await, Some(Ok(4)));
+ /// assert_eq!(stream.next().await, Some(Err(5)));
+ /// assert_eq!(stream.next().await, None);
/// # });
/// ```
fn try_flatten(self) -> TryFlatten<Self>
@@ -914,7 +984,7 @@ pub trait TryStreamExt: TryStream {
/// that matches the stream's `Error` type.
///
/// This adaptor will buffer up to `n` futures and then return their
- /// outputs in the order. If the underlying stream returns an error, it will
+ /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will
/// be immediately propagated.
///
/// The returned stream will be a stream of results, each containing either
@@ -1001,6 +1071,7 @@ pub trait TryStreamExt: TryStream {
/// Wraps a [`TryStream`] into a stream compatible with libraries using
/// futures 0.1 `Stream`. Requires the `compat` feature to be enabled.
/// ```
+ /// # if cfg!(miri) { return; } // Miri does not support epoll
/// use futures::future::{FutureExt, TryFutureExt};
/// # let (tx, rx) = futures::channel::oneshot::channel();
///
@@ -1026,12 +1097,7 @@ pub trait TryStreamExt: TryStream {
Compat::new(self)
}
- /// Adapter that converts this stream into an [`AsyncRead`](crate::io::AsyncRead).
- ///
- /// Note that because `into_async_read` moves the stream, the [`Stream`](futures_core::stream::Stream) type must be
- /// [`Unpin`]. If you want to use `into_async_read` with a [`!Unpin`](Unpin) stream, you'll
- /// first have to pin the stream. This can be done by boxing the stream using [`Box::pin`]
- /// or pinning it to the stack using the `pin_mut!` macro from the `pin_utils` crate.
+ /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead).
///
/// This method is only available when the `std` feature of this
/// library is activated, and it is activated by default.
@@ -1043,12 +1109,12 @@ pub trait TryStreamExt: TryStream {
/// use futures::stream::{self, TryStreamExt};
/// use futures::io::AsyncReadExt;
///
- /// let stream = stream::iter(vec![Ok(vec![1, 2, 3, 4, 5])]);
+ /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]);
/// let mut reader = stream.into_async_read();
- /// let mut buf = Vec::new();
///
- /// assert!(reader.read_to_end(&mut buf).await.is_ok());
- /// assert_eq!(buf, &[1, 2, 3, 4, 5]);
+ /// let mut buf = Vec::new();
+ /// reader.read_to_end(&mut buf).await.unwrap();
+ /// assert_eq!(buf, [1, 2, 3, 4, 5]);
/// # })
/// ```
#[cfg(feature = "io")]
@@ -1056,7 +1122,7 @@ pub trait TryStreamExt: TryStream {
#[cfg(feature = "std")]
fn into_async_read(self) -> IntoAsyncRead<Self>
where
- Self: Sized + TryStreamExt<Error = std::io::Error> + Unpin,
+ Self: Sized + TryStreamExt<Error = std::io::Error>,
Self::Ok: AsRef<[u8]>,
{
crate::io::assert_read(IntoAsyncRead::new(self))
diff --git a/vendor/futures-util/src/stream/try_stream/or_else.rs b/vendor/futures-util/src/stream/try_stream/or_else.rs
index cb69e8132..53aceb8e6 100644
--- a/vendor/futures-util/src/stream/try_stream/or_else.rs
+++ b/vendor/futures-util/src/stream/try_stream/or_else.rs
@@ -75,7 +75,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let future_len = if self.future.is_some() { 1 } else { 0 };
+ let future_len = usize::from(self.future.is_some());
let (lower, upper) = self.stream.size_hint();
let lower = lower.saturating_add(future_len);
let upper = match upper {
diff --git a/vendor/futures-util/src/stream/try_stream/try_buffered.rs b/vendor/futures-util/src/stream/try_stream/try_buffered.rs
index 45bd3f8c7..9f48e5c0a 100644
--- a/vendor/futures-util/src/stream/try_stream/try_buffered.rs
+++ b/vendor/futures-util/src/stream/try_stream/try_buffered.rs
@@ -54,7 +54,7 @@ where
// our queue of futures. Propagate errors from the stream immediately.
while this.in_progress_queue.len() < *this.max {
match this.stream.as_mut().poll_next(cx)? {
- Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()),
+ Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut.into_future()),
Poll::Ready(None) | Poll::Pending => break,
}
}
diff --git a/vendor/futures-util/src/stream/try_stream/try_chunks.rs b/vendor/futures-util/src/stream/try_stream/try_chunks.rs
index 07d4425a8..ec53f4bd1 100644
--- a/vendor/futures-util/src/stream/try_stream/try_chunks.rs
+++ b/vendor/futures-util/src/stream/try_stream/try_chunks.rs
@@ -41,9 +41,10 @@ impl<St: TryStream> TryChunks<St> {
delegate_access_inner!(stream, St, (. .));
}
+type TryChunksStreamError<St> = TryChunksError<<St as TryStream>::Ok, <St as TryStream>::Error>;
+
impl<St: TryStream> Stream for TryChunks<St> {
- #[allow(clippy::type_complexity)]
- type Item = Result<Vec<St::Ok>, TryChunksError<St::Ok, St::Error>>;
+ type Item = Result<Vec<St::Ok>, TryChunksStreamError<St>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
@@ -70,7 +71,7 @@ impl<St: TryStream> Stream for TryChunks<St> {
let last = if this.items.is_empty() {
None
} else {
- let full_buf = mem::replace(this.items, Vec::new());
+ let full_buf = mem::take(this.items);
Some(full_buf)
};
@@ -81,9 +82,9 @@ impl<St: TryStream> Stream for TryChunks<St> {
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let chunk_len = if self.items.is_empty() { 0 } else { 1 };
+ let chunk_len = usize::from(!self.items.is_empty());
let (lower, upper) = self.stream.size_hint();
- let lower = lower.saturating_add(chunk_len);
+ let lower = (lower / self.cap).saturating_add(chunk_len);
let upper = match upper {
Some(x) => x.checked_add(chunk_len),
None => None,
diff --git a/vendor/futures-util/src/stream/try_stream/try_collect.rs b/vendor/futures-util/src/stream/try_stream/try_collect.rs
index 5d3b3d766..3e5963f03 100644
--- a/vendor/futures-util/src/stream/try_stream/try_collect.rs
+++ b/vendor/futures-util/src/stream/try_stream/try_collect.rs
@@ -45,7 +45,7 @@ where
Poll::Ready(Ok(loop {
match ready!(this.stream.as_mut().try_poll_next(cx)?) {
Some(x) => this.items.extend(Some(x)),
- None => break mem::replace(this.items, Default::default()),
+ None => break mem::take(this.items),
}
}))
}
diff --git a/vendor/futures-util/src/stream/try_stream/try_filter.rs b/vendor/futures-util/src/stream/try_stream/try_filter.rs
index 61e6105c3..11d58243f 100644
--- a/vendor/futures-util/src/stream/try_stream/try_filter.rs
+++ b/vendor/futures-util/src/stream/try_stream/try_filter.rs
@@ -90,7 +90,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let pending_len = if self.pending_fut.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending_fut.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/vendor/futures-util/src/stream/try_stream/try_filter_map.rs b/vendor/futures-util/src/stream/try_stream/try_filter_map.rs
index bb1b5b9db..ed1201732 100644
--- a/vendor/futures-util/src/stream/try_stream/try_filter_map.rs
+++ b/vendor/futures-util/src/stream/try_stream/try_filter_map.rs
@@ -84,7 +84,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let pending_len = if self.pending.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs
new file mode 100644
index 000000000..a74dfc451
--- /dev/null
+++ b/vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs
@@ -0,0 +1,176 @@
+use core::marker::PhantomData;
+use core::pin::Pin;
+
+use futures_core::ready;
+use futures_core::stream::{FusedStream, Stream, TryStream};
+use futures_core::task::{Context, Poll};
+#[cfg(feature = "sink")]
+use futures_sink::Sink;
+
+use pin_project_lite::pin_project;
+
+use crate::future::Either;
+use crate::stream::stream::flatten_unordered::{
+ FlattenUnorderedWithFlowController, FlowController, FlowStep,
+};
+use crate::stream::IntoStream;
+use crate::TryStreamExt;
+
+delegate_all!(
+ /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method.
+ TryFlattenUnordered<St>(
+ FlattenUnorderedWithFlowController<NestedTryStreamIntoEitherTryStream<St>, PropagateBaseStreamError<St>>
+ ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)]
+ + New[
+ |stream: St, limit: impl Into<Option<usize>>|
+ FlattenUnorderedWithFlowController::new(
+ NestedTryStreamIntoEitherTryStream::new(stream),
+ limit.into()
+ )
+ ]
+ where
+ St: TryStream,
+ St::Ok: TryStream,
+ St::Ok: Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>
+);
+
+pin_project! {
+ /// Emits either successful streams or single-item streams containing the underlying errors.
+ /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`.
+ #[derive(Debug)]
+ #[must_use = "streams do nothing unless polled"]
+ pub struct NestedTryStreamIntoEitherTryStream<St>
+ where
+ St: TryStream,
+ St::Ok: TryStream,
+ St::Ok: Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>
+ {
+ #[pin]
+ stream: St
+ }
+}
+
+impl<St> NestedTryStreamIntoEitherTryStream<St>
+where
+ St: TryStream,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>,
+{
+ fn new(stream: St) -> Self {
+ Self { stream }
+ }
+
+ delegate_access_inner!(stream, St, ());
+}
+
+/// Emits a single item immediately, then stream will be terminated.
+#[derive(Debug, Clone)]
+pub struct Single<T>(Option<T>);
+
+impl<T> Single<T> {
+ /// Constructs new `Single` with the given value.
+ fn new(val: T) -> Self {
+ Self(Some(val))
+ }
+
+ /// Attempts to take inner item immediately. Will always succeed if the stream isn't terminated.
+ fn next_immediate(&mut self) -> Option<T> {
+ self.0.take()
+ }
+}
+
+impl<T> Unpin for Single<T> {}
+
+impl<T> Stream for Single<T> {
+ type Item = T;
+
+ fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ Poll::Ready(self.0.take())
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.0.as_ref().map_or((0, Some(0)), |_| (1, Some(1)))
+ }
+}
+
+/// Immediately propagates errors occurred in the base stream.
+#[derive(Debug, Clone, Copy)]
+pub struct PropagateBaseStreamError<St>(PhantomData<St>);
+
+type BaseStreamItem<St> = <NestedTryStreamIntoEitherTryStream<St> as Stream>::Item;
+type InnerStreamItem<St> = <BaseStreamItem<St> as Stream>::Item;
+
+impl<St> FlowController<BaseStreamItem<St>, InnerStreamItem<St>> for PropagateBaseStreamError<St>
+where
+ St: TryStream,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>,
+{
+ fn next_step(item: BaseStreamItem<St>) -> FlowStep<BaseStreamItem<St>, InnerStreamItem<St>> {
+ match item {
+ // A new successful inner stream received
+ st @ Either::Left(_) => FlowStep::Continue(st),
+ // An error encountered
+ Either::Right(mut err) => FlowStep::Return(err.next_immediate().unwrap()),
+ }
+ }
+}
+
+type SingleStreamResult<St> = Single<Result<<St as TryStream>::Ok, <St as TryStream>::Error>>;
+
+impl<St> Stream for NestedTryStreamIntoEitherTryStream<St>
+where
+ St: TryStream,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>,
+{
+ // Item is either an inner stream or a stream containing a single error.
+ // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s.
+ type Item = Either<IntoStream<St::Ok>, SingleStreamResult<St::Ok>>;
+
+ fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let item = ready!(self.project().stream.try_poll_next(cx));
+
+ let out = match item {
+ Some(res) => match res {
+ // Emit successful inner stream as is
+ Ok(stream) => Either::Left(stream.into_stream()),
+ // Wrap an error into a stream containing a single item
+ err @ Err(_) => {
+ let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into);
+
+ Either::Right(Single::new(res))
+ }
+ },
+ None => return Poll::Ready(None),
+ };
+
+ Poll::Ready(Some(out))
+ }
+}
+
+impl<St> FusedStream for NestedTryStreamIntoEitherTryStream<St>
+where
+ St: TryStream + FusedStream,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<St::Error>,
+{
+ fn is_terminated(&self) -> bool {
+ self.stream.is_terminated()
+ }
+}
+
+// Forwarding impl of Sink from the underlying stream
+#[cfg(feature = "sink")]
+impl<St, Item> Sink<Item> for NestedTryStreamIntoEitherTryStream<St>
+where
+ St: TryStream + Sink<Item>,
+ St::Ok: TryStream + Unpin,
+ <St::Ok as TryStream>::Error: From<<St as TryStream>::Error>,
+{
+ type Error = <St as Sink<Item>>::Error;
+
+ delegate_sink!(stream, Item);
+}
diff --git a/vendor/futures-util/src/stream/try_stream/try_skip_while.rs b/vendor/futures-util/src/stream/try_stream/try_skip_while.rs
index a424b6c5b..52aa2d478 100644
--- a/vendor/futures-util/src/stream/try_stream/try_skip_while.rs
+++ b/vendor/futures-util/src/stream/try_stream/try_skip_while.rs
@@ -87,7 +87,7 @@ where
}
fn size_hint(&self) -> (usize, Option<usize>) {
- let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending_item.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/vendor/futures-util/src/stream/try_stream/try_take_while.rs b/vendor/futures-util/src/stream/try_stream/try_take_while.rs
index 3375960ef..4b5ff1ad3 100644
--- a/vendor/futures-util/src/stream/try_stream/try_take_while.rs
+++ b/vendor/futures-util/src/stream/try_stream/try_take_while.rs
@@ -96,7 +96,7 @@ where
return (0, Some(0));
}
- let pending_len = if self.pending_item.is_some() { 1 } else { 0 };
+ let pending_len = usize::from(self.pending_item.is_some());
let (_, upper) = self.stream.size_hint();
let upper = match upper {
Some(x) => x.checked_add(pending_len),
diff --git a/vendor/futures-util/src/task/spawn.rs b/vendor/futures-util/src/task/spawn.rs
index f8779230e..d9e998530 100644
--- a/vendor/futures-util/src/task/spawn.rs
+++ b/vendor/futures-util/src/task/spawn.rs
@@ -34,6 +34,7 @@ pub trait SpawnExt: Spawn {
/// today. Feel free to use this method in the meantime.
///
/// ```
+ /// # {
/// use futures::executor::ThreadPool;
/// use futures::task::SpawnExt;
///
@@ -41,6 +42,8 @@ pub trait SpawnExt: Spawn {
///
/// let future = async { /* ... */ };
/// executor.spawn(future).unwrap();
+ /// # }
+ /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
/// ```
#[cfg(feature = "alloc")]
fn spawn<Fut>(&self, future: Fut) -> Result<(), SpawnError>
@@ -58,6 +61,7 @@ pub trait SpawnExt: Spawn {
/// resolves to the output of the spawned future.
///
/// ```
+ /// # {
/// use futures::executor::{block_on, ThreadPool};
/// use futures::future;
/// use futures::task::SpawnExt;
@@ -67,6 +71,8 @@ pub trait SpawnExt: Spawn {
/// let future = future::ready(1);
/// let join_handle_fut = executor.spawn_with_handle(future).unwrap();
/// assert_eq!(block_on(join_handle_fut), 1);
+ /// # }
+ /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
/// ```
#[cfg(feature = "channel")]
#[cfg_attr(docsrs, doc(cfg(feature = "channel")))]