Linux 原生 AIO 實(shí)現(xiàn)(Native AIO)
上一篇文章 主要分析了 Linux 原生 AIO 的原理和使用,而這篇要介紹的是 Linux 原生 AIO 的實(shí)現(xiàn)過(guò)程。
本文基于 Linux-2.6.0 版本內(nèi)核源碼
一般來(lái)說(shuō),使用 Linux 原生 AIO 需要 3 個(gè)步驟:
1) 調(diào)用
io_setup函數(shù)創(chuàng)建一個(gè)一般 IO 上下文。2) 調(diào)用
io_submit函數(shù)向內(nèi)核提交一個(gè)異步 IO 操作。3) 調(diào)用
io_getevents函數(shù)獲取異步 IO 操作結(jié)果。
所以,我們可以通過(guò)分析這三個(gè)函數(shù)的實(shí)現(xiàn)來(lái)理解 Linux 原生 AIO 的實(shí)現(xiàn)。
Linux 原生 AIO 實(shí)現(xiàn)在源碼文件
/fs/aio.c中。
創(chuàng)建異步 IO 上下文
要使用 Linux 原生 AIO,首先需要?jiǎng)?chuàng)建一個(gè)異步 IO 上下文,在內(nèi)核中,異步 IO 上下文使用 kioctx 結(jié)構(gòu)表示,定義如下:
struct kioctx {atomic_t users; // 引用計(jì)數(shù)器int dead; // 是否已經(jīng)關(guān)閉struct mm_struct *mm; // 對(duì)應(yīng)的內(nèi)存管理對(duì)象unsigned long user_id; // 唯一的ID,用于標(biāo)識(shí)當(dāng)前上下文, 返回給用戶(hù)struct kioctx *next;wait_queue_head_t wait; // 等待隊(duì)列spinlock_t ctx_lock; // 鎖int reqs_active; // 正在進(jìn)行的異步IO請(qǐng)求數(shù)struct list_head active_reqs; // 正在進(jìn)行的異步IO請(qǐng)求對(duì)象struct list_head run_list;unsigned max_reqs; // 最大IO請(qǐng)求數(shù)struct aio_ring_info ring_info; // 環(huán)形緩沖區(qū)struct work_struct wq;};
在 kioctx 結(jié)構(gòu)中,比較重要的成員為 active_reqs 和 ring_info。active_reqs 保存了所有正在進(jìn)行的異步 IO 操作,而 ring_info 成員用于存放異步 IO 操作的結(jié)果。
kioctx 結(jié)構(gòu)如 圖1 所示:

如 圖1 所示,active_reqs 成員保存的異步 IO 操作隊(duì)列是以 kiocb 結(jié)構(gòu)為單元的,而 ring_info 成員指向一個(gè)類(lèi)型為 aio_ring_info 結(jié)構(gòu)的環(huán)形緩沖區(qū)(Ring Buffer)。
所以我們先來(lái)看看 kiocb 結(jié)構(gòu)和 aio_ring_info 結(jié)構(gòu)的定義:
struct kiocb {...struct file *ki_filp; // 異步IO操作的文件對(duì)象struct kioctx *ki_ctx; // 指向所屬的異步IO上下文...struct list_head ki_list; // 用于連接所有正在進(jìn)行的異步IO操作對(duì)象__u64 ki_user_data; // 用戶(hù)提供的數(shù)據(jù)指針(可用于區(qū)分異步IO操作)loff_t ki_pos; // 異步IO操作的文件偏移量...};
kiocb 結(jié)構(gòu)比較簡(jiǎn)單,主要用于保存異步 IO 操作的一些信息,如:
ki_filp:用于保存進(jìn)行異步 IO 的文件對(duì)象。ki_ctx:指向所屬的異步 IO 上下文對(duì)象。ki_list:用于連接當(dāng)前異步 IO 上下文中的所有 IO 操作對(duì)象。ki_user_data:這個(gè)字段主要提供給用戶(hù)自定義使用,比如區(qū)分異步 IO 操作,或者設(shè)置一個(gè)回調(diào)函數(shù)等。ki_pos:用于保存異步 IO 操作的文件偏移量。
而 aio_ring_info 結(jié)構(gòu)是一個(gè)環(huán)形緩沖區(qū)的實(shí)現(xiàn),其定義如下:
struct aio_ring_info {unsigned long mmap_base; // 環(huán)形緩沖區(qū)的虛擬內(nèi)存地址unsigned long mmap_size; // 環(huán)形緩沖區(qū)的大小struct page **ring_pages; // 環(huán)形緩沖區(qū)所使用的內(nèi)存頁(yè)數(shù)組spinlock_t ring_lock; // 保護(hù)環(huán)形緩沖區(qū)的自旋鎖long nr_pages; // 環(huán)形緩沖區(qū)所占用的內(nèi)存頁(yè)數(shù)unsigned nr, tail;// 如果環(huán)形緩沖區(qū)不大于 8 個(gè)內(nèi)存頁(yè)時(shí)// ring_pages 就指向 internal_pages 字段struct page *internal_pages[AIO_RING_PAGES];};
這個(gè)環(huán)形緩沖區(qū)主要用于保存已經(jīng)完成的異步 IO 操作的結(jié)果,異步 IO 操作的結(jié)果使用 io_event 結(jié)構(gòu)表示。如 圖2 所示:

圖2 中的 head 代表環(huán)形緩沖區(qū)的開(kāi)始位置,而 tail 代表環(huán)形緩沖區(qū)的結(jié)束位置,如果 tail 大于 head,則表示有完成的異步 IO 操作結(jié)果可以獲取。如果 head 等于 tail,則表示沒(méi)有完成的異步 IO 操作。
環(huán)形緩沖區(qū)的 head 和 tail 位置保存在 aio_ring 的結(jié)構(gòu)中,其定義如下:
struct aio_ring {unsigned id;unsigned nr; // 環(huán)形緩沖區(qū)可容納的 io_event 數(shù)unsigned head; // 環(huán)形緩沖區(qū)的開(kāi)始位置unsigned tail; // 環(huán)形緩沖區(qū)的結(jié)束位置...};
上面介紹了那么多數(shù)據(jù)結(jié)構(gòu),只是為了接下來(lái)的源碼分析更加容易明白。
現(xiàn)在,我們開(kāi)始分析異步 IO 上下文的創(chuàng)建過(guò)程,異步 IO 上下文的創(chuàng)建通過(guò)調(diào)用 io_setup 函數(shù)完成,而 io_setup 函數(shù)會(huì)調(diào)用內(nèi)核函數(shù) sys_io_setup,其實(shí)現(xiàn)如下:
asmlinkage long sys_io_setup(unsigned nr_events, aio_context_t *ctxp){struct kioctx *ioctx = NULL;unsigned long ctx;long ret;...ioctx = ioctx_alloc(nr_events); // 調(diào)用 ioctx_alloc 函數(shù)創(chuàng)建異步IO上下文ret = PTR_ERR(ioctx);if (!IS_ERR(ioctx)) {ret = put_user(ioctx->user_id, ctxp); // 把異步IO上下文的標(biāo)識(shí)符返回給調(diào)用者if (!ret)return 0;io_destroy(ioctx);}out:return ret;}
sys_io_setup 函數(shù)的實(shí)現(xiàn)比較簡(jiǎn)單,首先調(diào)用 ioctx_alloc 申請(qǐng)一個(gè)異步 IO 上下文對(duì)象,然后把異步 IO 上下文對(duì)象的標(biāo)識(shí)符返回給調(diào)用者。
所以,sys_io_setup 函數(shù)的核心過(guò)程是調(diào)用 ioctx_alloc 函數(shù),我們繼續(xù)分析 ioctx_alloc 函數(shù)的實(shí)現(xiàn):
static struct kioctx *ioctx_alloc(unsigned nr_events){struct mm_struct *mm;struct kioctx *ctx;...ctx = kmem_cache_alloc(kioctx_cachep, GFP_KERNEL); // 申請(qǐng)一個(gè) kioctx 對(duì)象...INIT_LIST_HEAD(&ctx->active_reqs); // 初始化異步 IO 操作隊(duì)列...if (aio_setup_ring(ctx) < 0) // 初始化環(huán)形緩沖區(qū)goto out_freectx;...return ctx;...}
ioctx_alloc 函數(shù)主要完成以下工作:
調(diào)用
kmem_cache_alloc函數(shù)向內(nèi)核申請(qǐng)一個(gè)異步 IO 上下文對(duì)象。初始化異步 IO 上下文各個(gè)成員變量,如初始化異步 IO 操作隊(duì)列。
調(diào)用
aio_setup_ring函數(shù)初始化環(huán)形緩沖區(qū)。
環(huán)形緩沖區(qū)初始化函數(shù) aio_setup_ring 的實(shí)現(xiàn)有點(diǎn)小復(fù)雜,主要涉及內(nèi)存管理的知識(shí)點(diǎn),所以這里跳過(guò)這部分的分析,有興趣的可以私聊我一起討論。
提交異步 IO 操作
提交異步 IO 操作是通過(guò) io_submit 函數(shù)完成的,io_submit 需要提供一個(gè)類(lèi)型為 iocb 結(jié)構(gòu)的數(shù)組,表示要進(jìn)行的異步 IO 操作相關(guān)的信息,我們先來(lái)看看 iocb 結(jié)構(gòu)的定義:
struct iocb {__u64 aio_data; // 用戶(hù)自定義數(shù)據(jù), 可用于標(biāo)識(shí)IO操作或者設(shè)置回調(diào)函數(shù)...__u16 aio_lio_opcode; // IO操作類(lèi)型, 如讀(IOCB_CMD_PREAD)或者寫(xiě)(IOCB_CMD_PWRITE)操作__s16 aio_reqprio;__u32 aio_fildes; // 進(jìn)行IO操作的文件句柄__u64 aio_buf; // 進(jìn)行IO操作的緩沖區(qū)(如寫(xiě)操作的話(huà)就是寫(xiě)到文件的數(shù)據(jù))__u64 aio_nbytes; // 緩沖區(qū)的大小__s64 aio_offset; // IO操作的文件偏移量...};
io_submit 函數(shù)最終會(huì)調(diào)用內(nèi)核函數(shù) sys_io_submit 來(lái)實(shí)現(xiàn)提供異步 IO 操作,我們來(lái)分析 sys_io_submit 函數(shù)的實(shí)現(xiàn):
asmlinkage longsys_io_submit(aio_context_t ctx_id, long nr,struct iocb __user **iocbpp){struct kioctx *ctx;long ret = 0;int i;...ctx = lookup_ioctx(ctx_id); // 通過(guò)異步IO上下文標(biāo)識(shí)符獲取異步IO上下文對(duì)象...for (i = 0; i < nr; i++) {struct iocb __user *user_iocb;struct iocb tmp;if (unlikely(__get_user(user_iocb, iocbpp+i))) {ret = -EFAULT;break;}// 從用戶(hù)空間復(fù)制異步IO操作到內(nèi)核空間if (unlikely(copy_from_user(&tmp, user_iocb, sizeof(tmp)))) {ret = -EFAULT;break;}// 調(diào)用 io_submit_one 函數(shù)提交異步IO操作ret = io_submit_one(ctx, user_iocb, &tmp);if (ret)break;}put_ioctx(ctx);return i ? i : ret;}
sys_io_submit 函數(shù)的實(shí)現(xiàn)比較簡(jiǎn)單,主要從用戶(hù)空間復(fù)制異步 IO 操作信息到內(nèi)核空間,然后調(diào)用 io_submit_one 函數(shù)提交異步 IO 操作。我們重點(diǎn)分析 io_submit_one 函數(shù)的實(shí)現(xiàn):
int io_submit_one(struct kioctx *ctx,struct iocb __user *user_iocb,struct iocb *iocb){struct kiocb *req;struct file *file;ssize_t ret;char *buf;...file = fget(iocb->aio_fildes); // 通過(guò)文件句柄獲取文件對(duì)象...req = aio_get_req(ctx); // 獲取一個(gè)異步IO操作對(duì)象...req->ki_filp = file; // 要進(jìn)行異步IO的文件對(duì)象req->ki_user_obj = user_iocb; // 指向用戶(hù)空間的iocb對(duì)象req->ki_user_data = iocb->aio_data; // 設(shè)置用戶(hù)自定義數(shù)據(jù)req->ki_pos = iocb->aio_offset; // 設(shè)置異步IO操作的文件偏移量buf = (char *)(unsigned long)iocb->aio_buf; // 要進(jìn)行異步IO操作的數(shù)據(jù)緩沖區(qū)// 根據(jù)不同的異步IO操作類(lèi)型來(lái)進(jìn)行不同的處理switch (iocb->aio_lio_opcode) {case IOCB_CMD_PREAD: // 異步讀操作...ret = -EINVAL;// 發(fā)起異步IO操作, 會(huì)根據(jù)不同的文件系統(tǒng)調(diào)用不同的函數(shù):// 如ext3文件系統(tǒng)會(huì)調(diào)用 generic_file_aio_read 函數(shù)if (file->f_op->aio_read)ret = file->f_op->aio_read(req, buf, iocb->aio_nbytes, req->ki_pos);break;...}...// 異步IO操作或許會(huì)在調(diào)用 aio_read 時(shí)已經(jīng)完成, 或者會(huì)被添加到IO請(qǐng)求隊(duì)列中。// 所以, 如果異步IO操作被提交到IO請(qǐng)求隊(duì)列中, 直接返回if (likely(-EIOCBQUEUED == ret)) return 0;aio_complete(req, ret, 0); // 如果IO操作已經(jīng)完成, 調(diào)用 aio_complete 函數(shù)完成收尾工作return 0;}
上面代碼已經(jīng)對(duì) io_submit_one 函數(shù)進(jìn)行了詳細(xì)的注釋?zhuān)@里總結(jié)一下 io_submit_one 函數(shù)主要完成的工作:
通過(guò)調(diào)用
fget函數(shù)獲取文件句柄對(duì)應(yīng)的文件對(duì)象。調(diào)用
aio_get_req函數(shù)獲取一個(gè)類(lèi)型為kiocb結(jié)構(gòu)的異步 IO 操作對(duì)象,這個(gè)結(jié)構(gòu)前面已經(jīng)分析過(guò)。另外,aio_get_req函數(shù)還會(huì)把異步 IO 操作對(duì)象添加到異步 IO 上下文的active_reqs隊(duì)列中。根據(jù)不同的異步 IO 操作類(lèi)型來(lái)進(jìn)行不同的處理,如
異步讀操作會(huì)調(diào)用文件對(duì)象的aio_read方法來(lái)進(jìn)行處理。不同的文件系統(tǒng),其aio_read方法的實(shí)現(xiàn)不一樣,如 Ext3 文件系統(tǒng)的aio_read方法會(huì)指向generic_file_aio_read函數(shù)。如果異步 IO 操作被添加到內(nèi)核的 IO 請(qǐng)求隊(duì)列中,那么就直接返回。否則就代表 IO 操作已經(jīng)完成,那么就調(diào)用
aio_complete函數(shù)完成收尾工作。
io_submit_one 函數(shù)的操作過(guò)程如 圖3 所示:

所以,io_submit_one 函數(shù)的主要任務(wù)就是向內(nèi)核提交 IO 請(qǐng)求。
異步 IO 操作完成
當(dāng)異步 IO 操作完成后,內(nèi)核會(huì)調(diào)用 aio_complete 函數(shù)來(lái)把處理結(jié)果放進(jìn)異步 IO 上下文的環(huán)形緩沖區(qū) ring_info 中,我們來(lái)分析一下 aio_complete 函數(shù)的實(shí)現(xiàn):
int aio_complete(struct kiocb *iocb, long res, long res2){struct kioctx *ctx = iocb->ki_ctx;struct aio_ring_info *info;struct aio_ring *ring;struct io_event *event;unsigned long flags;unsigned long tail;int ret;...info = &ctx->ring_info; // 環(huán)形緩沖區(qū)對(duì)象spin_lock_irqsave(&ctx->ctx_lock, flags); // 對(duì)異步IO上下文進(jìn)行上鎖ring = kmap_atomic(info->ring_pages[0], KM_IRQ1); // 對(duì)內(nèi)存頁(yè)進(jìn)行虛擬內(nèi)存地址映射tail = info->tail; // 環(huán)形緩沖區(qū)下一個(gè)空閑的位置event = aio_ring_event(info, tail, KM_IRQ0); // 從環(huán)形緩沖區(qū)獲取空閑的位置保存結(jié)果tail = (tail + 1) % info->nr; // 更新下一個(gè)空閑的位置// 保存異步IO結(jié)果到環(huán)形緩沖區(qū)中event->obj = (u64)(unsigned long)iocb->ki_user_obj;event->data = iocb->ki_user_data;event->res = res;event->res2 = res2;...info->tail = tail;ring->tail = tail; // 更新環(huán)形緩沖區(qū)下一個(gè)空閑的位置put_aio_ring_event(event, KM_IRQ0); // 解除虛擬內(nèi)存地址映射kunmap_atomic(ring, KM_IRQ1); // 解除虛擬內(nèi)存地址映射// 釋放異步IO對(duì)象ret = __aio_put_req(ctx, iocb);spin_unlock_irqrestore(&ctx->ctx_lock, flags);...return ret;}
aio_complete 函數(shù)的 iocb 參數(shù)是我們通過(guò)調(diào)用 io_submit_once 函數(shù)提交的異步 IO 對(duì)象,而參數(shù) res 和 res2 是用內(nèi)核進(jìn)行 IO 操作完成后返回的結(jié)果。
aio_complete 函數(shù)的主要工作如下:
根據(jù)環(huán)形緩沖區(qū)的
tail指針獲取一個(gè)空閑的io_event對(duì)象來(lái)保存 IO 操作的結(jié)果。對(duì)環(huán)形緩沖區(qū)的
tail指針進(jìn)行加一操作,指向下一個(gè)空閑的位置。
當(dāng)把異步 IO 操作的結(jié)果保存到環(huán)形緩沖區(qū)后,用戶(hù)層就可以通過(guò)調(diào)用 io_getevents 函數(shù)來(lái)讀取 IO 操作的結(jié)果,io_getevents 函數(shù)最終會(huì)調(diào)用 sys_io_getevents 函數(shù)。
我們來(lái)分析 sys_io_getevents 函數(shù)的實(shí)現(xiàn):
asmlinkage long sys_io_getevents(aio_context_t ctx_id,long min_nr,long nr,struct io_event *events,struct timespec *timeout){struct kioctx *ioctx = lookup_ioctx(ctx_id);long ret = -EINVAL;...if (likely(NULL != ioctx)) {// 調(diào)用 read_events 函數(shù)讀取IO操作的結(jié)果ret = read_events(ioctx, min_nr, nr, events, timeout);put_ioctx(ioctx);}return ret;}
從上面的代碼可以看出,sys_io_getevents 函數(shù)主要調(diào)用 read_events 函數(shù)來(lái)讀取異步 IO 操作的結(jié)果,我們接著分析 read_events 函數(shù):
static int read_events(struct kioctx *ctx,long min_nr, long nr,struct io_event *event,struct timespec *timeout){long start_jiffies = jiffies;struct task_struct *tsk = current;DECLARE_WAITQUEUE(wait, tsk);int ret;int i = 0;struct io_event ent;struct timeout to;memset(&ent, 0, sizeof(ent));ret = 0;while (likely(i < nr)) {ret = aio_read_evt(ctx, &ent); // 從環(huán)形緩沖區(qū)中讀取一個(gè)IO處理結(jié)果if (unlikely(ret <= 0)) // 如果環(huán)形緩沖區(qū)沒(méi)有IO處理結(jié)果, 退出循環(huán)break;ret = -EFAULT;// 把IO處理結(jié)果復(fù)制到用戶(hù)空間if (unlikely(copy_to_user(event, &ent, sizeof(ent)))) {break;}ret = 0;event++;i++;}if (min_nr <= i)return i;if (ret)return ret;...}
read_events 函數(shù)主要還是調(diào)用 aio_read_evt 函數(shù)來(lái)從環(huán)形緩沖區(qū)中讀取異步 IO 操作的結(jié)果,如果讀取成功,就把結(jié)果復(fù)制到用戶(hù)空間中。
aio_read_evt 函數(shù)是從環(huán)形緩沖區(qū)中讀取異步 IO 操作的結(jié)果,其實(shí)現(xiàn)如下:
static int aio_read_evt(struct kioctx *ioctx, struct io_event *ent){struct aio_ring_info *info = &ioctx->ring_info;struct aio_ring *ring;unsigned long head;int ret = 0;ring = kmap_atomic(info->ring_pages[0], KM_USER0);// 如果環(huán)形緩沖區(qū)的head指針與tail指針相等, 代表環(huán)形緩沖區(qū)為空, 所以直接返回if (ring->head == ring->tail)goto out;spin_lock(&info->ring_lock);head = ring->head % info->nr;if (head != ring->tail) {// 根據(jù)環(huán)形緩沖區(qū)的head指針從環(huán)形緩沖區(qū)中讀取結(jié)果struct io_event *evp = aio_ring_event(info, head, KM_USER1);*ent = *evp; // 將結(jié)果保存到ent參數(shù)中head = (head + 1) % info->nr; // 移動(dòng)環(huán)形緩沖區(qū)的head指針到下一個(gè)位置ring->head = head; // 保存環(huán)形緩沖區(qū)的head指針ret = 1;put_aio_ring_event(evp, KM_USER1);}spin_unlock(&info->ring_lock);out:kunmap_atomic(ring, KM_USER0);return ret;}
aio_read_evt 函數(shù)的主要工作就是判斷環(huán)形緩沖區(qū)是否為空,如果不為空就從環(huán)形緩沖區(qū)中讀取異步 IO 操作的結(jié)果,并且保存到參數(shù) ent 中,并且移動(dòng)環(huán)形緩沖區(qū)的 head 指針到下一個(gè)位置。
總結(jié)
本文主要分析了 Linux 原生 AIO 的實(shí)現(xiàn),但為了不陷入太多的實(shí)現(xiàn)細(xì)節(jié)中,本文并沒(méi)有涉及到磁盤(pán) IO 相關(guān)的知識(shí)點(diǎn)。然而磁盤(pán) IO 也是 AIO 實(shí)現(xiàn)中不可或缺的一部分,所以有興趣的朋友可以繼續(xù)通過(guò)閱讀 Linux 的源碼來(lái)分析其實(shí)現(xiàn)原理。
