diff options
author | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-18 17:35:05 +0000 |
---|---|---|
committer | Daniel Baumann <daniel.baumann@progress-linux.org> | 2024-05-18 17:39:31 +0000 |
commit | 85c675d0d09a45a135bddd15d7b385f8758c32fb (patch) | |
tree | 76267dbc9b9a130337be3640948fe397b04ac629 /drivers/block/ublk_drv.c | |
parent | Adding upstream version 6.6.15. (diff) | |
download | linux-85c675d0d09a45a135bddd15d7b385f8758c32fb.tar.xz linux-85c675d0d09a45a135bddd15d7b385f8758c32fb.zip |
Adding upstream version 6.7.7.upstream/6.7.7
Signed-off-by: Daniel Baumann <daniel.baumann@progress-linux.org>
Diffstat (limited to 'drivers/block/ublk_drv.c')
-rw-r--r-- | drivers/block/ublk_drv.c | 342 |
1 files changed, 215 insertions, 127 deletions
diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c index 630ddfe665..83600b45e1 100644 --- a/drivers/block/ublk_drv.c +++ b/drivers/block/ublk_drv.c @@ -75,6 +75,7 @@ struct ublk_rq_data { struct ublk_uring_cmd_pdu { struct ublk_queue *ubq; + u16 tag; }; /* @@ -115,6 +116,9 @@ struct ublk_uring_cmd_pdu { */ #define UBLK_IO_FLAG_NEED_GET_DATA 0x08 +/* atomic RW with ubq->cancel_lock */ +#define UBLK_IO_FLAG_CANCELED 0x80000000 + struct ublk_io { /* userspace buffer address from io cmd */ __u64 addr; @@ -138,13 +142,13 @@ struct ublk_queue { unsigned int max_io_sz; bool force_abort; bool timeout; + bool canceling; unsigned short nr_io_ready; /* how many ios setup */ + spinlock_t cancel_lock; struct ublk_device *dev; struct ublk_io ios[]; }; -#define UBLK_DAEMON_MONITOR_PERIOD (5 * HZ) - struct ublk_device { struct gendisk *ub_disk; @@ -166,7 +170,7 @@ struct ublk_device { struct mutex mutex; - spinlock_t mm_lock; + spinlock_t lock; struct mm_struct *mm; struct ublk_params params; @@ -175,11 +179,6 @@ struct ublk_device { unsigned int nr_queues_ready; unsigned int nr_privileged_daemon; - /* - * Our ubq->daemon may be killed without any notification, so - * monitor each queue's daemon periodically - */ - struct delayed_work monitor_work; struct work_struct quiesce_work; struct work_struct stop_work; }; @@ -190,10 +189,11 @@ struct ublk_params_header { __u32 types; }; +static bool ublk_abort_requests(struct ublk_device *ub, struct ublk_queue *ubq); + static inline unsigned int ublk_req_build_flags(struct request *req); static inline struct ublksrv_io_desc *ublk_get_iod(struct ublk_queue *ubq, int tag); - static inline bool ublk_dev_is_user_copy(const struct ublk_device *ub) { return ub->dev_info.flags & UBLK_F_USER_COPY; @@ -470,6 +470,7 @@ static DEFINE_MUTEX(ublk_ctl_mutex); * It can be extended to one per-user limit in future or even controlled * by cgroup. */ +#define UBLK_MAX_UBLKS UBLK_MINORS static unsigned int ublks_max = 64; static unsigned int ublks_added; /* protected by ublk_ctl_mutex */ @@ -1083,13 +1084,10 @@ static void __ublk_fail_req(struct ublk_queue *ubq, struct ublk_io *io, { WARN_ON_ONCE(io->flags & UBLK_IO_FLAG_ACTIVE); - if (!(io->flags & UBLK_IO_FLAG_ABORTED)) { - io->flags |= UBLK_IO_FLAG_ABORTED; - if (ublk_queue_can_use_recovery_reissue(ubq)) - blk_mq_requeue_request(req, false); - else - ublk_put_req_ref(ubq, req); - } + if (ublk_queue_can_use_recovery_reissue(ubq)) + blk_mq_requeue_request(req, false); + else + ublk_put_req_ref(ubq, req); } static void ubq_complete_io_cmd(struct ublk_io *io, int res, @@ -1118,8 +1116,6 @@ static inline void __ublk_abort_rq(struct ublk_queue *ubq, blk_mq_requeue_request(rq, false); else blk_mq_end_request(rq, BLK_STS_IOERR); - - mod_delayed_work(system_wq, &ubq->dev->monitor_work, 0); } static inline void __ublk_rq_task_work(struct request *req, @@ -1212,15 +1208,6 @@ static inline void ublk_forward_io_cmds(struct ublk_queue *ubq, __ublk_rq_task_work(blk_mq_rq_from_pdu(data), issue_flags); } -static inline void ublk_abort_io_cmds(struct ublk_queue *ubq) -{ - struct llist_node *io_cmds = llist_del_all(&ubq->io_cmds); - struct ublk_rq_data *data, *tmp; - - llist_for_each_entry_safe(data, tmp, io_cmds, node) - __ublk_abort_rq(ubq, blk_mq_rq_from_pdu(data)); -} - static void ublk_rq_task_work_cb(struct io_uring_cmd *cmd, unsigned issue_flags) { struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd); @@ -1232,38 +1219,19 @@ static void ublk_rq_task_work_cb(struct io_uring_cmd *cmd, unsigned issue_flags) static void ublk_queue_cmd(struct ublk_queue *ubq, struct request *rq) { struct ublk_rq_data *data = blk_mq_rq_to_pdu(rq); - struct ublk_io *io; - if (!llist_add(&data->node, &ubq->io_cmds)) - return; + if (llist_add(&data->node, &ubq->io_cmds)) { + struct ublk_io *io = &ubq->ios[rq->tag]; - io = &ubq->ios[rq->tag]; - /* - * If the check pass, we know that this is a re-issued request aborted - * previously in monitor_work because the ubq_daemon(cmd's task) is - * PF_EXITING. We cannot call io_uring_cmd_complete_in_task() anymore - * because this ioucmd's io_uring context may be freed now if no inflight - * ioucmd exists. Otherwise we may cause null-deref in ctx->fallback_work. - * - * Note: monitor_work sets UBLK_IO_FLAG_ABORTED and ends this request(releasing - * the tag). Then the request is re-started(allocating the tag) and we are here. - * Since releasing/allocating a tag implies smp_mb(), finding UBLK_IO_FLAG_ABORTED - * guarantees that here is a re-issued request aborted previously. - */ - if (unlikely(io->flags & UBLK_IO_FLAG_ABORTED)) { - ublk_abort_io_cmds(ubq); - } else { - struct io_uring_cmd *cmd = io->cmd; - struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd); - - pdu->ubq = ubq; - io_uring_cmd_complete_in_task(cmd, ublk_rq_task_work_cb); + io_uring_cmd_complete_in_task(io->cmd, ublk_rq_task_work_cb); } } static enum blk_eh_timer_return ublk_timeout(struct request *rq) { struct ublk_queue *ubq = rq->mq_hctx->driver_data; + unsigned int nr_inflight = 0; + int i; if (ubq->flags & UBLK_F_UNPRIVILEGED_DEV) { if (!ubq->timeout) { @@ -1274,6 +1242,29 @@ static enum blk_eh_timer_return ublk_timeout(struct request *rq) return BLK_EH_DONE; } + if (!ubq_daemon_is_dying(ubq)) + return BLK_EH_RESET_TIMER; + + for (i = 0; i < ubq->q_depth; i++) { + struct ublk_io *io = &ubq->ios[i]; + + if (!(io->flags & UBLK_IO_FLAG_ACTIVE)) + nr_inflight++; + } + + /* cancelable uring_cmd can't help us if all commands are in-flight */ + if (nr_inflight == ubq->q_depth) { + struct ublk_device *ub = ubq->dev; + + if (ublk_abort_requests(ub, ubq)) { + if (ublk_can_use_recovery(ub)) + schedule_work(&ub->quiesce_work); + else + schedule_work(&ub->stop_work); + } + return BLK_EH_DONE; + } + return BLK_EH_RESET_TIMER; } @@ -1301,13 +1292,12 @@ static blk_status_t ublk_queue_rq(struct blk_mq_hw_ctx *hctx, if (ublk_queue_can_use_recovery(ubq) && unlikely(ubq->force_abort)) return BLK_STS_IOERR; - blk_mq_start_request(bd->rq); - - if (unlikely(ubq_daemon_is_dying(ubq))) { + if (unlikely(ubq->canceling)) { __ublk_abort_rq(ubq, rq); return BLK_STS_OK; } + blk_mq_start_request(bd->rq); ublk_queue_cmd(ubq, rq); return BLK_STS_OK; @@ -1357,12 +1347,12 @@ static int ublk_ch_mmap(struct file *filp, struct vm_area_struct *vma) unsigned long pfn, end, phys_off = vma->vm_pgoff << PAGE_SHIFT; int q_id, ret = 0; - spin_lock(&ub->mm_lock); + spin_lock(&ub->lock); if (!ub->mm) ub->mm = current->mm; if (current->mm != ub->mm) ret = -EINVAL; - spin_unlock(&ub->mm_lock); + spin_unlock(&ub->lock); if (ret) return ret; @@ -1411,17 +1401,14 @@ static void ublk_commit_completion(struct ublk_device *ub, } /* - * When ->ubq_daemon is exiting, either new request is ended immediately, - * or any queued io command is drained, so it is safe to abort queue - * lockless + * Called from ubq_daemon context via cancel fn, meantime quiesce ublk + * blk-mq queue, so we are called exclusively with blk-mq and ubq_daemon + * context, so everything is serialized. */ static void ublk_abort_queue(struct ublk_device *ub, struct ublk_queue *ubq) { int i; - if (!ublk_get_device(ub)) - return; - for (i = 0; i < ubq->q_depth; i++) { struct ublk_io *io = &ubq->ios[i]; @@ -1433,72 +1420,114 @@ static void ublk_abort_queue(struct ublk_device *ub, struct ublk_queue *ubq) * will do it */ rq = blk_mq_tag_to_rq(ub->tag_set.tags[ubq->q_id], i); - if (rq) + if (rq && blk_mq_request_started(rq)) { + io->flags |= UBLK_IO_FLAG_ABORTED; __ublk_fail_req(ubq, io, rq); + } } } - ublk_put_device(ub); } -static void ublk_daemon_monitor_work(struct work_struct *work) +static bool ublk_abort_requests(struct ublk_device *ub, struct ublk_queue *ubq) { - struct ublk_device *ub = - container_of(work, struct ublk_device, monitor_work.work); - int i; + struct gendisk *disk; - for (i = 0; i < ub->dev_info.nr_hw_queues; i++) { - struct ublk_queue *ubq = ublk_get_queue(ub, i); + spin_lock(&ubq->cancel_lock); + if (ubq->canceling) { + spin_unlock(&ubq->cancel_lock); + return false; + } + ubq->canceling = true; + spin_unlock(&ubq->cancel_lock); - if (ubq_daemon_is_dying(ubq)) { - if (ublk_queue_can_use_recovery(ubq)) - schedule_work(&ub->quiesce_work); - else - schedule_work(&ub->stop_work); + spin_lock(&ub->lock); + disk = ub->ub_disk; + if (disk) + get_device(disk_to_dev(disk)); + spin_unlock(&ub->lock); - /* abort queue is for making forward progress */ - ublk_abort_queue(ub, ubq); - } - } + /* Our disk has been dead */ + if (!disk) + return false; - /* - * We can't schedule monitor work after ub's state is not UBLK_S_DEV_LIVE. - * after ublk_remove() or __ublk_quiesce_dev() is started. - * - * No need ub->mutex, monitor work are canceled after state is marked - * as not LIVE, so new state is observed reliably. - */ - if (ub->dev_info.state == UBLK_S_DEV_LIVE) - schedule_delayed_work(&ub->monitor_work, - UBLK_DAEMON_MONITOR_PERIOD); -} + /* Now we are serialized with ublk_queue_rq() */ + blk_mq_quiesce_queue(disk->queue); + /* abort queue is for making forward progress */ + ublk_abort_queue(ub, ubq); + blk_mq_unquiesce_queue(disk->queue); + put_device(disk_to_dev(disk)); -static inline bool ublk_queue_ready(struct ublk_queue *ubq) -{ - return ubq->nr_io_ready == ubq->q_depth; + return true; } -static void ublk_cmd_cancel_cb(struct io_uring_cmd *cmd, unsigned issue_flags) +static void ublk_cancel_cmd(struct ublk_queue *ubq, struct ublk_io *io, + unsigned int issue_flags) { - io_uring_cmd_done(cmd, UBLK_IO_RES_ABORT, 0, issue_flags); + bool done; + + if (!(io->flags & UBLK_IO_FLAG_ACTIVE)) + return; + + spin_lock(&ubq->cancel_lock); + done = !!(io->flags & UBLK_IO_FLAG_CANCELED); + if (!done) + io->flags |= UBLK_IO_FLAG_CANCELED; + spin_unlock(&ubq->cancel_lock); + + if (!done) + io_uring_cmd_done(io->cmd, UBLK_IO_RES_ABORT, 0, issue_flags); } -static void ublk_cancel_queue(struct ublk_queue *ubq) +/* + * The ublk char device won't be closed when calling cancel fn, so both + * ublk device and queue are guaranteed to be live + */ +static void ublk_uring_cmd_cancel_fn(struct io_uring_cmd *cmd, + unsigned int issue_flags) { - int i; + struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd); + struct ublk_queue *ubq = pdu->ubq; + struct task_struct *task; + struct ublk_device *ub; + bool need_schedule; + struct ublk_io *io; - if (!ublk_queue_ready(ubq)) + if (WARN_ON_ONCE(!ubq)) return; - for (i = 0; i < ubq->q_depth; i++) { - struct ublk_io *io = &ubq->ios[i]; + if (WARN_ON_ONCE(pdu->tag >= ubq->q_depth)) + return; + + task = io_uring_cmd_get_task(cmd); + if (WARN_ON_ONCE(task && task != ubq->ubq_daemon)) + return; + + ub = ubq->dev; + need_schedule = ublk_abort_requests(ub, ubq); + + io = &ubq->ios[pdu->tag]; + WARN_ON_ONCE(io->cmd != cmd); + ublk_cancel_cmd(ubq, io, issue_flags); - if (io->flags & UBLK_IO_FLAG_ACTIVE) - io_uring_cmd_complete_in_task(io->cmd, - ublk_cmd_cancel_cb); + if (need_schedule) { + if (ublk_can_use_recovery(ub)) + schedule_work(&ub->quiesce_work); + else + schedule_work(&ub->stop_work); } +} - /* all io commands are canceled */ - ubq->nr_io_ready = 0; +static inline bool ublk_queue_ready(struct ublk_queue *ubq) +{ + return ubq->nr_io_ready == ubq->q_depth; +} + +static void ublk_cancel_queue(struct ublk_queue *ubq) +{ + int i; + + for (i = 0; i < ubq->q_depth; i++) + ublk_cancel_cmd(ubq, &ubq->ios[i], IO_URING_F_UNLOCKED); } /* Cancel all pending commands, must be called after del_gendisk() returns */ @@ -1545,16 +1574,6 @@ static void __ublk_quiesce_dev(struct ublk_device *ub) blk_mq_quiesce_queue(ub->ub_disk->queue); ublk_wait_tagset_rqs_idle(ub); ub->dev_info.state = UBLK_S_DEV_QUIESCED; - ublk_cancel_dev(ub); - /* we are going to release task_struct of ubq_daemon and resets - * ->ubq_daemon to NULL. So in monitor_work, check on ubq_daemon causes UAF. - * Besides, monitor_work is not necessary in QUIESCED state since we have - * already scheduled quiesce_work and quiesced all ubqs. - * - * Do not let monitor_work schedule itself if state it QUIESCED. And we cancel - * it here and re-schedule it in END_USER_RECOVERY to avoid UAF. - */ - cancel_delayed_work_sync(&ub->monitor_work); } static void ublk_quiesce_work_fn(struct work_struct *work) @@ -1568,6 +1587,7 @@ static void ublk_quiesce_work_fn(struct work_struct *work) __ublk_quiesce_dev(ub); unlock: mutex_unlock(&ub->mutex); + ublk_cancel_dev(ub); } static void ublk_unquiesce_dev(struct ublk_device *ub) @@ -1593,6 +1613,8 @@ static void ublk_unquiesce_dev(struct ublk_device *ub) static void ublk_stop_dev(struct ublk_device *ub) { + struct gendisk *disk; + mutex_lock(&ub->mutex); if (ub->dev_info.state == UBLK_S_DEV_DEAD) goto unlock; @@ -1602,14 +1624,18 @@ static void ublk_stop_dev(struct ublk_device *ub) ublk_unquiesce_dev(ub); } del_gendisk(ub->ub_disk); + + /* Sync with ublk_abort_queue() by holding the lock */ + spin_lock(&ub->lock); + disk = ub->ub_disk; ub->dev_info.state = UBLK_S_DEV_DEAD; ub->dev_info.ublksrv_pid = -1; - put_disk(ub->ub_disk); ub->ub_disk = NULL; + spin_unlock(&ub->lock); + put_disk(disk); unlock: - ublk_cancel_dev(ub); mutex_unlock(&ub->mutex); - cancel_delayed_work_sync(&ub->monitor_work); + ublk_cancel_dev(ub); } /* device can only be started after all IOs are ready */ @@ -1660,6 +1686,21 @@ static inline void ublk_fill_io_cmd(struct ublk_io *io, io->addr = buf_addr; } +static inline void ublk_prep_cancel(struct io_uring_cmd *cmd, + unsigned int issue_flags, + struct ublk_queue *ubq, unsigned int tag) +{ + struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd); + + /* + * Safe to refer to @ubq since ublk_queue won't be died until its + * commands are completed + */ + pdu->ubq = ubq; + pdu->tag = tag; + io_uring_cmd_mark_cancelable(cmd, issue_flags); +} + static int __ublk_ch_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags, const struct ublksrv_io_cmd *ub_cmd) @@ -1775,6 +1816,7 @@ static int __ublk_ch_uring_cmd(struct io_uring_cmd *cmd, default: goto out; } + ublk_prep_cancel(cmd, issue_flags, ubq, tag); return -EIOCBQUEUED; out: @@ -1814,7 +1856,8 @@ fail_put: return NULL; } -static int ublk_ch_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags) +static inline int ublk_ch_uring_cmd_local(struct io_uring_cmd *cmd, + unsigned int issue_flags) { /* * Not necessary for async retry, but let's keep it simple and always @@ -1828,9 +1871,33 @@ static int ublk_ch_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags) .addr = READ_ONCE(ub_src->addr) }; + WARN_ON_ONCE(issue_flags & IO_URING_F_UNLOCKED); + return __ublk_ch_uring_cmd(cmd, issue_flags, &ub_cmd); } +static void ublk_ch_uring_cmd_cb(struct io_uring_cmd *cmd, + unsigned int issue_flags) +{ + ublk_ch_uring_cmd_local(cmd, issue_flags); +} + +static int ublk_ch_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags) +{ + if (unlikely(issue_flags & IO_URING_F_CANCEL)) { + ublk_uring_cmd_cancel_fn(cmd, issue_flags); + return 0; + } + + /* well-implemented server won't run into unlocked */ + if (unlikely(issue_flags & IO_URING_F_UNLOCKED)) { + io_uring_cmd_complete_in_task(cmd, ublk_ch_uring_cmd_cb); + return -EIOCBQUEUED; + } + + return ublk_ch_uring_cmd_local(cmd, issue_flags); +} + static inline bool ublk_check_ubuf_dir(const struct request *req, int ubuf_dir) { @@ -1962,6 +2029,7 @@ static int ublk_init_queue(struct ublk_device *ub, int q_id) void *ptr; int size; + spin_lock_init(&ubq->cancel_lock); ubq->flags = ub->dev_info.flags; ubq->q_id = q_id; ubq->q_depth = ub->dev_info.queue_depth; @@ -2026,7 +2094,8 @@ static int ublk_alloc_dev_number(struct ublk_device *ub, int idx) if (err == -ENOSPC) err = -EEXIST; } else { - err = idr_alloc(&ublk_index_idr, ub, 0, 0, GFP_NOWAIT); + err = idr_alloc(&ublk_index_idr, ub, 0, UBLK_MAX_UBLKS, + GFP_NOWAIT); } spin_unlock(&ublk_idr_lock); @@ -2151,8 +2220,6 @@ static int ublk_ctrl_start_dev(struct ublk_device *ub, struct io_uring_cmd *cmd) if (wait_for_completion_interruptible(&ub->completion) != 0) return -EINTR; - schedule_delayed_work(&ub->monitor_work, UBLK_DAEMON_MONITOR_PERIOD); - mutex_lock(&ub->mutex); if (ub->dev_info.state == UBLK_S_DEV_LIVE || test_bit(UB_STATE_USED, &ub->state)) { @@ -2305,6 +2372,12 @@ static int ublk_ctrl_add_dev(struct io_uring_cmd *cmd) return -EINVAL; } + if (header->dev_id != U32_MAX && header->dev_id >= UBLK_MAX_UBLKS) { + pr_warn("%s: dev id is too large. Max supported is %d\n", + __func__, UBLK_MAX_UBLKS - 1); + return -EINVAL; + } + ublk_dump_dev_info(&info); ret = mutex_lock_killable(&ublk_ctl_mutex); @@ -2320,10 +2393,9 @@ static int ublk_ctrl_add_dev(struct io_uring_cmd *cmd) if (!ub) goto out_unlock; mutex_init(&ub->mutex); - spin_lock_init(&ub->mm_lock); + spin_lock_init(&ub->lock); INIT_WORK(&ub->quiesce_work, ublk_quiesce_work_fn); INIT_WORK(&ub->stop_work, ublk_stop_work_fn); - INIT_DELAYED_WORK(&ub->monitor_work, ublk_daemon_monitor_work); ret = ublk_alloc_dev_number(ub, header->dev_id); if (ret < 0) @@ -2569,13 +2641,15 @@ static void ublk_queue_reinit(struct ublk_device *ub, struct ublk_queue *ubq) int i; WARN_ON_ONCE(!(ubq->ubq_daemon && ubq_daemon_is_dying(ubq))); + /* All old ioucmds have to be completed */ - WARN_ON_ONCE(ubq->nr_io_ready); + ubq->nr_io_ready = 0; /* old daemon is PF_EXITING, put it now */ put_task_struct(ubq->ubq_daemon); /* We have to reset it to NULL, otherwise ub won't accept new FETCH_REQ */ ubq->ubq_daemon = NULL; ubq->timeout = false; + ubq->canceling = false; for (i = 0; i < ubq->q_depth; i++) { struct ublk_io *io = &ubq->ios[i]; @@ -2661,7 +2735,6 @@ static int ublk_ctrl_end_recovery(struct ublk_device *ub, __func__, header->dev_id); blk_mq_kick_requeue_list(ub->ub_disk->queue); ub->dev_info.state = UBLK_S_DEV_LIVE; - schedule_delayed_work(&ub->monitor_work, UBLK_DAEMON_MONITOR_PERIOD); ret = 0; out_unlock: mutex_unlock(&ub->mutex); @@ -2932,7 +3005,22 @@ static void __exit ublk_exit(void) module_init(ublk_init); module_exit(ublk_exit); -module_param(ublks_max, int, 0444); +static int ublk_set_max_ublks(const char *buf, const struct kernel_param *kp) +{ + return param_set_uint_minmax(buf, kp, 0, UBLK_MAX_UBLKS); +} + +static int ublk_get_max_ublks(char *buf, const struct kernel_param *kp) +{ + return sysfs_emit(buf, "%u\n", ublks_max); +} + +static const struct kernel_param_ops ublk_max_ublks_ops = { + .set = ublk_set_max_ublks, + .get = ublk_get_max_ublks, +}; + +module_param_cb(ublks_max, &ublk_max_ublks_ops, &ublks_max, 0644); MODULE_PARM_DESC(ublks_max, "max number of ublk devices allowed to add(default: 64)"); MODULE_AUTHOR("Ming Lei <ming.lei@redhat.com>"); |