diff options
Diffstat (limited to 'vendor/futures-util')
75 files changed, 2103 insertions, 566 deletions
diff --git a/vendor/futures-util/.cargo-checksum.json b/vendor/futures-util/.cargo-checksum.json index 2db230686..0d1186307 100644 --- a/vendor/futures-util/.cargo-checksum.json +++ b/vendor/futures-util/.cargo-checksum.json @@ -1 +1 @@ -{"files":{"Cargo.toml":"fb0e0a9cd20fff0e75bddf0f79a2c06200f5177b338a3bb2b3cd27f3b08eaff7","LICENSE-APACHE":"275c491d6d1160553c32fd6127061d7f9606c3ea25abfad6ca3f6ed088785427","LICENSE-MIT":"6652c868f35dfe5e8ef636810a4e576b9d663f3a17fb0f5613ad73583e1b88fd","README.md":"727f58ddc0ad88244d784b56a090410b39805e256e3415d4ae20cf2ba471d260","benches/futures_unordered.rs":"5eb8280be8d8fb7bd5fb103ce20db10f618f47e180a402105e0d5e9f8c9fe35a","benches_disabled/bilock.rs":"ab8b47fba5cfa5366477ef1034a042efde9b0aff6b77110d1f3a2715ab7508e8","build.rs":"f6e21c09f18cc405bd7048cb7a2958f92d5414b9ca6b301d137e120a84fa020a","no_atomic_cas.rs":"ff8be002b49a5cd9e4ca0db17b1c9e6b98e55f556319eb6b953dd6ff52c397a6","src/abortable.rs":"d88dd2501ed379b3540bd971d367b4629755d6d9f264e7b54ae59eea0ff83623","src/async_await/join_mod.rs":"8f83c0001df867f5eb47a4174bf4a0c0b548f8ff3be3b532e0c759ad981b87da","src/async_await/mod.rs":"3d25c343cc3e789d3f982cdacd6f8ed91511ba656c3923da310700f318f423a4","src/async_await/pending.rs":"7971ec1d5d89ad80390e2a0c51e396257b2e78f1436cce79ea2b55ac2f13b328","src/async_await/poll.rs":"440c19a89fd42b12da09ff48a69523b5a8a5baea0bcd2f860589a0ab996ed781","src/async_await/random.rs":"daf229cd01595d38ef0f6284865fe2f60ed3b8134f7a15c82564b97ff3a5be98","src/async_await/select_mod.rs":"414c7fb7923cfe21116d558bf3cd1a6ae5bef4ed01f9877f0e7cb3e42ee6c79d","src/async_await/stream_select_mod.rs":"9a51338914cbb1502619fed591dfe4fc676919499b9d041898e59f630fe5e7f0","src/compat/compat01as03.rs":"07c83b0820e1b6a5a792043e0b0eb5be70f365e5462c8a1fa5fa6b0f62f9e57d","src/compat/compat03as01.rs":"7cf29e57f8ee14b64123b3d2c16dceced25af5491a5ef81b655b2de2e9587fbe","src/compat/executor.rs":"a0edd7baa2192daa14a5a26bf53bd229defaeac538d2ec771c60992c6dfeb393","src/compat/mod.rs":"6cf3412f6a3f9ee8406118ea75de65468a83febc6ba61bdbad69261f0cfea02e","src/fns.rs":"f8e396128791169098a38a82c3c28aaa6dd5d40718635f7cc30b59b32f7110b8","src/future/abortable.rs":"373ce61c0c7c31718ff572113503bb88f55e3b49ed5d028a3dfafd69070f44c1","src/future/either.rs":"d7fb8728fac6fccbbc14afaa16ebd51c2633dd653277289c089b5c478742b37f","src/future/future/catch_unwind.rs":"08b0ac049cdee28325d378209aa5bb4d91b14a29ddd9c2b0e5c661b61f9cfcfe","src/future/future/flatten.rs":"5bf9846cef8dec5dcc38b992653e11146bc149a0d3efc09b1f8268bd29de0b2b","src/future/future/fuse.rs":"6531a95dc1917b2a5724b35e364faa741143811afc654a45c360111e9807864c","src/future/future/map.rs":"de607c2a4d80d2bddb590781c37328ddd294bb9d5064a9ecb99455244239b597","src/future/future/mod.rs":"ecfac09dcba801cede7c58acfaa76a9ab76d26a3f4c968d66c2a49caa57faefe","src/future/future/remote_handle.rs":"2ae17a409569b32c78e20026a8ecdf667352c2597a4a0a8deefa4761fafcb223","src/future/future/shared.rs":"ebf46b4bf428676bf553015e384f7f41da03558481aaa38deb1e8a896d212dae","src/future/join.rs":"38b55fc7cdbbdaaa525e51f8ce09783dbbcb65eabfd7de9f46610593e0bbef17","src/future/join_all.rs":"294a8e76862f447dea8759f78f7224d885a826c80193ceef2389345221e6e3c0","src/future/lazy.rs":"d161fc4108a97348c1becbbd5ba8fccb7225dcf1d81c097666f5c8b40718251d","src/future/maybe_done.rs":"559e41cb170f9fe7246d2a5b112527a9f9cbca63b8a5a872b3aa9c861f70f307","src/future/mod.rs":"51e018100362f20b071225268f1d81f25c8e9664e94730af199069c2692bf26a","src/future/option.rs":"73daca814800b91b707753dcfe074265372b0077fae2504ea6efddc713453579","src/future/pending.rs":"86598a5d5ade7c0416566deb280150bac34bd4703c997d2c7af572342d8d6d02","src/future/poll_fn.rs":"8e54bf57d60e01d496ae31df35e0b96868f4bda504c024a14f51ab723d67885f","src/future/poll_immediate.rs":"7e199fc102894c9095de17af602a7c8f05d427269aefce5d71cd5136d54659c0","src/future/ready.rs":"c9860ccd8ac529f44f66dee73ca9b9d7f1b1b3e5e9e4dc70c59640c752553d58","src/future/select.rs":"a582b1ed9c1e6cd8dcaa80b5f45e2176ed4a1740fe303f7143e29cab8e0dbc22","src/future/select_all.rs":"179b8168370e2c7105e348fdfbeb965eb746336d9660aa7fbc9185d681ae8c2d","src/future/select_ok.rs":"fed28e1fd368cdd465d297a84ea9436a00757eff4b34e592d94d7747b3bf4996","src/future/try_future/into_future.rs":"d966bde7b06a88443f0efd877e95f91541778c4e713f3f4b66e00ca5d3f352b6","src/future/try_future/mod.rs":"4733167d93e8f728e87f79b7d4dfe66de6afb306735ca76f9f843270286b3f6b","src/future/try_future/try_flatten.rs":"16c02e1780bd312b8b386e41c1d9dd4bcc4e8ef10f26007364f857b3adcc6e99","src/future/try_future/try_flatten_err.rs":"130f3fc3fd95a19f4e4a50e69301106fab02f77d0faf3aac9c473a92b826c2ca","src/future/try_join.rs":"1836931f8ba32da41c6810e6acc0ea2fee75b74b3153e760c4542cb12b220540","src/future/try_join_all.rs":"d4f262e80bb5347597c75b25de3c7784ffb4bd766227d6dc70cdeb77a38f4a5d","src/future/try_maybe_done.rs":"1cce46b2ee43ad51b7c5f9c02bc90a890af32bc549ce99098a2c8813508051e1","src/future/try_select.rs":"2e1b7e0b0cb7343f766fade87269093558db206f7fbe7dddfa3143885e17bac4","src/io/allow_std.rs":"a125959c255fd344399fb0be19218a8ee7d613ce2485d6df9cdbc2ed5d3987df","src/io/buf_reader.rs":"46a1e24046c5bc2ab8f266e3d904281bec3ab4ba6c13d4213a52599b57b8de66","src/io/buf_writer.rs":"d6666b8dde60eefbb7fa69da4a2eea2b34ea0e4a85e21e5ac6e83cc680ea9140","src/io/chain.rs":"12f508fc39c3234a71a0f886505245c5d659aed09c7d874b1bd8ca0a0d456cf3","src/io/close.rs":"9832210a870637198fa58642cdf2779afab71f2e31a9953e663fa6854bd73ac7","src/io/copy.rs":"cb2466dcd7ea8bb1f07d00c03e66ed55abf71fe4be6937adc9f533ef9d99fb2d","src/io/copy_buf.rs":"e9a5f6aac8375e298bddb332f23d8b626d056ce452b58f772a05df7e2cd326cf","src/io/cursor.rs":"612bdb8b4055d26816fb0e4c3e9859b06b3d08c99e4a27ed4946c95e219a29ab","src/io/empty.rs":"6ae40b4bc8fc41572abad2d013285d78d8df445868d41fac77bde508ec9bc1a5","src/io/fill_buf.rs":"4f217fed8eb3f66dbde2371c3fbcfa9420d38ba20da544a0658584e5778aa47d","src/io/flush.rs":"0c9b588dfd9da039dc123ba9448ac31ca21ee3da0a164a21f6c2c182183d43e2","src/io/into_sink.rs":"ab5bdb12bff62672175b69b8c9f5a4bbbea716b9cf89169ed6a723ab43da9df8","src/io/line_writer.rs":"16c151c68d89b7c2ab929c4a782539b1ad512b723eed9b544f50f1ff06f0b661","src/io/lines.rs":"ccfa24e212a610aad0c81042cfa64ada820c4305ba0e911a2c16221d7867468e","src/io/mod.rs":"599416e4d7dd5c6523a87bf778001ce0c3848ee760827af0b69c7b7aafd8a8a0","src/io/read.rs":"4ea675a83cec98a22c9c4731ff980209f0cf67f63c71871cd1deed53c1266345","src/io/read_exact.rs":"d27d5ec082ccb1b051d1292e029e398926a164c82e1f0c983ca9928410aa2abe","src/io/read_line.rs":"a3c62ca2034089a22ea9567e0b3cab0dfe09309782fcf151d92311a77223e37c","src/io/read_to_end.rs":"5e9e38dc087623dac5a3ae3ad329ed44ffe4f6205a78e546adadc3ffb76703fc","src/io/read_to_string.rs":"2c073d05f0361acda1f0172b24fd4c5da61840ac925a5bdfae9111c697759d1b","src/io/read_until.rs":"354507ce95242a735940f0aaa6ef11cc7d6d0505ae148f05277ce6e7537f168a","src/io/read_vectored.rs":"bd7f442c92f2cb320075d0983b0d08d51c23078898d72e6c2857cf6c7ad4cec7","src/io/repeat.rs":"53bc472e4bd7d286bf90765ce574f13b7aabc871c4f04f712da7cea160491390","src/io/seek.rs":"9863e9fb6495eb6e1f8c45c283c8a6993b9bdb1462f75a3e525e135c6840dec7","src/io/sink.rs":"30a503631d196e5da92c386d0afc1af9656a5f7682456cfa2489a2c30a05cac5","src/io/split.rs":"2aa567452b713497d5b85813980b69e888aee32be14492c92404d261fd50eb09","src/io/take.rs":"c53fec5b5e8c3742b7e60e6ebfa625cf2e566fbea193fb1eee2f0a8e561d63d5","src/io/window.rs":"295d7dc18ad101642003cd67687242e4bdba11552cfb7f18c521cbff369e6f71","src/io/write.rs":"60670eb00f999f2e2c43b099759a7fb030325b323744d88c9d20f75926ec30df","src/io/write_all.rs":"c88930fd23c88cc01fef2c6118d53d33996c011c4abf28778a27646fe1f7896a","src/io/write_all_vectored.rs":"53becf89c031bf4c3073f0903ce809eee7606b1b4fbeb518605875badba216d3","src/io/write_vectored.rs":"bc98ff4a709cb75cd9ffedefa8ef251089a49906b98e142d76447ddf4ac098bb","src/lib.rs":"384447fb9bfcd3b110656979cca71b53c3abe72690e970c30563c1baba27fd74","src/lock/bilock.rs":"f1b955cb2e10c906933e63bbfb8e953af634428ce15faf3696b07d11da0cc279","src/lock/mod.rs":"e964dd0d999ccf9d9d167d7ecbfeb7a66d180a80eeb6fd41ec3fa698c1067674","src/lock/mutex.rs":"782375724e4abbdaf3221eb422911c37fe13e794e6f30ea819acece7303c3368","src/never.rs":"2066481ab04921269cfa768cb8b778a035ab6aa49ec404d9ac0aeb07a4bf6094","src/sink/buffer.rs":"33a7380f8232225a8e9ac5ee138fd095979efa3a64f9fecf5fcaf2e78fcbc355","src/sink/close.rs":"f2f31c884f048163abebd4f5a877b7b4306f7d02beae428325636fd00ed42ca9","src/sink/drain.rs":"392d9487003fcd55a3373c3e2558f6549b9633b82fc08b5a665a573b137ae9f7","src/sink/err_into.rs":"ced2998b2b0b792d80f7543523c9e07e8f5d20a4336cae93084b995e46671b15","src/sink/fanout.rs":"66dcde056e0bbee4e0074d331838ed2743dc872ea1597f05d61970523dc34926","src/sink/feed.rs":"64b9d296d37aedde37e1421c459ebcd9a7e8814db905996996167850124f3b3f","src/sink/flush.rs":"fbba344f428ca7636541ba013f7db2ece480b404a9e0b421c5537552d61e2492","src/sink/map_err.rs":"0f68f444ef13fe7115164be855c3b7b1d269e1119e69fcdad1706988255641f1","src/sink/mod.rs":"37cf379170f3099992eb59f3181be4c4e4a5c2d3581dbe424d22ab360840d321","src/sink/send.rs":"56aaba9aa4a562e0af39473a5779206d91b0acb1fced4fc06cd8b959d1897524","src/sink/send_all.rs":"a8e4956604fe73e321b0a3896c2018bc5c27149f2862f8406112db140b3aa2dd","src/sink/unfold.rs":"428080b76213b504fcc981d2f05840f1a93c8db305301af1cf5852b6c47c4be5","src/sink/with.rs":"850cd3b96304df1f38360a0bc60b02d485535e399ef7642acdd9add7876867d8","src/sink/with_flat_map.rs":"5e0f527b33ee8f1cc6a6a46d45b6d74dad5c735d88b2cb24e1cb34fdc6ef501b","src/stream/abortable.rs":"935d79aa44d793f4abe87ca27a9e4a20891500488cf942693cd2756d65b3aab2","src/stream/empty.rs":"5000c856186408a17f68bbef432d4a1a3edb7fb5a07ed8699342fef04b10a181","src/stream/futures_ordered.rs":"46217ed3802d052724a4a3166370f74e6d5fcd248d6f983caea10bc3335a1f0e","src/stream/futures_unordered/abort.rs":"bdfece9f91accafd5122be36d628c37c5b219ac0eecec181267840fbb1e95a45","src/stream/futures_unordered/iter.rs":"e8862300ddb0504090c059b3dba2425af6335874cb6ef393fef26e87788b6d3e","src/stream/futures_unordered/mod.rs":"da8b7adb93b50c1c6507c1af4b3c1ec0de4ce33f68f64dc876749c07e231d642","src/stream/futures_unordered/ready_to_run_queue.rs":"6223c67519c1ae35cbc449dd5654fda422aaba61a2344cc0b190c73b4b1e9f80","src/stream/futures_unordered/task.rs":"ab2de99b2a42c1da70d56e4be43c0ef72e8d5a4504adc0f870f8d28afd332a37","src/stream/iter.rs":"609fa821a460e901a54ae51f8da58220881157cef02b8b7b8c9e4321c2d05a23","src/stream/mod.rs":"33873b13535443cce2d49fdb3f0b359286bfc74f3553419fe7174cf7c1840da0","src/stream/once.rs":"d7b70adabad1f10af711ac3dcef33fd4c287e9852fdb678406e7ff350ba8fd47","src/stream/pending.rs":"84aaa15c8bbb17a250da5b1b5f0c7f6717410915d63340a3fcbf098bebe19d6f","src/stream/poll_fn.rs":"35952ea514b8aade14a3934d7777006475f50bbf0c5b50141710e31637f980be","src/stream/poll_immediate.rs":"e7a53ff8275ebe89dab8f9b984cce2ee0fde0a828e540b77c5500ca017d5bb98","src/stream/repeat.rs":"e4e4a9b6f2fca72bcbf098c3ac0c4a41323a840741d4dce9d9416464b7e8bd0d","src/stream/repeat_with.rs":"525780d24f3f99152b879765ca6eab99bcc0c757dc6654b6635c099b93ea654d","src/stream/select.rs":"28eb422c0eca9fd02778a6003004471b3489db09746a70e617a506303ea8b81d","src/stream/select_all.rs":"4358fa26cfe8c1b56f19d077b841bbdfe22f7adc043034fd6313b004e94e310d","src/stream/select_with_strategy.rs":"7fe249fd92fc66ad2bfa5a2dec7148b2a0102b3a8d915b2103bfbcd1b8870447","src/stream/stream/all.rs":"43cfb69de0ea991497d26d0aeb02091f10eb241ef93758b54c5e7aced5b63b63","src/stream/stream/any.rs":"2582da02f9a1ce2bd0af87a64b65188fc93686c5e3dd9128e89e5f57c1d70e43","src/stream/stream/buffer_unordered.rs":"66c3f4bd2fabfbdf6a4033dfaed44dd0262b68e6533509029c984ae037e35392","src/stream/stream/buffered.rs":"eabd0c0e50eaaaf0a92a7b39fdb5b77e068bbfbbfd5e216a09c3a6e0c1fc102d","src/stream/stream/catch_unwind.rs":"b2e801ff744d5d9e17177ec1156b0ab67bdd56b94c618ed8590344ec8a0f35e7","src/stream/stream/chain.rs":"ba1a206b3ce0160186021f5c1e4c95a770d26b843e3640e52609a2facaf756ac","src/stream/stream/chunks.rs":"d3aaddc05779ef70e2f0e59570cf6d5a1d231ae4885c8b8b2e4813fc02832562","src/stream/stream/collect.rs":"977ed1970b46029517ecc45f4af924b8e585d3770f01b2a0d2df0e01519ca50f","src/stream/stream/concat.rs":"171ea941b45c0295ed978c3f318a449ea295e33cb4ea82c764f4e9e7c48ad5de","src/stream/stream/count.rs":"ff218aea3d2d2456c8163926ea0c357b2752e92578e5fd4bec6b789fe1246556","src/stream/stream/cycle.rs":"ed7e3d15e7b1adec5ad5789b0d3186b5995a3353cc974fb7f41a72f6d8ad4cbb","src/stream/stream/enumerate.rs":"fc7565d21d39565790859eeac9ae8dd74123a9d15b88258d3abe894f1876cc39","src/stream/stream/filter.rs":"3dd080914e6770f8790455fc9cbedf30c5f44a589ef99b19112344c75f9e9042","src/stream/stream/filter_map.rs":"3a8a3e06dfac48dd3f7b6b1a552a51a3e31ea943e012dd35729d461a1fcfad80","src/stream/stream/flatten.rs":"69493fc106a1447abe109fd54375bb30363f7bc419463a8f835e4c80d97f2186","src/stream/stream/fold.rs":"75d61d4321db1bcbbdd1a0102d9ad60206275777167c008fc8953e50cd978a09","src/stream/stream/for_each.rs":"07bca889821bad18ff083e54abe679fbeb8cd19c086581c2f2722cba6b42263f","src/stream/stream/for_each_concurrent.rs":"4e1e7eb3d4ccfae0e8000651b75834e2960a7f9c62ab92dba35a0bdbbf5bbb21","src/stream/stream/forward.rs":"cd024ba1a3d5098d3ff2d5178a12e068916cc4307284b00c18dbc54b554a5560","src/stream/stream/fuse.rs":"061c5385f12f80c7906cb15ddb8f455ced6ce21d1de9a97de9db2616407c0cac","src/stream/stream/into_future.rs":"b46ad45cc03ddd778a9ffaa0d603c8ee0b411f49333100160959942cde9588bd","src/stream/stream/map.rs":"b91bdd5b33821a50c9b5034261a14f89ff1a9d541ab99b9d9a6921b12a5d434e","src/stream/stream/mod.rs":"106daa368424cca9e35aab6ec1bc177570aeca1dfaec57b188682ae3aaee11b7","src/stream/stream/next.rs":"7b4d5a22b5e00aa191ea82346bb1f392121cc68692864a8230e462d59e622928","src/stream/stream/peek.rs":"1ef5f11b1f0cc11d01690bebe282d8953ff8e860597f4ce21208fc274be5e98e","src/stream/stream/ready_chunks.rs":"4e6deb3a6d453fd4e982bdba416188311a72daca1b218d4e9ef20819fc09b5b2","src/stream/stream/scan.rs":"54489c8efef60dbf3c35ee803afee5c5ea7c364fb9b68939a04956e46febb856","src/stream/stream/select_next_some.rs":"0094eccc96cfe78d9b6d0a9bdb82cada8fb7929770a3ac00ffcb5441d7dc4f51","src/stream/stream/skip.rs":"61f7ec7fe25663d2c87cffaad19ed27eda032842edb8af731b521025b244f120","src/stream/stream/skip_while.rs":"75ee580e0111200758d0c0fe154276007ff233db6b63a8223f0baeac1db18874","src/stream/stream/split.rs":"fa4adea18708dad384eb347260cfb965d30c40a677e15f9267f97aa382c6306c","src/stream/stream/take.rs":"505f83d341dc84eeab46f5e66adfa21a36207cb66f2394dd6a256576db665827","src/stream/stream/take_until.rs":"0f1fa7d158192a5dee32392dfdd062c15dab6d246b0ca267e91aae490d7d7fdb","src/stream/stream/take_while.rs":"51007dbde8434fd22c5ef2481a99463f11b3785e4bdeb73fa583a17f29f5c228","src/stream/stream/then.rs":"9dcfdc741d1d7dea0100aa9f1feadb932f8530f7a7d3071befc1e490a6cb50ed","src/stream/stream/unzip.rs":"9ad4db7522f66a9133e464c13d6c95682c797ae5a986e60d3aba358c65031fe8","src/stream/stream/zip.rs":"56f30f513e11754f59ead5ca4112014cba9278d02796eb8fe0937ae8bb4d44cd","src/stream/try_stream/and_then.rs":"22ca6e547d0db2e07b0a928c48118a532adaf28d85c60ab84b8366dbfeab9161","src/stream/try_stream/into_async_read.rs":"f584fd8dfdab90328fc89eac78306caa308d43f3035c1c5489e55384007e77ed","src/stream/try_stream/into_stream.rs":"4fee94e89956a42871fc4a0cdba7ae1b7d4265e884528799cd227c9dd851acce","src/stream/try_stream/mod.rs":"7a83406bfbefe4fe651b9535035fef80d52e995a44dcd0e16105bf274e0fef06","src/stream/try_stream/or_else.rs":"8cc7f602da1ffee21bf06c5203aa0427514a83b67941ae264459f1eff8dc8aec","src/stream/try_stream/try_buffer_unordered.rs":"64e698ea6aefbe7e32d48e737553b20b9cde5c258963bb20486b48b7d6899660","src/stream/try_stream/try_buffered.rs":"7546d396026bf700d3f37f55d5a4e39abe5fb05919e6a269feeb8be7af19256c","src/stream/try_stream/try_chunks.rs":"58b8c5af4914eae3698e528b0361532b391bf4b3463f4c790e43c8069cfe1bd7","src/stream/try_stream/try_collect.rs":"1132751055a51b936ed28b83c4eed7dee3f40a4be13ea374b30086e864e1ee09","src/stream/try_stream/try_concat.rs":"f2330ebeeab30273e9ac0e8600bfe2f405ce671f6386e688b3afb1d2fdd7c2c6","src/stream/try_stream/try_filter.rs":"7c2a09cdb1753ecb49a44d1d84742cb2050a999a8148448b31bb404eb2d17154","src/stream/try_stream/try_filter_map.rs":"5afc6ab35e2b425e37ed217a6bb038459c8828d6bcd6a3699883d6df071dc7e7","src/stream/try_stream/try_flatten.rs":"e05614d86a27ab8386476eea35fd424c07e5f7f99cf0401d63a6655eb7ca1247","src/stream/try_stream/try_fold.rs":"b96aa2fe1a16f625d5045028a86ff8684dcf5198ef8c7c072f52f39aeaa8b619","src/stream/try_stream/try_for_each.rs":"3f3901d618333b740d470eb02fcbb645df92483493872298bb7bd0382646028a","src/stream/try_stream/try_for_each_concurrent.rs":"78a94a77f329862c2a245ec3add97e49c534985f0d9da98f205b7fa3c7c08df3","src/stream/try_stream/try_next.rs":"6e29473153db1435906e79f7eaa13ce9da842d4528ba9eb1c0034665feacc565","src/stream/try_stream/try_skip_while.rs":"c0259ec70bdf4a81c1fa569275766e3e65db9d5715c81e93ada04817c1835add","src/stream/try_stream/try_take_while.rs":"54927dfa95ff58b542a1d7382f564eeae5e02e633c948b1a39ac09bc7e92f5f5","src/stream/try_stream/try_unfold.rs":"aaf0f4857a4ec8233ac842ae509f29e5a210827a0bb40cfc0dc3e858f153d2b4","src/stream/unfold.rs":"8b2feb00f979562b43064eb078d53a160cdb3c65deed17ec25a05938df2d370f","src/task/mod.rs":"074ce7f3869663d2e768bb08ea201ed1be176e13edd4150f201bc1ea362170d3","src/task/spawn.rs":"26bbcf1d65e1467de0ecdad2b56f464a510cda7c1933427d69a1b50459836489","src/unfold_state.rs":"ffe848071a99d6afcdbe8281a8a77a559a7dde434fc41f734c90e6b9b5d8a5af"},"package":"d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164"}
\ No newline at end of file +{"files":{"Cargo.toml":"2889f27d32d20c79aeba0c92bbe5ff2066d96eb0fd1603bd2dece4090ac2eb29","LICENSE-APACHE":"275c491d6d1160553c32fd6127061d7f9606c3ea25abfad6ca3f6ed088785427","LICENSE-MIT":"6652c868f35dfe5e8ef636810a4e576b9d663f3a17fb0f5613ad73583e1b88fd","README.md":"4094b953bfd2bb2687df0c3c3deb05c307c14ac084e6a79878342b8ee56aa710","benches/bilock.rs":"6f59b71f9b9ca5751018a985eff0ea8d63d4cb6d18a17e672e17bc786b972c20","benches/flatten_unordered.rs":"79330465a5d8f2d6e450861e7ca2ed8ae7fe78a5fb221b6ab7121227810c1bcf","benches/futures_unordered.rs":"5eb8280be8d8fb7bd5fb103ce20db10f618f47e180a402105e0d5e9f8c9fe35a","benches/select.rs":"ca0a79bc3434f0fc025e0b0e37941ba1d592b40f36ce6544cdfede9f23e70581","build.rs":"5b263bd2bd587511a9c8daef580b05e0613c15a6c5f800b1e5bc145fa013d99e","no_atomic_cas.rs":"7ae747b83b08dd926c1696faf4ecab9399c652ae77d5179221258c73b8eecb6f","src/abortable.rs":"38bcb3d48361e4cfa89cc2e225c5f1dc97129837da834d28b69cff3bbf5200a6","src/async_await/join_mod.rs":"8f83c0001df867f5eb47a4174bf4a0c0b548f8ff3be3b532e0c759ad981b87da","src/async_await/mod.rs":"3d25c343cc3e789d3f982cdacd6f8ed91511ba656c3923da310700f318f423a4","src/async_await/pending.rs":"7971ec1d5d89ad80390e2a0c51e396257b2e78f1436cce79ea2b55ac2f13b328","src/async_await/poll.rs":"440c19a89fd42b12da09ff48a69523b5a8a5baea0bcd2f860589a0ab996ed781","src/async_await/random.rs":"daf229cd01595d38ef0f6284865fe2f60ed3b8134f7a15c82564b97ff3a5be98","src/async_await/select_mod.rs":"414c7fb7923cfe21116d558bf3cd1a6ae5bef4ed01f9877f0e7cb3e42ee6c79d","src/async_await/stream_select_mod.rs":"9a51338914cbb1502619fed591dfe4fc676919499b9d041898e59f630fe5e7f0","src/compat/compat01as03.rs":"6728ffd4f0a92d4e6aff8b7ff7916ad7ae20a317633d2739813a3b6ffc814204","src/compat/compat03as01.rs":"7cf29e57f8ee14b64123b3d2c16dceced25af5491a5ef81b655b2de2e9587fbe","src/compat/executor.rs":"3e40b4ccd905a99eab42c47fefc5502b530eef869158ce9ceaa28f8f1638436f","src/compat/mod.rs":"6cf3412f6a3f9ee8406118ea75de65468a83febc6ba61bdbad69261f0cfea02e","src/fns.rs":"f8e396128791169098a38a82c3c28aaa6dd5d40718635f7cc30b59b32f7110b8","src/future/abortable.rs":"373ce61c0c7c31718ff572113503bb88f55e3b49ed5d028a3dfafd69070f44c1","src/future/either.rs":"fb00002e68b5c46c8ded09e91efe0be7362c168a1ea00dc5906e1c8c7e38aaa4","src/future/future/catch_unwind.rs":"08b0ac049cdee28325d378209aa5bb4d91b14a29ddd9c2b0e5c661b61f9cfcfe","src/future/future/flatten.rs":"5bf9846cef8dec5dcc38b992653e11146bc149a0d3efc09b1f8268bd29de0b2b","src/future/future/fuse.rs":"65b80a1ba7556e2ef35ce8d23e47489a2a6eb6d1c3ef9ac4e080c63e69eaa07d","src/future/future/map.rs":"de607c2a4d80d2bddb590781c37328ddd294bb9d5064a9ecb99455244239b597","src/future/future/mod.rs":"ecfac09dcba801cede7c58acfaa76a9ab76d26a3f4c968d66c2a49caa57faefe","src/future/future/remote_handle.rs":"2ae17a409569b32c78e20026a8ecdf667352c2597a4a0a8deefa4761fafcb223","src/future/future/shared.rs":"d1973063327851931c75969ec627657e5e34e8cfa97f295d65a4be288377c446","src/future/join.rs":"38b55fc7cdbbdaaa525e51f8ce09783dbbcb65eabfd7de9f46610593e0bbef17","src/future/join_all.rs":"4813aba0e6ddf02310ba3d368eb6af44b30aaac227d1b799e977996db9e3cf36","src/future/lazy.rs":"d161fc4108a97348c1becbbd5ba8fccb7225dcf1d81c097666f5c8b40718251d","src/future/maybe_done.rs":"559e41cb170f9fe7246d2a5b112527a9f9cbca63b8a5a872b3aa9c861f70f307","src/future/mod.rs":"51e018100362f20b071225268f1d81f25c8e9664e94730af199069c2692bf26a","src/future/option.rs":"73daca814800b91b707753dcfe074265372b0077fae2504ea6efddc713453579","src/future/pending.rs":"3967984d2061e6b201c407f28ba8392a21fc9ef7c0b9201e2e244110af0782c5","src/future/poll_fn.rs":"8e54bf57d60e01d496ae31df35e0b96868f4bda504c024a14f51ab723d67885f","src/future/poll_immediate.rs":"7e199fc102894c9095de17af602a7c8f05d427269aefce5d71cd5136d54659c0","src/future/ready.rs":"c9860ccd8ac529f44f66dee73ca9b9d7f1b1b3e5e9e4dc70c59640c752553d58","src/future/select.rs":"0c358a5ae079858f31c61cf6ea835205fdb9092d07536778440b975995d2626c","src/future/select_all.rs":"5b304210c34cc2bd84f7b1819baa30a68eea2ee578b10b243f5dd884ee9a4791","src/future/select_ok.rs":"dc35027db70c0111399c6ab6f7c977e6e7362f069a3891e4a62006c52643528e","src/future/try_future/into_future.rs":"d966bde7b06a88443f0efd877e95f91541778c4e713f3f4b66e00ca5d3f352b6","src/future/try_future/mod.rs":"991edb3b52903ceb3bcb6599d04d898509023cd038c5974f4872eaafa9748f08","src/future/try_future/try_flatten.rs":"16c02e1780bd312b8b386e41c1d9dd4bcc4e8ef10f26007364f857b3adcc6e99","src/future/try_future/try_flatten_err.rs":"130f3fc3fd95a19f4e4a50e69301106fab02f77d0faf3aac9c473a92b826c2ca","src/future/try_join.rs":"1836931f8ba32da41c6810e6acc0ea2fee75b74b3153e760c4542cb12b220540","src/future/try_join_all.rs":"4d01395c74c7b82c581a578f2cb34087824b091f70075ebcd76a8d12b8476c1f","src/future/try_maybe_done.rs":"1cce46b2ee43ad51b7c5f9c02bc90a890af32bc549ce99098a2c8813508051e1","src/future/try_select.rs":"5d6187ace76b5f26e60c713a1fe9fcb9cbb0d161c5881c532ce9472a230b595d","src/io/allow_std.rs":"a125959c255fd344399fb0be19218a8ee7d613ce2485d6df9cdbc2ed5d3987df","src/io/buf_reader.rs":"46a1e24046c5bc2ab8f266e3d904281bec3ab4ba6c13d4213a52599b57b8de66","src/io/buf_writer.rs":"d6666b8dde60eefbb7fa69da4a2eea2b34ea0e4a85e21e5ac6e83cc680ea9140","src/io/chain.rs":"12f508fc39c3234a71a0f886505245c5d659aed09c7d874b1bd8ca0a0d456cf3","src/io/close.rs":"9832210a870637198fa58642cdf2779afab71f2e31a9953e663fa6854bd73ac7","src/io/copy.rs":"cb2466dcd7ea8bb1f07d00c03e66ed55abf71fe4be6937adc9f533ef9d99fb2d","src/io/copy_buf.rs":"e9a5f6aac8375e298bddb332f23d8b626d056ce452b58f772a05df7e2cd326cf","src/io/copy_buf_abortable.rs":"28ef452bc49423e0a6e8323b5956b37c57335941f99797867e5c5932f9366136","src/io/cursor.rs":"c12e9b82c6eff2108a5524b026d73fbb2c250072e8e3f673cc04d4da02a553b8","src/io/empty.rs":"6ae40b4bc8fc41572abad2d013285d78d8df445868d41fac77bde508ec9bc1a5","src/io/fill_buf.rs":"4f217fed8eb3f66dbde2371c3fbcfa9420d38ba20da544a0658584e5778aa47d","src/io/flush.rs":"0c9b588dfd9da039dc123ba9448ac31ca21ee3da0a164a21f6c2c182183d43e2","src/io/into_sink.rs":"ab5bdb12bff62672175b69b8c9f5a4bbbea716b9cf89169ed6a723ab43da9df8","src/io/line_writer.rs":"16c151c68d89b7c2ab929c4a782539b1ad512b723eed9b544f50f1ff06f0b661","src/io/lines.rs":"137279b6b899ce438fb1b0ee9e6a412976f9f9db54fb7b961d2bad8787a26b1e","src/io/mod.rs":"bead8faa1bd4c3733543e38cf64dc9b52d703440367f2efb460bda9f9baafa0b","src/io/read.rs":"4ea675a83cec98a22c9c4731ff980209f0cf67f63c71871cd1deed53c1266345","src/io/read_exact.rs":"ddebd58db9f6766efa3f50543fb51b138538533921e1ee1da4621fff9c64efe2","src/io/read_line.rs":"e22c853ddfd769c441b1a1dc59cbcda4f22a9c49e86f6a697f94193fce3bcdfb","src/io/read_to_end.rs":"5e9e38dc087623dac5a3ae3ad329ed44ffe4f6205a78e546adadc3ffb76703fc","src/io/read_to_string.rs":"bef4cc292dd95fa9c850d0438ad0cf49a8cc4caf40a0384f763f8c9512ad9e79","src/io/read_until.rs":"354507ce95242a735940f0aaa6ef11cc7d6d0505ae148f05277ce6e7537f168a","src/io/read_vectored.rs":"bd7f442c92f2cb320075d0983b0d08d51c23078898d72e6c2857cf6c7ad4cec7","src/io/repeat.rs":"53bc472e4bd7d286bf90765ce574f13b7aabc871c4f04f712da7cea160491390","src/io/seek.rs":"9863e9fb6495eb6e1f8c45c283c8a6993b9bdb1462f75a3e525e135c6840dec7","src/io/sink.rs":"30a503631d196e5da92c386d0afc1af9656a5f7682456cfa2489a2c30a05cac5","src/io/split.rs":"2aa567452b713497d5b85813980b69e888aee32be14492c92404d261fd50eb09","src/io/take.rs":"c53fec5b5e8c3742b7e60e6ebfa625cf2e566fbea193fb1eee2f0a8e561d63d5","src/io/window.rs":"295d7dc18ad101642003cd67687242e4bdba11552cfb7f18c521cbff369e6f71","src/io/write.rs":"60670eb00f999f2e2c43b099759a7fb030325b323744d88c9d20f75926ec30df","src/io/write_all.rs":"8fcd4ff233650b5abd20f7b987000cac095d8de23445572de588dccf710623c6","src/io/write_all_vectored.rs":"53becf89c031bf4c3073f0903ce809eee7606b1b4fbeb518605875badba216d3","src/io/write_vectored.rs":"bc98ff4a709cb75cd9ffedefa8ef251089a49906b98e142d76447ddf4ac098bb","src/lib.rs":"384447fb9bfcd3b110656979cca71b53c3abe72690e970c30563c1baba27fd74","src/lock/bilock.rs":"a294b016cfb39fb54406a6190438546a5fd7c8ef21667ab38a6cea9cb2d3ef7b","src/lock/mod.rs":"ed0f4ef97af382f6038730bd5932b449f32dc3a634e73e7ebb48a24bb7782d6f","src/lock/mutex.rs":"745c68e571f84a7456681cd683b2b8eed28ea8b6d3f9a38337efad105a65e0b6","src/never.rs":"2066481ab04921269cfa768cb8b778a035ab6aa49ec404d9ac0aeb07a4bf6094","src/sink/buffer.rs":"33a7380f8232225a8e9ac5ee138fd095979efa3a64f9fecf5fcaf2e78fcbc355","src/sink/close.rs":"f2f31c884f048163abebd4f5a877b7b4306f7d02beae428325636fd00ed42ca9","src/sink/drain.rs":"60262bf3ef48c09b4d52e52953f9437d536e20f63690b73e975388751405d239","src/sink/err_into.rs":"ced2998b2b0b792d80f7543523c9e07e8f5d20a4336cae93084b995e46671b15","src/sink/fanout.rs":"66dcde056e0bbee4e0074d331838ed2743dc872ea1597f05d61970523dc34926","src/sink/feed.rs":"64b9d296d37aedde37e1421c459ebcd9a7e8814db905996996167850124f3b3f","src/sink/flush.rs":"fbba344f428ca7636541ba013f7db2ece480b404a9e0b421c5537552d61e2492","src/sink/map_err.rs":"0f68f444ef13fe7115164be855c3b7b1d269e1119e69fcdad1706988255641f1","src/sink/mod.rs":"37cf379170f3099992eb59f3181be4c4e4a5c2d3581dbe424d22ab360840d321","src/sink/send.rs":"56aaba9aa4a562e0af39473a5779206d91b0acb1fced4fc06cd8b959d1897524","src/sink/send_all.rs":"a8e4956604fe73e321b0a3896c2018bc5c27149f2862f8406112db140b3aa2dd","src/sink/unfold.rs":"5febcfb9295a79fe1187284d0d45055c787e399b00d73c0e85a0446ae2246d18","src/sink/with.rs":"850cd3b96304df1f38360a0bc60b02d485535e399ef7642acdd9add7876867d8","src/sink/with_flat_map.rs":"5e0f527b33ee8f1cc6a6a46d45b6d74dad5c735d88b2cb24e1cb34fdc6ef501b","src/stream/abortable.rs":"935d79aa44d793f4abe87ca27a9e4a20891500488cf942693cd2756d65b3aab2","src/stream/empty.rs":"5000c856186408a17f68bbef432d4a1a3edb7fb5a07ed8699342fef04b10a181","src/stream/futures_ordered.rs":"c62010493e68e1c6317c189ce36af48770736407c2e0e60e6c677f3b20b4b12b","src/stream/futures_unordered/abort.rs":"bdfece9f91accafd5122be36d628c37c5b219ac0eecec181267840fbb1e95a45","src/stream/futures_unordered/iter.rs":"01f8aaa2ac7ea493bf727a945424cc6ae695c9a0c289ac57cbb26697abb05827","src/stream/futures_unordered/mod.rs":"460cdf03695f6b292d46bc58138952f0c3d84fe58974337bca6be5a1ff30e48a","src/stream/futures_unordered/ready_to_run_queue.rs":"3a9c08cb5df28e57f2bfe613b8174d0dfb420b8664dd7c46a053e2980a6d3482","src/stream/futures_unordered/task.rs":"2b780bcc97844bc0bdeced7bb4318066e86ba082c08c8628c9b2e92bfe36fb61","src/stream/iter.rs":"609fa821a460e901a54ae51f8da58220881157cef02b8b7b8c9e4321c2d05a23","src/stream/mod.rs":"8ec9b052297b82a1be6b9a2ad631cf686e8cc17e763794ebeeea3a39e3a72805","src/stream/once.rs":"d7b70adabad1f10af711ac3dcef33fd4c287e9852fdb678406e7ff350ba8fd47","src/stream/pending.rs":"84aaa15c8bbb17a250da5b1b5f0c7f6717410915d63340a3fcbf098bebe19d6f","src/stream/poll_fn.rs":"35952ea514b8aade14a3934d7777006475f50bbf0c5b50141710e31637f980be","src/stream/poll_immediate.rs":"e7a53ff8275ebe89dab8f9b984cce2ee0fde0a828e540b77c5500ca017d5bb98","src/stream/repeat.rs":"e4e4a9b6f2fca72bcbf098c3ac0c4a41323a840741d4dce9d9416464b7e8bd0d","src/stream/repeat_with.rs":"525780d24f3f99152b879765ca6eab99bcc0c757dc6654b6635c099b93ea654d","src/stream/select.rs":"28eb422c0eca9fd02778a6003004471b3489db09746a70e617a506303ea8b81d","src/stream/select_all.rs":"19ef94abcf63fa9e46a73b6ab783642d2d069a015c7fa57fea36eeac7b6f2a20","src/stream/select_with_strategy.rs":"caa0f5d1fd02824b48a1cd2be13a6f96b532039eb88cf47ea5d2becf58595073","src/stream/stream/all.rs":"43cfb69de0ea991497d26d0aeb02091f10eb241ef93758b54c5e7aced5b63b63","src/stream/stream/any.rs":"2582da02f9a1ce2bd0af87a64b65188fc93686c5e3dd9128e89e5f57c1d70e43","src/stream/stream/buffer_unordered.rs":"b0f7a1c72cee178e7bfd8990e6e426c1258eeba6d952b82c6be8e4cac0a054ea","src/stream/stream/buffered.rs":"e37d08d6a18090ba37079937575920cc8c7569f4183dba710d3f4b94c11da01b","src/stream/stream/catch_unwind.rs":"b2e801ff744d5d9e17177ec1156b0ab67bdd56b94c618ed8590344ec8a0f35e7","src/stream/stream/chain.rs":"809b6b5c8372f65341dc9810d39f60ae3bcf74a78f133b4ab8d289fb5f2a7cbb","src/stream/stream/chunks.rs":"9f872b473de14d2251584050f04d56eada9c3b1d8dc3e746bdd57c1f757bfc6f","src/stream/stream/collect.rs":"6e4d2d580189f7d3b6b294b6b17437e8e2570502f08c11786a71caac207f0309","src/stream/stream/concat.rs":"171ea941b45c0295ed978c3f318a449ea295e33cb4ea82c764f4e9e7c48ad5de","src/stream/stream/count.rs":"ff218aea3d2d2456c8163926ea0c357b2752e92578e5fd4bec6b789fe1246556","src/stream/stream/cycle.rs":"ed7e3d15e7b1adec5ad5789b0d3186b5995a3353cc974fb7f41a72f6d8ad4cbb","src/stream/stream/enumerate.rs":"fc7565d21d39565790859eeac9ae8dd74123a9d15b88258d3abe894f1876cc39","src/stream/stream/filter.rs":"5d871f416d41baff3733121f564229fe31bdf7dfaaeb78ab940fafba6ab4b7c6","src/stream/stream/filter_map.rs":"179045a5ab1295e77ab5cfea1964be69dc50984ef8ac9ee04034adf0a043514f","src/stream/stream/flatten.rs":"69493fc106a1447abe109fd54375bb30363f7bc419463a8f835e4c80d97f2186","src/stream/stream/flatten_unordered.rs":"dd5216fc0e34d09cc69ae6b3c4690efe8ff01404853756bf5aa6b92eb2e6750b","src/stream/stream/fold.rs":"75d61d4321db1bcbbdd1a0102d9ad60206275777167c008fc8953e50cd978a09","src/stream/stream/for_each.rs":"07bca889821bad18ff083e54abe679fbeb8cd19c086581c2f2722cba6b42263f","src/stream/stream/for_each_concurrent.rs":"4e1e7eb3d4ccfae0e8000651b75834e2960a7f9c62ab92dba35a0bdbbf5bbb21","src/stream/stream/forward.rs":"cd024ba1a3d5098d3ff2d5178a12e068916cc4307284b00c18dbc54b554a5560","src/stream/stream/fuse.rs":"061c5385f12f80c7906cb15ddb8f455ced6ce21d1de9a97de9db2616407c0cac","src/stream/stream/into_future.rs":"b46ad45cc03ddd778a9ffaa0d603c8ee0b411f49333100160959942cde9588bd","src/stream/stream/map.rs":"b91bdd5b33821a50c9b5034261a14f89ff1a9d541ab99b9d9a6921b12a5d434e","src/stream/stream/mod.rs":"ca8f514b5157373408c6b59a8892dca8a1441a4c82557c34e6570342990da487","src/stream/stream/next.rs":"7b4d5a22b5e00aa191ea82346bb1f392121cc68692864a8230e462d59e622928","src/stream/stream/peek.rs":"2e08e6990c31186c97edb21737f83fe8640a19561062879af83090935aef99cf","src/stream/stream/ready_chunks.rs":"7e17c49ff29c106c13a2ec13fb05f32ff048e482b47a157d3965bd03c38c01c2","src/stream/stream/scan.rs":"54489c8efef60dbf3c35ee803afee5c5ea7c364fb9b68939a04956e46febb856","src/stream/stream/select_next_some.rs":"0094eccc96cfe78d9b6d0a9bdb82cada8fb7929770a3ac00ffcb5441d7dc4f51","src/stream/stream/skip.rs":"61f7ec7fe25663d2c87cffaad19ed27eda032842edb8af731b521025b244f120","src/stream/stream/skip_while.rs":"6f114a3fa538bd479e4fa24d8aa0e0e0454613643a97c44242c5683ae7293b82","src/stream/stream/split.rs":"0552ddf8f7f3a9980dbc692d0c34b72503107c714f81e853445fb6c81fe328ff","src/stream/stream/take.rs":"57d381b482c3d584c4c26b0e15941bc2ea58e3f39a2e5c74391a2ee7b825cc8c","src/stream/stream/take_until.rs":"0f1fa7d158192a5dee32392dfdd062c15dab6d246b0ca267e91aae490d7d7fdb","src/stream/stream/take_while.rs":"2f57a6e5b903c045da642e9a40eb19dabbc612a80a6ce8098df1a1973555f108","src/stream/stream/then.rs":"c995c6b0d9151927b26b10fba70e135dfc41224b969d1367dc8c11697218c1e9","src/stream/stream/unzip.rs":"e7beedc2192604e0091ac3d0265b487127a37c780198838f6419c21ef1b38df0","src/stream/stream/zip.rs":"3890b40daea00341fac6ac977de0b534d1ec7cdaabece44af5df2ca56026fe62","src/stream/try_stream/and_then.rs":"6f92b333955f5ec30fddf8e087e3f60ebf53a054769fc72c80bbccdf13a9431e","src/stream/try_stream/into_async_read.rs":"5b200c76ccb95460d94286ca8e63f5454940eb62b5f15aae998da48aa06fbffd","src/stream/try_stream/into_stream.rs":"4fee94e89956a42871fc4a0cdba7ae1b7d4265e884528799cd227c9dd851acce","src/stream/try_stream/mod.rs":"e2460ce64e3b43c92860b9fd6dd6b36f9c6f6750e9d1e7bec8f766ad84889269","src/stream/try_stream/or_else.rs":"473ca77e0e81a1a0834d2d882076b8823a5a3027b2d7d78f887be2d5edfd0de3","src/stream/try_stream/try_buffer_unordered.rs":"64e698ea6aefbe7e32d48e737553b20b9cde5c258963bb20486b48b7d6899660","src/stream/try_stream/try_buffered.rs":"38f60d7290f44471a02084c6b394b754c224a84ee8d2ba01c08568168b48a21f","src/stream/try_stream/try_chunks.rs":"69c4d85a256250d73c6372d8047e6055da7eac918cb5d7ef4f3697898f4dcb4c","src/stream/try_stream/try_collect.rs":"979920e3034dad6c75961e3f6b4c0234691db7063eca1a05562cc5d41f2943c1","src/stream/try_stream/try_concat.rs":"f2330ebeeab30273e9ac0e8600bfe2f405ce671f6386e688b3afb1d2fdd7c2c6","src/stream/try_stream/try_filter.rs":"1344e9aea05e2d0078f30caff176a99e1ccb8fcdf0a287817abc82fbaf09c48b","src/stream/try_stream/try_filter_map.rs":"285e7ea875a3ea3e16942c1b1acae5a1cb26b9bac476dce3903547cb99306602","src/stream/try_stream/try_flatten.rs":"e05614d86a27ab8386476eea35fd424c07e5f7f99cf0401d63a6655eb7ca1247","src/stream/try_stream/try_flatten_unordered.rs":"1cc4c4a5ea0a8db3010958f34fb1886dcfbd2e1584082d2004030eb70b13cd6c","src/stream/try_stream/try_fold.rs":"b96aa2fe1a16f625d5045028a86ff8684dcf5198ef8c7c072f52f39aeaa8b619","src/stream/try_stream/try_for_each.rs":"3f3901d618333b740d470eb02fcbb645df92483493872298bb7bd0382646028a","src/stream/try_stream/try_for_each_concurrent.rs":"78a94a77f329862c2a245ec3add97e49c534985f0d9da98f205b7fa3c7c08df3","src/stream/try_stream/try_next.rs":"6e29473153db1435906e79f7eaa13ce9da842d4528ba9eb1c0034665feacc565","src/stream/try_stream/try_skip_while.rs":"7c2fa31fe8b0b4e59c5d7f2972c8d9f83e8f01a687b08f5cd631f92a14b402f1","src/stream/try_stream/try_take_while.rs":"2783664637aff0442f0c9204d35600139c941332310f70495cbc4dc345cae99d","src/stream/try_stream/try_unfold.rs":"aaf0f4857a4ec8233ac842ae509f29e5a210827a0bb40cfc0dc3e858f153d2b4","src/stream/unfold.rs":"8b2feb00f979562b43064eb078d53a160cdb3c65deed17ec25a05938df2d370f","src/task/mod.rs":"074ce7f3869663d2e768bb08ea201ed1be176e13edd4150f201bc1ea362170d3","src/task/spawn.rs":"8ff3a3652d8d2cb45717324b6ead9c3f111629e7eb0c0b33d3639a0e7c5bbf3e","src/unfold_state.rs":"ffe848071a99d6afcdbe8281a8a77a559a7dde434fc41f734c90e6b9b5d8a5af"},"package":"26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533"}
\ No newline at end of file diff --git a/vendor/futures-util/Cargo.toml b/vendor/futures-util/Cargo.toml index f18cfbcf9..a3b9cadf2 100644 --- a/vendor/futures-util/Cargo.toml +++ b/vendor/futures-util/Cargo.toml @@ -11,44 +11,52 @@ [package] edition = "2018" -rust-version = "1.45" +rust-version = "1.56" name = "futures-util" -version = "0.3.19" -description = "Common utilities and extension traits for the futures-rs library.\n" +version = "0.3.28" +description = """ +Common utilities and extension traits for the futures-rs library. +""" homepage = "https://rust-lang.github.io/futures-rs" +readme = "README.md" license = "MIT OR Apache-2.0" repository = "https://github.com/rust-lang/futures-rs" + [package.metadata.docs.rs] all-features = true -rustdoc-args = ["--cfg", "docsrs"] +rustdoc-args = [ + "--cfg", + "docsrs", +] + [dependencies.futures-channel] -version = "0.3.19" +version = "0.3.28" features = ["std"] optional = true default-features = false [dependencies.futures-core] -version = "0.3.19" +version = "0.3.28" default-features = false [dependencies.futures-io] -version = "0.3.19" +version = "0.3.28" features = ["std"] optional = true default-features = false [dependencies.futures-macro] -version = "=0.3.19" +version = "=0.3.28" optional = true default-features = false [dependencies.futures-sink] -version = "0.3.19" +version = "0.3.28" optional = true default-features = false [dependencies.futures-task] -version = "0.3.19" +version = "0.3.28" default-features = false [dependencies.futures_01] @@ -61,7 +69,7 @@ version = "2.2" optional = true [dependencies.pin-project-lite] -version = "0.2.4" +version = "0.2.6" [dependencies.pin-utils] version = "0.1.0" @@ -73,21 +81,55 @@ optional = true [dependencies.tokio-io] version = "0.1.9" optional = true + [dev-dependencies.tokio] version = "0.1.11" [features] -alloc = ["futures-core/alloc", "futures-task/alloc"] +alloc = [ + "futures-core/alloc", + "futures-task/alloc", +] async-await = [] -async-await-macro = ["async-await", "futures-macro"] +async-await-macro = [ + "async-await", + "futures-macro", +] bilock = [] cfg-target-has-atomic = [] -channel = ["std", "futures-channel"] -compat = ["std", "futures_01"] -default = ["std", "async-await", "async-await-macro"] -io = ["std", "futures-io", "memchr"] -io-compat = ["io", "compat", "tokio-io"] +channel = [ + "std", + "futures-channel", +] +compat = [ + "std", + "futures_01", +] +default = [ + "std", + "async-await", + "async-await-macro", +] +io = [ + "std", + "futures-io", + "memchr", +] +io-compat = [ + "io", + "compat", + "tokio-io", +] +portable-atomic = ["futures-core/portable-atomic"] sink = ["futures-sink"] -std = ["alloc", "futures-core/std", "futures-task/std", "slab"] -unstable = ["futures-core/unstable", "futures-task/unstable"] +std = [ + "alloc", + "futures-core/std", + "futures-task/std", + "slab", +] +unstable = [ + "futures-core/unstable", + "futures-task/unstable", +] write-all-vectored = ["io"] diff --git a/vendor/futures-util/README.md b/vendor/futures-util/README.md index 6e0aaed84..60e2c2109 100644 --- a/vendor/futures-util/README.md +++ b/vendor/futures-util/README.md @@ -11,7 +11,7 @@ Add this to your `Cargo.toml`: futures-util = "0.3" ``` -The current `futures-util` requires Rust 1.45 or later. +The current `futures-util` requires Rust 1.56 or later. ## License diff --git a/vendor/futures-util/benches/bilock.rs b/vendor/futures-util/benches/bilock.rs new file mode 100644 index 000000000..013f3351e --- /dev/null +++ b/vendor/futures-util/benches/bilock.rs @@ -0,0 +1,68 @@ +#![feature(test)] +#![cfg(feature = "bilock")] + +extern crate test; + +use futures::task::Poll; +use futures_test::task::noop_context; +use futures_util::lock::BiLock; + +use crate::test::Bencher; + +#[bench] +fn contended(b: &mut Bencher) { + let mut context = noop_context(); + + b.iter(|| { + let (x, y) = BiLock::new(1); + + for _ in 0..1000 { + let x_guard = match x.poll_lock(&mut context) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + // Try poll second lock while first lock still holds the lock + match y.poll_lock(&mut context) { + Poll::Pending => (), + _ => panic!(), + }; + + drop(x_guard); + + let y_guard = match y.poll_lock(&mut context) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + drop(y_guard); + } + (x, y) + }); +} + +#[bench] +fn lock_unlock(b: &mut Bencher) { + let mut context = noop_context(); + + b.iter(|| { + let (x, y) = BiLock::new(1); + + for _ in 0..1000 { + let x_guard = match x.poll_lock(&mut context) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + drop(x_guard); + + let y_guard = match y.poll_lock(&mut context) { + Poll::Ready(guard) => guard, + _ => panic!(), + }; + + drop(y_guard); + } + (x, y) + }) +} diff --git a/vendor/futures-util/benches/flatten_unordered.rs b/vendor/futures-util/benches/flatten_unordered.rs new file mode 100644 index 000000000..517b2816c --- /dev/null +++ b/vendor/futures-util/benches/flatten_unordered.rs @@ -0,0 +1,58 @@ +#![feature(test)] + +extern crate test; +use crate::test::Bencher; + +use futures::channel::oneshot; +use futures::executor::block_on; +use futures::future; +use futures::stream::{self, StreamExt}; +use futures::task::Poll; +use futures_util::FutureExt; +use std::collections::VecDeque; +use std::thread; + +#[bench] +fn oneshot_streams(b: &mut Bencher) { + const STREAM_COUNT: usize = 10_000; + const STREAM_ITEM_COUNT: usize = 1; + + b.iter(|| { + let mut txs = VecDeque::with_capacity(STREAM_COUNT); + let mut rxs = Vec::new(); + + for _ in 0..STREAM_COUNT { + let (tx, rx) = oneshot::channel(); + txs.push_back(tx); + rxs.push(rx); + } + + thread::spawn(move || { + let mut last = 1; + while let Some(tx) = txs.pop_front() { + let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT)); + last += STREAM_ITEM_COUNT; + } + }); + + let mut flatten = stream::iter(rxs) + .map(|recv| recv.into_stream().map(|val| val.unwrap()).flatten()) + .flatten_unordered(None); + + block_on(future::poll_fn(move |cx| { + let mut count = 0; + loop { + match flatten.poll_next_unpin(cx) { + Poll::Ready(None) => break, + Poll::Ready(Some(_)) => { + count += 1; + } + _ => {} + } + } + assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT); + + Poll::Ready(()) + })) + }); +} diff --git a/vendor/futures-util/benches/select.rs b/vendor/futures-util/benches/select.rs new file mode 100644 index 000000000..5410a9529 --- /dev/null +++ b/vendor/futures-util/benches/select.rs @@ -0,0 +1,35 @@ +#![feature(test)] + +extern crate test; +use crate::test::Bencher; + +use futures::executor::block_on; +use futures::stream::{repeat, select, StreamExt}; + +#[bench] +fn select_streams(b: &mut Bencher) { + const STREAM_COUNT: usize = 10_000; + + b.iter(|| { + let stream1 = repeat(1).take(STREAM_COUNT); + let stream2 = repeat(2).take(STREAM_COUNT); + let stream3 = repeat(3).take(STREAM_COUNT); + let stream4 = repeat(4).take(STREAM_COUNT); + let stream5 = repeat(5).take(STREAM_COUNT); + let stream6 = repeat(6).take(STREAM_COUNT); + let stream7 = repeat(7).take(STREAM_COUNT); + let count = block_on(async { + let count = select( + stream1, + select( + stream2, + select(stream3, select(stream4, select(stream5, select(stream6, stream7)))), + ), + ) + .count() + .await; + count + }); + assert_eq!(count, STREAM_COUNT * 7); + }); +} diff --git a/vendor/futures-util/benches_disabled/bilock.rs b/vendor/futures-util/benches_disabled/bilock.rs deleted file mode 100644 index 417f75d31..000000000 --- a/vendor/futures-util/benches_disabled/bilock.rs +++ /dev/null @@ -1,122 +0,0 @@ -#![feature(test)] - -#[cfg(feature = "bilock")] -mod bench { - use futures::executor::LocalPool; - use futures::task::{Context, Waker}; - use futures_util::lock::BiLock; - use futures_util::lock::BiLockAcquire; - use futures_util::lock::BiLockAcquired; - use futures_util::task::ArcWake; - - use std::sync::Arc; - use test::Bencher; - - fn notify_noop() -> Waker { - struct Noop; - - impl ArcWake for Noop { - fn wake(_: &Arc<Self>) {} - } - - ArcWake::into_waker(Arc::new(Noop)) - } - - /// Pseudo-stream which simply calls `lock.poll()` on `poll` - struct LockStream { - lock: BiLockAcquire<u32>, - } - - impl LockStream { - fn new(lock: BiLock<u32>) -> Self { - Self { lock: lock.lock() } - } - - /// Release a lock after it was acquired in `poll`, - /// so `poll` could be called again. - fn release_lock(&mut self, guard: BiLockAcquired<u32>) { - self.lock = guard.unlock().lock() - } - } - - impl Stream for LockStream { - type Item = BiLockAcquired<u32>; - type Error = (); - - fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>, Self::Error> { - self.lock.poll(cx).map(|a| a.map(Some)) - } - } - - #[bench] - fn contended(b: &mut Bencher) { - let pool = LocalPool::new(); - let mut exec = pool.executor(); - let waker = notify_noop(); - let mut map = task::LocalMap::new(); - let mut waker = task::Context::new(&mut map, &waker, &mut exec); - - b.iter(|| { - let (x, y) = BiLock::new(1); - - let mut x = LockStream::new(x); - let mut y = LockStream::new(y); - - for _ in 0..1000 { - let x_guard = match x.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - // Try poll second lock while first lock still holds the lock - match y.poll_next(&mut waker) { - Ok(Poll::Pending) => (), - _ => panic!(), - }; - - x.release_lock(x_guard); - - let y_guard = match y.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - y.release_lock(y_guard); - } - (x, y) - }); - } - - #[bench] - fn lock_unlock(b: &mut Bencher) { - let pool = LocalPool::new(); - let mut exec = pool.executor(); - let waker = notify_noop(); - let mut map = task::LocalMap::new(); - let mut waker = task::Context::new(&mut map, &waker, &mut exec); - - b.iter(|| { - let (x, y) = BiLock::new(1); - - let mut x = LockStream::new(x); - let mut y = LockStream::new(y); - - for _ in 0..1000 { - let x_guard = match x.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - x.release_lock(x_guard); - - let y_guard = match y.poll_next(&mut waker) { - Ok(Poll::Ready(Some(guard))) => guard, - _ => panic!(), - }; - - y.release_lock(y_guard); - } - (x, y) - }) - } -} diff --git a/vendor/futures-util/build.rs b/vendor/futures-util/build.rs index 07b50bd55..05e0496d9 100644 --- a/vendor/futures-util/build.rs +++ b/vendor/futures-util/build.rs @@ -1,9 +1,3 @@ -#![warn(rust_2018_idioms, single_use_lifetimes)] - -use std::env; - -include!("no_atomic_cas.rs"); - // The rustc-cfg listed below are considered public API, but it is *unstable* // and outside of the normal semver guarantees: // @@ -13,10 +7,15 @@ include!("no_atomic_cas.rs"); // need to enable it manually when building for custom targets or using // non-cargo build systems that don't run the build script. // -// With the exceptions mentioned above, the rustc-cfg strings below are -// *not* public API. Please let us know by opening a GitHub issue if your build -// environment requires some way to enable these cfgs other than by executing -// our build script. +// With the exceptions mentioned above, the rustc-cfg emitted by the build +// script are *not* public API. + +#![warn(rust_2018_idioms, single_use_lifetimes)] + +use std::env; + +include!("no_atomic_cas.rs"); + fn main() { let target = match env::var("TARGET") { Ok(target) => target, @@ -34,7 +33,7 @@ fn main() { // `cfg(target_has_atomic = "ptr")` as true when the build script doesn't // run. This is needed for compatibility with non-cargo build systems that // don't run the build script. - if NO_ATOMIC_CAS_TARGETS.contains(&&*target) { + if NO_ATOMIC_CAS.contains(&&*target) { println!("cargo:rustc-cfg=futures_no_atomic_cas"); } diff --git a/vendor/futures-util/no_atomic_cas.rs b/vendor/futures-util/no_atomic_cas.rs index 4708bf853..16ec628cd 100644 --- a/vendor/futures-util/no_atomic_cas.rs +++ b/vendor/futures-util/no_atomic_cas.rs @@ -1,13 +1,17 @@ // This file is @generated by no_atomic_cas.sh. // It is not intended for manual editing. -const NO_ATOMIC_CAS_TARGETS: &[&str] = &[ +const NO_ATOMIC_CAS: &[&str] = &[ + "armv4t-none-eabi", + "armv5te-none-eabi", "avr-unknown-gnu-atmega328", "bpfeb-unknown-none", "bpfel-unknown-none", "msp430-none-elf", "riscv32i-unknown-none-elf", + "riscv32im-unknown-none-elf", "riscv32imc-unknown-none-elf", "thumbv4t-none-eabi", + "thumbv5te-none-eabi", "thumbv6m-none-eabi", ]; diff --git a/vendor/futures-util/src/abortable.rs b/vendor/futures-util/src/abortable.rs index bb82dd0db..9dbcfc2b5 100644 --- a/vendor/futures-util/src/abortable.rs +++ b/vendor/futures-util/src/abortable.rs @@ -75,7 +75,18 @@ impl<T> Abortable<T> { /// in calls to `Abortable::new`. #[derive(Debug)] pub struct AbortRegistration { - inner: Arc<AbortInner>, + pub(crate) inner: Arc<AbortInner>, +} + +impl AbortRegistration { + /// Create an [`AbortHandle`] from the given [`AbortRegistration`]. + /// + /// The created [`AbortHandle`] is functionally the same as any other + /// [`AbortHandle`]s that are associated with the same [`AbortRegistration`], + /// such as the one created by [`AbortHandle::new_pair`]. + pub fn handle(&self) -> AbortHandle { + AbortHandle { inner: self.inner.clone() } + } } /// A handle to an `Abortable` task. @@ -100,9 +111,9 @@ impl AbortHandle { // Inner type storing the waker to awaken and a bool indicating that it // should be aborted. #[derive(Debug)] -struct AbortInner { - waker: AtomicWaker, - aborted: AtomicBool, +pub(crate) struct AbortInner { + pub(crate) waker: AtomicWaker, + pub(crate) aborted: AtomicBool, } /// Indicator that the `Abortable` task was aborted. @@ -182,4 +193,17 @@ impl AbortHandle { self.inner.aborted.store(true, Ordering::Relaxed); self.inner.waker.wake(); } + + /// Checks whether [`AbortHandle::abort`] was *called* on any associated + /// [`AbortHandle`]s, which includes all the [`AbortHandle`]s linked with + /// the same [`AbortRegistration`]. This means that it will return `true` + /// even if: + /// * `abort` was called after the task had completed. + /// * `abort` was called while the task was being polled - the task may still be running and + /// will not be stopped until `poll` returns. + /// + /// This operation has a Relaxed ordering. + pub fn is_aborted(&self) -> bool { + self.inner.aborted.load(Ordering::Relaxed) + } } diff --git a/vendor/futures-util/src/compat/compat01as03.rs b/vendor/futures-util/src/compat/compat01as03.rs index 754e3d82a..36de1da98 100644 --- a/vendor/futures-util/src/compat/compat01as03.rs +++ b/vendor/futures-util/src/compat/compat01as03.rs @@ -64,6 +64,7 @@ pub trait Future01CompatExt: Future01 { /// [`Future<Output = Result<T, E>>`](futures_core::future::Future). /// /// ``` + /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514 /// # futures::executor::block_on(async { /// # // TODO: These should be all using `futures::compat`, but that runs up against Cargo /// # // feature issues @@ -90,6 +91,7 @@ pub trait Stream01CompatExt: Stream01 { /// [`Stream<Item = Result<T, E>>`](futures_core::stream::Stream). /// /// ``` + /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514 /// # futures::executor::block_on(async { /// use futures::stream::StreamExt; /// use futures_util::compat::Stream01CompatExt; @@ -119,6 +121,7 @@ pub trait Sink01CompatExt: Sink01 { /// [`Sink<T, Error = E>`](futures_sink::Sink). /// /// ``` + /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514 /// # futures::executor::block_on(async { /// use futures::{sink::SinkExt, stream::StreamExt}; /// use futures_util::compat::{Stream01CompatExt, Sink01CompatExt}; @@ -362,6 +365,7 @@ mod io { /// [`AsyncRead`](futures_io::AsyncRead). /// /// ``` + /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514 /// # futures::executor::block_on(async { /// use futures::io::AsyncReadExt; /// use futures_util::compat::AsyncRead01CompatExt; @@ -391,6 +395,7 @@ mod io { /// [`AsyncWrite`](futures_io::AsyncWrite). /// /// ``` + /// # if cfg!(miri) { return; } // https://github.com/rust-lang/futures-rs/issues/2514 /// # futures::executor::block_on(async { /// use futures::io::AsyncWriteExt; /// use futures_util::compat::AsyncWrite01CompatExt; diff --git a/vendor/futures-util/src/compat/executor.rs b/vendor/futures-util/src/compat/executor.rs index e25705be1..ea0c67a0a 100644 --- a/vendor/futures-util/src/compat/executor.rs +++ b/vendor/futures-util/src/compat/executor.rs @@ -17,6 +17,7 @@ pub trait Executor01CompatExt: Executor01<Executor01Future> + Clone + Send + 'st /// futures 0.3 [`Spawn`](futures_task::Spawn). /// /// ``` + /// # if cfg!(miri) { return; } // Miri does not support epoll /// use futures::task::SpawnExt; /// use futures::future::{FutureExt, TryFutureExt}; /// use futures_util::compat::Executor01CompatExt; diff --git a/vendor/futures-util/src/future/either.rs b/vendor/futures-util/src/future/either.rs index 9602de7a4..27e5064df 100644 --- a/vendor/futures-util/src/future/either.rs +++ b/vendor/futures-util/src/future/either.rs @@ -33,11 +33,31 @@ pub enum Either<A, B> { } impl<A, B> Either<A, B> { - fn project(self: Pin<&mut Self>) -> Either<Pin<&mut A>, Pin<&mut B>> { + /// Convert `Pin<&Either<A, B>>` to `Either<Pin<&A>, Pin<&B>>`, + /// pinned projections of the inner variants. + pub fn as_pin_ref(self: Pin<&Self>) -> Either<Pin<&A>, Pin<&B>> { + // SAFETY: We can use `new_unchecked` because the `inner` parts are + // guaranteed to be pinned, as they come from `self` which is pinned. unsafe { - match self.get_unchecked_mut() { - Either::Left(a) => Either::Left(Pin::new_unchecked(a)), - Either::Right(b) => Either::Right(Pin::new_unchecked(b)), + match *Pin::get_ref(self) { + Either::Left(ref inner) => Either::Left(Pin::new_unchecked(inner)), + Either::Right(ref inner) => Either::Right(Pin::new_unchecked(inner)), + } + } + } + + /// Convert `Pin<&mut Either<A, B>>` to `Either<Pin<&mut A>, Pin<&mut B>>`, + /// pinned projections of the inner variants. + pub fn as_pin_mut(self: Pin<&mut Self>) -> Either<Pin<&mut A>, Pin<&mut B>> { + // SAFETY: `get_unchecked_mut` is fine because we don't move anything. + // We can use `new_unchecked` because the `inner` parts are guaranteed + // to be pinned, as they come from `self` which is pinned, and we never + // offer an unpinned `&mut A` or `&mut B` through `Pin<&mut Self>`. We + // also don't have an implementation of `Drop`, nor manual `Unpin`. + unsafe { + match *Pin::get_unchecked_mut(self) { + Either::Left(ref mut inner) => Either::Left(Pin::new_unchecked(inner)), + Either::Right(ref mut inner) => Either::Right(Pin::new_unchecked(inner)), } } } @@ -85,7 +105,7 @@ where type Output = A::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll(cx), Either::Right(x) => x.poll(cx), } @@ -113,7 +133,7 @@ where type Item = A::Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_next(cx), Either::Right(x) => x.poll_next(cx), } @@ -149,28 +169,28 @@ where type Error = A::Error; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_ready(cx), Either::Right(x) => x.poll_ready(cx), } } fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.start_send(item), Either::Right(x) => x.start_send(item), } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_flush(cx), Either::Right(x) => x.poll_flush(cx), } } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_close(cx), Either::Right(x) => x.poll_close(cx), } @@ -198,7 +218,7 @@ mod if_std { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_read(cx, buf), Either::Right(x) => x.poll_read(cx, buf), } @@ -209,7 +229,7 @@ mod if_std { cx: &mut Context<'_>, bufs: &mut [IoSliceMut<'_>], ) -> Poll<Result<usize>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_read_vectored(cx, bufs), Either::Right(x) => x.poll_read_vectored(cx, bufs), } @@ -226,7 +246,7 @@ mod if_std { cx: &mut Context<'_>, buf: &[u8], ) -> Poll<Result<usize>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_write(cx, buf), Either::Right(x) => x.poll_write(cx, buf), } @@ -237,21 +257,21 @@ mod if_std { cx: &mut Context<'_>, bufs: &[IoSlice<'_>], ) -> Poll<Result<usize>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_write_vectored(cx, bufs), Either::Right(x) => x.poll_write_vectored(cx, bufs), } } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_flush(cx), Either::Right(x) => x.poll_flush(cx), } } fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_close(cx), Either::Right(x) => x.poll_close(cx), } @@ -268,7 +288,7 @@ mod if_std { cx: &mut Context<'_>, pos: SeekFrom, ) -> Poll<Result<u64>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_seek(cx, pos), Either::Right(x) => x.poll_seek(cx, pos), } @@ -281,14 +301,14 @@ mod if_std { B: AsyncBufRead, { fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.poll_fill_buf(cx), Either::Right(x) => x.poll_fill_buf(cx), } } fn consume(self: Pin<&mut Self>, amt: usize) { - match self.project() { + match self.as_pin_mut() { Either::Left(x) => x.consume(amt), Either::Right(x) => x.consume(amt), } diff --git a/vendor/futures-util/src/future/future/fuse.rs b/vendor/futures-util/src/future/future/fuse.rs index 597aec1a4..225790672 100644 --- a/vendor/futures-util/src/future/future/fuse.rs +++ b/vendor/futures-util/src/future/future/fuse.rs @@ -1,6 +1,5 @@ use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; -use futures_core::ready; use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; @@ -81,13 +80,12 @@ impl<Fut: Future> Future for Fuse<Fut> { type Output = Fut::Output; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Fut::Output> { - Poll::Ready(match self.as_mut().project().inner.as_pin_mut() { - Some(fut) => { - let output = ready!(fut.poll(cx)); + match self.as_mut().project().inner.as_pin_mut() { + Some(fut) => fut.poll(cx).map(|output| { self.project().inner.set(None); output - } - None => return Poll::Pending, - }) + }), + None => Poll::Pending, + } } } diff --git a/vendor/futures-util/src/future/future/shared.rs b/vendor/futures-util/src/future/future/shared.rs index 9b31932fe..ecd1b426d 100644 --- a/vendor/futures-util/src/future/future/shared.rs +++ b/vendor/futures-util/src/future/future/shared.rs @@ -4,7 +4,9 @@ use futures_core::task::{Context, Poll, Waker}; use slab::Slab; use std::cell::UnsafeCell; use std::fmt; +use std::hash::Hasher; use std::pin::Pin; +use std::ptr; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::{Acquire, SeqCst}; use std::sync::{Arc, Mutex, Weak}; @@ -103,7 +105,6 @@ impl<Fut: Future> Shared<Fut> { impl<Fut> Shared<Fut> where Fut: Future, - Fut::Output: Clone, { /// Returns [`Some`] containing a reference to this [`Shared`]'s output if /// it has already been computed by a clone or [`None`] if it hasn't been @@ -139,6 +140,7 @@ where /// This method by itself is safe, but using it correctly requires extra care. Another thread /// can change the strong count at any time, including potentially between calling this method /// and acting on the result. + #[allow(clippy::unnecessary_safety_doc)] pub fn strong_count(&self) -> Option<usize> { self.inner.as_ref().map(|arc| Arc::strong_count(arc)) } @@ -152,15 +154,44 @@ where /// This method by itself is safe, but using it correctly requires extra care. Another thread /// can change the weak count at any time, including potentially between calling this method /// and acting on the result. + #[allow(clippy::unnecessary_safety_doc)] pub fn weak_count(&self) -> Option<usize> { self.inner.as_ref().map(|arc| Arc::weak_count(arc)) } + + /// Hashes the internal state of this `Shared` in a way that's compatible with `ptr_eq`. + pub fn ptr_hash<H: Hasher>(&self, state: &mut H) { + match self.inner.as_ref() { + Some(arc) => { + state.write_u8(1); + ptr::hash(Arc::as_ptr(arc), state); + } + None => { + state.write_u8(0); + } + } + } + + /// Returns `true` if the two `Shared`s point to the same future (in a vein similar to + /// `Arc::ptr_eq`). + /// + /// Returns `false` if either `Shared` has terminated. + pub fn ptr_eq(&self, rhs: &Self) -> bool { + let lhs = match self.inner.as_ref() { + Some(lhs) => lhs, + None => return false, + }; + let rhs = match rhs.inner.as_ref() { + Some(rhs) => rhs, + None => return false, + }; + Arc::ptr_eq(lhs, rhs) + } } impl<Fut> Inner<Fut> where Fut: Future, - Fut::Output: Clone, { /// Safety: callers must first ensure that `self.inner.state` /// is `COMPLETE` @@ -170,6 +201,13 @@ where FutureOrOutput::Future(_) => unreachable!(), } } +} + +impl<Fut> Inner<Fut> +where + Fut: Future, + Fut::Output: Clone, +{ /// Registers the current task to receive a wakeup when we are awoken. fn record_waker(&self, waker_key: &mut usize, cx: &mut Context<'_>) { let mut wakers_guard = self.notifier.wakers.lock().unwrap(); @@ -262,19 +300,20 @@ where let waker = waker_ref(&inner.notifier); let mut cx = Context::from_waker(&waker); - struct Reset<'a>(&'a AtomicUsize); + struct Reset<'a> { + state: &'a AtomicUsize, + did_not_panic: bool, + } impl Drop for Reset<'_> { fn drop(&mut self) { - use std::thread; - - if thread::panicking() { - self.0.store(POISONED, SeqCst); + if !self.did_not_panic { + self.state.store(POISONED, SeqCst); } } } - let _reset = Reset(&inner.notifier.state); + let mut reset = Reset { state: &inner.notifier.state, did_not_panic: false }; let output = { let future = unsafe { @@ -284,12 +323,15 @@ where } }; - match future.poll(&mut cx) { + let poll_result = future.poll(&mut cx); + reset.did_not_panic = true; + + match poll_result { Poll::Pending => { if inner.notifier.state.compare_exchange(POLLING, IDLE, SeqCst, SeqCst).is_ok() { // Success - drop(_reset); + drop(reset); this.inner = Some(inner); return Poll::Pending; } else { @@ -313,7 +355,7 @@ where waker.wake(); } - drop(_reset); // Make borrow checker happy + drop(reset); // Make borrow checker happy drop(wakers_guard); // Safety: We're in the COMPLETE state diff --git a/vendor/futures-util/src/future/join_all.rs b/vendor/futures-util/src/future/join_all.rs index 2e52ac17f..7dc159ba0 100644 --- a/vendor/futures-util/src/future/join_all.rs +++ b/vendor/futures-util/src/future/join_all.rs @@ -15,7 +15,7 @@ use super::{assert_future, MaybeDone}; #[cfg(not(futures_no_atomic_cas))] use crate::stream::{Collect, FuturesOrdered, StreamExt}; -fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { +pub(crate) fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { // Safety: `std` _could_ make this unsound if it were to decide Pin's // invariants aren't required to transmit through slices. Otherwise this has // the same safety as a normal field pin projection. @@ -32,9 +32,9 @@ where } #[cfg(not(futures_no_atomic_cas))] -const SMALL: usize = 30; +pub(crate) const SMALL: usize = 30; -pub(crate) enum JoinAllKind<F> +enum JoinAllKind<F> where F: Future, { @@ -104,26 +104,25 @@ where I: IntoIterator, I::Item: Future, { + let iter = iter.into_iter(); + #[cfg(futures_no_atomic_cas)] { - let elems = iter.into_iter().map(MaybeDone::Future).collect::<Box<[_]>>().into(); - let kind = JoinAllKind::Small { elems }; + let kind = + JoinAllKind::Small { elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into() }; + assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind }) } + #[cfg(not(futures_no_atomic_cas))] { - let iter = iter.into_iter(); let kind = match iter.size_hint().1 { - None => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() }, - Some(max) => { - if max <= SMALL { - let elems = iter.map(MaybeDone::Future).collect::<Box<[_]>>().into(); - JoinAllKind::Small { elems } - } else { - JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() } - } - } + Some(max) if max <= SMALL => JoinAllKind::Small { + elems: iter.map(MaybeDone::Future).collect::<Box<[_]>>().into(), + }, + _ => JoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().collect() }, }; + assert_future::<Vec<<I::Item as Future>::Output>, _>(JoinAll { kind }) } } diff --git a/vendor/futures-util/src/future/pending.rs b/vendor/futures-util/src/future/pending.rs index 92c78d52b..b8e28686e 100644 --- a/vendor/futures-util/src/future/pending.rs +++ b/vendor/futures-util/src/future/pending.rs @@ -33,6 +33,7 @@ impl<T> FusedFuture for Pending<T> { /// unreachable!(); /// # }); /// ``` +#[cfg_attr(docsrs, doc(alias = "never"))] pub fn pending<T>() -> Pending<T> { assert_future::<T, _>(Pending { _data: marker::PhantomData }) } diff --git a/vendor/futures-util/src/future/select.rs b/vendor/futures-util/src/future/select.rs index bd44f20f7..7e33d195f 100644 --- a/vendor/futures-util/src/future/select.rs +++ b/vendor/futures-util/src/future/select.rs @@ -99,17 +99,27 @@ where type Output = Either<(A::Output, B), (B::Output, A)>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); - match a.poll_unpin(cx) { - Poll::Ready(x) => Poll::Ready(Either::Left((x, b))), - Poll::Pending => match b.poll_unpin(cx) { - Poll::Ready(x) => Poll::Ready(Either::Right((x, a))), - Poll::Pending => { - self.inner = Some((a, b)); - Poll::Pending - } - }, + /// When compiled with `-C opt-level=z`, this function will help the compiler eliminate the `None` branch, where + /// `Option::unwrap` does not. + #[inline(always)] + fn unwrap_option<T>(value: Option<T>) -> T { + match value { + None => unreachable!(), + Some(value) => value, + } } + + let (a, b) = self.inner.as_mut().expect("cannot poll Select twice"); + + if let Poll::Ready(val) = a.poll_unpin(cx) { + return Poll::Ready(Either::Left((val, unwrap_option(self.inner.take()).1))); + } + + if let Poll::Ready(val) = b.poll_unpin(cx) { + return Poll::Ready(Either::Right((val, unwrap_option(self.inner.take()).0))); + } + + Poll::Pending } } diff --git a/vendor/futures-util/src/future/select_all.rs b/vendor/futures-util/src/future/select_all.rs index 106e50844..0a51d0da6 100644 --- a/vendor/futures-util/src/future/select_all.rs +++ b/vendor/futures-util/src/future/select_all.rs @@ -58,8 +58,9 @@ impl<Fut: Future + Unpin> Future for SelectAll<Fut> { }); match item { Some((idx, res)) => { + #[allow(clippy::let_underscore_future)] let _ = self.inner.swap_remove(idx); - let rest = mem::replace(&mut self.inner, Vec::new()); + let rest = mem::take(&mut self.inner); Poll::Ready((res, idx, rest)) } None => Poll::Pending, diff --git a/vendor/futures-util/src/future/select_ok.rs b/vendor/futures-util/src/future/select_ok.rs index 0ad83c6db..5d5579930 100644 --- a/vendor/futures-util/src/future/select_ok.rs +++ b/vendor/futures-util/src/future/select_ok.rs @@ -59,7 +59,7 @@ impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> { drop(self.inner.remove(idx)); match res { Ok(e) => { - let rest = mem::replace(&mut self.inner, Vec::new()); + let rest = mem::take(&mut self.inner); return Poll::Ready(Ok((e, rest))); } Err(e) => { diff --git a/vendor/futures-util/src/future/try_future/mod.rs b/vendor/futures-util/src/future/try_future/mod.rs index fb3bdd8a0..e5bc70071 100644 --- a/vendor/futures-util/src/future/try_future/mod.rs +++ b/vendor/futures-util/src/future/try_future/mod.rs @@ -302,6 +302,9 @@ pub trait TryFutureExt: TryFuture { /// assert_eq!(future.await, Ok(1)); /// # }); /// ``` + /// + /// [`join!`]: crate::join + /// [`select!`]: crate::select fn map_err<E, F>(self, f: F) -> MapErr<Self, F> where F: FnOnce(Self::Error) -> E, @@ -332,6 +335,9 @@ pub trait TryFutureExt: TryFuture { /// let future_err_i32 = future_err_u8.err_into::<i32>(); /// # }); /// ``` + /// + /// [`join!`]: crate::join + /// [`select!`]: crate::select fn err_into<E>(self) -> ErrInto<Self, E> where Self: Sized, diff --git a/vendor/futures-util/src/future/try_join_all.rs b/vendor/futures-util/src/future/try_join_all.rs index 29244af83..506f45065 100644 --- a/vendor/futures-util/src/future/try_join_all.rs +++ b/vendor/futures-util/src/future/try_join_all.rs @@ -10,14 +10,11 @@ use core::mem; use core::pin::Pin; use core::task::{Context, Poll}; -use super::{assert_future, TryFuture, TryMaybeDone}; +use super::{assert_future, join_all, IntoFuture, TryFuture, TryMaybeDone}; -fn iter_pin_mut<T>(slice: Pin<&mut [T]>) -> impl Iterator<Item = Pin<&mut T>> { - // Safety: `std` _could_ make this unsound if it were to decide Pin's - // invariants aren't required to transmit through slices. Otherwise this has - // the same safety as a normal field pin projection. - unsafe { slice.get_unchecked_mut() }.iter_mut().map(|t| unsafe { Pin::new_unchecked(t) }) -} +#[cfg(not(futures_no_atomic_cas))] +use crate::stream::{FuturesOrdered, TryCollect, TryStreamExt}; +use crate::TryFutureExt; enum FinalState<E = ()> { Pending, @@ -31,7 +28,20 @@ pub struct TryJoinAll<F> where F: TryFuture, { - elems: Pin<Box<[TryMaybeDone<F>]>>, + kind: TryJoinAllKind<F>, +} + +enum TryJoinAllKind<F> +where + F: TryFuture, +{ + Small { + elems: Pin<Box<[TryMaybeDone<IntoFuture<F>>]>>, + }, + #[cfg(not(futures_no_atomic_cas))] + Big { + fut: TryCollect<FuturesOrdered<IntoFuture<F>>, Vec<F::Ok>>, + }, } impl<F> fmt::Debug for TryJoinAll<F> @@ -39,9 +49,16 @@ where F: TryFuture + fmt::Debug, F::Ok: fmt::Debug, F::Error: fmt::Debug, + F::Output: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("TryJoinAll").field("elems", &self.elems).finish() + match self.kind { + TryJoinAllKind::Small { ref elems } => { + f.debug_struct("TryJoinAll").field("elems", elems).finish() + } + #[cfg(not(futures_no_atomic_cas))] + TryJoinAllKind::Big { ref fut, .. } => fmt::Debug::fmt(fut, f), + } } } @@ -60,6 +77,20 @@ where /// This function is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. /// +/// # See Also +/// +/// `try_join_all` will switch to the more powerful [`FuturesOrdered`] for performance +/// reasons if the number of futures is large. You may want to look into using it or +/// it's counterpart [`FuturesUnordered`][crate::stream::FuturesUnordered] directly. +/// +/// Some examples for additional functionality provided by these are: +/// +/// * Adding new futures to the set even after it has been started. +/// +/// * Only polling the specific futures that have been woken. In cases where +/// you have a lot of futures this will result in much more efficient polling. +/// +/// /// # Examples /// /// ``` @@ -83,15 +114,37 @@ where /// assert_eq!(try_join_all(futures).await, Err(2)); /// # }); /// ``` -pub fn try_join_all<I>(i: I) -> TryJoinAll<I::Item> +pub fn try_join_all<I>(iter: I) -> TryJoinAll<I::Item> where I: IntoIterator, I::Item: TryFuture, { - let elems: Box<[_]> = i.into_iter().map(TryMaybeDone::Future).collect(); - assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>( - TryJoinAll { elems: elems.into() }, - ) + let iter = iter.into_iter().map(TryFutureExt::into_future); + + #[cfg(futures_no_atomic_cas)] + { + let kind = TryJoinAllKind::Small { + elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(), + }; + + assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>( + TryJoinAll { kind }, + ) + } + + #[cfg(not(futures_no_atomic_cas))] + { + let kind = match iter.size_hint().1 { + Some(max) if max <= join_all::SMALL => TryJoinAllKind::Small { + elems: iter.map(TryMaybeDone::Future).collect::<Box<[_]>>().into(), + }, + _ => TryJoinAllKind::Big { fut: iter.collect::<FuturesOrdered<_>>().try_collect() }, + }; + + assert_future::<Result<Vec<<I::Item as TryFuture>::Ok>, <I::Item as TryFuture>::Error>, _>( + TryJoinAll { kind }, + ) + } } impl<F> Future for TryJoinAll<F> @@ -101,36 +154,46 @@ where type Output = Result<Vec<F::Ok>, F::Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - let mut state = FinalState::AllDone; - - for elem in iter_pin_mut(self.elems.as_mut()) { - match elem.try_poll(cx) { - Poll::Pending => state = FinalState::Pending, - Poll::Ready(Ok(())) => {} - Poll::Ready(Err(e)) => { - state = FinalState::Error(e); - break; + match &mut self.kind { + TryJoinAllKind::Small { elems } => { + let mut state = FinalState::AllDone; + + for elem in join_all::iter_pin_mut(elems.as_mut()) { + match elem.try_poll(cx) { + Poll::Pending => state = FinalState::Pending, + Poll::Ready(Ok(())) => {} + Poll::Ready(Err(e)) => { + state = FinalState::Error(e); + break; + } + } } - } - } - match state { - FinalState::Pending => Poll::Pending, - FinalState::AllDone => { - let mut elems = mem::replace(&mut self.elems, Box::pin([])); - let results = - iter_pin_mut(elems.as_mut()).map(|e| e.take_output().unwrap()).collect(); - Poll::Ready(Ok(results)) - } - FinalState::Error(e) => { - let _ = mem::replace(&mut self.elems, Box::pin([])); - Poll::Ready(Err(e)) + match state { + FinalState::Pending => Poll::Pending, + FinalState::AllDone => { + let mut elems = mem::replace(elems, Box::pin([])); + let results = join_all::iter_pin_mut(elems.as_mut()) + .map(|e| e.take_output().unwrap()) + .collect(); + Poll::Ready(Ok(results)) + } + FinalState::Error(e) => { + let _ = mem::replace(elems, Box::pin([])); + Poll::Ready(Err(e)) + } + } } + #[cfg(not(futures_no_atomic_cas))] + TryJoinAllKind::Big { fut } => Pin::new(fut).poll(cx), } } } -impl<F: TryFuture> FromIterator<F> for TryJoinAll<F> { +impl<F> FromIterator<F> for TryJoinAll<F> +where + F: TryFuture, +{ fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self { try_join_all(iter) } diff --git a/vendor/futures-util/src/future/try_select.rs b/vendor/futures-util/src/future/try_select.rs index 4d0b7ff13..bc282f7db 100644 --- a/vendor/futures-util/src/future/try_select.rs +++ b/vendor/futures-util/src/future/try_select.rs @@ -12,6 +12,9 @@ pub struct TrySelect<A, B> { impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {} +type EitherOk<A, B> = Either<(<A as TryFuture>::Ok, B), (<B as TryFuture>::Ok, A)>; +type EitherErr<A, B> = Either<(<A as TryFuture>::Error, B), (<B as TryFuture>::Error, A)>; + /// Waits for either one of two differently-typed futures to complete. /// /// This function will return a new future which awaits for either one of both @@ -52,10 +55,9 @@ where A: TryFuture + Unpin, B: TryFuture + Unpin, { - super::assert_future::< - Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>, - _, - >(TrySelect { inner: Some((future1, future2)) }) + super::assert_future::<Result<EitherOk<A, B>, EitherErr<A, B>>, _>(TrySelect { + inner: Some((future1, future2)), + }) } impl<A: Unpin, B: Unpin> Future for TrySelect<A, B> @@ -63,8 +65,7 @@ where A: TryFuture, B: TryFuture, { - #[allow(clippy::type_complexity)] - type Output = Result<Either<(A::Ok, B), (B::Ok, A)>, Either<(A::Error, B), (B::Error, A)>>; + type Output = Result<EitherOk<A, B>, EitherErr<A, B>>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); diff --git a/vendor/futures-util/src/io/copy_buf_abortable.rs b/vendor/futures-util/src/io/copy_buf_abortable.rs new file mode 100644 index 000000000..fdbc4a5f0 --- /dev/null +++ b/vendor/futures-util/src/io/copy_buf_abortable.rs @@ -0,0 +1,124 @@ +use crate::abortable::{AbortHandle, AbortInner, Aborted}; +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::{AsyncBufRead, AsyncWrite}; +use pin_project_lite::pin_project; +use std::io; +use std::pin::Pin; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +/// Creates a future which copies all the bytes from one object to another, with its `AbortHandle`. +/// +/// The returned future will copy all the bytes read from this `AsyncBufRead` into the +/// `writer` specified. This future will only complete once abort has been requested or the `reader` has hit +/// EOF and all bytes have been written to and flushed from the `writer` +/// provided. +/// +/// On success the number of bytes is returned. If aborted, `Aborted` is returned. Otherwise, the underlying error is returned. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::io::{self, AsyncWriteExt, Cursor}; +/// use futures::future::Aborted; +/// +/// let reader = Cursor::new([1, 2, 3, 4]); +/// let mut writer = Cursor::new(vec![0u8; 5]); +/// +/// let (fut, abort_handle) = io::copy_buf_abortable(reader, &mut writer); +/// let bytes = fut.await; +/// abort_handle.abort(); +/// writer.close().await.unwrap(); +/// match bytes { +/// Ok(Ok(n)) => { +/// assert_eq!(n, 4); +/// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]); +/// Ok(n) +/// }, +/// Ok(Err(a)) => { +/// Err::<u64, Aborted>(a) +/// } +/// Err(e) => panic!("{}", e) +/// } +/// # }).unwrap(); +/// ``` +pub fn copy_buf_abortable<R, W>( + reader: R, + writer: &mut W, +) -> (CopyBufAbortable<'_, R, W>, AbortHandle) +where + R: AsyncBufRead, + W: AsyncWrite + Unpin + ?Sized, +{ + let (handle, reg) = AbortHandle::new_pair(); + (CopyBufAbortable { reader, writer, amt: 0, inner: reg.inner }, handle) +} + +pin_project! { + /// Future for the [`copy_buf()`] function. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct CopyBufAbortable<'a, R, W: ?Sized> { + #[pin] + reader: R, + writer: &'a mut W, + amt: u64, + inner: Arc<AbortInner> + } +} + +macro_rules! ready_or_break { + ($e:expr $(,)?) => { + match $e { + $crate::task::Poll::Ready(t) => t, + $crate::task::Poll::Pending => break, + } + }; +} + +impl<R, W> Future for CopyBufAbortable<'_, R, W> +where + R: AsyncBufRead, + W: AsyncWrite + Unpin + Sized, +{ + type Output = Result<Result<u64, Aborted>, io::Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut this = self.project(); + loop { + // Check if the task has been aborted + if this.inner.aborted.load(Ordering::Relaxed) { + return Poll::Ready(Ok(Err(Aborted))); + } + + // Read some bytes from the reader, and if we have reached EOF, return total bytes read + let buffer = ready_or_break!(this.reader.as_mut().poll_fill_buf(cx))?; + if buffer.is_empty() { + ready_or_break!(Pin::new(&mut this.writer).poll_flush(cx))?; + return Poll::Ready(Ok(Ok(*this.amt))); + } + + // Pass the buffer to the writer, and update the amount written + let i = ready_or_break!(Pin::new(&mut this.writer).poll_write(cx, buffer))?; + if i == 0 { + return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); + } + *this.amt += i as u64; + this.reader.as_mut().consume(i); + } + // Schedule the task to be woken up again. + // Never called unless Poll::Pending is returned from io objects. + this.inner.waker.register(cx.waker()); + + // Check to see if the task was aborted between the first check and + // registration. + // Checking with `Relaxed` is sufficient because + // `register` introduces an `AcqRel` barrier. + if this.inner.aborted.load(Ordering::Relaxed) { + return Poll::Ready(Ok(Err(Aborted))); + } + Poll::Pending + } +} diff --git a/vendor/futures-util/src/io/cursor.rs b/vendor/futures-util/src/io/cursor.rs index b6fb3724c..c6e2aeea2 100644 --- a/vendor/futures-util/src/io/cursor.rs +++ b/vendor/futures-util/src/io/cursor.rs @@ -1,6 +1,4 @@ use futures_core::task::{Context, Poll}; -#[cfg(feature = "read_initializer")] -use futures_io::Initializer; use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite, IoSlice, IoSliceMut, SeekFrom}; use std::io; use std::pin::Pin; @@ -159,12 +157,6 @@ where } impl<T: AsRef<[u8]> + Unpin> AsyncRead for Cursor<T> { - #[cfg(feature = "read_initializer")] - #[inline] - unsafe fn initializer(&self) -> Initializer { - io::Read::initializer(&self.inner) - } - fn poll_read( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, diff --git a/vendor/futures-util/src/io/lines.rs b/vendor/futures-util/src/io/lines.rs index 13e70df23..b5561bfa7 100644 --- a/vendor/futures-util/src/io/lines.rs +++ b/vendor/futures-util/src/io/lines.rs @@ -42,6 +42,6 @@ impl<R: AsyncBufRead> Stream for Lines<R> { this.buf.pop(); } } - Poll::Ready(Some(Ok(mem::replace(this.buf, String::new())))) + Poll::Ready(Some(Ok(mem::take(this.buf)))) } } diff --git a/vendor/futures-util/src/io/mod.rs b/vendor/futures-util/src/io/mod.rs index 4dd2e029b..8ce3ad644 100644 --- a/vendor/futures-util/src/io/mod.rs +++ b/vendor/futures-util/src/io/mod.rs @@ -66,6 +66,9 @@ pub use self::copy::{copy, Copy}; mod copy_buf; pub use self::copy_buf::{copy_buf, CopyBuf}; +mod copy_buf_abortable; +pub use self::copy_buf_abortable::{copy_buf_abortable, CopyBufAbortable}; + mod cursor; pub use self::cursor::Cursor; diff --git a/vendor/futures-util/src/io/read_exact.rs b/vendor/futures-util/src/io/read_exact.rs index 02e38c35b..cd0b20e59 100644 --- a/vendor/futures-util/src/io/read_exact.rs +++ b/vendor/futures-util/src/io/read_exact.rs @@ -30,7 +30,7 @@ impl<R: AsyncRead + ?Sized + Unpin> Future for ReadExact<'_, R> { while !this.buf.is_empty() { let n = ready!(Pin::new(&mut this.reader).poll_read(cx, this.buf))?; { - let (_, rest) = mem::replace(&mut this.buf, &mut []).split_at_mut(n); + let (_, rest) = mem::take(&mut this.buf).split_at_mut(n); this.buf = rest; } if n == 0 { diff --git a/vendor/futures-util/src/io/read_line.rs b/vendor/futures-util/src/io/read_line.rs index c75af9471..e1b8fc945 100644 --- a/vendor/futures-util/src/io/read_line.rs +++ b/vendor/futures-util/src/io/read_line.rs @@ -22,7 +22,7 @@ impl<R: ?Sized + Unpin> Unpin for ReadLine<'_, R> {} impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> { pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self { - Self { reader, bytes: mem::replace(buf, String::new()).into_bytes(), buf, read: 0 } + Self { reader, bytes: mem::take(buf).into_bytes(), buf, read: 0 } } } diff --git a/vendor/futures-util/src/io/read_to_string.rs b/vendor/futures-util/src/io/read_to_string.rs index 457af59e4..c175396d8 100644 --- a/vendor/futures-util/src/io/read_to_string.rs +++ b/vendor/futures-util/src/io/read_to_string.rs @@ -22,7 +22,7 @@ impl<R: ?Sized + Unpin> Unpin for ReadToString<'_, R> {} impl<'a, R: AsyncRead + ?Sized + Unpin> ReadToString<'a, R> { pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self { let start_len = buf.len(); - Self { reader, bytes: mem::replace(buf, String::new()).into_bytes(), buf, start_len } + Self { reader, bytes: mem::take(buf).into_bytes(), buf, start_len } } } diff --git a/vendor/futures-util/src/io/write_all.rs b/vendor/futures-util/src/io/write_all.rs index b134bf1b2..08c025f94 100644 --- a/vendor/futures-util/src/io/write_all.rs +++ b/vendor/futures-util/src/io/write_all.rs @@ -30,7 +30,7 @@ impl<W: AsyncWrite + ?Sized + Unpin> Future for WriteAll<'_, W> { while !this.buf.is_empty() { let n = ready!(Pin::new(&mut this.writer).poll_write(cx, this.buf))?; { - let (_, rest) = mem::replace(&mut this.buf, &[]).split_at(n); + let (_, rest) = mem::take(&mut this.buf).split_at(n); this.buf = rest; } if n == 0 { diff --git a/vendor/futures-util/src/lock/bilock.rs b/vendor/futures-util/src/lock/bilock.rs index 2f51ae7c9..7ddc66ad2 100644 --- a/vendor/futures-util/src/lock/bilock.rs +++ b/vendor/futures-util/src/lock/bilock.rs @@ -3,11 +3,11 @@ use alloc::boxed::Box; use alloc::sync::Arc; use core::cell::UnsafeCell; -use core::fmt; use core::ops::{Deref, DerefMut}; use core::pin::Pin; -use core::sync::atomic::AtomicUsize; +use core::sync::atomic::AtomicPtr; use core::sync::atomic::Ordering::SeqCst; +use core::{fmt, ptr}; #[cfg(feature = "bilock")] use futures_core::future::Future; use futures_core::task::{Context, Poll, Waker}; @@ -41,7 +41,7 @@ pub struct BiLock<T> { #[derive(Debug)] struct Inner<T> { - state: AtomicUsize, + state: AtomicPtr<Waker>, value: Option<UnsafeCell<T>>, } @@ -61,7 +61,10 @@ impl<T> BiLock<T> { /// Similarly, reuniting the lock and extracting the inner value is only /// possible when `T` is `Unpin`. pub fn new(t: T) -> (Self, Self) { - let arc = Arc::new(Inner { state: AtomicUsize::new(0), value: Some(UnsafeCell::new(t)) }); + let arc = Arc::new(Inner { + state: AtomicPtr::new(ptr::null_mut()), + value: Some(UnsafeCell::new(t)), + }); (Self { arc: arc.clone() }, Self { arc }) } @@ -87,7 +90,8 @@ impl<T> BiLock<T> { pub fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<BiLockGuard<'_, T>> { let mut waker = None; loop { - match self.arc.state.swap(1, SeqCst) { + let n = self.arc.state.swap(invalid_ptr(1), SeqCst); + match n as usize { // Woohoo, we grabbed the lock! 0 => return Poll::Ready(BiLockGuard { bilock: self }), @@ -96,8 +100,8 @@ impl<T> BiLock<T> { // A task was previously blocked on this lock, likely our task, // so we need to update that task. - n => unsafe { - let mut prev = Box::from_raw(n as *mut Waker); + _ => unsafe { + let mut prev = Box::from_raw(n); *prev = cx.waker().clone(); waker = Some(prev); }, @@ -105,9 +109,9 @@ impl<T> BiLock<T> { // type ascription for safety's sake! let me: Box<Waker> = waker.take().unwrap_or_else(|| Box::new(cx.waker().clone())); - let me = Box::into_raw(me) as usize; + let me = Box::into_raw(me); - match self.arc.state.compare_exchange(1, me, SeqCst, SeqCst) { + match self.arc.state.compare_exchange(invalid_ptr(1), me, SeqCst, SeqCst) { // The lock is still locked, but we've now parked ourselves, so // just report that we're scheduled to receive a notification. Ok(_) => return Poll::Pending, @@ -115,8 +119,8 @@ impl<T> BiLock<T> { // Oops, looks like the lock was unlocked after our swap above // and before the compare_exchange. Deallocate what we just // allocated and go through the loop again. - Err(0) => unsafe { - waker = Some(Box::from_raw(me as *mut Waker)); + Err(n) if n.is_null() => unsafe { + waker = Some(Box::from_raw(me)); }, // The top of this loop set the previous state to 1, so if we @@ -125,7 +129,7 @@ impl<T> BiLock<T> { // but we're trying to acquire the lock and there's only one // other reference of the lock, so it should be impossible for // that task to ever block itself. - Err(n) => panic!("invalid state: {}", n), + Err(n) => panic!("invalid state: {}", n as usize), } } } @@ -164,7 +168,8 @@ impl<T> BiLock<T> { } fn unlock(&self) { - match self.arc.state.swap(0, SeqCst) { + let n = self.arc.state.swap(ptr::null_mut(), SeqCst); + match n as usize { // we've locked the lock, shouldn't be possible for us to see an // unlocked lock. 0 => panic!("invalid unlocked state"), @@ -174,8 +179,8 @@ impl<T> BiLock<T> { // Another task has parked themselves on this lock, let's wake them // up as its now their turn. - n => unsafe { - Box::from_raw(n as *mut Waker).wake(); + _ => unsafe { + Box::from_raw(n).wake(); }, } } @@ -189,7 +194,7 @@ impl<T: Unpin> Inner<T> { impl<T> Drop for Inner<T> { fn drop(&mut self) { - assert_eq!(self.state.load(SeqCst), 0); + assert!(self.state.load(SeqCst).is_null()); } } @@ -224,6 +229,9 @@ pub struct BiLockGuard<'a, T> { bilock: &'a BiLock<T>, } +// We allow parallel access to T via Deref, so Sync bound is also needed here. +unsafe impl<T: Send + Sync> Sync for BiLockGuard<'_, T> {} + impl<T> Deref for BiLockGuard<'_, T> { type Target = T; fn deref(&self) -> &T { @@ -274,3 +282,12 @@ impl<'a, T> Future for BiLockAcquire<'a, T> { self.bilock.poll_lock(cx) } } + +// Based on core::ptr::invalid_mut. Equivalent to `addr as *mut T`, but is strict-provenance compatible. +#[allow(clippy::useless_transmute)] +#[inline] +fn invalid_ptr<T>(addr: usize) -> *mut T { + // SAFETY: every valid integer is also a valid pointer (as long as you don't dereference that + // pointer). + unsafe { core::mem::transmute(addr) } +} diff --git a/vendor/futures-util/src/lock/mod.rs b/vendor/futures-util/src/lock/mod.rs index cf374c016..0be72717c 100644 --- a/vendor/futures-util/src/lock/mod.rs +++ b/vendor/futures-util/src/lock/mod.rs @@ -4,11 +4,18 @@ //! library is activated, and it is activated by default. #[cfg(not(futures_no_atomic_cas))] -#[cfg(feature = "std")] -mod mutex; +#[cfg(any(feature = "sink", feature = "io"))] +#[cfg(not(feature = "bilock"))] +pub(crate) use self::bilock::BiLock; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "bilock")] +#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] +pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "std")] -pub use self::mutex::{MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture}; +pub use self::mutex::{ + MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture, OwnedMutexGuard, OwnedMutexLockFuture, +}; #[cfg(not(futures_no_atomic_cas))] #[cfg(any(feature = "bilock", feature = "sink", feature = "io"))] @@ -16,10 +23,5 @@ pub use self::mutex::{MappedMutexGuard, Mutex, MutexGuard, MutexLockFuture}; #[cfg_attr(not(feature = "bilock"), allow(unreachable_pub))] mod bilock; #[cfg(not(futures_no_atomic_cas))] -#[cfg(any(feature = "sink", feature = "io"))] -#[cfg(not(feature = "bilock"))] -pub(crate) use self::bilock::BiLock; -#[cfg(not(futures_no_atomic_cas))] -#[cfg(feature = "bilock")] -#[cfg_attr(docsrs, doc(cfg(feature = "bilock")))] -pub use self::bilock::{BiLock, BiLockAcquire, BiLockGuard, ReuniteError}; +#[cfg(feature = "std")] +mod mutex; diff --git a/vendor/futures-util/src/lock/mutex.rs b/vendor/futures-util/src/lock/mutex.rs index 85dcb1537..335ad1427 100644 --- a/vendor/futures-util/src/lock/mutex.rs +++ b/vendor/futures-util/src/lock/mutex.rs @@ -1,14 +1,16 @@ -use futures_core::future::{FusedFuture, Future}; -use futures_core::task::{Context, Poll, Waker}; -use slab::Slab; use std::cell::UnsafeCell; use std::marker::PhantomData; use std::ops::{Deref, DerefMut}; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Mutex as StdMutex; +use std::sync::{Arc, Mutex as StdMutex}; use std::{fmt, mem}; +use slab::Slab; + +use futures_core::future::{FusedFuture, Future}; +use futures_core::task::{Context, Poll, Waker}; + /// A futures-aware mutex. /// /// # Fairness @@ -107,6 +109,18 @@ impl<T: ?Sized> Mutex<T> { } } + /// Attempt to acquire the lock immediately. + /// + /// If the lock is currently held, this will return `None`. + pub fn try_lock_owned(self: &Arc<Self>) -> Option<OwnedMutexGuard<T>> { + let old_state = self.state.fetch_or(IS_LOCKED, Ordering::Acquire); + if (old_state & IS_LOCKED) == 0 { + Some(OwnedMutexGuard { mutex: self.clone() }) + } else { + None + } + } + /// Acquire the lock asynchronously. /// /// This method returns a future that will resolve once the lock has been @@ -115,6 +129,14 @@ impl<T: ?Sized> Mutex<T> { MutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE } } + /// Acquire the lock asynchronously. + /// + /// This method returns a future that will resolve once the lock has been + /// successfully acquired. + pub fn lock_owned(self: Arc<Self>) -> OwnedMutexLockFuture<T> { + OwnedMutexLockFuture { mutex: Some(self), wait_key: WAIT_KEY_NONE } + } + /// Returns a mutable reference to the underlying data. /// /// Since this call borrows the `Mutex` mutably, no actual locking needs to @@ -173,7 +195,118 @@ impl<T: ?Sized> Mutex<T> { } // Sentinel for when no slot in the `Slab` has been dedicated to this object. -const WAIT_KEY_NONE: usize = usize::max_value(); +const WAIT_KEY_NONE: usize = usize::MAX; + +/// A future which resolves when the target mutex has been successfully acquired, owned version. +pub struct OwnedMutexLockFuture<T: ?Sized> { + // `None` indicates that the mutex was successfully acquired. + mutex: Option<Arc<Mutex<T>>>, + wait_key: usize, +} + +impl<T: ?Sized> fmt::Debug for OwnedMutexLockFuture<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("OwnedMutexLockFuture") + .field("was_acquired", &self.mutex.is_none()) + .field("mutex", &self.mutex) + .field( + "wait_key", + &(if self.wait_key == WAIT_KEY_NONE { None } else { Some(self.wait_key) }), + ) + .finish() + } +} + +impl<T: ?Sized> FusedFuture for OwnedMutexLockFuture<T> { + fn is_terminated(&self) -> bool { + self.mutex.is_none() + } +} + +impl<T: ?Sized> Future for OwnedMutexLockFuture<T> { + type Output = OwnedMutexGuard<T>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let this = self.get_mut(); + + let mutex = this.mutex.as_ref().expect("polled OwnedMutexLockFuture after completion"); + + if let Some(lock) = mutex.try_lock_owned() { + mutex.remove_waker(this.wait_key, false); + this.mutex = None; + return Poll::Ready(lock); + } + + { + let mut waiters = mutex.waiters.lock().unwrap(); + if this.wait_key == WAIT_KEY_NONE { + this.wait_key = waiters.insert(Waiter::Waiting(cx.waker().clone())); + if waiters.len() == 1 { + mutex.state.fetch_or(HAS_WAITERS, Ordering::Relaxed); // released by mutex unlock + } + } else { + waiters[this.wait_key].register(cx.waker()); + } + } + + // Ensure that we haven't raced `MutexGuard::drop`'s unlock path by + // attempting to acquire the lock again. + if let Some(lock) = mutex.try_lock_owned() { + mutex.remove_waker(this.wait_key, false); + this.mutex = None; + return Poll::Ready(lock); + } + + Poll::Pending + } +} + +impl<T: ?Sized> Drop for OwnedMutexLockFuture<T> { + fn drop(&mut self) { + if let Some(mutex) = self.mutex.as_ref() { + // This future was dropped before it acquired the mutex. + // + // Remove ourselves from the map, waking up another waiter if we + // had been awoken to acquire the lock. + mutex.remove_waker(self.wait_key, true); + } + } +} + +/// An RAII guard returned by the `lock_owned` and `try_lock_owned` methods. +/// When this structure is dropped (falls out of scope), the lock will be +/// unlocked. +pub struct OwnedMutexGuard<T: ?Sized> { + mutex: Arc<Mutex<T>>, +} + +impl<T: ?Sized + fmt::Debug> fmt::Debug for OwnedMutexGuard<T> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("OwnedMutexGuard") + .field("value", &&**self) + .field("mutex", &self.mutex) + .finish() + } +} + +impl<T: ?Sized> Drop for OwnedMutexGuard<T> { + fn drop(&mut self) { + self.mutex.unlock() + } +} + +impl<T: ?Sized> Deref for OwnedMutexGuard<T> { + type Target = T; + fn deref(&self) -> &T { + unsafe { &*self.mutex.value.get() } + } +} + +impl<T: ?Sized> DerefMut for OwnedMutexGuard<T> { + fn deref_mut(&mut self) -> &mut T { + unsafe { &mut *self.mutex.value.get() } + } +} /// A future which resolves when the target mutex has been successfully acquired. pub struct MutexLockFuture<'a, T: ?Sized> { @@ -386,13 +519,25 @@ unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {} // It's safe to switch which thread the acquire is being attempted on so long as // `T` can be accessed on that thread. unsafe impl<T: ?Sized + Send> Send for MutexLockFuture<'_, T> {} + // doesn't have any interesting `&self` methods (only Debug) unsafe impl<T: ?Sized> Sync for MutexLockFuture<'_, T> {} +// It's safe to switch which thread the acquire is being attempted on so long as +// `T` can be accessed on that thread. +unsafe impl<T: ?Sized + Send> Send for OwnedMutexLockFuture<T> {} + +// doesn't have any interesting `&self` methods (only Debug) +unsafe impl<T: ?Sized> Sync for OwnedMutexLockFuture<T> {} + // Safe to send since we don't track any thread-specific details-- the inner // lock is essentially spinlock-equivalent (attempt to flip an atomic bool) unsafe impl<T: ?Sized + Send> Send for MutexGuard<'_, T> {} unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {} + +unsafe impl<T: ?Sized + Send> Send for OwnedMutexGuard<T> {} +unsafe impl<T: ?Sized + Sync> Sync for OwnedMutexGuard<T> {} + unsafe impl<T: ?Sized + Send, U: ?Sized + Send> Send for MappedMutexGuard<'_, T, U> {} unsafe impl<T: ?Sized + Sync, U: ?Sized + Sync> Sync for MappedMutexGuard<'_, T, U> {} diff --git a/vendor/futures-util/src/sink/drain.rs b/vendor/futures-util/src/sink/drain.rs index 5295115b6..1a5480c0d 100644 --- a/vendor/futures-util/src/sink/drain.rs +++ b/vendor/futures-util/src/sink/drain.rs @@ -32,6 +32,12 @@ pub fn drain<T>() -> Drain<T> { impl<T> Unpin for Drain<T> {} +impl<T> Clone for Drain<T> { + fn clone(&self) -> Self { + drain() + } +} + impl<T> Sink<T> for Drain<T> { type Error = Never; diff --git a/vendor/futures-util/src/sink/unfold.rs b/vendor/futures-util/src/sink/unfold.rs index 330a068c3..dea1307b6 100644 --- a/vendor/futures-util/src/sink/unfold.rs +++ b/vendor/futures-util/src/sink/unfold.rs @@ -73,7 +73,10 @@ where this.state.set(UnfoldState::Value { value: state }); Ok(()) } - Err(err) => Err(err), + Err(err) => { + this.state.set(UnfoldState::Empty); + Err(err) + } } } else { Ok(()) diff --git a/vendor/futures-util/src/stream/futures_ordered.rs b/vendor/futures-util/src/stream/futures_ordered.rs index f596b3b0e..618bf1b7b 100644 --- a/vendor/futures-util/src/stream/futures_ordered.rs +++ b/vendor/futures-util/src/stream/futures_ordered.rs @@ -19,7 +19,7 @@ pin_project! { struct OrderWrapper<T> { #[pin] data: T, // A future or a future's output - index: usize, + index: isize, } } @@ -58,7 +58,7 @@ where /// An unbounded queue of futures. /// -/// This "combinator" is similar to `FuturesUnordered`, but it imposes an order +/// This "combinator" is similar to [`FuturesUnordered`], but it imposes a FIFO order /// on top of the set of futures. While futures in the set will race to /// completion in parallel, results will only be returned in the order their /// originating futures were added to the queue. @@ -95,8 +95,8 @@ where pub struct FuturesOrdered<T: Future> { in_progress_queue: FuturesUnordered<OrderWrapper<T>>, queued_outputs: BinaryHeap<OrderWrapper<T::Output>>, - next_incoming_index: usize, - next_outgoing_index: usize, + next_incoming_index: isize, + next_outgoing_index: isize, } impl<T: Future> Unpin for FuturesOrdered<T> {} @@ -135,11 +135,35 @@ impl<Fut: Future> FuturesOrdered<Fut> { /// This function will not call `poll` on the submitted future. The caller /// must ensure that `FuturesOrdered::poll` is called in order to receive /// task notifications. + #[deprecated(note = "use `push_back` instead")] pub fn push(&mut self, future: Fut) { + self.push_back(future); + } + + /// Pushes a future to the back of the queue. + /// + /// This function submits the given future to the internal set for managing. + /// This function will not call `poll` on the submitted future. The caller + /// must ensure that `FuturesOrdered::poll` is called in order to receive + /// task notifications. + pub fn push_back(&mut self, future: Fut) { let wrapped = OrderWrapper { data: future, index: self.next_incoming_index }; self.next_incoming_index += 1; self.in_progress_queue.push(wrapped); } + + /// Pushes a future to the front of the queue. + /// + /// This function submits the given future to the internal set for managing. + /// This function will not call `poll` on the submitted future. The caller + /// must ensure that `FuturesOrdered::poll` is called in order to receive + /// task notifications. This future will be the next future to be returned + /// complete. + pub fn push_front(&mut self, future: Fut) { + let wrapped = OrderWrapper { data: future, index: self.next_outgoing_index - 1 }; + self.next_outgoing_index -= 1; + self.in_progress_queue.push(wrapped); + } } impl<Fut: Future> Default for FuturesOrdered<Fut> { @@ -196,7 +220,7 @@ impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> { { let acc = Self::new(); iter.into_iter().fold(acc, |mut acc, item| { - acc.push(item); + acc.push_back(item); acc }) } @@ -214,7 +238,7 @@ impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> { I: IntoIterator<Item = Fut>, { for item in iter { - self.push(item); + self.push_back(item); } } } diff --git a/vendor/futures-util/src/stream/futures_unordered/iter.rs b/vendor/futures-util/src/stream/futures_unordered/iter.rs index 04db5ee75..20248c70f 100644 --- a/vendor/futures-util/src/stream/futures_unordered/iter.rs +++ b/vendor/futures-util/src/stream/futures_unordered/iter.rs @@ -2,6 +2,7 @@ use super::task::Task; use super::FuturesUnordered; use core::marker::PhantomData; use core::pin::Pin; +use core::ptr; use core::sync::atomic::Ordering::Relaxed; /// Mutable iterator over all futures in the unordered set. @@ -58,6 +59,9 @@ impl<Fut: Unpin> Iterator for IntoIter<Fut> { // valid `next_all` checks can be skipped. let next = (**task).next_all.load(Relaxed); *task = next; + if !task.is_null() { + *(**task).prev_all.get() = ptr::null_mut(); + } self.len -= 1; Some(future) } diff --git a/vendor/futures-util/src/stream/futures_unordered/mod.rs b/vendor/futures-util/src/stream/futures_unordered/mod.rs index aab2bb446..6b5804dc4 100644 --- a/vendor/futures-util/src/stream/futures_unordered/mod.rs +++ b/vendor/futures-util/src/stream/futures_unordered/mod.rs @@ -6,7 +6,6 @@ use crate::task::AtomicWaker; use alloc::sync::{Arc, Weak}; use core::cell::UnsafeCell; -use core::cmp; use core::fmt::{self, Debug}; use core::iter::FromIterator; use core::marker::PhantomData; @@ -23,6 +22,7 @@ use futures_task::{FutureObj, LocalFutureObj, LocalSpawn, Spawn, SpawnError}; mod abort; mod iter; +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/102352 pub use self::iter::{IntoIter, Iter, IterMut, IterPinMut, IterPinRef}; mod task; @@ -31,35 +31,11 @@ use self::task::Task; mod ready_to_run_queue; use self::ready_to_run_queue::{Dequeue, ReadyToRunQueue}; -/// Constant used for a `FuturesUnordered` to determine how many times it is -/// allowed to poll underlying futures without yielding. -/// -/// A single call to `poll_next` may potentially do a lot of work before -/// yielding. This happens in particular if the underlying futures are awoken -/// frequently but continue to return `Pending`. This is problematic if other -/// tasks are waiting on the executor, since they do not get to run. This value -/// caps the number of calls to `poll` on underlying futures a single call to -/// `poll_next` is allowed to make. -/// -/// The value itself is chosen somewhat arbitrarily. It needs to be high enough -/// that amortize wakeup and scheduling costs, but low enough that we do not -/// starve other tasks for long. -/// -/// See also https://github.com/rust-lang/futures-rs/issues/2047. -/// -/// Note that using the length of the `FuturesUnordered` instead of this value -/// may cause problems if the number of futures is large. -/// See also https://github.com/rust-lang/futures-rs/pull/2527. -/// -/// Additionally, polling the same future twice per iteration may cause another -/// problem. So, when using this value, it is necessary to limit the max value -/// based on the length of the `FuturesUnordered`. -/// (e.g., `cmp::min(self.len(), YIELD_EVERY)`) -/// See also https://github.com/rust-lang/futures-rs/pull/2333. -const YIELD_EVERY: usize = 32; - /// A set of futures which may complete in any order. /// +/// See [`FuturesOrdered`](crate::stream::FuturesOrdered) for a version of this +/// type that preserves a FIFO order. +/// /// This structure is optimized to manage a large number of futures. /// Futures managed by [`FuturesUnordered`] will only be polled when they /// generate wake-up notifications. This reduces the required amount of work @@ -149,8 +125,9 @@ impl<Fut> FuturesUnordered<Fut> { next_ready_to_run: AtomicPtr::new(ptr::null_mut()), queued: AtomicBool::new(true), ready_to_run_queue: Weak::new(), + woken: AtomicBool::new(false), }); - let stub_ptr = &*stub as *const Task<Fut>; + let stub_ptr = Arc::as_ptr(&stub); let ready_to_run_queue = Arc::new(ReadyToRunQueue { waker: AtomicWaker::new(), head: AtomicPtr::new(stub_ptr as *mut _), @@ -195,6 +172,7 @@ impl<Fut> FuturesUnordered<Fut> { next_ready_to_run: AtomicPtr::new(ptr::null_mut()), queued: AtomicBool::new(true), ready_to_run_queue: Arc::downgrade(&self.ready_to_run_queue), + woken: AtomicBool::new(false), }); // Reset the `is_terminated` flag if we've previously marked ourselves @@ -403,7 +381,7 @@ impl<Fut> FuturesUnordered<Fut> { // The `ReadyToRunQueue` stub is never inserted into the `head_all` // list, and its pointer value will remain valid for the lifetime of // this `FuturesUnordered`, so we can make use of its value here. - &*self.ready_to_run_queue.stub as *const _ as *mut _ + Arc::as_ptr(&self.ready_to_run_queue.stub) as *mut _ } } @@ -411,12 +389,12 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { type Item = Fut::Output; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { - // See YIELD_EVERY docs for more. - let yield_every = cmp::min(self.len(), YIELD_EVERY); + let len = self.len(); // Keep track of how many child futures we have polled, // in case we want to forcibly yield. let mut polled = 0; + let mut yielded = 0; // Ensure `parent` is correctly set. self.ready_to_run_queue.waker.register(cx.waker()); @@ -527,7 +505,11 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { // the internal allocation, appropriately accessing fields and // deallocating the task if need be. let res = { - let waker = Task::waker_ref(bomb.task.as_ref().unwrap()); + let task = bomb.task.as_ref().unwrap(); + // We are only interested in whether the future is awoken before it + // finishes polling, so reset the flag here. + task.woken.store(false, Relaxed); + let waker = Task::waker_ref(task); let mut cx = Context::from_waker(&waker); // Safety: We won't move the future ever again @@ -540,12 +522,17 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> { match res { Poll::Pending => { let task = bomb.task.take().unwrap(); + // If the future was awoken during polling, we assume + // the future wanted to explicitly yield. + yielded += task.woken.load(Relaxed) as usize; bomb.queue.link(task); - if polled == yield_every { - // We have polled a large number of futures in a row without yielding. - // To ensure we do not starve other tasks waiting on the executor, - // we yield here, but immediately wake ourselves up to continue. + // If a future yields, we respect it and yield here. + // If all futures have been polled, we also yield here to + // avoid starving other tasks waiting on the executor. + // (polling the same future twice per iteration may cause + // the problem: https://github.com/rust-lang/futures-rs/pull/2333) + if yielded >= 2 || polled == len { cx.waker().wake_by_ref(); return Poll::Pending; } diff --git a/vendor/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs b/vendor/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs index 5ef6cde83..451870532 100644 --- a/vendor/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs +++ b/vendor/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs @@ -83,7 +83,7 @@ impl<Fut> ReadyToRunQueue<Fut> { } pub(super) fn stub(&self) -> *const Task<Fut> { - &*self.stub + Arc::as_ptr(&self.stub) } // Clear the queue of tasks. diff --git a/vendor/futures-util/src/stream/futures_unordered/task.rs b/vendor/futures-util/src/stream/futures_unordered/task.rs index da2cd67d9..ec2114eff 100644 --- a/vendor/futures-util/src/stream/futures_unordered/task.rs +++ b/vendor/futures-util/src/stream/futures_unordered/task.rs @@ -1,6 +1,6 @@ use alloc::sync::{Arc, Weak}; use core::cell::UnsafeCell; -use core::sync::atomic::Ordering::{self, SeqCst}; +use core::sync::atomic::Ordering::{self, Relaxed, SeqCst}; use core::sync::atomic::{AtomicBool, AtomicPtr}; use super::abort::abort; @@ -31,6 +31,11 @@ pub(super) struct Task<Fut> { // Whether or not this task is currently in the ready to run queue pub(super) queued: AtomicBool, + + // Whether the future was awoken during polling + // It is possible for this flag to be set to true after the polling, + // but it will be ignored. + pub(super) woken: AtomicBool, } // `Task` can be sent across threads safely because it ensures that @@ -48,6 +53,8 @@ impl<Fut> ArcWake for Task<Fut> { None => return, }; + arc_self.woken.store(true, Relaxed); + // It's our job to enqueue this task it into the ready to run queue. To // do this we set the `queued` flag, and if successful we then do the // actual queueing operation, ensuring that we're only queued once. @@ -62,7 +69,7 @@ impl<Fut> ArcWake for Task<Fut> { // still. let prev = arc_self.queued.swap(true, SeqCst); if !prev { - inner.enqueue(&**arc_self); + inner.enqueue(Arc::as_ptr(arc_self)); inner.waker.wake(); } } diff --git a/vendor/futures-util/src/stream/mod.rs b/vendor/futures-util/src/stream/mod.rs index ec685b984..bf9506147 100644 --- a/vendor/futures-util/src/stream/mod.rs +++ b/vendor/futures-util/src/stream/mod.rs @@ -18,9 +18,10 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream}; #[allow(clippy::module_inception)] mod stream; pub use self::stream::{ - Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach, - Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, SelectNextSome, - Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, Unzip, Zip, + All, Any, Chain, Collect, Concat, Count, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, + Fold, ForEach, Fuse, Inspect, Map, Next, NextIf, NextIfEq, Peek, PeekMut, Peekable, Scan, + SelectNextSome, Skip, SkipWhile, StreamExt, StreamFuture, Take, TakeUntil, TakeWhile, Then, + Unzip, Zip, }; #[cfg(feature = "std")] @@ -38,7 +39,9 @@ pub use self::stream::Forward; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] -pub use self::stream::{BufferUnordered, Buffered, ForEachConcurrent}; +pub use self::stream::{ + BufferUnordered, Buffered, FlatMapUnordered, FlattenUnordered, ForEachConcurrent, +}; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "sink")] @@ -60,7 +63,9 @@ pub use self::try_stream::IntoAsyncRead; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] -pub use self::try_stream::{TryBufferUnordered, TryBuffered, TryForEachConcurrent}; +pub use self::try_stream::{ + TryBufferUnordered, TryBuffered, TryFlattenUnordered, TryForEachConcurrent, +}; #[cfg(feature = "alloc")] pub use self::try_stream::{TryChunks, TryChunksError}; diff --git a/vendor/futures-util/src/stream/select_all.rs b/vendor/futures-util/src/stream/select_all.rs index 3474331ad..121b6a0e5 100644 --- a/vendor/futures-util/src/stream/select_all.rs +++ b/vendor/futures-util/src/stream/select_all.rs @@ -8,29 +8,24 @@ use futures_core::ready; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; -use pin_project_lite::pin_project; - use super::assert_stream; use crate::stream::{futures_unordered, FuturesUnordered, StreamExt, StreamFuture}; -pin_project! { - /// An unbounded set of streams - /// - /// This "combinator" provides the ability to maintain a set of streams - /// and drive them all to completion. - /// - /// Streams are pushed into this set and their realized values are - /// yielded as they become ready. Streams will only be polled when they - /// generate notifications. This allows to coordinate a large number of streams. - /// - /// Note that you can create a ready-made `SelectAll` via the - /// `select_all` function in the `stream` module, or you can start with an - /// empty set with the `SelectAll::new` constructor. - #[must_use = "streams do nothing unless polled"] - pub struct SelectAll<St> { - #[pin] - inner: FuturesUnordered<StreamFuture<St>>, - } +/// An unbounded set of streams +/// +/// This "combinator" provides the ability to maintain a set of streams +/// and drive them all to completion. +/// +/// Streams are pushed into this set and their realized values are +/// yielded as they become ready. Streams will only be polled when they +/// generate notifications. This allows to coordinate a large number of streams. +/// +/// Note that you can create a ready-made `SelectAll` via the +/// `select_all` function in the `stream` module, or you can start with an +/// empty set with the `SelectAll::new` constructor. +#[must_use = "streams do nothing unless polled"] +pub struct SelectAll<St> { + inner: FuturesUnordered<StreamFuture<St>>, } impl<St: Debug> Debug for SelectAll<St> { diff --git a/vendor/futures-util/src/stream/select_with_strategy.rs b/vendor/futures-util/src/stream/select_with_strategy.rs index bd86990cd..224d5f821 100644 --- a/vendor/futures-util/src/stream/select_with_strategy.rs +++ b/vendor/futures-util/src/stream/select_with_strategy.rs @@ -1,5 +1,4 @@ use super::assert_stream; -use crate::stream::{Fuse, StreamExt}; use core::{fmt, pin::Pin}; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; @@ -18,13 +17,15 @@ impl PollNext { /// Toggle the value and return the old one. pub fn toggle(&mut self) -> Self { let old = *self; + *self = self.other(); + old + } + fn other(&self) -> PollNext { match self { - PollNext::Left => *self = PollNext::Right, - PollNext::Right => *self = PollNext::Left, + PollNext::Left => PollNext::Right, + PollNext::Right => PollNext::Left, } - - old } } @@ -34,14 +35,41 @@ impl Default for PollNext { } } +enum InternalState { + Start, + LeftFinished, + RightFinished, + BothFinished, +} + +impl InternalState { + fn finish(&mut self, ps: PollNext) { + match (&self, ps) { + (InternalState::Start, PollNext::Left) => { + *self = InternalState::LeftFinished; + } + (InternalState::Start, PollNext::Right) => { + *self = InternalState::RightFinished; + } + (InternalState::LeftFinished, PollNext::Right) + | (InternalState::RightFinished, PollNext::Left) => { + *self = InternalState::BothFinished; + } + _ => {} + } + } +} + pin_project! { /// Stream for the [`select_with_strategy()`] function. See function docs for details. #[must_use = "streams do nothing unless polled"] + #[project = SelectWithStrategyProj] pub struct SelectWithStrategy<St1, St2, Clos, State> { #[pin] - stream1: Fuse<St1>, + stream1: St1, #[pin] - stream2: Fuse<St2>, + stream2: St2, + internal_state: InternalState, state: State, clos: Clos, } @@ -120,9 +148,10 @@ where State: Default, { assert_stream::<St1::Item, _>(SelectWithStrategy { - stream1: stream1.fuse(), - stream2: stream2.fuse(), + stream1, + stream2, state: Default::default(), + internal_state: InternalState::Start, clos: which, }) } @@ -131,7 +160,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> { /// Acquires a reference to the underlying streams that this combinator is /// pulling from. pub fn get_ref(&self) -> (&St1, &St2) { - (self.stream1.get_ref(), self.stream2.get_ref()) + (&self.stream1, &self.stream2) } /// Acquires a mutable reference to the underlying streams that this @@ -140,7 +169,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> { /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. pub fn get_mut(&mut self) -> (&mut St1, &mut St2) { - (self.stream1.get_mut(), self.stream2.get_mut()) + (&mut self.stream1, &mut self.stream2) } /// Acquires a pinned mutable reference to the underlying streams that this @@ -150,7 +179,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> { /// stream which may otherwise confuse this combinator. pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) { let this = self.project(); - (this.stream1.get_pin_mut(), this.stream2.get_pin_mut()) + (this.stream1, this.stream2) } /// Consumes this combinator, returning the underlying streams. @@ -158,7 +187,7 @@ impl<St1, St2, Clos, State> SelectWithStrategy<St1, St2, Clos, State> { /// Note that this may discard intermediate state of this combinator, so /// care should be taken to avoid losing resources when this is called. pub fn into_inner(self) -> (St1, St2) { - (self.stream1.into_inner(), self.stream2.into_inner()) + (self.stream1, self.stream2) } } @@ -169,47 +198,93 @@ where Clos: FnMut(&mut State) -> PollNext, { fn is_terminated(&self) -> bool { - self.stream1.is_terminated() && self.stream2.is_terminated() + match self.internal_state { + InternalState::BothFinished => true, + _ => false, + } } } -impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State> +#[inline] +fn poll_side<St1, St2, Clos, State>( + select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, + side: PollNext, + cx: &mut Context<'_>, +) -> Poll<Option<St1::Item>> where St1: Stream, St2: Stream<Item = St1::Item>, - Clos: FnMut(&mut State) -> PollNext, { - type Item = St1::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> { - let this = self.project(); - - match (this.clos)(this.state) { - PollNext::Left => poll_inner(this.stream1, this.stream2, cx), - PollNext::Right => poll_inner(this.stream2, this.stream1, cx), - } + match side { + PollNext::Left => select.stream1.as_mut().poll_next(cx), + PollNext::Right => select.stream2.as_mut().poll_next(cx), } } -fn poll_inner<St1, St2>( - a: Pin<&mut St1>, - b: Pin<&mut St2>, +#[inline] +fn poll_inner<St1, St2, Clos, State>( + select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State>, + side: PollNext, cx: &mut Context<'_>, ) -> Poll<Option<St1::Item>> where St1: Stream, St2: Stream<Item = St1::Item>, { - let a_done = match a.poll_next(cx) { + let first_done = match poll_side(select, side, cx) { Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), - Poll::Ready(None) => true, + Poll::Ready(None) => { + select.internal_state.finish(side); + true + } Poll::Pending => false, }; + let other = side.other(); + match poll_side(select, other, cx) { + Poll::Ready(None) => { + select.internal_state.finish(other); + if first_done { + Poll::Ready(None) + } else { + Poll::Pending + } + } + a => a, + } +} + +impl<St1, St2, Clos, State> Stream for SelectWithStrategy<St1, St2, Clos, State> +where + St1: Stream, + St2: Stream<Item = St1::Item>, + Clos: FnMut(&mut State) -> PollNext, +{ + type Item = St1::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St1::Item>> { + let mut this = self.project(); - match b.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) if a_done => Poll::Ready(None), - Poll::Ready(None) | Poll::Pending => Poll::Pending, + match this.internal_state { + InternalState::Start => { + let next_side = (this.clos)(this.state); + poll_inner(&mut this, next_side, cx) + } + InternalState::LeftFinished => match this.stream2.poll_next(cx) { + Poll::Ready(None) => { + *this.internal_state = InternalState::BothFinished; + Poll::Ready(None) + } + a => a, + }, + InternalState::RightFinished => match this.stream1.poll_next(cx) { + Poll::Ready(None) => { + *this.internal_state = InternalState::BothFinished; + Poll::Ready(None) + } + a => a, + }, + InternalState::BothFinished => Poll::Ready(None), + } } } diff --git a/vendor/futures-util/src/stream/stream/buffer_unordered.rs b/vendor/futures-util/src/stream/stream/buffer_unordered.rs index d64c142b4..91b0f6bcc 100644 --- a/vendor/futures-util/src/stream/stream/buffer_unordered.rs +++ b/vendor/futures-util/src/stream/stream/buffer_unordered.rs @@ -41,11 +41,7 @@ where St: Stream, St::Item: Future, { - pub(super) fn new(stream: St, n: usize) -> Self - where - St: Stream, - St::Item: Future, - { + pub(super) fn new(stream: St, n: usize) -> Self { Self { stream: super::Fuse::new(stream), in_progress_queue: FuturesUnordered::new(), diff --git a/vendor/futures-util/src/stream/stream/buffered.rs b/vendor/futures-util/src/stream/stream/buffered.rs index 6052a737b..5854eb7ea 100644 --- a/vendor/futures-util/src/stream/stream/buffered.rs +++ b/vendor/futures-util/src/stream/stream/buffered.rs @@ -1,4 +1,4 @@ -use crate::stream::{Fuse, FuturesOrdered, StreamExt}; +use crate::stream::{Fuse, FusedStream, FuturesOrdered, StreamExt}; use core::fmt; use core::pin::Pin; use futures_core::future::Future; @@ -64,7 +64,7 @@ where // our queue of futures. while this.in_progress_queue.len() < *this.max { match this.stream.as_mut().poll_next(cx) { - Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut), + Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut), Poll::Ready(None) | Poll::Pending => break, } } @@ -95,6 +95,16 @@ where } } +impl<St> FusedStream for Buffered<St> +where + St: Stream, + St::Item: Future, +{ + fn is_terminated(&self) -> bool { + self.stream.is_done() && self.in_progress_queue.is_terminated() + } +} + // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl<S, Item> Sink<Item> for Buffered<S> diff --git a/vendor/futures-util/src/stream/stream/chain.rs b/vendor/futures-util/src/stream/stream/chain.rs index c5da35e25..36ff1e533 100644 --- a/vendor/futures-util/src/stream/stream/chain.rs +++ b/vendor/futures-util/src/stream/stream/chain.rs @@ -50,8 +50,9 @@ where if let Some(item) = ready!(first.poll_next(cx)) { return Poll::Ready(Some(item)); } + + this.first.set(None); } - this.first.set(None); this.second.poll_next(cx) } diff --git a/vendor/futures-util/src/stream/stream/chunks.rs b/vendor/futures-util/src/stream/stream/chunks.rs index 845786999..2a71ebc6c 100644 --- a/vendor/futures-util/src/stream/stream/chunks.rs +++ b/vendor/futures-util/src/stream/stream/chunks.rs @@ -21,10 +21,7 @@ pin_project! { } } -impl<St: Stream> Chunks<St> -where - St: Stream, -{ +impl<St: Stream> Chunks<St> { pub(super) fn new(stream: St, capacity: usize) -> Self { assert!(capacity > 0); @@ -66,7 +63,7 @@ impl<St: Stream> Stream for Chunks<St> { let last = if this.items.is_empty() { None } else { - let full_buf = mem::replace(this.items, Vec::new()); + let full_buf = mem::take(this.items); Some(full_buf) }; @@ -77,9 +74,9 @@ impl<St: Stream> Stream for Chunks<St> { } fn size_hint(&self) -> (usize, Option<usize>) { - let chunk_len = if self.items.is_empty() { 0 } else { 1 }; + let chunk_len = usize::from(!self.items.is_empty()); let (lower, upper) = self.stream.size_hint(); - let lower = lower.saturating_add(chunk_len); + let lower = (lower / self.cap).saturating_add(chunk_len); let upper = match upper { Some(x) => x.checked_add(chunk_len), None => None, diff --git a/vendor/futures-util/src/stream/stream/collect.rs b/vendor/futures-util/src/stream/stream/collect.rs index b0e81b9ce..970ac26db 100644 --- a/vendor/futures-util/src/stream/stream/collect.rs +++ b/vendor/futures-util/src/stream/stream/collect.rs @@ -19,7 +19,7 @@ pin_project! { impl<St: Stream, C: Default> Collect<St, C> { fn finish(self: Pin<&mut Self>) -> C { - mem::replace(self.project().collection, Default::default()) + mem::take(self.project().collection) } pub(super) fn new(stream: St) -> Self { diff --git a/vendor/futures-util/src/stream/stream/filter.rs b/vendor/futures-util/src/stream/stream/filter.rs index ccf1a5122..997fe9977 100644 --- a/vendor/futures-util/src/stream/stream/filter.rs +++ b/vendor/futures-util/src/stream/stream/filter.rs @@ -93,7 +93,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending_item.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/vendor/futures-util/src/stream/stream/filter_map.rs b/vendor/futures-util/src/stream/stream/filter_map.rs index 02a0a4386..6b7d0070d 100644 --- a/vendor/futures-util/src/stream/stream/filter_map.rs +++ b/vendor/futures-util/src/stream/stream/filter_map.rs @@ -87,7 +87,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let pending_len = if self.pending.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/vendor/futures-util/src/stream/stream/flatten_unordered.rs b/vendor/futures-util/src/stream/stream/flatten_unordered.rs new file mode 100644 index 000000000..44c6ace2f --- /dev/null +++ b/vendor/futures-util/src/stream/stream/flatten_unordered.rs @@ -0,0 +1,536 @@ +use alloc::sync::Arc; +use core::{ + cell::UnsafeCell, + convert::identity, + fmt, + marker::PhantomData, + num::NonZeroUsize, + pin::Pin, + sync::atomic::{AtomicU8, Ordering}, +}; + +use pin_project_lite::pin_project; + +use futures_core::{ + future::Future, + ready, + stream::{FusedStream, Stream}, + task::{Context, Poll, Waker}, +}; +#[cfg(feature = "sink")] +use futures_sink::Sink; +use futures_task::{waker, ArcWake}; + +use crate::stream::FuturesUnordered; + +/// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered) +/// method. +pub type FlattenUnordered<St> = FlattenUnorderedWithFlowController<St, ()>; + +/// There is nothing to poll and stream isn't being polled/waking/woken at the moment. +const NONE: u8 = 0; + +/// Inner streams need to be polled. +const NEED_TO_POLL_INNER_STREAMS: u8 = 1; + +/// The base stream needs to be polled. +const NEED_TO_POLL_STREAM: u8 = 0b10; + +/// Both base stream and inner streams need to be polled. +const NEED_TO_POLL_ALL: u8 = NEED_TO_POLL_INNER_STREAMS | NEED_TO_POLL_STREAM; + +/// The current stream is being polled at the moment. +const POLLING: u8 = 0b100; + +/// Stream is being woken at the moment. +const WAKING: u8 = 0b1000; + +/// The stream was waked and will be polled. +const WOKEN: u8 = 0b10000; + +/// Internal polling state of the stream. +#[derive(Clone, Debug)] +struct SharedPollState { + state: Arc<AtomicU8>, +} + +impl SharedPollState { + /// Constructs new `SharedPollState` with the given state. + fn new(value: u8) -> SharedPollState { + SharedPollState { state: Arc::new(AtomicU8::new(value)) } + } + + /// Attempts to start polling, returning stored state in case of success. + /// Returns `None` if either waker is waking at the moment. + fn start_polling( + &self, + ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { + let value = self + .state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { + if value & WAKING == NONE { + Some(POLLING) + } else { + None + } + }) + .ok()?; + let bomb = PollStateBomb::new(self, SharedPollState::reset); + + Some((value, bomb)) + } + + /// Attempts to start the waking process and performs bitwise or with the given value. + /// + /// If some waker is already in progress or stream is already woken/being polled, waking process won't start, however + /// state will be disjuncted with the given value. + fn start_waking( + &self, + to_poll: u8, + ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { + let value = self + .state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { + let mut next_value = value | to_poll; + if value & (WOKEN | POLLING) == NONE { + next_value |= WAKING; + } + + if next_value != value { + Some(next_value) + } else { + None + } + }) + .ok()?; + + // Only start the waking process if we're not in the polling/waking phase and the stream isn't woken already + if value & (WOKEN | POLLING | WAKING) == NONE { + let bomb = PollStateBomb::new(self, SharedPollState::stop_waking); + + Some((value, bomb)) + } else { + None + } + } + + /// Sets current state to + /// - `!POLLING` allowing to use wakers + /// - `WOKEN` if the state was changed during `POLLING` phase as waker will be called, + /// or `will_be_woken` flag supplied + /// - `!WAKING` as + /// * Wakers called during the `POLLING` phase won't propagate their calls + /// * `POLLING` phase can't start if some of the wakers are active + /// So no wrapped waker can touch the inner waker's cell, it's safe to poll again. + fn stop_polling(&self, to_poll: u8, will_be_woken: bool) -> u8 { + self.state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |mut value| { + let mut next_value = to_poll; + + value &= NEED_TO_POLL_ALL; + if value != NONE || will_be_woken { + next_value |= WOKEN; + } + next_value |= value; + + Some(next_value & !POLLING & !WAKING) + }) + .unwrap() + } + + /// Toggles state to non-waking, allowing to start polling. + fn stop_waking(&self) -> u8 { + let value = self + .state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { + let next_value = value & !WAKING | WOKEN; + + if next_value != value { + Some(next_value) + } else { + None + } + }) + .unwrap_or_else(identity); + + debug_assert!(value & (WOKEN | POLLING | WAKING) == WAKING); + value + } + + /// Resets current state allowing to poll the stream and wake up wakers. + fn reset(&self) -> u8 { + self.state.swap(NEED_TO_POLL_ALL, Ordering::SeqCst) + } +} + +/// Used to execute some function on the given state when dropped. +struct PollStateBomb<'a, F: FnOnce(&SharedPollState) -> u8> { + state: &'a SharedPollState, + drop: Option<F>, +} + +impl<'a, F: FnOnce(&SharedPollState) -> u8> PollStateBomb<'a, F> { + /// Constructs new bomb with the given state. + fn new(state: &'a SharedPollState, drop: F) -> Self { + Self { state, drop: Some(drop) } + } + + /// Deactivates bomb, forces it to not call provided function when dropped. + fn deactivate(mut self) { + self.drop.take(); + } +} + +impl<F: FnOnce(&SharedPollState) -> u8> Drop for PollStateBomb<'_, F> { + fn drop(&mut self) { + if let Some(drop) = self.drop.take() { + (drop)(self.state); + } + } +} + +/// Will update state with the provided value on `wake_by_ref` call +/// and then, if there is a need, call `inner_waker`. +struct WrappedWaker { + inner_waker: UnsafeCell<Option<Waker>>, + poll_state: SharedPollState, + need_to_poll: u8, +} + +unsafe impl Send for WrappedWaker {} +unsafe impl Sync for WrappedWaker {} + +impl WrappedWaker { + /// Replaces given waker's inner_waker for polling stream/futures which will + /// update poll state on `wake_by_ref` call. Use only if you need several + /// contexts. + /// + /// ## Safety + /// + /// This function will modify waker's `inner_waker` via `UnsafeCell`, so + /// it should be used only during `POLLING` phase by one thread at the time. + unsafe fn replace_waker(self_arc: &mut Arc<Self>, cx: &Context<'_>) { + *self_arc.inner_waker.get() = cx.waker().clone().into(); + } + + /// Attempts to start the waking process for the waker with the given value. + /// If succeeded, then the stream isn't yet woken and not being polled at the moment. + fn start_waking(&self) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { + self.poll_state.start_waking(self.need_to_poll) + } +} + +impl ArcWake for WrappedWaker { + fn wake_by_ref(self_arc: &Arc<Self>) { + if let Some((_, state_bomb)) = self_arc.start_waking() { + // Safety: now state is not `POLLING` + let waker_opt = unsafe { self_arc.inner_waker.get().as_ref().unwrap() }; + + if let Some(inner_waker) = waker_opt.clone() { + // Stop waking to allow polling stream + drop(state_bomb); + + // Wake up inner waker + inner_waker.wake(); + } + } + } +} + +pin_project! { + /// Future which polls optional inner stream. + /// + /// If it's `Some`, it will attempt to call `poll_next` on it, + /// returning `Some((item, next_item_fut))` in case of `Poll::Ready(Some(...))` + /// or `None` in case of `Poll::Ready(None)`. + /// + /// If `poll_next` will return `Poll::Pending`, it will be forwarded to + /// the future and current task will be notified by waker. + #[must_use = "futures do nothing unless you `.await` or poll them"] + struct PollStreamFut<St> { + #[pin] + stream: Option<St>, + } +} + +impl<St> PollStreamFut<St> { + /// Constructs new `PollStreamFut` using given `stream`. + fn new(stream: impl Into<Option<St>>) -> Self { + Self { stream: stream.into() } + } +} + +impl<St: Stream + Unpin> Future for PollStreamFut<St> { + type Output = Option<(St::Item, PollStreamFut<St>)>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + let mut stream = self.project().stream; + + let item = if let Some(stream) = stream.as_mut().as_pin_mut() { + ready!(stream.poll_next(cx)) + } else { + None + }; + let next_item_fut = PollStreamFut::new(stream.get_mut().take()); + let out = item.map(|item| (item, next_item_fut)); + + Poll::Ready(out) + } +} + +pin_project! { + /// Stream for the [`flatten_unordered`](super::StreamExt::flatten_unordered) + /// method with ability to specify flow controller. + #[project = FlattenUnorderedWithFlowControllerProj] + #[must_use = "streams do nothing unless polled"] + pub struct FlattenUnorderedWithFlowController<St, Fc> where St: Stream { + #[pin] + inner_streams: FuturesUnordered<PollStreamFut<St::Item>>, + #[pin] + stream: St, + poll_state: SharedPollState, + limit: Option<NonZeroUsize>, + is_stream_done: bool, + inner_streams_waker: Arc<WrappedWaker>, + stream_waker: Arc<WrappedWaker>, + flow_controller: PhantomData<Fc> + } +} + +impl<St, Fc> fmt::Debug for FlattenUnorderedWithFlowController<St, Fc> +where + St: Stream + fmt::Debug, + St::Item: Stream + fmt::Debug, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FlattenUnorderedWithFlowController") + .field("poll_state", &self.poll_state) + .field("inner_streams", &self.inner_streams) + .field("limit", &self.limit) + .field("stream", &self.stream) + .field("is_stream_done", &self.is_stream_done) + .field("flow_controller", &self.flow_controller) + .finish() + } +} + +impl<St, Fc> FlattenUnorderedWithFlowController<St, Fc> +where + St: Stream, + Fc: FlowController<St::Item, <St::Item as Stream>::Item>, + St::Item: Stream + Unpin, +{ + pub(crate) fn new( + stream: St, + limit: Option<usize>, + ) -> FlattenUnorderedWithFlowController<St, Fc> { + let poll_state = SharedPollState::new(NEED_TO_POLL_STREAM); + + FlattenUnorderedWithFlowController { + inner_streams: FuturesUnordered::new(), + stream, + is_stream_done: false, + limit: limit.and_then(NonZeroUsize::new), + inner_streams_waker: Arc::new(WrappedWaker { + inner_waker: UnsafeCell::new(None), + poll_state: poll_state.clone(), + need_to_poll: NEED_TO_POLL_INNER_STREAMS, + }), + stream_waker: Arc::new(WrappedWaker { + inner_waker: UnsafeCell::new(None), + poll_state: poll_state.clone(), + need_to_poll: NEED_TO_POLL_STREAM, + }), + poll_state, + flow_controller: PhantomData, + } + } + + delegate_access_inner!(stream, St, ()); +} + +/// Returns the next flow step based on the received item. +pub trait FlowController<I, O> { + /// Handles an item producing `FlowStep` describing the next flow step. + fn next_step(item: I) -> FlowStep<I, O>; +} + +impl<I, O> FlowController<I, O> for () { + fn next_step(item: I) -> FlowStep<I, O> { + FlowStep::Continue(item) + } +} + +/// Describes the next flow step. +#[derive(Debug, Clone)] +pub enum FlowStep<C, R> { + /// Just yields an item and continues standard flow. + Continue(C), + /// Immediately returns an underlying item from the function. + Return(R), +} + +impl<St, Fc> FlattenUnorderedWithFlowControllerProj<'_, St, Fc> +where + St: Stream, +{ + /// Checks if current `inner_streams` bucket size is greater than optional limit. + fn is_exceeded_limit(&self) -> bool { + self.limit.map_or(false, |limit| self.inner_streams.len() >= limit.get()) + } +} + +impl<St, Fc> FusedStream for FlattenUnorderedWithFlowController<St, Fc> +where + St: FusedStream, + Fc: FlowController<St::Item, <St::Item as Stream>::Item>, + St::Item: Stream + Unpin, +{ + fn is_terminated(&self) -> bool { + self.stream.is_terminated() && self.inner_streams.is_empty() + } +} + +impl<St, Fc> Stream for FlattenUnorderedWithFlowController<St, Fc> +where + St: Stream, + Fc: FlowController<St::Item, <St::Item as Stream>::Item>, + St::Item: Stream + Unpin, +{ + type Item = <St::Item as Stream>::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let mut next_item = None; + let mut need_to_poll_next = NONE; + + let mut this = self.as_mut().project(); + + // Attempt to start polling, in case some waker is holding the lock, wait in loop + let (mut poll_state_value, state_bomb) = loop { + if let Some(value) = this.poll_state.start_polling() { + break value; + } + }; + + // Safety: now state is `POLLING`. + unsafe { + WrappedWaker::replace_waker(this.stream_waker, cx); + WrappedWaker::replace_waker(this.inner_streams_waker, cx) + }; + + if poll_state_value & NEED_TO_POLL_STREAM != NONE { + let mut stream_waker = None; + + // Here we need to poll the base stream. + // + // To improve performance, we will attempt to place as many items as we can + // to the `FuturesUnordered` bucket before polling inner streams + loop { + if this.is_exceeded_limit() || *this.is_stream_done { + // We either exceeded the limit or the stream is exhausted + if !*this.is_stream_done { + // The stream needs to be polled in the next iteration + need_to_poll_next |= NEED_TO_POLL_STREAM; + } + + break; + } else { + let mut cx = Context::from_waker( + stream_waker.get_or_insert_with(|| waker(this.stream_waker.clone())), + ); + + match this.stream.as_mut().poll_next(&mut cx) { + Poll::Ready(Some(item)) => { + let next_item_fut = match Fc::next_step(item) { + // Propagates an item immediately (the main use-case is for errors) + FlowStep::Return(item) => { + need_to_poll_next |= NEED_TO_POLL_STREAM + | (poll_state_value & NEED_TO_POLL_INNER_STREAMS); + poll_state_value &= !NEED_TO_POLL_INNER_STREAMS; + + next_item = Some(item); + + break; + } + // Yields an item and continues processing (normal case) + FlowStep::Continue(inner_stream) => { + PollStreamFut::new(inner_stream) + } + }; + // Add new stream to the inner streams bucket + this.inner_streams.as_mut().push(next_item_fut); + // Inner streams must be polled afterward + poll_state_value |= NEED_TO_POLL_INNER_STREAMS; + } + Poll::Ready(None) => { + // Mark the base stream as done + *this.is_stream_done = true; + } + Poll::Pending => { + break; + } + } + } + } + } + + if poll_state_value & NEED_TO_POLL_INNER_STREAMS != NONE { + let inner_streams_waker = waker(this.inner_streams_waker.clone()); + let mut cx = Context::from_waker(&inner_streams_waker); + + match this.inner_streams.as_mut().poll_next(&mut cx) { + Poll::Ready(Some(Some((item, next_item_fut)))) => { + // Push next inner stream item future to the list of inner streams futures + this.inner_streams.as_mut().push(next_item_fut); + // Take the received item + next_item = Some(item); + // On the next iteration, inner streams must be polled again + need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS; + } + Poll::Ready(Some(None)) => { + // On the next iteration, inner streams must be polled again + need_to_poll_next |= NEED_TO_POLL_INNER_STREAMS; + } + _ => {} + } + } + + // We didn't have any `poll_next` panic, so it's time to deactivate the bomb + state_bomb.deactivate(); + + // Call the waker at the end of polling if + let mut force_wake = + // we need to poll the stream and didn't reach the limit yet + need_to_poll_next & NEED_TO_POLL_STREAM != NONE && !this.is_exceeded_limit() + // or we need to poll the inner streams again + || need_to_poll_next & NEED_TO_POLL_INNER_STREAMS != NONE; + + // Stop polling and swap the latest state + poll_state_value = this.poll_state.stop_polling(need_to_poll_next, force_wake); + // If state was changed during `POLLING` phase, we also need to manually call a waker + force_wake |= poll_state_value & NEED_TO_POLL_ALL != NONE; + + let is_done = *this.is_stream_done && this.inner_streams.is_empty(); + + if next_item.is_some() || is_done { + Poll::Ready(next_item) + } else { + if force_wake { + cx.waker().wake_by_ref(); + } + + Poll::Pending + } + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl<St, Item, Fc> Sink<Item> for FlattenUnorderedWithFlowController<St, Fc> +where + St: Stream + Sink<Item>, +{ + type Error = St::Error; + + delegate_sink!(stream, Item); +} diff --git a/vendor/futures-util/src/stream/stream/mod.rs b/vendor/futures-util/src/stream/stream/mod.rs index 9cfcc09ba..558dc22bd 100644 --- a/vendor/futures-util/src/stream/stream/mod.rs +++ b/vendor/futures-util/src/stream/stream/mod.rs @@ -199,6 +199,25 @@ pub use self::buffered::Buffered; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] +pub(crate) mod flatten_unordered; + +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] +pub use self::flatten_unordered::FlattenUnordered; + +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +delegate_all!( + /// Stream for the [`flat_map_unordered`](StreamExt::flat_map_unordered) method. + FlatMapUnordered<St, U, F>( + FlattenUnordered<Map<St, F>> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + New[|x: St, limit: Option<usize>, f: F| FlattenUnordered::new(Map::new(x, f), limit)] + where St: Stream, U: Stream, U: Unpin, F: FnMut(St::Item) -> U +); + +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] mod for_each_concurrent; #[cfg(not(futures_no_atomic_cas))] #[cfg(feature = "alloc")] @@ -390,9 +409,9 @@ pub trait StreamExt: Stream { /// use futures::stream::{self, StreamExt}; /// /// let stream = stream::iter(1..=10); - /// let evens = stream.filter(|x| future::ready(x % 2 == 0)); + /// let events = stream.filter(|x| future::ready(x % 2 == 0)); /// - /// assert_eq!(vec![2, 4, 6, 8, 10], evens.collect::<Vec<_>>().await); + /// assert_eq!(vec![2, 4, 6, 8, 10], events.collect::<Vec<_>>().await); /// # }); /// ``` fn filter<Fut, F>(self, f: F) -> Filter<Self, Fut, F> @@ -422,11 +441,11 @@ pub trait StreamExt: Stream { /// use futures::stream::{self, StreamExt}; /// /// let stream = stream::iter(1..=10); - /// let evens = stream.filter_map(|x| async move { + /// let events = stream.filter_map(|x| async move { /// if x % 2 == 0 { Some(x + 1) } else { None } /// }); /// - /// assert_eq!(vec![3, 5, 7, 9, 11], evens.collect::<Vec<_>>().await); + /// assert_eq!(vec![3, 5, 7, 9, 11], events.collect::<Vec<_>>().await); /// # }); /// ``` fn filter_map<Fut, T, F>(self, f: F) -> FilterMap<Self, Fut, F> @@ -754,13 +773,64 @@ pub trait StreamExt: Stream { assert_stream::<<Self::Item as Stream>::Item, _>(Flatten::new(self)) } + /// Flattens a stream of streams into just one continuous stream. Polls + /// inner streams produced by the base stream concurrently. + /// + /// The only argument is an optional limit on the number of concurrently + /// polled streams. If this limit is not `None`, no more than `limit` streams + /// will be polled at the same time. The `limit` argument is of type + /// `Into<Option<usize>>`, and so can be provided as either `None`, + /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as + /// no limit at all, and will have the same result as passing in `None`. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::channel::mpsc; + /// use futures::stream::StreamExt; + /// use std::thread; + /// + /// let (tx1, rx1) = mpsc::unbounded(); + /// let (tx2, rx2) = mpsc::unbounded(); + /// let (tx3, rx3) = mpsc::unbounded(); + /// + /// thread::spawn(move || { + /// tx1.unbounded_send(1).unwrap(); + /// tx1.unbounded_send(2).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx2.unbounded_send(3).unwrap(); + /// tx2.unbounded_send(4).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx3.unbounded_send(rx1).unwrap(); + /// tx3.unbounded_send(rx2).unwrap(); + /// }); + /// + /// let mut output = rx3.flatten_unordered(None).collect::<Vec<i32>>().await; + /// output.sort(); + /// + /// assert_eq!(output, vec![1, 2, 3, 4]); + /// # }); + /// ``` + #[cfg(not(futures_no_atomic_cas))] + #[cfg(feature = "alloc")] + fn flatten_unordered(self, limit: impl Into<Option<usize>>) -> FlattenUnordered<Self> + where + Self::Item: Stream + Unpin, + Self: Sized, + { + assert_stream::<<Self::Item as Stream>::Item, _>(FlattenUnordered::new(self, limit.into())) + } + /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s. /// /// [`StreamExt::map`] is very useful, but if it produces a `Stream` instead, /// you would have to chain combinators like `.map(f).flatten()` while this /// combinator provides ability to write `.flat_map(f)` instead of chaining. /// - /// The provided closure which produce inner streams is executed over all elements + /// The provided closure which produces inner streams is executed over all elements /// of stream as last inner stream is terminated and next stream item is available. /// /// Note that this function consumes the stream passed into it and returns a @@ -788,6 +858,59 @@ pub trait StreamExt: Stream { assert_stream::<U::Item, _>(FlatMap::new(self, f)) } + /// Maps a stream like [`StreamExt::map`] but flattens nested `Stream`s + /// and polls them concurrently, yielding items in any order, as they made + /// available. + /// + /// [`StreamExt::map`] is very useful, but if it produces `Stream`s + /// instead, and you need to poll all of them concurrently, you would + /// have to use something like `for_each_concurrent` and merge values + /// by hand. This combinator provides ability to collect all values + /// from concurrently polled streams into one stream. + /// + /// The first argument is an optional limit on the number of concurrently + /// polled streams. If this limit is not `None`, no more than `limit` streams + /// will be polled at the same time. The `limit` argument is of type + /// `Into<Option<usize>>`, and so can be provided as either `None`, + /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as + /// no limit at all, and will have the same result as passing in `None`. + /// + /// The provided closure which produces inner streams is executed over + /// all elements of stream as next stream item is available and limit + /// of concurrently processed streams isn't exceeded. + /// + /// Note that this function consumes the stream passed into it and + /// returns a wrapped version of it. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// + /// let stream = stream::iter(1..5); + /// let stream = stream.flat_map_unordered(1, |x| stream::iter(vec![x; x])); + /// let mut values = stream.collect::<Vec<_>>().await; + /// values.sort(); + /// + /// assert_eq!(vec![1usize, 2, 2, 3, 3, 3, 4, 4, 4, 4], values); + /// # }); + /// ``` + #[cfg(not(futures_no_atomic_cas))] + #[cfg(feature = "alloc")] + fn flat_map_unordered<U, F>( + self, + limit: impl Into<Option<usize>>, + f: F, + ) -> FlatMapUnordered<Self, U, F> + where + U: Stream + Unpin, + F: FnMut(Self::Item) -> U, + Self: Sized, + { + assert_stream::<U::Item, _>(FlatMapUnordered::new(self, limit.into(), f)) + } + /// Combinator similar to [`StreamExt::fold`] that holds internal state /// and produces a new stream. /// @@ -1397,8 +1520,7 @@ pub trait StreamExt: Stream { /// be immediately returned. /// /// If the underlying stream ended and only a partial vector was created, - /// it'll be returned. Additionally if an error happens from the underlying - /// stream then the currently buffered items will be yielded. + /// it will be returned. /// /// This method is only available when the `std` or `alloc` feature of this /// library is activated, and it is activated by default. @@ -1558,6 +1680,8 @@ pub trait StreamExt: Stream { /// assert_eq!(total, 6); /// # }); /// ``` + /// + /// [`select!`]: crate::select fn select_next_some(&mut self) -> SelectNextSome<'_, Self> where Self: Unpin + FusedStream, diff --git a/vendor/futures-util/src/stream/stream/peek.rs b/vendor/futures-util/src/stream/stream/peek.rs index c72dfc366..ea3d6243f 100644 --- a/vendor/futures-util/src/stream/stream/peek.rs +++ b/vendor/futures-util/src/stream/stream/peek.rs @@ -204,7 +204,7 @@ impl<S: Stream> Stream for Peekable<S> { } fn size_hint(&self) -> (usize, Option<usize>) { - let peek_len = if self.peeked.is_some() { 1 } else { 0 }; + let peek_len = usize::from(self.peeked.is_some()); let (lower, upper) = self.stream.size_hint(); let lower = lower.saturating_add(peek_len); let upper = match upper { diff --git a/vendor/futures-util/src/stream/stream/ready_chunks.rs b/vendor/futures-util/src/stream/stream/ready_chunks.rs index 5ebc9582d..192054c4a 100644 --- a/vendor/futures-util/src/stream/stream/ready_chunks.rs +++ b/vendor/futures-util/src/stream/stream/ready_chunks.rs @@ -1,6 +1,5 @@ -use crate::stream::Fuse; +use crate::stream::{Fuse, StreamExt}; use alloc::vec::Vec; -use core::mem; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; @@ -15,23 +14,15 @@ pin_project! { pub struct ReadyChunks<St: Stream> { #[pin] stream: Fuse<St>, - items: Vec<St::Item>, cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475 } } -impl<St: Stream> ReadyChunks<St> -where - St: Stream, -{ +impl<St: Stream> ReadyChunks<St> { pub(super) fn new(stream: St, capacity: usize) -> Self { assert!(capacity > 0); - Self { - stream: super::Fuse::new(stream), - items: Vec::with_capacity(capacity), - cap: capacity, - } + Self { stream: stream.fuse(), cap: capacity } } delegate_access_inner!(stream, St, (.)); @@ -43,40 +34,33 @@ impl<St: Stream> Stream for ReadyChunks<St> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.project(); + let mut items: Vec<St::Item> = Vec::new(); + loop { match this.stream.as_mut().poll_next(cx) { // Flush all collected data if underlying stream doesn't contain // more ready values Poll::Pending => { - return if this.items.is_empty() { - Poll::Pending - } else { - Poll::Ready(Some(mem::replace(this.items, Vec::with_capacity(*this.cap)))) - } + return if items.is_empty() { Poll::Pending } else { Poll::Ready(Some(items)) } } // Push the ready item into the buffer and check whether it is full. // If so, replace our buffer with a new and empty one and return // the full one. Poll::Ready(Some(item)) => { - this.items.push(item); - if this.items.len() >= *this.cap { - return Poll::Ready(Some(mem::replace( - this.items, - Vec::with_capacity(*this.cap), - ))); + if items.is_empty() { + items.reserve(*this.cap); + } + items.push(item); + if items.len() >= *this.cap { + return Poll::Ready(Some(items)); } } // Since the underlying stream ran out of values, return what we // have buffered, if we have anything. Poll::Ready(None) => { - let last = if this.items.is_empty() { - None - } else { - let full_buf = mem::replace(this.items, Vec::new()); - Some(full_buf) - }; + let last = if items.is_empty() { None } else { Some(items) }; return Poll::Ready(last); } @@ -85,20 +69,15 @@ impl<St: Stream> Stream for ReadyChunks<St> { } fn size_hint(&self) -> (usize, Option<usize>) { - let chunk_len = if self.items.is_empty() { 0 } else { 1 }; let (lower, upper) = self.stream.size_hint(); - let lower = lower.saturating_add(chunk_len); - let upper = match upper { - Some(x) => x.checked_add(chunk_len), - None => None, - }; + let lower = lower / self.cap; (lower, upper) } } -impl<St: FusedStream> FusedStream for ReadyChunks<St> { +impl<St: Stream> FusedStream for ReadyChunks<St> { fn is_terminated(&self) -> bool { - self.stream.is_terminated() && self.items.is_empty() + self.stream.is_terminated() } } diff --git a/vendor/futures-util/src/stream/stream/skip_while.rs b/vendor/futures-util/src/stream/stream/skip_while.rs index 50a21a21a..dabd5eefa 100644 --- a/vendor/futures-util/src/stream/stream/skip_while.rs +++ b/vendor/futures-util/src/stream/stream/skip_while.rs @@ -99,7 +99,7 @@ where if self.done_skipping { self.stream.size_hint() } else { - let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending_item.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/vendor/futures-util/src/stream/stream/split.rs b/vendor/futures-util/src/stream/stream/split.rs index 3a72fee30..e2034e0c2 100644 --- a/vendor/futures-util/src/stream/stream/split.rs +++ b/vendor/futures-util/src/stream/stream/split.rs @@ -35,7 +35,7 @@ impl<S: Stream> Stream for SplitStream<S> { } } -#[allow(bad_style)] +#[allow(non_snake_case)] fn SplitSink<S: Sink<Item>, Item>(lock: BiLock<S>) -> SplitSink<S, Item> { SplitSink { lock, slot: None } } diff --git a/vendor/futures-util/src/stream/stream/take.rs b/vendor/futures-util/src/stream/stream/take.rs index b1c728e33..29d6c39ee 100644 --- a/vendor/futures-util/src/stream/stream/take.rs +++ b/vendor/futures-util/src/stream/stream/take.rs @@ -54,11 +54,11 @@ where let (lower, upper) = self.stream.size_hint(); - let lower = cmp::min(lower, self.remaining as usize); + let lower = cmp::min(lower, self.remaining); let upper = match upper { - Some(x) if x < self.remaining as usize => Some(x), - _ => Some(self.remaining as usize), + Some(x) if x < self.remaining => Some(x), + _ => Some(self.remaining), }; (lower, upper) diff --git a/vendor/futures-util/src/stream/stream/take_while.rs b/vendor/futures-util/src/stream/stream/take_while.rs index 01b27654b..925694301 100644 --- a/vendor/futures-util/src/stream/stream/take_while.rs +++ b/vendor/futures-util/src/stream/stream/take_while.rs @@ -91,7 +91,7 @@ where return (0, Some(0)); } - let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending_item.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/vendor/futures-util/src/stream/stream/then.rs b/vendor/futures-util/src/stream/stream/then.rs index d4531d4b9..9192c0b0c 100644 --- a/vendor/futures-util/src/stream/stream/then.rs +++ b/vendor/futures-util/src/stream/stream/then.rs @@ -78,7 +78,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let future_len = if self.future.is_some() { 1 } else { 0 }; + let future_len = usize::from(self.future.is_some()); let (lower, upper) = self.stream.size_hint(); let lower = lower.saturating_add(future_len); let upper = match upper { diff --git a/vendor/futures-util/src/stream/stream/unzip.rs b/vendor/futures-util/src/stream/stream/unzip.rs index 15f22e80b..a88cf0326 100644 --- a/vendor/futures-util/src/stream/stream/unzip.rs +++ b/vendor/futures-util/src/stream/stream/unzip.rs @@ -21,7 +21,7 @@ pin_project! { impl<St: Stream, FromA: Default, FromB: Default> Unzip<St, FromA, FromB> { fn finish(self: Pin<&mut Self>) -> (FromA, FromB) { let this = self.project(); - (mem::replace(this.left, Default::default()), mem::replace(this.right, Default::default())) + (mem::take(this.left), mem::take(this.right)) } pub(super) fn new(stream: St) -> Self { diff --git a/vendor/futures-util/src/stream/stream/zip.rs b/vendor/futures-util/src/stream/stream/zip.rs index 360a8b63b..25a47e96b 100644 --- a/vendor/futures-util/src/stream/stream/zip.rs +++ b/vendor/futures-util/src/stream/stream/zip.rs @@ -102,8 +102,8 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let queued1_len = if self.queued1.is_some() { 1 } else { 0 }; - let queued2_len = if self.queued2.is_some() { 1 } else { 0 }; + let queued1_len = usize::from(self.queued1.is_some()); + let queued2_len = usize::from(self.queued2.is_some()); let (stream1_lower, stream1_upper) = self.stream1.size_hint(); let (stream2_lower, stream2_upper) = self.stream2.size_hint(); diff --git a/vendor/futures-util/src/stream/try_stream/and_then.rs b/vendor/futures-util/src/stream/try_stream/and_then.rs index a7b50db0b..2f8b6f258 100644 --- a/vendor/futures-util/src/stream/try_stream/and_then.rs +++ b/vendor/futures-util/src/stream/try_stream/and_then.rs @@ -71,7 +71,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let future_len = if self.future.is_some() { 1 } else { 0 }; + let future_len = usize::from(self.future.is_some()); let (lower, upper) = self.stream.size_hint(); let lower = lower.saturating_add(future_len); let upper = match upper { diff --git a/vendor/futures-util/src/stream/try_stream/into_async_read.rs b/vendor/futures-util/src/stream/try_stream/into_async_read.rs index 914b277a0..ffbfc7eae 100644 --- a/vendor/futures-util/src/stream/try_stream/into_async_read.rs +++ b/vendor/futures-util/src/stream/try_stream/into_async_read.rs @@ -1,30 +1,26 @@ -use crate::stream::TryStreamExt; use core::pin::Pin; use futures_core::ready; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; +use pin_project_lite::pin_project; use std::cmp; use std::io::{Error, Result}; -/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method. -#[derive(Debug)] -#[must_use = "readers do nothing unless polled"] -#[cfg_attr(docsrs, doc(cfg(feature = "io")))] -pub struct IntoAsyncRead<St> -where - St: TryStream<Error = Error> + Unpin, - St::Ok: AsRef<[u8]>, -{ - stream: St, - state: ReadState<St::Ok>, -} - -impl<St> Unpin for IntoAsyncRead<St> -where - St: TryStream<Error = Error> + Unpin, - St::Ok: AsRef<[u8]>, -{ +pin_project! { + /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method. + #[derive(Debug)] + #[must_use = "readers do nothing unless polled"] + #[cfg_attr(docsrs, doc(cfg(feature = "io")))] + pub struct IntoAsyncRead<St> + where + St: TryStream<Error = Error>, + St::Ok: AsRef<[u8]>, + { + #[pin] + stream: St, + state: ReadState<St::Ok>, + } } #[derive(Debug)] @@ -36,7 +32,7 @@ enum ReadState<T: AsRef<[u8]>> { impl<St> IntoAsyncRead<St> where - St: TryStream<Error = Error> + Unpin, + St: TryStream<Error = Error>, St::Ok: AsRef<[u8]>, { pub(super) fn new(stream: St) -> Self { @@ -46,16 +42,18 @@ where impl<St> AsyncRead for IntoAsyncRead<St> where - St: TryStream<Error = Error> + Unpin, + St: TryStream<Error = Error>, St::Ok: AsRef<[u8]>, { fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll<Result<usize>> { + let mut this = self.project(); + loop { - match &mut self.state { + match this.state { ReadState::Ready { chunk, chunk_start } => { let chunk = chunk.as_ref(); let len = cmp::min(buf.len(), chunk.len() - *chunk_start); @@ -64,23 +62,23 @@ where *chunk_start += len; if chunk.len() == *chunk_start { - self.state = ReadState::PendingChunk; + *this.state = ReadState::PendingChunk; } return Poll::Ready(Ok(len)); } - ReadState::PendingChunk => match ready!(self.stream.try_poll_next_unpin(cx)) { + ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) { Some(Ok(chunk)) => { if !chunk.as_ref().is_empty() { - self.state = ReadState::Ready { chunk, chunk_start: 0 }; + *this.state = ReadState::Ready { chunk, chunk_start: 0 }; } } Some(Err(err)) => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Err(err)); } None => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Ok(0)); } }, @@ -94,51 +92,52 @@ where impl<St> AsyncWrite for IntoAsyncRead<St> where - St: TryStream<Error = Error> + AsyncWrite + Unpin, + St: TryStream<Error = Error> + AsyncWrite, St::Ok: AsRef<[u8]>, { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll<Result<usize>> { - Pin::new(&mut self.stream).poll_write(cx, buf) + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { + let this = self.project(); + this.stream.poll_write(cx, buf) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { - Pin::new(&mut self.stream).poll_flush(cx) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + let this = self.project(); + this.stream.poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { - Pin::new(&mut self.stream).poll_close(cx) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> { + let this = self.project(); + this.stream.poll_close(cx) } } impl<St> AsyncBufRead for IntoAsyncRead<St> where - St: TryStream<Error = Error> + Unpin, + St: TryStream<Error = Error>, St::Ok: AsRef<[u8]>, { - fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { - while let ReadState::PendingChunk = self.state { - match ready!(self.stream.try_poll_next_unpin(cx)) { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> { + let mut this = self.project(); + + while let ReadState::PendingChunk = this.state { + match ready!(this.stream.as_mut().try_poll_next(cx)) { Some(Ok(chunk)) => { if !chunk.as_ref().is_empty() { - self.state = ReadState::Ready { chunk, chunk_start: 0 }; + *this.state = ReadState::Ready { chunk, chunk_start: 0 }; } } Some(Err(err)) => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Err(err)); } None => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Ok(&[])); } } } - if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state { + if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state { let chunk = chunk.as_ref(); return Poll::Ready(Ok(&chunk[chunk_start..])); } @@ -147,16 +146,18 @@ where Poll::Ready(Ok(&[])) } - fn consume(mut self: Pin<&mut Self>, amount: usize) { + fn consume(self: Pin<&mut Self>, amount: usize) { + let this = self.project(); + // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295 if amount == 0 { return; } - if let ReadState::Ready { chunk, chunk_start } = &mut self.state { + if let ReadState::Ready { chunk, chunk_start } = this.state { *chunk_start += amount; debug_assert!(*chunk_start <= chunk.as_ref().len()); if *chunk_start >= chunk.as_ref().len() { - self.state = ReadState::PendingChunk; + *this.state = ReadState::PendingChunk; } } else { debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk"); diff --git a/vendor/futures-util/src/stream/try_stream/mod.rs b/vendor/futures-util/src/stream/try_stream/mod.rs index 455ddca3f..414a40dbe 100644 --- a/vendor/futures-util/src/stream/try_stream/mod.rs +++ b/vendor/futures-util/src/stream/try_stream/mod.rs @@ -15,6 +15,7 @@ use crate::stream::{Inspect, Map}; #[cfg(feature = "alloc")] use alloc::vec::Vec; use core::pin::Pin; + use futures_core::{ future::{Future, TryFuture}, stream::TryStream, @@ -88,6 +89,14 @@ mod try_flatten; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_flatten::TryFlatten; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +mod try_flatten_unordered; +#[cfg(not(futures_no_atomic_cas))] +#[cfg(feature = "alloc")] +#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 +pub use self::try_flatten_unordered::TryFlattenUnordered; + mod try_collect; #[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411 pub use self::try_collect::TryCollect; @@ -711,6 +720,63 @@ pub trait TryStreamExt: TryStream { assert_stream::<Result<T, Self::Error>, _>(TryFilterMap::new(self, f)) } + /// Flattens a stream of streams into just one continuous stream. Produced streams + /// will be polled concurrently and any errors will be passed through without looking at them. + /// If the underlying base stream returns an error, it will be **immediately** propagated. + /// + /// The only argument is an optional limit on the number of concurrently + /// polled streams. If this limit is not `None`, no more than `limit` streams + /// will be polled at the same time. The `limit` argument is of type + /// `Into<Option<usize>>`, and so can be provided as either `None`, + /// `Some(10)`, or just `10`. Note: a limit of zero is interpreted as + /// no limit at all, and will have the same result as passing in `None`. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::channel::mpsc; + /// use futures::stream::{StreamExt, TryStreamExt}; + /// use std::thread; + /// + /// let (tx1, rx1) = mpsc::unbounded(); + /// let (tx2, rx2) = mpsc::unbounded(); + /// let (tx3, rx3) = mpsc::unbounded(); + /// + /// thread::spawn(move || { + /// tx1.unbounded_send(Ok(1)).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx2.unbounded_send(Ok(2)).unwrap(); + /// tx2.unbounded_send(Err(3)).unwrap(); + /// tx2.unbounded_send(Ok(4)).unwrap(); + /// }); + /// thread::spawn(move || { + /// tx3.unbounded_send(Ok(rx1)).unwrap(); + /// tx3.unbounded_send(Ok(rx2)).unwrap(); + /// tx3.unbounded_send(Err(5)).unwrap(); + /// }); + /// + /// let stream = rx3.try_flatten_unordered(None); + /// let mut values: Vec<_> = stream.collect().await; + /// values.sort(); + /// + /// assert_eq!(values, vec![Ok(1), Ok(2), Ok(4), Err(3), Err(5)]); + /// # }); + /// ``` + #[cfg(not(futures_no_atomic_cas))] + #[cfg(feature = "alloc")] + fn try_flatten_unordered(self, limit: impl Into<Option<usize>>) -> TryFlattenUnordered<Self> + where + Self::Ok: TryStream + Unpin, + <Self::Ok as TryStream>::Error: From<Self::Error>, + Self: Sized, + { + assert_stream::<Result<<Self::Ok as TryStream>::Ok, <Self::Ok as TryStream>::Error>, _>( + TryFlattenUnordered::new(self, limit), + ) + } + /// Flattens a stream of streams into just one continuous stream. /// /// If this stream's elements are themselves streams then this combinator @@ -736,17 +802,21 @@ pub trait TryStreamExt: TryStream { /// thread::spawn(move || { /// tx2.unbounded_send(Ok(2)).unwrap(); /// tx2.unbounded_send(Err(3)).unwrap(); + /// tx2.unbounded_send(Ok(4)).unwrap(); /// }); /// thread::spawn(move || { /// tx3.unbounded_send(Ok(rx1)).unwrap(); /// tx3.unbounded_send(Ok(rx2)).unwrap(); - /// tx3.unbounded_send(Err(4)).unwrap(); + /// tx3.unbounded_send(Err(5)).unwrap(); /// }); /// /// let mut stream = rx3.try_flatten(); /// assert_eq!(stream.next().await, Some(Ok(1))); /// assert_eq!(stream.next().await, Some(Ok(2))); /// assert_eq!(stream.next().await, Some(Err(3))); + /// assert_eq!(stream.next().await, Some(Ok(4))); + /// assert_eq!(stream.next().await, Some(Err(5))); + /// assert_eq!(stream.next().await, None); /// # }); /// ``` fn try_flatten(self) -> TryFlatten<Self> @@ -914,7 +984,7 @@ pub trait TryStreamExt: TryStream { /// that matches the stream's `Error` type. /// /// This adaptor will buffer up to `n` futures and then return their - /// outputs in the order. If the underlying stream returns an error, it will + /// outputs in the same order as the underlying stream. If the underlying stream returns an error, it will /// be immediately propagated. /// /// The returned stream will be a stream of results, each containing either @@ -1001,6 +1071,7 @@ pub trait TryStreamExt: TryStream { /// Wraps a [`TryStream`] into a stream compatible with libraries using /// futures 0.1 `Stream`. Requires the `compat` feature to be enabled. /// ``` + /// # if cfg!(miri) { return; } // Miri does not support epoll /// use futures::future::{FutureExt, TryFutureExt}; /// # let (tx, rx) = futures::channel::oneshot::channel(); /// @@ -1026,12 +1097,7 @@ pub trait TryStreamExt: TryStream { Compat::new(self) } - /// Adapter that converts this stream into an [`AsyncRead`](crate::io::AsyncRead). - /// - /// Note that because `into_async_read` moves the stream, the [`Stream`](futures_core::stream::Stream) type must be - /// [`Unpin`]. If you want to use `into_async_read` with a [`!Unpin`](Unpin) stream, you'll - /// first have to pin the stream. This can be done by boxing the stream using [`Box::pin`] - /// or pinning it to the stack using the `pin_mut!` macro from the `pin_utils` crate. + /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead). /// /// This method is only available when the `std` feature of this /// library is activated, and it is activated by default. @@ -1043,12 +1109,12 @@ pub trait TryStreamExt: TryStream { /// use futures::stream::{self, TryStreamExt}; /// use futures::io::AsyncReadExt; /// - /// let stream = stream::iter(vec![Ok(vec![1, 2, 3, 4, 5])]); + /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]); /// let mut reader = stream.into_async_read(); - /// let mut buf = Vec::new(); /// - /// assert!(reader.read_to_end(&mut buf).await.is_ok()); - /// assert_eq!(buf, &[1, 2, 3, 4, 5]); + /// let mut buf = Vec::new(); + /// reader.read_to_end(&mut buf).await.unwrap(); + /// assert_eq!(buf, [1, 2, 3, 4, 5]); /// # }) /// ``` #[cfg(feature = "io")] @@ -1056,7 +1122,7 @@ pub trait TryStreamExt: TryStream { #[cfg(feature = "std")] fn into_async_read(self) -> IntoAsyncRead<Self> where - Self: Sized + TryStreamExt<Error = std::io::Error> + Unpin, + Self: Sized + TryStreamExt<Error = std::io::Error>, Self::Ok: AsRef<[u8]>, { crate::io::assert_read(IntoAsyncRead::new(self)) diff --git a/vendor/futures-util/src/stream/try_stream/or_else.rs b/vendor/futures-util/src/stream/try_stream/or_else.rs index cb69e8132..53aceb8e6 100644 --- a/vendor/futures-util/src/stream/try_stream/or_else.rs +++ b/vendor/futures-util/src/stream/try_stream/or_else.rs @@ -75,7 +75,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let future_len = if self.future.is_some() { 1 } else { 0 }; + let future_len = usize::from(self.future.is_some()); let (lower, upper) = self.stream.size_hint(); let lower = lower.saturating_add(future_len); let upper = match upper { diff --git a/vendor/futures-util/src/stream/try_stream/try_buffered.rs b/vendor/futures-util/src/stream/try_stream/try_buffered.rs index 45bd3f8c7..9f48e5c0a 100644 --- a/vendor/futures-util/src/stream/try_stream/try_buffered.rs +++ b/vendor/futures-util/src/stream/try_stream/try_buffered.rs @@ -54,7 +54,7 @@ where // our queue of futures. Propagate errors from the stream immediately. while this.in_progress_queue.len() < *this.max { match this.stream.as_mut().poll_next(cx)? { - Poll::Ready(Some(fut)) => this.in_progress_queue.push(fut.into_future()), + Poll::Ready(Some(fut)) => this.in_progress_queue.push_back(fut.into_future()), Poll::Ready(None) | Poll::Pending => break, } } diff --git a/vendor/futures-util/src/stream/try_stream/try_chunks.rs b/vendor/futures-util/src/stream/try_stream/try_chunks.rs index 07d4425a8..ec53f4bd1 100644 --- a/vendor/futures-util/src/stream/try_stream/try_chunks.rs +++ b/vendor/futures-util/src/stream/try_stream/try_chunks.rs @@ -41,9 +41,10 @@ impl<St: TryStream> TryChunks<St> { delegate_access_inner!(stream, St, (. .)); } +type TryChunksStreamError<St> = TryChunksError<<St as TryStream>::Ok, <St as TryStream>::Error>; + impl<St: TryStream> Stream for TryChunks<St> { - #[allow(clippy::type_complexity)] - type Item = Result<Vec<St::Ok>, TryChunksError<St::Ok, St::Error>>; + type Item = Result<Vec<St::Ok>, TryChunksStreamError<St>>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { let mut this = self.as_mut().project(); @@ -70,7 +71,7 @@ impl<St: TryStream> Stream for TryChunks<St> { let last = if this.items.is_empty() { None } else { - let full_buf = mem::replace(this.items, Vec::new()); + let full_buf = mem::take(this.items); Some(full_buf) }; @@ -81,9 +82,9 @@ impl<St: TryStream> Stream for TryChunks<St> { } fn size_hint(&self) -> (usize, Option<usize>) { - let chunk_len = if self.items.is_empty() { 0 } else { 1 }; + let chunk_len = usize::from(!self.items.is_empty()); let (lower, upper) = self.stream.size_hint(); - let lower = lower.saturating_add(chunk_len); + let lower = (lower / self.cap).saturating_add(chunk_len); let upper = match upper { Some(x) => x.checked_add(chunk_len), None => None, diff --git a/vendor/futures-util/src/stream/try_stream/try_collect.rs b/vendor/futures-util/src/stream/try_stream/try_collect.rs index 5d3b3d766..3e5963f03 100644 --- a/vendor/futures-util/src/stream/try_stream/try_collect.rs +++ b/vendor/futures-util/src/stream/try_stream/try_collect.rs @@ -45,7 +45,7 @@ where Poll::Ready(Ok(loop { match ready!(this.stream.as_mut().try_poll_next(cx)?) { Some(x) => this.items.extend(Some(x)), - None => break mem::replace(this.items, Default::default()), + None => break mem::take(this.items), } })) } diff --git a/vendor/futures-util/src/stream/try_stream/try_filter.rs b/vendor/futures-util/src/stream/try_stream/try_filter.rs index 61e6105c3..11d58243f 100644 --- a/vendor/futures-util/src/stream/try_stream/try_filter.rs +++ b/vendor/futures-util/src/stream/try_stream/try_filter.rs @@ -90,7 +90,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let pending_len = if self.pending_fut.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending_fut.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/vendor/futures-util/src/stream/try_stream/try_filter_map.rs b/vendor/futures-util/src/stream/try_stream/try_filter_map.rs index bb1b5b9db..ed1201732 100644 --- a/vendor/futures-util/src/stream/try_stream/try_filter_map.rs +++ b/vendor/futures-util/src/stream/try_stream/try_filter_map.rs @@ -84,7 +84,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let pending_len = if self.pending.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs new file mode 100644 index 000000000..a74dfc451 --- /dev/null +++ b/vendor/futures-util/src/stream/try_stream/try_flatten_unordered.rs @@ -0,0 +1,176 @@ +use core::marker::PhantomData; +use core::pin::Pin; + +use futures_core::ready; +use futures_core::stream::{FusedStream, Stream, TryStream}; +use futures_core::task::{Context, Poll}; +#[cfg(feature = "sink")] +use futures_sink::Sink; + +use pin_project_lite::pin_project; + +use crate::future::Either; +use crate::stream::stream::flatten_unordered::{ + FlattenUnorderedWithFlowController, FlowController, FlowStep, +}; +use crate::stream::IntoStream; +use crate::TryStreamExt; + +delegate_all!( + /// Stream for the [`try_flatten_unordered`](super::TryStreamExt::try_flatten_unordered) method. + TryFlattenUnordered<St>( + FlattenUnorderedWithFlowController<NestedTryStreamIntoEitherTryStream<St>, PropagateBaseStreamError<St>> + ): Debug + Sink + Stream + FusedStream + AccessInner[St, (. .)] + + New[ + |stream: St, limit: impl Into<Option<usize>>| + FlattenUnorderedWithFlowController::new( + NestedTryStreamIntoEitherTryStream::new(stream), + limit.into() + ) + ] + where + St: TryStream, + St::Ok: TryStream, + St::Ok: Unpin, + <St::Ok as TryStream>::Error: From<St::Error> +); + +pin_project! { + /// Emits either successful streams or single-item streams containing the underlying errors. + /// This's a wrapper for `FlattenUnordered` to reuse its logic over `TryStream`. + #[derive(Debug)] + #[must_use = "streams do nothing unless polled"] + pub struct NestedTryStreamIntoEitherTryStream<St> + where + St: TryStream, + St::Ok: TryStream, + St::Ok: Unpin, + <St::Ok as TryStream>::Error: From<St::Error> + { + #[pin] + stream: St + } +} + +impl<St> NestedTryStreamIntoEitherTryStream<St> +where + St: TryStream, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<St::Error>, +{ + fn new(stream: St) -> Self { + Self { stream } + } + + delegate_access_inner!(stream, St, ()); +} + +/// Emits a single item immediately, then stream will be terminated. +#[derive(Debug, Clone)] +pub struct Single<T>(Option<T>); + +impl<T> Single<T> { + /// Constructs new `Single` with the given value. + fn new(val: T) -> Self { + Self(Some(val)) + } + + /// Attempts to take inner item immediately. Will always succeed if the stream isn't terminated. + fn next_immediate(&mut self) -> Option<T> { + self.0.take() + } +} + +impl<T> Unpin for Single<T> {} + +impl<T> Stream for Single<T> { + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> { + Poll::Ready(self.0.take()) + } + + fn size_hint(&self) -> (usize, Option<usize>) { + self.0.as_ref().map_or((0, Some(0)), |_| (1, Some(1))) + } +} + +/// Immediately propagates errors occurred in the base stream. +#[derive(Debug, Clone, Copy)] +pub struct PropagateBaseStreamError<St>(PhantomData<St>); + +type BaseStreamItem<St> = <NestedTryStreamIntoEitherTryStream<St> as Stream>::Item; +type InnerStreamItem<St> = <BaseStreamItem<St> as Stream>::Item; + +impl<St> FlowController<BaseStreamItem<St>, InnerStreamItem<St>> for PropagateBaseStreamError<St> +where + St: TryStream, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<St::Error>, +{ + fn next_step(item: BaseStreamItem<St>) -> FlowStep<BaseStreamItem<St>, InnerStreamItem<St>> { + match item { + // A new successful inner stream received + st @ Either::Left(_) => FlowStep::Continue(st), + // An error encountered + Either::Right(mut err) => FlowStep::Return(err.next_immediate().unwrap()), + } + } +} + +type SingleStreamResult<St> = Single<Result<<St as TryStream>::Ok, <St as TryStream>::Error>>; + +impl<St> Stream for NestedTryStreamIntoEitherTryStream<St> +where + St: TryStream, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<St::Error>, +{ + // Item is either an inner stream or a stream containing a single error. + // This will allow using `Either`'s `Stream` implementation as both branches are actually streams of `Result`'s. + type Item = Either<IntoStream<St::Ok>, SingleStreamResult<St::Ok>>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let item = ready!(self.project().stream.try_poll_next(cx)); + + let out = match item { + Some(res) => match res { + // Emit successful inner stream as is + Ok(stream) => Either::Left(stream.into_stream()), + // Wrap an error into a stream containing a single item + err @ Err(_) => { + let res = err.map(|_: St::Ok| unreachable!()).map_err(Into::into); + + Either::Right(Single::new(res)) + } + }, + None => return Poll::Ready(None), + }; + + Poll::Ready(Some(out)) + } +} + +impl<St> FusedStream for NestedTryStreamIntoEitherTryStream<St> +where + St: TryStream + FusedStream, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<St::Error>, +{ + fn is_terminated(&self) -> bool { + self.stream.is_terminated() + } +} + +// Forwarding impl of Sink from the underlying stream +#[cfg(feature = "sink")] +impl<St, Item> Sink<Item> for NestedTryStreamIntoEitherTryStream<St> +where + St: TryStream + Sink<Item>, + St::Ok: TryStream + Unpin, + <St::Ok as TryStream>::Error: From<<St as TryStream>::Error>, +{ + type Error = <St as Sink<Item>>::Error; + + delegate_sink!(stream, Item); +} diff --git a/vendor/futures-util/src/stream/try_stream/try_skip_while.rs b/vendor/futures-util/src/stream/try_stream/try_skip_while.rs index a424b6c5b..52aa2d478 100644 --- a/vendor/futures-util/src/stream/try_stream/try_skip_while.rs +++ b/vendor/futures-util/src/stream/try_stream/try_skip_while.rs @@ -87,7 +87,7 @@ where } fn size_hint(&self) -> (usize, Option<usize>) { - let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending_item.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/vendor/futures-util/src/stream/try_stream/try_take_while.rs b/vendor/futures-util/src/stream/try_stream/try_take_while.rs index 3375960ef..4b5ff1ad3 100644 --- a/vendor/futures-util/src/stream/try_stream/try_take_while.rs +++ b/vendor/futures-util/src/stream/try_stream/try_take_while.rs @@ -96,7 +96,7 @@ where return (0, Some(0)); } - let pending_len = if self.pending_item.is_some() { 1 } else { 0 }; + let pending_len = usize::from(self.pending_item.is_some()); let (_, upper) = self.stream.size_hint(); let upper = match upper { Some(x) => x.checked_add(pending_len), diff --git a/vendor/futures-util/src/task/spawn.rs b/vendor/futures-util/src/task/spawn.rs index f8779230e..d9e998530 100644 --- a/vendor/futures-util/src/task/spawn.rs +++ b/vendor/futures-util/src/task/spawn.rs @@ -34,6 +34,7 @@ pub trait SpawnExt: Spawn { /// today. Feel free to use this method in the meantime. /// /// ``` + /// # { /// use futures::executor::ThreadPool; /// use futures::task::SpawnExt; /// @@ -41,6 +42,8 @@ pub trait SpawnExt: Spawn { /// /// let future = async { /* ... */ }; /// executor.spawn(future).unwrap(); + /// # } + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 /// ``` #[cfg(feature = "alloc")] fn spawn<Fut>(&self, future: Fut) -> Result<(), SpawnError> @@ -58,6 +61,7 @@ pub trait SpawnExt: Spawn { /// resolves to the output of the spawned future. /// /// ``` + /// # { /// use futures::executor::{block_on, ThreadPool}; /// use futures::future; /// use futures::task::SpawnExt; @@ -67,6 +71,8 @@ pub trait SpawnExt: Spawn { /// let future = future::ready(1); /// let join_handle_fut = executor.spawn_with_handle(future).unwrap(); /// assert_eq!(block_on(join_handle_fut), 1); + /// # } + /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371 /// ``` #[cfg(feature = "channel")] #[cfg_attr(docsrs, doc(cfg(feature = "channel")))] |