Redis源码 - 事件处理 part2

接PART1部分

redis文件事件来源是网络I/O,即clients请求

本文将结合网络socket、 EPoll来聊聊Redis是接收数据的过程

part2可以说是part1的前传

万事开头难,先从socket谈起

Socket 是什么?

先看下socket结构

/**
 *  struct socket - general BSD socket
 *  @state: socket state (%SS_CONNECTED, etc)
 *  @type: socket type (%SOCK_STREAM, etc)
 *  @flags: socket flags (%SOCK_NOSPACE, etc)
 *  @ops: protocol specific socket operations
 *  @file: File back pointer for gc
 *  @sk: internal networking protocol agnostic socket representation
 *  @wq: wait queue for several uses
 */
struct socket {
	socket_state		state;

	short			type;

	unsigned long		flags;

	struct socket_wq	*wq;

	struct file		*file;
	struct sock		*sk;
	const struct proto_ops	*ops;
};

socket 结构介绍两部分:

与文件系统关系密切的部分file

struct file {
	union {
		struct llist_node	fu_llist;
		struct rcu_head 	fu_rcuhead;
	} f_u;
	struct path		f_path;
	struct inode		*f_inode;	/* cached value */
	const struct file_operations	*f_op;

	/*
	 * Protects f_ep_links, f_flags.
	 * Must not be taken from IRQ context.
	 */
	spinlock_t		f_lock;
    ......
}

与通信关系密切的部分sock

struct tcp_sock {
	/* inet_connection_sock has to be the first member of tcp_sock */
	struct inet_connection_sock	inet_conn;
}

struct inet_connection_sock {
	/* inet_sock has to be the first member! */
	struct inet_sock	  icsk_inet;
	struct request_sock_queue icsk_accept_queue;
	struct inet_bind_bucket	  *icsk_bind_hash;
}
struct inet_sock {
	/* sk and pinet6 has to be the first two members of inet_sock */
	struct sock		sk;
}
struct sock {
	/*
	 * Now struct inet_timewait_sock also uses sock_common, so please just
	 * don't add nothing before this first member (__sk_common) --acme
	 */
	struct sock_common	__sk_common;
    struct sk_buff_head	sk_error_queue;
	struct sk_buff_head	sk_receive_queue;
    union {
		struct socket_wq __rcu	*sk_wq;
		struct socket_wq	*sk_wq_raw;
	};
    .....
}
struct sock_common {
	/* skc_daddr and skc_rcv_saddr must be grouped on a 8 bytes aligned
	 * address on 64bit arches : cf INET_MATCH()
	 */
	union {
		__addrpair	skc_addrpair;
		struct {
			__be32	skc_daddr;
			__be32	skc_rcv_saddr;
		};
	};
	union  {
		unsigned int	skc_hash;
		__u16		skc_u16hashes[2];
	};
	/* skc_dport && skc_num must be grouped as well */
	union {
		__portpair	skc_portpair;
		struct {
			__be16	skc_dport;
			__u16	skc_num;
		};
	};
    .....
}

创建Socket过程

创建一个 socket,要把 socket 关联到一个已打开文件,方便进程进行管理

#------------------- *用户态* ---------------------------
socket
#------------------- *内核态* ---------------------------
__x64_sys_socket # 内核系统调用。
__sys_socket # net/socket.c
    |-- sock_create # net/socket.c
        |-- __sock_create # net/socket.c
#------------------- 文件部分 ---------------------------
            |-- sock_alloc # net/socket.c
                |-- new_inode_pseudo # fs/inode.c
                    |-- alloc_inode # fs/inode.c
                        |-- sock_alloc_inode # net/socket.c
                            |-- kmem_cache_alloc
#------------------- 网络部分 ---------------------------
            |-- inet_create # pf->create -- af_inet.c
                |-- sk_alloc # net/core/sock.c
                    |-- sk_prot_alloc # net/core/sock.c
                        |-- kmem_cache_alloc
                |-- inet_sk
                |-- sock_init_data # net/core/sock.c
                    |-- sk_init_common # net/core/sock.c
                    |-- timer_setup
                |-- sk->sk_prot->init(sk) # tcp_v4_init_sock  -- net/ipv4/tcp_ipv4.c
                    |-- tcp_init_sock
#------------------- 文件+网络+关联进程 ------------------------
    |-- sock_map_fd # net/socket.c
        |-- get_unused_fd_flags # fs/file.c -- 进程分配空闲 fd。
        |-- sock_alloc_file # net/socket.c
            |-- alloc_file_pseudo # fs/file_table.c
        |-- fd_install # fs/file.c
            |-- __fd_install # fs/file.c
                |-- fdt = rcu_dereference_sched(files->fdt);
                |-- rcu_assign_pointer(fdt->fd[fd], file); # file 关联到进程
  • sock 与 inode 文件节点关联结构
  • sock 与进程关联
static int sock_map_fd(struct socket *sock, int flags) {
    struct file *newfile;

    int fd = get_unused_fd_flags(flags);
    if (unlikely(fd < 0)) {
        sock_release(sock);
        return fd;
    }

    newfile = sock_alloc_file(sock, flags, NULL);
    if (likely(!IS_ERR(newfile))) {
        fd_install(fd, newfile);
        return fd;
    }
    ...
}

Redis何时创建socket?

处理client连接的socket,在accept中创建

#------------------- *用户态* ---------------------------
accept
#------------------- *内核态* ---------------------------
__sys_accept4 # net/socket.c - 内核系统调用。
|-- sockfd_lookup_light # 根据 fd 查找 listen socket 的 socket 指针。
|-- sock_alloc # 创建一个新的 socket 对象,因为要从 listen socket 的全连接队列里获取一个就绪的连接。
|-- get_unused_fd_flags # 从进程中获取一个空闲的文件 fd。
|-- sock_alloc_file # 从进程中创建一个新的文件,因为文件要与 socket 关联。
|-- inet_accept # 从 listen socket 的全连接队列里获取一个就绪的 sock 连接,与前面新创建的 socket 关联。
    |-- inet_csk_accept 
        |-- reqsk_queue_empty # 如果 listen socket 的全连接队列是空的,那么阻塞或者非阻塞返回 EAGAIN。
        |-- sock_rcvtimeo
        |-- inet_csk_wait_for_connect # 阻塞场景下的等待。
        # 如果 listen socket 的全连接队列非空,那么从全连接队列取一个连接处理。
        |-- reqsk_queue_remove # 从 listen socket 全连接队列删除获取一个 request_sock 连接处理。
    |-- sock_graft # socket 与 sock 建立联系。
|-- inet_getname
|-- move_addr_to_user # 拷贝 accept 的连接的 ip/port 到用户层。
|-- fd_install # 文件和进程进行关联。__fd_install(current->files, fd, file);

TCP 三次握手结束,server accept完成,建立与client连接的socket抽象

结合redis实现: anetTcpAccept,GDB debug client连接

Thread 1 "redis-server" hit Breakpoint 5, anetTcpAccept (err=0x750ca8 <server+520> "accept: Resource temporarily unavailable", s=4, ip=0x7fffffffe1f0 "127.0.0.1", ip_len=46, port=0x7fffffffe224)
    at anet.c:549
(gdb) bt
#0  anetTcpAccept (err=0x750ca8 <server+520> "accept: Resource temporarily unavailable", s=4, ip=0x7fffffffe1f0 "127.0.0.1", ip_len=46, port=0x7fffffffe224) at anet.c:549
#1  0x0000000000438b5f in acceptTcpHandler (el=0x7ffff6a2f0a0, fd=4, privdata=0x0, mask=1) at networking.c:694
#2  0x0000000000421c85 in aeProcessEvents (eventLoop=0x7ffff6a2f0a0, flags=3) at ae.c:431
#3  0x0000000000421e68 in aeMain (eventLoop=0x7ffff6a2f0a0) at ae.c:489
#4  0x000000000042e591 in main (argc=2, argv=0x7fffffffe428) at server.c:4133
static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
    int fd;
    while(1) {
        fd = accept(s,sa,len);//user space tcp accept
        if (fd == -1) {
            if (errno == EINTR)
                continue;
            else {
                anetSetError(err, "accept: %s", strerror(errno));
                return ANET_ERR;
            }
        }
        break;
    }
    return fd;
}

这样链路就清晰了

Redis 创建server 对应的socket

过程如下:

  1. 创建EPoll fd server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); 核心代码epoll_create(1024);
  2. int listenToPort(int port, int fds, int count) 以TCP4为例
  3. 创建server的socket socket(p->ai_family,p->ai_socktype,p->ai_protocol)
  4. anetListen bind 略
  5. anetListen listen 略
  6. while(!server.el->stop) aeProcessEvents(eventLoop, AE_ALL_EVENTS) 监听FD事件

Redis 如何管理 Socket - FD ?

redis 使用EPoll管理FD

struct eventpoll {
	/*
	 * This mutex is used to ensure that files are not removed
	 * while epoll is using them. This is held during the event
	 * collection loop, the file cleanup path, the epoll file exit
	 * code and the ctl operations.
	 */
	struct mutex mtx;

	/* Wait queue used by sys_epoll_wait() */
	wait_queue_head_t wq;

	/* Wait queue used by file->poll() */
	wait_queue_head_t poll_wait;

	/* List of ready file descriptors */
	struct list_head rdllist;

	/* Lock which protects rdllist and ovflist */
	rwlock_t lock;

	/* RB tree root used to store monitored fd structs */
	struct rb_root_cached rbr;

	/*
	 * This is a single linked list that chains all the "struct epitem" that
	 * happened while transferring ready events to userspace w/out
	 * holding ->lock.
	 */
	struct epitem *ovflist;

	/* wakeup_source used when ep_scan_ready_list is running */
	struct wakeup_source *ws;

	/* The user that created the eventpoll descriptor */
	struct user_struct *user;

	struct file *file;

	/*
	 * usage count, used together with epitem->dying to
	 * orchestrate the disposal of this struct
	 */
	refcount_t refcount;

#ifdef CONFIG_NET_RX_BUSY_POLL
	/* used to track busy poll napi_id */
	unsigned int napi_id;
#endif

#ifdef CONFIG_DEBUG_LOCK_ALLOC
	/* tracks wakeup nests for lockdep validation */
	u8 nests;
#endif
};
  • wq 阻塞在epoll_wait的等待队列
  • poll_wait 当epoll_ctl 监听的是另外一个 epoll fd 时使用
  • rdllist 就绪队列。产生了用户注册的 fd读写事件的 epi 链表
  • ovflist 当正在转移就绪队列中的事件到用户空间时,这段时期就绪的事件会被暂时加入该队列,等待转移结束再添加到rdllist
  • rbr 红黑树根结点,管理 fd 结点
  • file eventpoll 对应的文件结构,Linux 一切皆文件
  • lock 锁,保护 rdllist 和 ovflist

EPoll实例使用红黑树rbr来管理注册的需要监听的fd ,这些fd就是👆 accept过来的client 连接

Redis 如何处理FD就绪事件?

当一个accept的fd被添加到epoll实例时,epoll实例会调用对应file的poll方法

(Redis Poll调用链路aeMain->aeProcessEvents->aeApiPoll,详细代码在PART1已介绍过,这里跳过)

poll方法主要有两个作用:

  • 设置callback,当文件有新的就绪事件产生时,调用callback处理
  • 返回文件当前的就绪事件

结合Redis, fd被添加到epoll流程如下:

  1. void acceptTcpHandler(aeEventLoop el, int fd, void privdata, int mask)
  2. acceptCommonHandler(cfd,0,cip)
  3. client *createClient(int fd) 设置callback:readQueryFromClient函数
  4. int aeCreateFileEvent(aeEventLoop eventLoop, int fd, int mask, aeFileProc proc, void *clientData)
  5. aeApiAddEvent(eventLoop, fd, mask) 注册fd同时设置RW事件对于的处理callback函数,核心代码epoll_ctl(state->epfd,op,fd,&ee)

callback就是readQueryFromClient函数

后续就是读取数据,解析命令,这里不再展开,下一篇继续

上面👆说的FD就绪事件是何时产生的?

当数据进入网卡,底层中断执行 ep_poll_callback, ep_poll_callback 将事件添加至rdllist & ovflist 并 wake up epoll

以上便是产生的过程

链路如下:

driver -> ep_poll_callback -> epoll_wait(wake up)

/*
 * This is the callback that is passed to the wait queue wakeup
 * mechanism. It is called by the stored file descriptors when they
 * have events to report.
 *
 * This callback takes a read lock in order not to contend with concurrent
 * events from another file descriptor, thus all modifications to ->rdllist
 * or ->ovflist are lockless.  Read lock is paired with the write lock from
 * ep_scan_ready_list(), which stops all list modifications and guarantees
 * that lists state is seen correctly.
 *
 * Another thing worth to mention is that ep_poll_callback() can be called
 * concurrently for the same @epi from different CPUs if poll table was inited
 * with several wait queues entries.  Plural wakeup from different CPUs of a
 * single wait queue is serialized by wq.lock, but the case when multiple wait
 * queues are used should be detected accordingly.  This is detected using
 * cmpxchg() operation.
 */
static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key){
    ......
    // 当内核空间向用户空间拷贝数据时 将它添加到 ovflist。
    if (READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR) {
        if (epi->next == EP_UNACTIVE_PTR && chain_epi_lockless(epi))
            ep_pm_stay_awake_rcu(epi);
        goto out_unlock;
    }

    if (!ep_is_linked(epi) &&
        list_add_tail_lockless(&epi->rdllink, &ep->rdllist)) {
        ep_pm_stay_awake_rcu(epi);
    }
    // 当回调事件是用户注册的事件,那么需要唤醒进程处理。
    // ep->wq 在 epoll_wait 时添加,当没有就绪事件,epoll_wait 进行睡眠等待唤醒
    if (waitqueue_active(&ep->wq)) {
        if ((epi->event.events & EPOLLEXCLUSIVE) &&
            !(pollflags & POLLFREE)) {
            // #define EPOLLINOUT_BITS (EPOLLIN | EPOLLOUT)
            switch (pollflags & EPOLLINOUT_BITS) {
            case EPOLLIN:
                if (epi->event.events & EPOLLIN)
                    ewake = 1;
                break;
            case EPOLLOUT:
                if (epi->event.events & EPOLLOUT)
                    ewake = 1;
                break;
            case 0:
                ewake = 1;
                break;
            }
        }
        wake_up(&ep->wq);
    }
    ......
}

epoll_wait(wake up)之后发生了什么?

epoll_wait 调用 ep_events_available 检查就绪队列rdllist & ovflist, 就绪事件调用ep_send_events put到用户空间,详细见👇的代码

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
		int, maxevents, int, timeout)
{
	struct timespec64 to;
	return do_epoll_wait(epfd, events, maxevents,
			     ep_timeout_to_timespec(&to, timeout));
}

/*
 * Implement the event wait interface for the eventpoll file. It is the kernel
 * part of the user space epoll_pwait(2).
 */
static int do_epoll_pwait(int epfd, struct epoll_event __user *events,
			  int maxevents, struct timespec64 *to,
			  const sigset_t __user *sigmask, size_t sigsetsize)
{

	error = do_epoll_wait(epfd, events, maxevents, to);

	restore_saved_sigmask_unless(error == -EINTR);

	return error;
}

/*
 * Implement the event wait interface for the eventpoll file. It is the kernel
 * part of the user space epoll_wait(2).
 */
static int do_epoll_wait(int epfd, struct epoll_event __user *events,
			 int maxevents, struct timespec64 *to){
    ......

                /*
	 * We have to check that the file structure underneath the fd
	 * the user passed to us _is_ an eventpoll file.
	 */
	error = -EINVAL;
	if (!is_file_epoll(f.file))
		goto error_fput;

	/*
	 * At this point it is safe to assume that the "private_data" contains
	 * our own data structure.
	 */
	ep = f.file->private_data;

	/* Time to fish for events ... */
    // timeout 阻塞等待处理并返回就绪事件
	error = ep_poll(ep, events, maxevents, to);
}

/**
 * ep_poll - Retrieves ready events, and delivers them to the caller-supplied
 *           event buffer.
 *
 * @ep: Pointer to the eventpoll context.
 * @events: Pointer to the userspace buffer where the ready events should be
 *          stored.
 * @maxevents: Size (in terms of number of events) of the caller event buffer.
 * @timeout: Maximum timeout for the ready events fetch operation, in
 *           timespec. If the timeout is zero, the function will not block,
 *           while if the @timeout ptr is NULL, the function will block
 *           until at least one event has been retrieved (or an error
 *           occurred).
 *
 * Return: the number of ready events which have been fetched, or an
 *          error code, in case of error.
 */
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
		   int maxevents, struct timespec64 *timeout){
    ......
    // 检查就绪队列
    eavail = ep_events_available(ep);

	while (1) {
		if (eavail) {
			/*
			 * Try to transfer events to user space. In case we get
			 * 0 events and there's still timeout left over, we go
			 * trying again in search of more luck.
			 */
             // 就绪,发送事件
			res = ep_send_events(ep, events, maxevents);
			if (res)
				return res;
		}
        ......
    }
}

static int ep_send_events(struct eventpoll *ep,
			  struct epoll_event __user *events, int maxevents){
......
/*
		 * If the event mask intersect the caller-requested one,
		 * deliver the event to userspace. Again, we are holding ep->mtx,
		 * so no operations coming from userspace can change the item.
		 */
         //获取 epi 对应 fd 的就绪事件--server socket epfd
		revents = ep_item_poll(epi, &pt, 1);
		if (!revents)
			continue;
        //内核空间向用户空间传递数据
		events = epoll_put_uevent(revents, epi->event.data, events);
		if (!events) {
            // 如果拷贝失败,继续保存在就绪列表里
			list_add(&epi->rdllink, &txlist);
			ep_pm_stay_awake(epi);
			if (!res)
				res = -EFAULT;
			break;
		}
		res++;
        if (epi->event.events & EPOLLONESHOT)
			epi->event.events &= EP_PRIVATE_BITS;
		else if (!(epi->event.events & EPOLLET)) {
			/*
			 * If this file has been added with Level
			 * Trigger mode, we need to insert back inside
			 * the ready list, so that the next call to
			 * epoll_wait() will check again the events
			 * availability. At this point, no one can insert
			 * into ep->rdllist besides us. The epoll_ctl()
			 * callers are locked out by
			 * ep_scan_ready_list() holding "mtx" and the
			 * poll callback will queue them in ep->ovflist.
			 */
             /* lt 模式下,当前事件被处理完后,不会从就绪列表中删除,留待下一次 epoll_wait
             * 调用,再查看是否还有事件没处理,如果没有事件了就从就绪列表中删除。
             * 在遍历事件的过程中,不能写 ep->rdllist,因为已经上锁,只能把新的就绪信息
             * 添加到 ep->ovflist */
			list_add_tail(&epi->rdllink, &ep->rdllist);
			ep_pm_stay_awake(epi);
		}
    ......
}

ep_send_events之后的流程

  • ep_send_events->ep_item_poll->vfs_poll->tcp_poll->sock_poll_wait->poll_wait

    • ep_send_events 调用ep_item_poll 获取fd就绪事件
static __poll_t ep_item_poll(const struct epitem *epi, poll_table *pt,
				 int depth)
{
	struct file *file = epi->ffd.file;
	__poll_t res;

	pt->_key = epi->event.events;
	if (!is_file_epoll(file))
		res = vfs_poll(file, pt);
	else
		res = __ep_eventpoll_poll(file, pt, depth);
	return res & epi->event.events;
}

static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt) {
    if (unlikely(!file->f_op->poll))
        return DEFAULT_POLLMASK;
    // tcp socket,poll指向 tcp_poll 函数。
    return file->f_op->poll(file, pt);
}
/*
 *	Wait for a TCP event.
 *
 *	Note that we don't need to lock the socket, as the upper poll layers
 *	take care of normal races (between the test and the event) and we don't
 *	go look at any of the socket buffers directly.
 */
__poll_t tcp_poll(struct file *file, struct socket *sock, poll_table *wait)


/**
 * sock_poll_wait - place memory barrier behind the poll_wait call.
 * @filp:           file
 * @sock:           socket to wait on
 * @p:              poll_table
 *
 * See the comments in the wq_has_sleeper function.
 */
static inline void sock_poll_wait(struct file *filp, struct socket *sock,
				  poll_table *p)
{
	if (!poll_does_not_wait(p)) {
		poll_wait(filp, &sock->wq.wait, p);
		/* We need to be sure we are in sync with the
		 * socket flags modification.
		 *
		 * This memory barrier is paired in the wq_has_sleeper.
		 */
		smp_mb();
	}
}

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
	if (p && p->_qproc && wait_address)
		p->_qproc(filp, wait_address, p);
}

上面的p->_qproc是什么呢?

是函数ep_ptable_queue_proc,ep_insert函数里进行初始化

static inline void init_poll_funcptr(poll_table *pt, poll_queue_proc qproc)
{
	pt->_qproc = qproc;
	pt->_key   = ~(__poll_t)0; /* all events enabled */
}

static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
		     struct file *tfile, int fd, int full_check){
    ......
    /* Initialize the poll table using the queue callback */
	epq.epi = epi;
	init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
    ......
}

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
				 poll_table *pt)
{
	struct ep_pqueue *epq = container_of(pt, struct ep_pqueue, pt);
	struct epitem *epi = epq->epi;
	struct eppoll_entry *pwq;

	if (unlikely(!epi))	// an earlier allocation has failed
		return;

	pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL);
	if (unlikely(!pwq)) {
		epq->epi = NULL;
		return;
	}
    // wq设置ep_poll_callback 
	init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
	pwq->whead = whead;
	pwq->base = epi;
	if (epi->event.events & EPOLLEXCLUSIVE)
		add_wait_queue_exclusive(whead, &pwq->wait);
	else
		add_wait_queue(whead, &pwq->wait);
	pwq->next = epi->pwqlist;
	epi->pwqlist = pwq;
}

ep_insert是什么时候调用呢?

epoll_ctl函数

结合redis,accept之后,调用aeApiAddEvent 时会调用epoll_ctl函数,调用链路如下:

aeApiAddEvent-> epoll_ctl -> ep_insert->init_poll_funcptr->ep_ptable_queue_proc

driver接收到数据如何触发ep_poll_callback呢?

网卡是硬件,内核通过网卡驱动driver与网卡交互

  • 内核启动网卡,为网卡工作分配资源(ring buffer)和注册硬中断处理
  • 网卡(NIC)接收数据,网卡触发硬中断,通知 CPU 已接收数据
  • CPU 收到网卡的硬中断,调用对应的处理函数-driver提供
  • 然后启用 NET_RX_SOFTIRQ -> net_rx_action 内核软中断
  • 内核软中断线程消费网卡 DMA 方式写入主存的数据
  • 内核软中断遍历 softnet_data.poll_list,调用对应的 napi_struct.poll 读取网卡 DMA 方式写入主存的数据
  • 遍历 ring buffer 通过 dma_sync_single_for_cpu 接口读取 DMA 方式写入主存的数据,并将数据拷贝到 skb 包
  • 网卡驱动读取到 skb 包后,需要将该包传到网络层处理
  • skb 包需要传到网络层。如果内核开启了 RPS (Receive Package Steering) 功能,为了利用多核资源,(enqueue_to_backlog)需要将数据包负载均衡到各个 CPU,那么这个 skb 包将会通过哈希算法,挂在某个 cpu 的接收队列上(softnet_data.input_pkt_queue),然后等待软中断调用 softnet_data 的 napi 接口 process_backlog(softnet_data.backlog.poll)将接收队列上的数据包通过 __netif_receive_skb 交给网络层处理
  • 网卡驱动读取了网卡写入的数据,并将数据包交给协议栈处理- IP/TCP tcp_v4_rcv
 tcp_v4_rcv
	tcp_v4_do_rcv
		tcp_rcv_state_process
            sk_state_change
					sock_def_wakeup
						wake_up_interruptible_all
							__wake_up
								__wake_up_common

sock_def_wakeup 当socket状态变化时sk_state_change调用

/*
 *	Default Socket Callbacks
 */
// sock.c
static void sock_def_wakeup(struct sock *sk)
{
	struct socket_wq *wq;

	rcu_read_lock();
	wq = rcu_dereference(sk->sk_wq);
	if (skwq_has_sleeper(wq))
		wake_up_interruptible_all(&wq->wait);
	rcu_read_unlock();
}
//wait.h

#define wake_up_interruptible_all(x)	__wake_up(x, TASK_INTERRUPTIBLE, 0, NULL)
//wait.c
/**
 * __wake_up - wake up threads blocked on a waitqueue.
 * @wq_head: the waitqueue
 * @mode: which threads
 * @nr_exclusive: how many wake-one or wake-many threads to wake up
 * @key: is directly passed to the wakeup function
 *
 * If this function wakes up a task, it executes a full memory barrier
 * before accessing the task state.  Returns the number of exclusive
 * tasks that were awaken.
 */
int __wake_up(struct wait_queue_head *wq_head, unsigned int mode,
	      int nr_exclusive, void *key)
{
	return __wake_up_common_lock(wq_head, mode, nr_exclusive, 0, key);
}

static int __wake_up_common_lock(struct wait_queue_head *wq_head, unsigned int mode,
			int nr_exclusive, int wake_flags, void *key)
{
	unsigned long flags;
	int remaining;

	spin_lock_irqsave(&wq_head->lock, flags);
	remaining = __wake_up_common(wq_head, mode, nr_exclusive, wake_flags,
			key);
	spin_unlock_irqrestore(&wq_head->lock, flags);

	return nr_exclusive - remaining;
}

/*
 * The core wakeup function. Non-exclusive wakeups (nr_exclusive == 0) just
 * wake everything up. If it's an exclusive wakeup (nr_exclusive == small +ve
 * number) then we wake that number of exclusive tasks, and potentially all
 * the non-exclusive tasks. Normally, exclusive tasks will be at the end of
 * the list and any non-exclusive tasks will be woken first. A priority task
 * may be at the head of the list, and can consume the event without any other
 * tasks being woken.
 *
 * There are circumstances in which we can try to wake a task which has already
 * started to run but is not in state TASK_RUNNING. try_to_wake_up() returns
 * zero in this (rare) case, and we handle it by continuing to scan the queue.
 */
static int __wake_up_common(struct wait_queue_head *wq_head, unsigned int mode,
			int nr_exclusive, int wake_flags, void *key)
{
	wait_queue_entry_t *curr, *next;

	lockdep_assert_held(&wq_head->lock);

	curr = list_first_entry(&wq_head->head, wait_queue_entry_t, entry);

	if (&curr->entry == &wq_head->head)
		return nr_exclusive;
    //循环处理等待队列结点,回调 ep_poll_callback
	list_for_each_entry_safe_from(curr, next, &wq_head->head, entry) {
		unsigned flags = curr->flags;
		int ret;
        //wq->ep_poll_callback
		ret = curr->func(curr, mode, wake_flags, key);
		if (ret < 0)
			break;
		if (ret && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
			break;
	}

	return nr_exclusive;
}

最终调用epol l的ep_poll_callback处理事件