工控智汇

工控智汇

fastdfs源码阅读:文件传输原理与网络IO模型

admin 79 1
一、fastdfs网络IO模型的结构

fdfs文件服务器主要有3种线程,accept线程、work线程(网络io处理)、dio线程(处理文件)


nio是netio的意思(网络io)

dio是dataio的意思(文件io)


1.accept线程在接受新的连接后,封装成一个任务对象,选择worker线程,写入管道pipe,pipe是能够触发work线程中的epoll。(在接受新连接前,在work线程中就已经把pipe加入了监听)

2.文件io线程,通过从队列中取任务,然后写入到磁盘中。

那么如何通知文件io线程呢,直接往文件处理队列(阻塞队列)中添加即可,在文件io线程中就能取到(锁和条件变量,取消阻塞)。没有直接在文件io线程中检测网络io事件,主要是为了将功能解耦合,让work线程(处理网络io)去做这件事,每个线程只做自己的事,使得逻辑清楚。

3.dio线程不会直接给nio线程设置各种读写事件,⽽是通过

FDFS_STORAGE_STAGE_NIO_INIT、FDFS_STORAGE_STAGE_NIO_RECV、

FDFS_STORAGE_STAGE_NIO_SEND、FDFS_STORAGE_STAGE_NIO_CLOSE、

FDFS_STORAGE_STAGE_DIO_THREAD等状态+通过pipe通知nio线程响应storage_recv_notify_read

进⾏io事件的处理。

只要文件io线程读取完了文件处理队列中所有的数据,那么就会请求fd加入可写事件(通过pipe向work线程发送信号,触发epoll去做这件事),让网络io去读取新的事件。


不同线程之间通过以下方式进行通信

1.队列(阻塞队列,锁+条件变量)

2.管道(通过pipe创建)

二、服务端的一些逻辑

新连接:

首先通过accept线程去接受新的连接incomingsock,让后去获取它的信息,并从对象池取出一个pTask,将新的连接fd以及它的信息封装成pTask。通过管道将pTask发送出去,在work线程epoll中就已经监听了该管道的读fd,accept线程发送的pipe,在work线程中epoll能检测到pipe事件,并读取信息。然后将该客户端fd读事件加入到epoll中。

上传:

work线程的epoll检测到客户端发送的信息,通过解析协议,获取它的数据,如果其中的CMD是上传的标志,那么就会执行client_sock_read–storage_deal_task

–storage_upload_file–storage_write_to_file

然后会将协议解析的数据和上传回调函数(保存到服务器磁盘)dio_write_file,上传完成的回调函数storage_upload_file_done_callback等信息都封装到pTask,然后将它加入队列中。

然后dio线程中队列就能检测到新的任务,就会执行deal_func也就是dio_write_file,进行写入磁盘。

linux下的epoll实战揭秘——支撑亿级IO的底层基石

手把手带你实现epoll组件,为tcp并发的实现epoll

需要C/C++Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享


三、源码阅读

1、fastdfs/storage/fdfs_

这一部分是进行启动storage服务端

一些初始化工作(如创建socket对象(进行listen),等)

创建work线程(读写网络io)

创建dio线程(dataIO线程,也就是处理文件的,比如将上传的内容写入到磁盘)

创建accept线程(用于接受新的连接)

当然还有写同步的功能,但不在本文介绍。

intmain(intargc,char*argv[]){sock=socketServer(g_bind_addr,g_server_port,result);//socket、bind、listenif(sock0){logCrit("exitabnormally!\n");delete_pid_file(pidFilename);log_destroy();returnresult;}if((result=storage_service_init())!=0)//storage_service初始化,包含work线程初始化(网络IO部分){logCrit("file:"__FILE__",line:%d,"\"storage_service_initfail,programexit!",__LINE__);g_continue_flag=false;returnresult;}if((result=storage_dio_init())!=0)//初始化dio线程(dataIO线程,就也是处理文件的){logCrit("exitabnormally!\n");log_destroy();returnresult;}log_set_cache(true);bTerminateFlag=false;accept_stage=ACCEPT_STAGE_DOING;storage_accept_loop(sock);//初始化accept线程accept_stage=ACCEPT_STAGE_DONE;}

2、storage_accept_loop

创建accept线程(数量由g_accept_threads决定)

线程的执行函数为accept_thread_entrance

voidstorage_accept_loop(intserver_sock){if(g_accept_threads1){pthread_ttid;pthread_attr_tthread_attr;intresult;inti;if((result=init_pthread_attr(thread_attr,g_thread_stack_size))!=0){logWarning("file:"__FILE__",line:%d,"\"init_pthread_attrfail!",__LINE__);}else{for(i=1;ig_accept_threads;i++){if((result=pthread_create(tid,thread_attr,\accept_thread_entrance,\(void*)(long)server_sock))!=0)//创建accept线程{logError("file:"__FILE__",line:%d,"\"createthreadfailed,startupthreads:%d,"\"errno:%d,errorinfo:%s",\__LINE__,i,result,STRERROR(result));break;}}pthread_attr_destroy(thread_attr);}}accept_thread_entrance((void*)(long)server_sock);}

1)accept_thread_entrance

流程:


其中获取任务对象是从对象池中获取,然后将新连接的信息封装到任务对象。

通过轮询的方式指定work线程,然后将任务对象发送(write)给该work线程

源码:

staticvoid*accept_thread_entrance(void*arg){intserver_sock;intincomesock;structsockaddr_ininaddr;socklen_tsockaddr_len;in_addr_tclient_addr;charszClientIp[IP_ADDRESS_SIZE];longtask_addr;structfast_task_info*pTask;StorageClientInfo*pClientInfo;structstorage_nio_thread_data*pThreadData;server_sock=(long)arg;while(g_continue_flag){sockaddr_len=sizeof(inaddr);incomesock=accept(server_sock,(structsockaddr*)inaddr,\sockaddr_len);//acceptif(incomesock0)//error{if(!(errno==EINTR||errno==EAGAIN)){logError("file:"__FILE__",line:%d,"\"acceptfailed,"\"errno:%d,errorinfo:%s",\__LINE__,errno,STRERROR(errno));}continue;}client_addr=getPeerIpaddr(incomesock,\szClientIp,IP_ADDRESS_SIZE);if(g_allow_ip_count=0){if(bsearch(client_addr,g_allow_ip_addrs,\g_allow_ip_count,sizeof(in_addr_t),\cmp_by_ip_addr_t)==NULL){logError("file:"__FILE__",line:%d,"\"ipaddr%sisnotallowedtoaccess",\__LINE__,szClientIp);close(incomesock);continue;}}if(tcpsetnonblockopt(incomesock)!=0){close(incomesock);continue;}pTask=free_queue_pop();//取task对象(从对象池中取)if(pTask==NULL){logError("file:"__FILE__",line:%d,""malloctaskbufffail,youshould""increasetheparameter\"max_connections\""",orcheckyourapplications""forconnectionleaks",__LINE__);close(incomesock);continue;}pClientInfo=(StorageClientInfo*)pTask-arg;//封装客户端信息=incomesock;//socketfdpClientInfo-stage=FDFS_STORAGE_STAGE_NIO_INIT;//初始化client的状态pClientInfo-nio_thread_index=%g_work_threads;//通过轮询的方式,发送给对应的work线程pThreadData=g_nio_thread_data+pClientInfo-nio_thread_index;//g_nio_thread_data是一个全局的信息(指针),加上一个index就可以指向,具体的线程信息strcpy(pTask-client_ip,szClientIp);task_addr=(long)pTask;if(write(pThreadData-thread__fds[1],task_addr,\sizeof(task_addr))!=sizeof(task_addr))//写入管道{close(incomesock);free_queue_push(pTask);//如果写入失败,就把对象重新放入对象池中logError("file:"__FILE__",line:%d,"\"callwritefailed,"\"errno:%d,errorinfo:%s",\__LINE__,errno,STRERROR(errno));}else{intcurrent_connections;current_connections=__sync_add_and_fetch(g_storage__count,1);//连接数量+1(CAS)if(current_connectionsg_storage__count){g_storage__count=current_connections;}++g_stat_change_count;}}returnNULL;}

注意pipe_fds[1]是管道的写端,pipe_fds[0]是读端。因此如果pipe_fds[0]加入epoll的话,往pipe_fds[1]中写入数据,那么epoll就能监听到。

3、storage_service_init

这部分的主要内容是创建work线程,work线程的执行函数为work_thread_entrance

intstorage_service_init(){bytes=sizeof(structstorage_nio_thread_data)*g_work_threads;//work线程默认个数是4,也可以从配置文件中读出来g_nio_thread_data=(structstorage_nio_thread_data*)malloc(bytes);if(g_nio_thread_data==NULL){logError("file:"__FILE__",line:%d,"\"malloc%dbytesfail,errno:%d,errorinfo:%s",\__LINE__,bytes,errno,STRERROR(errno));returnerrno!=0?errno:ENOMEM;}memset(g_nio_thread_data,0,bytes);g_storage_thread_count=0;pData=g_nio_thread_data+g_work_threads;for(pThreadData=g_nio_thread_data;pThreadDatapData;pThreadData++){if(pipe(pThreadData-thread__fds)!=0)//创建管道{result=errno!=0?errno:EPERM;logError("file:"__FILE__",line:%d,"\"callpipefail,"\"errno:%d,errorinfo:%s",\__LINE__,result,STRERROR(result));break;}if((result=pthread_create(tid,thread_attr,\work_thread_entrance,pThreadData))!=0)//创建work线程{logError("file:"__FILE__",line:%d,"\"createthreadfailed,startupthreads:%d,"\"errno:%d,errorinfo:%s",\__LINE__,g_storage_thread_count,\result,STRERROR(result));break;}else{if((result=pthread_mutex_lock(g_storage_thread_lock))!=0){logError("file:"__FILE__",line:%d,"\"callpthread_mutex_lockfail,"\"errno:%d,errorinfo:%s",\__LINE__,result,STRERROR(result));}g_storage_thread_count++;//创建线程成功,因此+1if((result=pthread_mutex_unlock(g_storage_thread_lock))!=0){logError("file:"__FILE__",line:%d,"\"callpthread_mutex_lockfail,"\"errno:%d,errorinfo:%s",\__LINE__,result,STRERROR(result));}}}}

1)work_thread_entrance

ioevent_loop是事件循环所在

staticvoid*work_thread_entrance(void*arg){intresult;structstorage_nio_thread_data*pThreadData;pThreadData=(structstorage_nio_thread_data*)arg;//事件循环所在ioevent_loop(pThreadData-thread_data,storage_recv_notify_read,task_finish_clean_up,g_continue_flag);ioevent_destroy(pThreadData-thread__puller);returnNULL;}

2)ioevent_loop

事件循环所在

比如将pipe_fds[0]管道的读端fd加入epoll管理

进行epoll_wait

如果事件触发,就执行相应的回调函数

intioevent_loop(structnio_thread_data*pThreadData,IOEventCallbackrecv_notify_callback,TaskCleanUpCallbackclean_up_callback,volatilebool*continue_flag){intresult;structioevent_notify_entryev_notify;FastTimerEntryhead;structfast_task_info*task;time_tlast_check_time;intcount;memset(ev_notify,0,sizeof(ev_notify));ev_=FC_NOTIFY_READ_FD(pThreadData);//socketfdev_=recv_notify_callback;//对应的是storage_recv_notify_readev__data=pThreadData;//自己所属的线程if(ioevent_attach(pThreadData-ev_puller,pThreadData-pipe_fds[0],IOEVENT_READ,ev_notify)!=0)//管道添加到epoll管理{result=errno!=0?errno:ENOMEM;logCrit("file:"__FILE__",line:%d,"\"ioevent_attachfail,"\"errno:%d,errorinfo:%s",\__LINE__,result,STRERROR(result));returnresult;}pThreadData-deleted_list=NULL;last_check_time=g_current_time;while(*continue_flag){pThreadData-ev_=ioevent_poll(//实际是调用epoll_waitpThreadData-ev_puller);if(pThreadData-ev_){deal_ioevents(pThreadData-ev_puller);//真正有数据来进入该函数(执行回调函数)}elseif(pThreadData-ev_){result=errno!=0?errno:EINVAL;if(result!=EINTR){logError("file:"__FILE__",line:%d,"\"ioevent_pollfail,"\"errno:%d,errorinfo:%s",\__LINE__,result,STRERROR(result));returnresult;}}}return0;}

3)storage_recv_notify_read


//这里的socket实际是pipevoidstorage_recv_notify_read(intsock,shortevent,void*arg)//数据服务器socket事件回调,比如说在上传文件时,接收了一部分之后,调用storage_nio_notify(pTask){structfast_task_info*pTask;StorageClientInfo*pClientInfo;//注意这个参数是不同的,一个是跟踪服务器参数,一个是数据服务器参数longtask_addr;//读取task的地址int64_tremain_bytes;intbytes;intresult;while(1)//循环读取task任务{if((bytes=read(sock,task_addr,sizeof(task_addr)))0)//读取task任务{if(!(errno==EAGAIN||errno==EWOULDBLOCK)){logError("file:"__FILE__",line:%d,"\"callreadfailed,"\"errno:%d,errorinfo:%s",\__LINE__,errno,STRERROR(errno));}break;//没有task可读}elseif(bytes==0){logError("file:"__FILE__",line:%d,"\"callreadfailed,offile",__LINE__);break;}pTask=(structfast_task_info*)task_addr;//还原任务pClientInfo=(StorageClientInfo*)pTask-arg;if()//quitflag,这个是对应的socketfd{return;}/*//logInfo("=====threadindex:%d,=%d",\pClientInfo-nio_thread_index,);*/if(pClientInfo-stageFDFS_STORAGE_STAGE_DIO_THREAD){pClientInfo-stage=~FDFS_STORAGE_STAGE_DIO_THREAD;}switch(pClientInfo-stage){caseFDFS_STORAGE_STAGE_NIO_INIT://数据服务器服务端socket接收过来的任务的pClientInfo-stage=FDFS_STORAGE_STAGE_NIO_INITresult=storage_nio_init(pTask);//因此在这里在重新绑定读写事件//每连接一个客户端,在这里都会触发这个动作break;caseFDFS_STORAGE_STAGE_NIO_RECV:pTask-offset=0;//在次接受包体时pTask-offset偏移量被重置remain_bytes=pClientInfo-total_length-\pClientInfo-total_offset;//任务的长度=包的总长度-包的总偏移量if(remain_bytespTask-size)//总是试图将余下的自己一次接收收完{pTask-length=pTask-size;//pTask-size是每次最大的数据接收长度}else{pTask-length=remain_bytes;}if(set_recv_event(pTask)==0){client_sock_read(,//通过socketfd读取数据IOEVENT_READ,pTask);//读取数据}result=0;break;caseFDFS_STORAGE_STAGE_NIO_SEND:result=storage_s_add_event(pTask);//数据发送break;caseFDFS_STORAGE_STAGE_NIO_CLOSE:result=EIO;//closethissocketbreak;default:logError("file:"__FILE__",line:%d,"\"invalidstage:%d",__LINE__,\pClientInfo-stage);result=EINVAL;break;}if(result!=0){ioevent_add_to_deleted_list(pTask);//如果出错再将对应的task加入到删除队列进行处理}}}

(1)client_sock_read

从socket读取数据(也可以是pipe)

大部分内容都是读取数据的操作,最后一步才是关键的


recv读取完数据后,分为两种,

1、storage_deal_task(pTask)

这里面可以解析协议中的CMD,后续调用到storage_upload_file进行文件上传的初始化工作

2、data数据处理

读取完的数据交由dio(dataIO线程)进行处理storage_dio_queue_push(pTask),放入队列中,那么dio线程就能读取到,并执行dio_write_file写入磁盘中

staticvoidclient_sock_read(intsock,shortevent,void*arg){while(1){if(pClientInfo-total_length==0)//recvheader//初始时pClientInfo-total_length=0pTask-offset=0{recv_bytes=sizeof(TrackerHeader)-pTask-offset;}else//至少读到了10个字节后sizeof(TrackerHeader){recv_bytes=pTask-length-pTask-offset;//在次接受上传文件的数据包时,因为发生storage_nio_notify(pTask)}/*logInfo("total_length=%"PRId64",recv_bytes=%d,""pTask-length=%d,pTask-offset=%d",pClientInfo-total_length,recv_bytes,pTask-length,pTask-offset);*/bytes=recv(sock,pTask-data+pTask-offset,recv_bytes,0);//根据buffer情况读取数据if(bytes0){if(errno==EAGAIN||errno==EWOULDBLOCK){}elseif(errno==EINTR){continue;}else{logError("file:"__FILE__",line:%d,"\"clientip:%s,recvfailed,"\"errno:%d,errorinfo:%s",\__LINE__,pTask-client_ip,\errno,STRERROR(errno));task_finish_clean_up(pTask);}return;}elseif(bytes==0){logDebug("file:"__FILE__",line:%d,"\"clientip:%s,recvfailed,"\"connectiondisconnected.",\__LINE__,pTask-client_ip);task_finish_clean_up(pTask);return;}if(pClientInfo-total_length==0)//header{//要来解析headerif(pTask-offset+bytessizeof(TrackerHeader))//还没有读够header{pTask-offset+=bytes;return;}pClientInfo-total_length=buff2long(((TrackerHeader*)\//确定包data的总长度:比如下载文件时,接收的包,就只有包的长度,这里不包括headerpTask-data)-pkg_len);if(pClientInfo-total_length0){logError("file:"__FILE__",line:%d,"\"clientip:%s,pkglength:"\"%"PRId64"0",\__LINE__,pTask-client_ip,\pClientInfo-total_length);task_finish_clean_up(pTask);return;}//包的总长度=包头+包体的长度//设想发送的场景:包头+包体+包体+(其中在包头里面含有多个包体的总长度)pClientInfo-total_length+=sizeof(TrackerHeader);//因为默认的接收缓冲只有K,所以会分次发送,计算出来包括header的长度if(pClientInfo-total_lengthpTask-size){pTask-length=pTask-size;//如果包的总长大于包的分配的长度,那么任务长度等于任务分配的长度,读到对应的数据就去触发dio}else{pTask-length=pClientInfo-total_length;//确定任务的长度}}pTask-offset+=bytes;//offset增加if(pTask-offset=pTask-length)//recvcurrentpkgdone//接收到当前包完成{if(pClientInfo-total_offset+pTask-length=\//上次操作接收的总的偏移量+这次接收的数据长度,如果大于包的总长度,那么说明包接收完毕pClientInfo-total_length){/*currentreqrecvdone*/pClientInfo-stage=FDFS_STORAGE_STAGE_NIO_SEND;pTask-req_count++;}if(pClientInfo-total_offset==0){//说明还没有开始处理pClientInfo-total_offset=pTask-length;//数据服务器进行处理storage_deal_task(pTask);//解析header以及我们协议附加的信息}else{pClientInfo-total_offset+=pTask-length;//否则继续写文件/*continuewritetofile*/storage_dio_queue_push(pTask);//比如文件增加}return;}}return;}

(2)client_sock_write

服务端发送数据write给客户端,对于客户端来说,就是下载文件

一个下载的任务,要分成好几个ptask,让网络io线程去发送(work线程)。

如果ptask的数据发送完了(ptask-offset=ptask-length),那么看总任务的是不是都发送完了。如果总任务都发送完成了,那么就切换接受RECV状态。

如果总任务还没发送,那么就加入队列中storage_dio_queue_push(pTask),让dio线程再去读取数据。

staticvoidclient_sock_write(intsock,shortevent,void*arg){intbytes;structfast_task_info*pTask;StorageClientInfo*pClientInfo;pTask=(structfast_task_info*)arg;pClientInfo=(StorageClientInfo*)pTask-arg;if(pTask-canceled){return;}if(eventIOEVENT_TIMEOUT){logError("file:"__FILE__",line:%d,""clientip:%s,stimeout,offset:%d,""remainbytes:%d",__LINE__,pTask-client_ip,pTask-offset,pTask-length-pTask-offset);task_finish_clean_up(pTask);return;}if(eventIOEVENT_ERROR){logDebug("file:"__FILE__",line:%d,""clientip:%s,recverrorevent:%d,""closeconnection",__LINE__,pTask-client_ip,event);task_finish_clean_up(pTask);return;}while(1){fast_timer_modify(pTask-thread_data-timer,,g_current_time+g_fdfs_network_timeout);bytes=s(sock,pTask-data+pTask-offset,\pTask-length-pTask-offset,0);//printf("%08Xsed%dbytes\n",(int)pTask,bytes);if(bytes0){if(errno==EAGAIN||errno==EWOULDBLOCK){set_s_event(pTask);}elseif(errno==EINTR){continue;}else{logError("file:"__FILE__",line:%d,"\"clientip:%s,recvfailed,"\"errno:%d,errorinfo:%s",\__LINE__,pTask-client_ip,\errno,STRERROR(errno));task_finish_clean_up(pTask);}return;}elseif(bytes==0){logWarning("file:"__FILE__",line:%d,"\"sfailed,connectiondisconnected.",\__LINE__);task_finish_clean_up(pTask);return;}pTask-offset+=bytes;if(pTask-offset=pTask-length){if(set_recv_event(pTask)!=0){return;}pClientInfo-total_offset+=pTask-length;if(pClientInfo-total_offset=pClientInfo-total_length){if(pClientInfo-total_length==sizeof(TrackerHeader)((TrackerHeader*)pTask-data)-status==EINVAL){logDebug("file:"__FILE__",line:%d,"\"closeconn:diowritebytes:%d,pTask-length=%d,buff_offset=%d",\write_bytes,pTask-length,pFileContext-buff_offset);*/pFileContext-offset+=write_bytes;//增加写入文件的字数数量if(pFileContext-offsetpFileContext-)//pFileContext-实际是指文件的大小。{pFileContext-buff_offset=0;//为什么设置为0?因为下一次传输的数据全部为文件内容了pFileContext-continue_callback(pTask);//等待下一次的继续触发,比如storage_nio_notify}else//文件已经写入完毕{if(pFileContext-calc_crc32){pFileContext-crc32=CRC32_FINAL(\pFileContext-crc32);}if(pFileContext-calc_file_hash){if(g_file_signature_method==STORAGE_FILE_SIGNATURE_METHOD_HASH){FINISH_HASH_CODES4(pFileContext-file_hash_codes)}else{my_md5_final((unsignedchar*)(pFileContext-\file_hash_codes),pFileContext-md5_context);}}if(pFileContext-extra__close_callback!=NULL){result=pFileContext-extra_\before_close_callback(pTask);}/*filewritedone,closeit*/close(pFileContext-fd);pFileContext-fd=-1;if(pFileContext-done_callback!=NULL){pFileContext-done_callback(pTask,result);//比如storage_upload_file_done_callback}}return0;}while(0);pClientInfo-clean_func(pTask);if(pFileContext-done_callback!=NULL){pFileContext-done_callback(pTask,result);}returnresult;}