一、epoll系统调用
epoll是Linux特有的I/O复用函数。它的实现和使用上与select、poll有很大的差异。注意epoll是使用一组函数来完成任务的,而不是单个函数。其次,epoll把用户关心的文件描述符上的事件放在内核的一个事件表里面,从而无需像select和poll那样每次调用都要重复传入文件描述符或事件集。
二、内核事件表
- 首先,epoll需要使用一个额外的文件描述符,来唯一表示内核中的这个事件表,这个文件描述符使用如下函数epoll_create函数来创建:
#include<sys/epoll.h>
int epoll_create(int size);
- size参数并不起作用,只是给内核一个提示,告诉他事件表需要多大。该函数返回的文件描述符将作为其他epoll系统调用的第一个参数。以指定要访问的内核事件表。
- 通过epoll_ctl来操作内核事件表
#include<sys/epoll.h>
int epoll_ctl(int epfd,int op,int fd,struct epoll_event *event)
- fd参数是要操作的文件描述符
- op参数则指定操作类型,操作类型有一下三种:
EPOLL_CTL_ADD //往事件表上注册fd的事件
EPOLL_CTL_MOD //修改fd上的注册事件
EPOLL_CTL_DEL //删除fd上的注册事件
- event参数指定事件,它是一个epoll_event结构指针类型,epoll_event定义如下:
struct epoll_event
{
_uint32_t events; //epoll事件
epolla_data_t data; //用户数据
}
其中event成员描述事件类型。epoll支持的事件类型和poll基本相同。表示epoll事件类型的宏是在poll对应的宏前面加上”E”,比如epoll的数据可读事件是EPOLLIN。但epoll有两个额外的事件类型—-EPOLLET和EPOLLONESHOT。data成员用于存储用户数据。
typedef union epoll_data
{
void* ptr;
int fd;
uint32_t u32;
unit64_t u64;
} epoll_data_t;
epoll_data_t是一个联合体,四个成员中用的最多的是fd,它指定事件所从属的目标的文件描述符。ptr成员可用来指定与fd相关的用户数据。但由于epoll_data_t是一个联合体,所以不能同时使用其ptr成员和fd成员,因此,如果要将文件描述符和用户数据关联起来,实现快速的数据访问,只能使用其他的手段,比如放弃epoll_data_t的fd成员,而在ptr指向的用户数据中包含fd。
epoll_ctl成功时返回0,失败返回-1并设置error。
三、epoll_wait函数
epoll系列系统调用的主要接口是epoll_wait函数。它在一段超时时间内等待一组文件描述符上的事件,其原型如下:
#include<sys/epoll.h>
int epoll_wait(int epfd,struct epoll_event* events,int maxevents,int timeout);
该函数成功时返回就绪的文件描述符的个数,失败时返回-1并设置error。
- timeout参数的含义与poll接口的timeout参数相同。
- maxevents参数指定最多监听多少个事件,该参数必须大于0;
- epoll_wait函数如果检测到事件,就将所有就绪的事件从内核事件表中复制到它的第二个参数events指向的数组中。这个数组只用于输出epoll_wait检测到的就绪事件,以下代码可以清楚的看到这个差别:
//如何索引poll返回的就绪文件描述符
int ret=poll(fds,MAX_EVENT_NUMBER,-1);
//必须遍历所有已注册文件描述符并找到其中的就绪者
for(int i=0;i<MAX_EVENT_NUMBET;++i)
{
if(fds[i].revents &POLLIN)
{
int sockfd=fds[i].fd;
}
}
//如何索引epoll返回的就绪文件描述符
int ret=epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);
//仅遍历就绪的ret个文件描述符
for(int i=0;i<ret;++i)
{
int sockfd=events[i].data.fd;
//sockfd肯定就绪,直接处理。
}
四、LT和ET模式
epoll对文件描述符的操作有两种模式:LT(水平触发)和 ET(边沿触发)模式。LT是默认的工作模式,这种模式下epoll相当于一个效率较高的poll。当往epoll内核事件表中注册一个文件描述符上的EPOLLET事件时,epoll将以ET模式来操作该文件描述符。
- LT模式:当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下次调用epoll_wait时,epoll_wait还会再次向应用程序通知此事件,直到该事件被处理。
- ET模式:当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait调用将不再向应用程序通知这一事件。
可见,ET模式在很大程度上降低了同一个epoll事件呗重复触发的次数,因此效率要比LT模式高。
下面代码可以清除的体现LT和ET正在工作方式上面的差异:
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<assert.h>
#include<stdio.h>
#include<unistd.h>
#include<errno.h>
#include<string.h>
#include<fcntl.h>
#include<stdlib.h>
#include<sys/epoll.h>
#include<pthread.h>
#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10
//将文件描述符设置为非阻塞的
int setnonblocking(int fd)
{
int old_option=fcntl(fd,F_GETFL);
int new_option=old_option | O_NONBLOCK;
fcntl(fd,F_SETFL,new_option);
return old_option;
}
//将文件描述符fd上的EPOLLIN注册到epollfd指示的epoll内核事件表中,
//参数enable_et指定是否对fd启用ET模式
void addfd(int epollfd,int fd,bool enable_et)
{
epoll_event event;
event.data.fd=fd;
event.events=EPOLLIN;
if(enable_et)
{
event.events|=EPOLLET;
}
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);
setnonblocking(fd);
}
//LT模式的工作流程
void lt(epoll_event* events,int number,int epollfd,int listenfd)
{
char buf[BUFFER_SIZE];
for(int i=0;i<number;i++)
{
int sockfd=events[i].data.fd;
if(sockfd==listenfd)
{
struct sockaddr_in client_address;
socklen_t client_addrlength=sizeof(client_address);
int connfd=accept(listenfd,(struct sockaddr*)&client_address,&client_addrlength);
addfd(epollfd,connfd,false);//对connfd禁用ET模式
}
else if(events[i].events & EPOLLIN)
{
//只要socket读缓存中还有未读出的数据,这段代码就会被触发
printf("event trigger once\n");
memset(buf,'\0',BUFFER_SIZE);
int ret=recv(sockfd,buf,BUFFER_SIZE-1,0);
if(ret<=0)
{
close(sockfd);
continue;
}
printf("get %d bytes of content:%s\n",ret,buf);
}
else
{
printf("something else happened \n");
}
}
}
//ET工作模式
void et(epoll_event* events,int number,int epollfd,int listenfd)
{
char buf[BUFFER_SIZE];
printf("number:%d\n",number);
for(int i=0;i<number;i++)
{
printf("ET begin!\n");
int sockfd=events[i].data.fd;
if(sockfd==listenfd)
{
struct sockaddr_in client_address;
socklen_t client_addrlength=sizeof(client_address);
int connfd=accept(listenfd,(struct sockaddr*)&client_address,&client_addrlength);
addfd(epollfd,connfd,true);//开启ET模式
}
else if (events[i].events & EPOLLIN)
{
/*这段代码不会被重复触发,
所以我们循环读取数据,以确保把socket读缓存中的所有数据读出*/
printf("event trigger once\n");
while(1)
{
memset(buf,'\0',BUFFER_SIZE);
int ret=recv(sockfd,buf,BUFFER_SIZE,0);
if(ret<0)
{
/*对于非阻塞IO,下面的条件成立表示数据已经完全读取完毕。此后,
epoll就能再次触发epoll_fd上的EPOLLIN事件,以驱动下一次读事件
*/
if((errno==EAGAIN)||(errno==EWOULDBLOCK))
{
printf("read later\n");
break;
}
close(sockfd);
break;
}
else if(ret==0)
{
close(sockfd);
}
else
{
printf("get %d bytes of content: %s\n",ret,buf);
}
}
}
else
{
printf("something else happened\n");
}
}
}
int main(int argc,char* argv[])
{
struct sockaddr_in address;
socklen_t server_addr_len;
bzero(&address,sizeof(address));
address.sin_family=AF_INET;
address.sin_addr.s_addr=htons(INADDR_ANY);
address.sin_port=htons(15000);
int ret=0;
int listenfd=socket(PF_INET,SOCK_STREAM,0);
assert(listenfd>=0);
printf("Server socket !\n");
ret=bind(listenfd,(struct sockaddr*)&address,sizeof(address));
assert(ret!=-1);
printf("Server bind !\n");
ret=listen(listenfd,5);
assert(ret!=-1);
printf("Server listen !\n");
epoll_event events[MAX_EVENT_NUMBER];
int epollfd=epoll_create(5);
assert(epollfd!=-1);
addfd(epollfd,listenfd,true);
while(1)
{
int ret=epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);
if(ret<0)
{
printf("epoll failure\n");
break;
}
//lt(events,ret,epollfd,listenfd);
et(events,ret,epollfd,listenfd);
}
close(listenfd);
return 0;
}
- 注意:每个使用ET模式的文件描述符都应该是非阻塞的。二u过文件描述符是阻塞的,那么读或者写操作将会因为没有后续的事件二一直处于阻塞状态。
五、EPOLLET和EPOLLONESHOT事件
即使我们使用ET模式,一个socket上的某个事件还是可能被触发多次。这在并发程序中就会引起一个问题。比如一个线程在读取完某个socket上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有新的数据可读(EPOLLIN再次被触发),此时另外一个线程被唤醒来读取这些新的数据。于是就出现了两个线程同时操作一个socket的局面。这当然不是我们期望的。我们期望的是一个socket连接在任一时刻都只被一个线程处理,这一点可以使用EPOLLONESHOT事件。
对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个可读、可写或者异常事件,且只触发一次,除非我们使用epoll_ctl函数重置该文件描述符上注册的EPOLLONESHOT事件。这样,当一个线程在处理某个socket时,其他线程是不可能有机会操作该socket的。
六、epoll示例:Client向Server发送消息,Server接收消息并原样发送给Client,Client再把消息输出到终端。
#include<netinet/in.h>
#include<sys/types.h> //socket
#include<sys/socket.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<sys/epoll.h>
#include<sys/ioctl.h>
#include<sys/time.h>
#include<iostream>
#include<vector>
#include<string>
#include<cstdio>
#include<cstdlib>
#include<cstring>
using namespace std;
#define BUFFER_SIZE 1024
#define SIZE 100
#define EPOLLSIZE 100
struct PACKET_HEAD
{
int length;
};
class Server
{
private:
struct sockaddr_in server_addr;
socklen_t server_addr_len;
int listen_fd;
int epfd; //epoll fd
struct epoll_event events[EPOLLSIZE];
public:
Server(int port);
~Server();
void Bind();
void Listen(int queue_len=20);
void Accept();
void Run();
void Recv(int fd);
};
Server::Server(int port)
{
bzero(&server_addr,sizeof(server_addr));
server_addr.sin_family=AF_INET;
server_addr.sin_addr.s_addr=htons(INADDR_ANY);
server_addr.sin_port=htons(port);
//create socket to listen
listen_fd=socket(PF_INET,SOCK_STREAM,0);
if(listen_fd<0)
{
cout<<"Create listenSocket Failed!";
exit(1);
}
int opt=1;
// 一般来说,一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用。
setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
}
Server::~Server()
{
close(epfd);
}
void Server::Bind()
{
if(-1==(bind(listen_fd,(struct sockaddr*)&server_addr,sizeof(server_addr))))
{
cout<<"Server Bind Failed!";
exit(1);
}
cout<<"Bind Successfully!\n";
}
void Server::Listen(int queue_len)
{
if(-1==listen(listen_fd,queue_len))
{
cout<<"Server Listen Failed!";
exit(1);
}
cout<<"Listen Successfully!\n";
}
void Server::Accept()
{
struct sockaddr_in client_addr;
socklen_t client_addr_len=sizeof(client_addr);
int new_fd=accept(listen_fd,(struct sockaddr*)&client_addr,&client_addr_len);
if(new_fd<0)
{
cout<<"Server Accept Failed!";
exit(1);
}
cout<<"new connection was accepted!\n";
//在epfd中注册新建立的连接
struct epoll_event event;
event.data.fd=new_fd;
event.events=EPOLLIN;
epoll_ctl(epfd,EPOLL_CTL_ADD,new_fd,&event);
}
void Server::Run()
{
//创建epoll句柄
epfd=epoll_create(SIZE);
struct epoll_event event;
event.data.fd=listen_fd;
event.events=EPOLLIN;
//注册listen_fd
epoll_ctl(epfd,EPOLL_CTL_ADD,listen_fd,&event);
while(1)
{
int nums=epoll_wait(epfd,events,EPOLLSIZE,-1);
if(nums<0)
{
cout<<"epoll_wait Failed!";
exit(1);
}
if(nums==0)
{
continue;
}
for(int i=0;i<nums;++i) //遍历所有就绪事件
{
int fd=events[i].data.fd;
if((fd==listen_fd)&&(events[i].events & EPOLLIN))
Accept();
else if(events[i].events & EPOLLIN)
Recv(fd);
else ;
}
}
}
void Server::Recv(int fd)
{
bool close_conn=false;
PACKET_HEAD head;
recv(fd,&head,sizeof(head),0);
char* buffer=new char[head.length];
bzero(buffer,head.length);
int total=0;
while(total<head.length)
{
int len=recv(fd,buffer+total,head.length-total,0);
if(len<0)
{
cout<<"recv() error!";
close_conn=true;
break;
}
total+=len;
}
if(total==head.length)
{
int ret1=send(fd,&head,sizeof(head),0);
int ret2=send(fd,buffer,head.length,0);
if(ret1<0||ret2<0)
{
cout<<"send() error!";
close_conn=true;
}
else
{
cout<<"send() Successfully!\n";
}
}
delete buffer;
if(close_conn)
{
close(fd);
struct epoll_event event;
event.data.fd=fd;
event.events=EPOLLIN;
epoll_ctl(epfd,EPOLL_CTL_DEL,fd,&event);//删除一个fd
}
}
int main()
{
Server server(15000);
server.Bind();
server.Listen();
server.Run();
return 0;
}