工控智汇

工控智汇

新一代异步IO框架 io_uring

admin 13 36
1LinuxIO模型分类


相比于kernelbypass模式需要结合具体的硬件支撑来讲,nativeIO是日常工作中接触到比较多的一种,其中同步IO在较长一段时间内被广泛使用,通常我们接触到的IO操作主要分为网络IO和存储IO。在大流量高并发的今天,提到网络IO,很容易想到大名鼎鼎的epoll以及reactor架构。但是epoll并不属于异步IO的范畴。本质上是一个同步非阻塞的架构。关于同步异步,阻塞与非阻塞的概念区别这里做简要概述:

什么是同步

指进程调用接口时需要等待接口处理完数据并相应进程才能继续执行。这里重点是数据处理活逻辑执行完成并返回,如果是异步则不必等待数据完成,亦可以继续执行。同步强调的是逻辑上的次序性;

什么是阻塞

当进程调用一个阻塞的系统函数时,该进程被置于睡眠(Sleep)状态,这时内核调度其它进程运行,直到该进程等待的事件发生了(比如网络上接收到数据包,或者调用sleep指定的睡眠时间到了)它才有可能继续运行。与睡眠状态相对的是运行(Running)状态,在Linux内核中,处于运行状态的进程分为两种情况,一种是进程正在被CPU调度,另一种是处于就绪状态随时可能被调度的进程;阻塞强调的是函数调用下进程的状态。

2Linux常见文件操作方式

2.1open/close/read/write

基本操作API如下:

includesys///返回值:成功返回新分配的文件描述符,出错返回-1并设置errnointopen(constchar*pathname,intflags);intopen(constchar*pathname,intflags,mode_tmode);//返回值:成功返回0,出错返回-1并设置errnointclose(intfd);//返回值:成功返回读取的字节数,出错返回-1并设置errno,如果在调read之前已到达文件末尾,则这次read返回0ssize_tread(intfd,void*buf,size_tcount);//返回值:成功返回写入的字节数,出错返回-1并设置errnossize_twrite(intfd,constvoid*buf,size_tcount);

在打开文件时可以指定为,只读,只写,读写等权限,以及阻塞或者非阻塞操作等;具体通过open函数的flags参数指定。这里以打开一个读写文件为例,同时定义了写文件的方式为追加写,以及使用直接IO模式操作文件,具体什么是直接IO下文会细述。open("/path/to/file",O_RDWR|O_APPEND|O_DIRECT);flags可选参数如下:

Flag参数

含义

O_CREATE

创建文件时,如果文件存在则出错返回

O_EXCL

如果同时指定了O_CREAT,并且文件已存在,则出错返回。

O_TRUC

把文件截断成0

O_RDONLY

只读

O_WRONLY

只写

O_RDWR

读写

O_APPEND

追加

O_NONBLOCK

非阻塞标记

O_SYNC

每次读写都等待物理IO操作完成

O_DIRECT

提供最直接IO支持

通常读写操作的数据首先从用户缓冲区进入内核缓冲区,然后由内核缓冲区完成与IO设备的同步:


2.2Mmap

//成功执行时,mmap()返回被映射区的指针。失败时,mmap()返回MAP_FAILED[其值为(void*)-1],//error被设为以下的某个值://1EACCES:访问出错//2EAGAIN:文件已被锁定,或者太多的内存已被锁定//3EBADF:fd不是有效的文件描述词//4EINVAL:一个或者多个参数无效//5ENFILE:已达到系统对打开文件的限制//6ENODEV:指定文件所在的文件系统不支持内存映射//7ENOMEM:内存不足,或者进程已超出最大内存映射数量//8EPERM:权能不足,操作不允许//9ETXTBSY:已写的方式打开文件,同时指定MAP_DENYWRITE标志//10SIGSEGV:试着向只读区写入//11SIGBUS:试着访问不属于进程的内存区void*mmap(void*start,size_tlength,intprot,intflags,intfd,off_toffset);//成功执行时,munmap()返回0。失败时,munmap返回-1,error返回标志和mmap一致;//该调用在进程地址空间中解除一个映射关系,addr是调用mmap()时返回的地址,len是映射区的大小;intmunmap(void*addr,size_tlen)//进程在映射空间的对共享内容的改变并不直接写回到磁盘文件中,往往在调用munmap()后才执行该操作。//如果期望内存的数据变化能够立刻反应到磁盘上,可以通过调用msync()实现。intmsync(void*addr,size_tlen,intflags)

Mmap是一种内存映射方法,通过将文件映射到内存的某个地址空间上,在对该地址空间的读写操作时,会触发相应的缺页异常以及脏页回写操作,从而实现文件数据的读写操作;


c++八股文重点,网络的posixapi实现原理

8个方面讲解io_uring,重塑对异步io的理解

epoll实战揭秘-支撑亿级IO的底层基石

需要C/C++Linux服务器架构师学习资料加qun579733396获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享


2.3直接IO

直接IO的方式比较简单,直接上文提及的open函数入参中指定O_DIRECT即可,相比普通IO操作,略过了内核的缓冲区直接操作下一层的文件文件。该操作比较底层,相比普通的文件读写少了一次数据复制,一般需要结合用户态缓存来使用;下图所示为DIO透过buffer层直接操作磁盘文件系统:


2.4sFile

严格来讲,sfile并不提供完整的读写能力,仅用于加速读取数据到网络的能力,由于数据不经过用户空间,因此无法对数据进行二次处理,也就是说从磁盘中读出来原封不动的发给网卡,下图展示了sFile的工作流程,

数据首先以DMA的方式从磁盘上读取到内核的文件缓冲区,

然后再从文件缓冲区读取到了socket的缓冲区,该过程由CPU负责完成。

接着网卡再以DMA的方式从socket缓冲区拷贝到自己网卡缓冲区,然后进行发送


Linux内核2.4版本以后对sFile进行了进一步优化,提供了带有scatter/gather的sfile操作,将仅有一次的CPU参与copy环节去掉,该操作需要网卡硬件的支持。其原理就是在内核空间ReadBuffer和SocketBuffer不做数据复制,而是将ReadBuffer的内存地址、偏移量记录到相应的SocketBuffer中。其本质和虚拟内存的解决方法思路一致,就是内存地址的记录。

2.5splice

splice调用和sfile很相似,应用程序必须拥有两个已经打开的文件描述符,一个表示输入设备,一个表示输出设备。splice允许任意两个文件互相连接,而并不只是文件与socket进行数据传输。对于从一个文件描述符发送数据到socket这种特例来说,简化为使用sfile系统调用,splice适用范围更广且不需要硬件支持,sfile是splice的一个子集。


用户进程调用pipe()陷入内核态;创建匿名单向管道pipe()返回,从内核态切换回用户态;

用户进程调用splice()从用户态陷入内核态,DMA控制器将数据从硬盘拷贝到内核缓冲区,从管道的写入端"拷贝"进管道,splice()返回,从内核态切换为用户态;

用户进程再次调用splice(),从用户态陷入内核态,内核把数据从管道的读取端拷贝到socket缓冲区,DMA控制器将数据从socket缓冲区拷贝到网卡,splice()返回,上下文从内核态切换回用户态。

3IO_URING是什么

io_uring是Linux提供的一个异步非阻塞I/O接口,他既能支持磁盘IO也能支持网络IO,只是存储IO支持的比较早较为成熟。IO_URING的使用需要较高的linux内核版本,一般建议5.12版本以后。下面会分别从存储和网络两个角度来介绍IO_URING。

3.1IO_URING架构


应用程序提交的IO请求会直接进入submissionqueue队列的尾部,内核进程会不断的从SQ队列的头部消费请求

内核处理完的SQ后会更新CQtail部分,应用程序读取到CQ的head时,会更新CQ的head

SQ中的任务称之为SQE(entry),CQ中的任务称之为CQE

3.2系统调用API
//创建一个SQ和一个CQ,queuesize至少entries个元素//返回一个文件描述符,随后用于在这个io_uring实例上执行操作。//参数p有两个作用://1.作为入参:应用用来配置io_uring的一些行为//2.作为出参:内核返回的SQ/CQ地址信息等也通过它带回来。intio_uring_setup(u32entries,structio_uring_params*p);//注册用于异步I/O的文件或用户缓冲区(filesoruserbuffers):intio_uring_register(unsignedintfd,unsignedintopcode,void*arg,unsignedintnr_args);//用于初始化和完成I/O,使用共享的SQ和CQ。单次调用同时提交新的I/O请求和等待I/O完成操作//fd是io_uring_setup()返回的文件描述符;//to_submit指定了SQ中提交的I/O数量;//默认模式下如果指定了min_complete,会等待这个数量的I/O事件完成再返回;//轮询模式(2种)://0:要求内核返回当前以及完成的所有events,无阻塞;//非0:如果有事件完成,内核仍然立即返回;如果没有完成事件,内核会poll,等待指定的次数完成,或者这个进程的时间片用完。intio_uring_enter(unsignedintfd,unsignedintto_submit,unsignedintmin_complete,unsignedintflags,sigset_t*sig);

3.3三种工作模式

3.3.1中断驱动模式:

默认模式。可通过io_uring_enter()提交I/O请求,然后直接检查CQ状态判断是否完成。也可以通过min_complete来睡在enter方法上,等待完成事件到达;

Mmap机制减少了内存复制

内核轮询模式下,没有用户态和内核态的切换降低了损耗

基于SQ和CQ机制下的数据竞争消除,即没有并发竞争损耗

3.4liburing

io_uring的核心系统调用只有三个,但使用起来较为复杂,开发者在io_uring之上封装了新的liburing库,简化使用。

//io_uring结构体中包含需要使用到的SQ和CQ,以及需要关联的文件FD,和相关的配置参数falgs;structio_uring{structio_uring_sqsq;structio_uring_cqcq;unsignedflags;intring_fd;};structio_uring_sq{unsigned*khead;unsigned*ktail;unsigned*kring_mask;unsigned*kring_entries;unsigned*kflags;unsigned*kdropped;unsigned*array;structio_uring_sqe*sqes;unsignedsqe_head;unsignedsqe_tail;size_tring_sz;void*ring_ptr;};structio_uring_cq{unsigned*khead;unsigned*ktail;unsigned*kring_mask;unsigned*kring_entries;unsigned*koverflow;structio_uring_cqe*cqes;size_tring_sz;void*ring_ptr;};//用户初始化io_uring。该方法中包含了内存空间的初始化以及mmap调用,entries:队列深度intio_uring_queue_init(unsignedentries,structio_uring*ring,unsignedflags);//为了提交IO请求,需要获取里面queue的一个空闲项structio_uring_sqe*io_uring_get_sqe(structio_uring*ring);//非系统调用,准备阶段,和libaio封装的io_prep_writev一样voidio_uring_prep_writev(structio_uring_sqe*sqe,intfd,conststructiovec*iovecs,unsignednr_vecs,off_toffset)//非系统调用,准备阶段,和libaio封装的io_prep_readv一样voidio_uring_prep_readv(structio_uring_sqe*sqe,intfd,conststructiovec*iovecs,unsignednr_vecs,off_toffset)//提交sq的entry,不会阻塞等到其完成,内核在其完成后会自动将sqe的偏移信息加入到cq,在提交时需要加锁intio_uring_submit(structio_uring*ring);//提交sq的entry,阻塞等到其完成,在提交时需要加锁。intio_uring_submit_and_wait(structio_uring*ring,unsignedwait_nr);//非系统调用遍历时,可以获取cqe的datavoid*io_uring_cqe_get_data(conststructio_uring_cqe*cqe)//清理io_uringvoidio_uring_queue_exit(structio_uring*ring);

liburinggithub地址:

3.5使用方式

3.5.1读取文件

调用io_uring_queue_init初始化

获取一个空SQE用于提交任务

io_uring_prep_readv方法填充SQE任务内容

io_uring_submit提交SQE

io_uring_wait_cqe获取已完成的CQE

io_uring_cqe_seen更新CQ队列的head,避免CQE被重复处理

io_uring_queue_exit退出io_uring

下面是liburinggithub上的example代码适当精简后的代码

/""//""defineBACKLOG512defineBUFFERS_COUNTMAX_CONNECTIONSvoidadd_accept(structio_uring*ring,intfd,structsockaddr*client_addr,socklen_t*client_len,unsignedflags);voidadd_socket_read(structio_uring*ring,intfd,unsignedgid,size_tsize,unsignedflags);voidadd_socket_write(structio_uring*ring,intfd,__u16bid,size_tsize,unsignedflags);voidadd_provide_buf(structio_uring*ring,__u16bid,unsignedgid);enum{ACCEPT,READ,WRITE,PROV_BUF,};typedefstructconn_info{__u32fd;__u16type;__u16bid;}conn_info;charbufs[BUFFERS_COUNT][MAX_MESSAGE_LEN]={0};intgroup_id=1337;intmain(intargc,char*argv[]){if(argc2){printf("Pleasegiveaportnumber:./io_uring_echo_server[port]\n");exit(0);}//somevariablesweneedintportno=strtol(argv[1],NULL,10);structsockaddr_inserv_addr,client_addr;socklen_tclient_len=sizeof(client_addr);//setupsocketintsock_listen_fd=socket(AF_INET,SOCK_STREAM,0);constintval=1;setsockopt(sock_listen_fd,SOL_SOCKET,SO_REUSEADDR,val,sizeof(val));memset(serv_addr,0,sizeof(serv_addr));serv__family=AF_INET;serv__port=htons(portno);serv___addr=INADDR_ANY;//bindandlistenif(bind(sock_listen_fd,(structsockaddr*)serv_addr,sizeof(serv_addr))0){perror("Errorbindingsocket\n");exit(1);}if(listen(sock_listen_fd,BACKLOG)0){perror("Errorlisteningonsocket\n");exit(1);}printf("io_uringechoserverlisteningforconnectionsonport:%d\n",portno);//initializeio_uringstructio_uring_paramsparams;structio_uringring;memset(¶ms,0,sizeof(params));if(io_uring_queue_init_params(2048,ring,¶ms)0){perror("io_uring_init_failed\n");exit(1);}//checkifIORING_FEAT_FAST_POLLissupportedif(!(_FEAT_FAST_POLL)){printf("IORING_FEAT_FAST_POLLnotavailableinthekernel,quiting\n");exit(0);}//checkifbufferselectionissupportedstructio_uring_probe*probe;probe=io_uring_get_probe_ring(ring);if(!probe||!io_uring_opcode_supported(probe,IORING_OP_PROVIDE_BUFFERS)){printf("Bufferselectnotsupported,skipping\n");exit(0);}free(probe);//registerbuffersforbufferselectionstructio_uring_sqe*sqe;structio_uring_cqe*cqe;sqe=io_uring_get_sqe(ring);io_uring_prep_provide_buffers(sqe,bufs,MAX_MESSAGE_LEN,BUFFERS_COUNT,group_id,0);io_uring_submit(ring);io_uring_wait_cqe(ring,cqe);if(cqe-res0){printf("cqe-res=%d\n",cqe-res);exit(1);}io_uring_cqe_seen(ring,cqe);//addfirstacceptSQEtomonitorfornewincomingconnectionsadd_accept(ring,sock_listen_fd,(structsockaddr*)client_addr,client_len,0);//starteventloopwhile(1){io_uring_submit_and_wait(ring,1);structio_uring_cqe*cqe;unsignedhead;unsignedcount=0;//gothroughallCQEsio_uring_for_each_cqe(ring,head,cqe){++count;structconn_infoconn_i;memcpy(conn_i,cqe-user_data,sizeof(conn_i));inttype=conn_;if(cqe-res==-ENOBUFS){fprintf(stdout,"bufsinautomaticbufferselectionempty,thisshouldnothappen\n");fflush(stdout);exit(1);}elseif(type==PROV_BUF){if(cqe-res0){printf("cqe-res=%d\n",cqe-res);exit(1);}}elseif(type==ACCEPT){intsock_conn_fd=cqe-res;//onlyreadwhenthereisnoerror,=0if(sock_conn_fd=0){add_socket_read(ring,sock_conn_fd,group_id,MAX_MESSAGE_LEN,IOSQE_BUFFER_SELECT);}//newconnectedclient;readdatafromsocketandre-addaccepttomonitorfornewconnectionsadd_accept(ring,sock_listen_fd,(structsockaddr*)client_addr,client_len,0);}elseif(type==READ){intbytes_read=cqe-res;intbid=cqe-flags16;if(cqe-res=0){//readfailed,re-addthebufferadd_provide_buf(ring,bid,group_id);//connectionclosedorerrorclose(conn_);}else{//byteshavebeenreadintobufs,nowaddwritetosocketsqeadd_socket_write(ring,conn_,bid,bytes_read,0);}}elseif(type==WRITE){//writehasbeencompleted,firstre-addthebufferadd_provide_buf(ring,conn_,group_id);//addanewreadfortheexistingconnectionadd_socket_read(ring,conn_,group_id,MAX_MESSAGE_LEN,IOSQE_BUFFER_SELECT);}}io_uring_cq_advance(ring,count);}}voidadd_accept(structio_uring*ring,intfd,structsockaddr*client_addr,socklen_t*client_len,unsignedflags){structio_uring_sqe*sqe=io_uring_get_sqe(ring);io_uring_prep_accept(sqe,fd,client_addr,client_len,0);io_uring_sqe_set_flags(sqe,flags);conn_infoconn_i={.fd=fd,.type=ACCEPT,};memcpy(sqe-user_data,conn_i,sizeof(conn_i));}voidadd_socket_read(structio_uring*ring,intfd,unsignedgid,size_tmessage_size,unsignedflags){structio_uring_sqe*sqe=io_uring_get_sqe(ring);io_uring_prep_recv(sqe,fd,NULL,message_size,0);io_uring_sqe_set_flags(sqe,flags);sqe-buf_group=gid;conn_infoconn_i={.fd=fd,.type=READ,};memcpy(sqe-user_data,conn_i,sizeof(conn_i));}voidadd_socket_write(structio_uring*ring,intfd,__u16bid,size_tmessage_size,unsignedflags){structio_uring_sqe*sqe=io_uring_get_sqe(ring);io_uring_prep_s(sqe,fd,bufs[bid],message_size,0);io_uring_sqe_set_flags(sqe,flags);conn_infoconn_i={.fd=fd,.type=WRITE,.bid=bid,};memcpy(sqe-user_data,conn_i,sizeof(conn_i));}voidadd_provide_buf(structio_uring*ring,__u16bid,unsignedgid){structio_uring_sqe*sqe=io_uring_get_sqe(ring);io_uring_prep_provide_buffers(sqe,bufs[bid],MAX_MESSAGE_LEN,1,gid,bid);conn_infoconn_i={.fd=0,.type=PROV_BUF,};memcpy(sqe-user_data,conn_i,sizeof(conn_i));}
4性能对比

4.1存储IO

SynchronousI/O、Libaio和IO_uring特性对比


io_uring和spdk的特性对比

SPDK全名StoragePerformanceDevelopmentKit,是一种存储性能开发套件。针对于支持nvme协议的SSD设备。是一种高性能的解决方案。


io_uring和spdk的性能对比


非polling模式,io_uring相比libaio提升不是很明显;在polling模式下,io_uring能与spdk接近,甚至在queuedepth较高时性能更好,性能超越libaio。


在queuedepth较低时有约7%的差距,但在queuedepth较高时基本接近。


对比结论:

io_uring在非polling模式下,相比libaio,性能提升不是非常显著。io_uring在polling模式下,性能提升显著,与spdk接近,在队列深度较高时性能更好。

4.2网络IO

Epoll性能对比

与epoll的性能对比差异还是很大的,参考这篇文章的数据:wsl2,内核版本5.10.60.1,发行版为Debian硬件:I5-9400,16gDDR4使用webbench进行简易测试,模拟10500、30500台客户端,持续时间为5s,分别在正常访问和不等待返回两种模式下进行测试,两个客户端均关闭日志记录,epoll开启双ET模式,比较每分钟发送页面数,结果如下:


对比结论:

毋庸置疑,碾压性的结果。

5总结

得益于精妙的设计,io_uring的性能基本超越linux内核以往任何软件层面的IO解决方案,达到了与硬件级解决方案媲美的性能。io_uring需要较高版本的内核支持,目前还没有大面积普及,但可以预料他是linux内核IO未来的核心发展方向。