一、epoll系统调用
epoll是Linux特有的I/O复用函数。它的实现和使用上与select、poll有很大的差异。注意epoll是使用一组函数来完成任务的,而不是单个函数。其次,epoll把用户关心的文件描述符上的事件放在内核的一个事件表里面,从而无需像select和poll那样每次调用都要重复传入文件描述符或事件集。

二、内核事件表

  1. 首先,epoll需要使用一个额外的文件描述符,来唯一表示内核中的这个事件表,这个文件描述符使用如下函数epoll_create函数来创建:
#include<sys/epoll.h>
int epoll_create(int size);
  • size参数并不起作用,只是给内核一个提示,告诉他事件表需要多大。该函数返回的文件描述符将作为其他epoll系统调用的第一个参数。以指定要访问的内核事件表。
  1. 通过epoll_ctl来操作内核事件表
#include<sys/epoll.h>
int epoll_ctl(int epfd,int op,int fd,struct epoll_event *event)
  • fd参数是要操作的文件描述符
  • op参数则指定操作类型,操作类型有一下三种:
EPOLL_CTL_ADD  //往事件表上注册fd的事件
EPOLL_CTL_MOD  //修改fd上的注册事件
EPOLL_CTL_DEL  //删除fd上的注册事件
  • event参数指定事件,它是一个epoll_event结构指针类型,epoll_event定义如下:
struct epoll_event
{
   _uint32_t events;   //epoll事件
   epolla_data_t data; //用户数据
}

其中event成员描述事件类型。epoll支持的事件类型和poll基本相同。表示epoll事件类型的宏是在poll对应的宏前面加上”E”,比如epoll的数据可读事件是EPOLLIN。但epoll有两个额外的事件类型—-EPOLLET和EPOLLONESHOT。data成员用于存储用户数据。

typedef union epoll_data
{
   void* ptr;
   int fd;
   uint32_t u32;
   unit64_t u64;
} epoll_data_t;

epoll_data_t是一个联合体,四个成员中用的最多的是fd,它指定事件所从属的目标的文件描述符。ptr成员可用来指定与fd相关的用户数据。但由于epoll_data_t是一个联合体,所以不能同时使用其ptr成员和fd成员,因此,如果要将文件描述符和用户数据关联起来,实现快速的数据访问,只能使用其他的手段,比如放弃epoll_data_t的fd成员,而在ptr指向的用户数据中包含fd。
epoll_ctl成功时返回0,失败返回-1并设置error。

三、epoll_wait函数

epoll系列系统调用的主要接口是epoll_wait函数。它在一段超时时间内等待一组文件描述符上的事件,其原型如下:

#include<sys/epoll.h>
int epoll_wait(int epfd,struct epoll_event* events,int maxevents,int timeout);

该函数成功时返回就绪的文件描述符的个数,失败时返回-1并设置error。

  • timeout参数的含义与poll接口的timeout参数相同。
  • maxevents参数指定最多监听多少个事件,该参数必须大于0;
  • epoll_wait函数如果检测到事件,就将所有就绪的事件从内核事件表中复制到它的第二个参数events指向的数组中。这个数组只用于输出epoll_wait检测到的就绪事件,以下代码可以清楚的看到这个差别:
//如何索引poll返回的就绪文件描述符
int ret=poll(fds,MAX_EVENT_NUMBER,-1);
//必须遍历所有已注册文件描述符并找到其中的就绪者
for(int i=0;i<MAX_EVENT_NUMBET;++i)
{
     if(fds[i].revents &POLLIN)
     {
         int sockfd=fds[i].fd;
     }
}

//如何索引epoll返回的就绪文件描述符
int ret=epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);
//仅遍历就绪的ret个文件描述符
for(int i=0;i<ret;++i)
{
     int sockfd=events[i].data.fd;
     //sockfd肯定就绪,直接处理。
}

四、LT和ET模式
epoll对文件描述符的操作有两种模式:LT(水平触发)和 ET(边沿触发)模式。LT是默认的工作模式,这种模式下epoll相当于一个效率较高的poll。当往epoll内核事件表中注册一个文件描述符上的EPOLLET事件时,epoll将以ET模式来操作该文件描述符。

  • LT模式:当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下次调用epoll_wait时,epoll_wait还会再次向应用程序通知此事件,直到该事件被处理。
  • ET模式:当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait调用将不再向应用程序通知这一事件。

可见,ET模式在很大程度上降低了同一个epoll事件呗重复触发的次数,因此效率要比LT模式高。
下面代码可以清除的体现LT和ET正在工作方式上面的差异:

#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<assert.h>
#include<stdio.h>
#include<unistd.h>
#include<errno.h>
#include<string.h>
#include<fcntl.h>
#include<stdlib.h>
#include<sys/epoll.h>
#include<pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10
//将文件描述符设置为非阻塞的
int setnonblocking(int fd)
{
    int old_option=fcntl(fd,F_GETFL);
    int new_option=old_option | O_NONBLOCK;
    fcntl(fd,F_SETFL,new_option);
    return old_option;
}
//将文件描述符fd上的EPOLLIN注册到epollfd指示的epoll内核事件表中,
//参数enable_et指定是否对fd启用ET模式
void addfd(int epollfd,int fd,bool enable_et)
{
    epoll_event event;
    event.data.fd=fd;
    event.events=EPOLLIN;
    if(enable_et)
    {
        event.events|=EPOLLET;
    }
    epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);
    setnonblocking(fd);
}

//LT模式的工作流程
void lt(epoll_event* events,int number,int epollfd,int listenfd)
{
    char buf[BUFFER_SIZE];
    for(int i=0;i<number;i++)
    {
        int sockfd=events[i].data.fd;
        if(sockfd==listenfd)
        {
            struct sockaddr_in client_address;
            socklen_t client_addrlength=sizeof(client_address);
            int connfd=accept(listenfd,(struct sockaddr*)&client_address,&client_addrlength);
            addfd(epollfd,connfd,false);//对connfd禁用ET模式
        }
        else if(events[i].events & EPOLLIN)
        {
            //只要socket读缓存中还有未读出的数据,这段代码就会被触发
            printf("event trigger once\n");
            memset(buf,'\0',BUFFER_SIZE);
            int ret=recv(sockfd,buf,BUFFER_SIZE-1,0);
            if(ret<=0)
            {
                close(sockfd);
                continue;
            }
            printf("get %d bytes of content:%s\n",ret,buf);
        }
        else
        {
            printf("something else happened \n");
        }
    }
}

//ET工作模式
void et(epoll_event* events,int number,int epollfd,int listenfd)
{
    char buf[BUFFER_SIZE];
    printf("number:%d\n",number);
    for(int i=0;i<number;i++)
    {
        printf("ET begin!\n");
        int sockfd=events[i].data.fd;
        if(sockfd==listenfd)
        {
            struct sockaddr_in client_address;
            socklen_t client_addrlength=sizeof(client_address);
            int connfd=accept(listenfd,(struct sockaddr*)&client_address,&client_addrlength);
            addfd(epollfd,connfd,true);//开启ET模式
        }
        else if (events[i].events & EPOLLIN)
        {
            /*这段代码不会被重复触发,
            所以我们循环读取数据,以确保把socket读缓存中的所有数据读出*/
            printf("event trigger once\n");
            while(1)
            {
                memset(buf,'\0',BUFFER_SIZE);
                int ret=recv(sockfd,buf,BUFFER_SIZE,0);
                if(ret<0)
                {
                    /*对于非阻塞IO,下面的条件成立表示数据已经完全读取完毕。此后,
                    epoll就能再次触发epoll_fd上的EPOLLIN事件,以驱动下一次读事件
                    */
                   if((errno==EAGAIN)||(errno==EWOULDBLOCK))
                   {
                       printf("read later\n");
                       break;
                   }
                   close(sockfd);
                   break;
                }
                else if(ret==0)
                {
                    close(sockfd);
                }
                else
                {
                    printf("get %d bytes of content: %s\n",ret,buf);
                }
            }
        }
        else
        {
            printf("something else happened\n");
        }
    }
}

int main(int argc,char* argv[])
{

    struct sockaddr_in address;
    socklen_t server_addr_len;
    bzero(&address,sizeof(address));
    address.sin_family=AF_INET;
    address.sin_addr.s_addr=htons(INADDR_ANY);
    address.sin_port=htons(15000);

    int ret=0;

    int listenfd=socket(PF_INET,SOCK_STREAM,0);
    assert(listenfd>=0);
    printf("Server  socket !\n");

    ret=bind(listenfd,(struct sockaddr*)&address,sizeof(address));
    assert(ret!=-1);
    printf("Server  bind !\n");

    ret=listen(listenfd,5);
    assert(ret!=-1);
    printf("Server  listen !\n");

    epoll_event events[MAX_EVENT_NUMBER];
    int epollfd=epoll_create(5);
    assert(epollfd!=-1);
    addfd(epollfd,listenfd,true);

    while(1)
    {
        int ret=epoll_wait(epollfd,events,MAX_EVENT_NUMBER,-1);
        if(ret<0)
        {
            printf("epoll failure\n");
            break;
        }
        //lt(events,ret,epollfd,listenfd);
        et(events,ret,epollfd,listenfd);
    }
    close(listenfd);
    return 0;
}
  • 注意:每个使用ET模式的文件描述符都应该是非阻塞的。二u过文件描述符是阻塞的,那么读或者写操作将会因为没有后续的事件二一直处于阻塞状态。

五、EPOLLET和EPOLLONESHOT事件
即使我们使用ET模式,一个socket上的某个事件还是可能被触发多次。这在并发程序中就会引起一个问题。比如一个线程在读取完某个socket上的数据后开始处理这些数据,而在数据的处理过程中该socket上又有新的数据可读(EPOLLIN再次被触发),此时另外一个线程被唤醒来读取这些新的数据。于是就出现了两个线程同时操作一个socket的局面。这当然不是我们期望的。我们期望的是一个socket连接在任一时刻都只被一个线程处理,这一点可以使用EPOLLONESHOT事件。
对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个可读、可写或者异常事件,且只触发一次,除非我们使用epoll_ctl函数重置该文件描述符上注册的EPOLLONESHOT事件。这样,当一个线程在处理某个socket时,其他线程是不可能有机会操作该socket的。

六、epoll示例:Client向Server发送消息,Server接收消息并原样发送给Client,Client再把消息输出到终端。

#include<netinet/in.h>
#include<sys/types.h>  //socket
#include<sys/socket.h>
#include<arpa/inet.h>
#include<unistd.h>
#include<sys/epoll.h>
#include<sys/ioctl.h>
#include<sys/time.h>
#include<iostream>
#include<vector>
#include<string>
#include<cstdio>
#include<cstdlib>
#include<cstring>
using namespace std;

#define BUFFER_SIZE 1024
#define SIZE 100
#define EPOLLSIZE 100

struct PACKET_HEAD
{
    int length;
};

class Server
{
private:
    struct sockaddr_in server_addr;
    socklen_t server_addr_len;
    int listen_fd;
    int epfd;  //epoll fd
    struct epoll_event events[EPOLLSIZE];
public:
    Server(int port);
    ~Server();
    void Bind();
    void Listen(int queue_len=20);
    void Accept();
    void Run();
    void Recv(int fd);
};

Server::Server(int port)
{
    bzero(&server_addr,sizeof(server_addr));
    server_addr.sin_family=AF_INET;
    server_addr.sin_addr.s_addr=htons(INADDR_ANY);
    server_addr.sin_port=htons(port);

    //create socket to listen
    listen_fd=socket(PF_INET,SOCK_STREAM,0);
    if(listen_fd<0)
    {
        cout<<"Create listenSocket Failed!";
        exit(1);
    }
    
    int opt=1;
    // 一般来说,一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用。
    setsockopt(listen_fd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));

}

Server::~Server()
{
    close(epfd);
}

void Server::Bind()
{
    if(-1==(bind(listen_fd,(struct sockaddr*)&server_addr,sizeof(server_addr))))
    {
        cout<<"Server Bind Failed!";
        exit(1);
    }
    cout<<"Bind Successfully!\n";
}

void Server::Listen(int queue_len)
{
    if(-1==listen(listen_fd,queue_len))
    {
        cout<<"Server Listen Failed!";
        exit(1);
    }
    cout<<"Listen Successfully!\n";
}

void Server::Accept()
{
    struct sockaddr_in client_addr;
    socklen_t client_addr_len=sizeof(client_addr);

    int new_fd=accept(listen_fd,(struct sockaddr*)&client_addr,&client_addr_len);
    if(new_fd<0)
    {
        cout<<"Server Accept Failed!";
        exit(1);
    }
    cout<<"new connection was accepted!\n";

    //在epfd中注册新建立的连接
    struct epoll_event event;
    event.data.fd=new_fd;
    event.events=EPOLLIN;

    epoll_ctl(epfd,EPOLL_CTL_ADD,new_fd,&event);
}

void Server::Run()
{
    //创建epoll句柄
    epfd=epoll_create(SIZE);

    struct epoll_event event;
    event.data.fd=listen_fd;
    event.events=EPOLLIN;
    //注册listen_fd
    epoll_ctl(epfd,EPOLL_CTL_ADD,listen_fd,&event);

    while(1)
    {
        int nums=epoll_wait(epfd,events,EPOLLSIZE,-1);
        if(nums<0)
        {
            cout<<"epoll_wait Failed!";
            exit(1);
        }

        if(nums==0)
        {
            continue;
        }

        for(int i=0;i<nums;++i)  //遍历所有就绪事件
        {
            int fd=events[i].data.fd;
            if((fd==listen_fd)&&(events[i].events & EPOLLIN))
                Accept();
            else if(events[i].events & EPOLLIN)
                Recv(fd);
            else ;
        }
    }

}

void Server::Recv(int fd)
{
    bool close_conn=false;

    PACKET_HEAD head;
    recv(fd,&head,sizeof(head),0);

    char* buffer=new char[head.length];
    bzero(buffer,head.length);
    int total=0;
    while(total<head.length)
    {
        int len=recv(fd,buffer+total,head.length-total,0);
        if(len<0)
        {
            cout<<"recv() error!";
            close_conn=true;
            break;
        }
        total+=len;
    }

    if(total==head.length)
    {
        int ret1=send(fd,&head,sizeof(head),0);
        int ret2=send(fd,buffer,head.length,0);
        if(ret1<0||ret2<0)
        {
            cout<<"send() error!";
            close_conn=true;
        }
        else
        {
            cout<<"send() Successfully!\n";
        }
    }

    delete buffer;
    if(close_conn)
    {
        close(fd);
        struct epoll_event event;
        event.data.fd=fd;
        event.events=EPOLLIN;
        epoll_ctl(epfd,EPOLL_CTL_DEL,fd,&event);//删除一个fd
    }
}

int main()
{
    Server server(15000);
    server.Bind();
    server.Listen();
    server.Run();
    return 0;
}

版权声明:本文为weixin_46566893原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/weixin_46566893/article/details/121546472