、代码结构(4) I/O “小”写流程

上一篇,介绍了dm dedup的写流程,这一篇,介绍它的一个特殊流程

如果我们接收到的对齐bio但是它的size < block_size,那么这时候是不能直接进行hash的。

需要将它的缺少的部分读出来,填充成一个完整的block_size才能计算hash。

接下来我们就介绍这一部分的代码流程。

static int handle_write(struct dedup_config *dc, struct bio *bio){    u64 lbn;    u8 hash[MAX_DIGEST_SIZE];    struct hash_pbn_value hashpbn_value;    u32 vsize;    struct bio *new_bio = NULL;    int r;    /* If there is a data corruption make the device read-only */    if (dc->corrupted_blocks > dc->fec_fixed)    return -EIO;    dc->writes++;    /* Read-on-write handling */    if (bio->bi_iter.bi_size < dc->block_size) {        dc->reads_on_writes++;        new_bio = prepare_bio_on_write(dc, bio);        if (!new_bio || IS_ERR(new_bio))        return -ENOMEM;        bio = new_bio;    }    /*.....*/}

对于“小”写这种操作,也被叫做reads_on_writes,或者read_motify_write

一起看看这个new_bio是如何被构造出来的。

struct bio *prepare_bio_on_write(struct dedup_config *dc, struct bio *bio){    int r;    sector_t lbn;    uint32_t vsize;    struct lbn_pbn_value lbnpbn_value;    struct bio *clone;    //DMINFO("\nEntered prepare bio on write");    lbn = compute_sector(bio, dc);    (void) sector_div(lbn, dc->sectors_per_block);    /* check for old or new lbn and fetch the appropriate pbn */    r = dc->kvs_lbn_pbn->kvs_lookup(dc->kvs_lbn_pbn, (void *)&lbn,    sizeof(lbn), (void *)&lbnpbn_value, &vsize);    if (r == -ENODATA)    clone = prepare_bio_without_pbn(dc, bio);    else if (r == 0)    clone = prepare_bio_with_pbn(dc, bio,         lbnpbn_value.pbn * dc->sectors_per_block);    else    return ERR_PTR(r);    //DMINFO("\nExiting prpare_bio_on_write");    return clone;}

我们注意:这里compute_sector算出来的,是bio sector的lbn

①:它有以下几种情况:

[x---y][x--------z]   ->   x/block_size=lbn     [x----y][a--------y]   ->   x/block_size=lbn     [x--y][a----------z]  ->   x/block_size=lbn

他们都会得到相同的lbn=x/block_size

        ② dc->kvs_lbn_pbn->kvs_lookup

这里需要将刚才算出来的lbn来求出是否存在,不存在填充zero。

接下啦就是两种情况了,这个lbn是否存在pbn。

    一、 lbn without pbn  "clone 

         prepare_bio_without_pbn(dc, bio);"

static struct bio *prepare_bio_without_pbn(struct dedup_config *dc,   struct bio *bio){    int r = 0;    struct bio *clone = NULL;    clone = create_bio(dc, bio);    if (!clone)    goto out;    zero_fill_bio(clone);    r = merge_data(dc, bio_page(clone), bio);    if (r < 0)    return ERR_PTR(r);    out:    return clone;}

这个很容易理解,既然不存在lbn_pbn的对应关系,那么这里就是直接填充zero。

这里用了内核提供的函数zero_fill_bio。如果我做,我可能不知道这个函数,大家要利用这个函数。

那么这里很简单,就是先创建一个null的bio,然后把这个bio的page全部填充成zero。

在和bio的进行合并。这里merge_data是代码实现的,我们看一下。

static int merge_data(struct dedup_config *dc, struct page *page,      struct bio *bio){    sector_t bi_sector = bio->bi_iter.bi_sector;    void *src_page_vaddr, *dest_page_vaddr;    int position, err = 0;    struct bvec_iter iter;    struct bio_vec bvec;    /* Relative offset in terms of sector size */    position = sector_div(bi_sector, dc->sectors_per_block);    if (!page || !bio_page(bio)) {        err = -EINVAL;        goto out;    }    /* Locating the right sector to merge */    dest_page_vaddr = page_address(page) + to_bytes(position);    bio_for_each_segment(bvec, bio, iter) {        src_page_vaddr = page_address(bio_iter_page(bio, iter)) + bio_iter_offset(bio, iter);        /* Merging Data */        memmove(dest_page_vaddr, src_page_vaddr, bio_iter_len(bio, iter));        /* Updating destinaion address */        dest_page_vaddr += bio_iter_len(bio, iter);    }    out:    return err;}

我们做块级功能研发的人,要对内存和bio sectors操作也要熟悉,

我曾经有段时间自己写代码就被segments和sectors各种对应绕晕。

这个代码写的还是比较鲜明的,先找出bi_sector在block_size内的位置

比如                       

sector [01234567]     [01234567]s                    free<>  page   <00000000>    ---><000data0>

                   

                               position就是3

        这时候,用page_address将[position]的数据转换成ptr

        然后将data memmove到3456的<0-0>的ptr上面就可以了。

        那么有一点复杂的是<  data >他也可能是割裂的比如:

       

sector   [01234567]      [01234567]      [01234567]s1          
        free
        free
s2            
             
       free
page     <00000000>   --><000da000>    --><000data0>

                

       这里需要将多个segments的page,s1和s2的内容,分别memmove到page里。

  一、 lbn without pbn  "clone 

         prepare_bio_with_pbn(dc, bio);"

    这里实现的思路,是既然发现了pbn,那么就把pbn读出来,然后把新的内容覆盖再写

这种方式就是标准的读-改-写,通常在I/O小于region_size的情况下,需要这么做。

因为需要block_size对齐,所以从pbn读一整个block_size出来。

static int fetch_whole_block(struct dedup_config *dc,     uint64_t pbn, struct page_list *pl){    struct dm_io_request iorq;    struct dm_io_region where;    unsigned long error_bits;    where.bdev = dc->data_dev->bdev;    where.sector = pbn;    where.count = dc->sectors_per_block;    iorq.bi_op = REQ_OP_READ;    iorq.bi_op_flags = 0;    iorq.mem.type = DM_IO_PAGE_LIST;    iorq.mem.ptr.pl = pl;    iorq.mem.offset = 0;    iorq.notify.fn = NULL;    iorq.client = dc->io_client;    return dm_io(&iorq, 1, &where, &error_bits);}
static struct bio *prepare_bio_with_pbn(struct dedup_config *dc,struct bio *bio, uint64_t pbn){    int r = 0;    struct page_list *pl;    struct bio *clone = NULL;    pl = kmalloc(sizeof(*pl), GFP_NOIO);    if (!pl)        goto out;    /*     * Since target I/O size is 4KB currently, we need only one page to     * store the data. However, if the target I/O size increases, we need     * to allocate more pages and set this linked list correctly.     */    pl->page = alloc_pages(GFP_NOIO, 0);    if (!pl->page)        goto out_allocfail;    pl->next = NULL;    r = fetch_whole_block(dc, pbn, pl);    if (r < 0)        goto out_fail;        r = merge_data(dc, pl->page, bio);    if (r < 0)        goto out_fail;        clone = create_bio(dc, bio);    if (!clone)        goto out_fail;        copy_pages(pl->page, clone);out_fail:    free_pages((unsigned long) page_address(pl->page), 0);    out_allocfail:    kfree(pl);out:    if (r < 0)        return ERR_PTR(r);    return clone;}

已经读出来的pl->page的pbn的内容和bio做merge,这个和上面zero的情况一下。

最后再将merge好的,数据拷贝到bio_new中,去做下一轮handle_write处理,也就是上一篇的内容。

这一篇的内容比较简单,所以没有图示,

--------------未完待续--------------

【本文只在51cto博客作者 “底层存储技术”  个人发布,公众号发布:存储之谷】,如需转载,请于本人联系,谢谢。