Libevent Java版实现

What's about the libevent? read more

Libevent Java版实现

纸上得来终觉浅,demo一下

Event 结构

/**
 * 基础配置
 */
public interface EventConfig {

	public int EV_READ = 0x01;
	public int EV_WRITE = 0x02;
	public int EV_TIMEOUT = 0x04; // 定时事件
    ;
	public int EV_ACCEPT = 0x08;
	public int EV_CONNECT = 0x10;

	public int EV_PERSIST = 0x20;// 辅助选项

	public int EVLIST_TIMEOUT = 0x01;// event在time堆中
	public int EVLIST_INSERTED = 0x02;// event在已注册事件链表中
	public int EVLIST_ACTIVE = 0x04;// event在激活链表中
	public int EVLIST_INTERNAL = 0x08;// 内部使用标记
	public int EVLIST_INIT = 0x10;// event已被初始化

}
/**
 * Event 主体
 */
public class Event  implements Comparable<Event>, EventConfig {
    EventBase eventBase;
    SelectableChannel selectableChannel;
    int ev_events;//event status,such as read write accept
    int ev_ncalls;//callback counts
    AtomicInteger ev_pncalls;//Allows deletes in callback
    int ev_res;//result passed to event callback
    int ev_flags=EVLIST_INIT; //libevent status
    long ev_timeout;//
    public int compareTo(Event o) {
        if (this.ev_timeout > o.ev_timeout)
            return 1;
        else if (this.ev_timeout == o.ev_timeout)
            return 0;
        else
            return -1;
    }
    int ev_pri;  // smaller numbers are higher priority
    Object[] args;
    EventCallBackHandler eventCallBackHandler;
」

EventBase 表示一个libevent instance,event相关list均在此对象 例如,当向写socket写入数据时,读socket就会得到通知,触发读事件,从而event_base就能相应的得到通知

    public LinkedList<Event> events_queue;//event register list
    public LinkedList<Event>[] active_queues;//active list
    public PriorityQueue<Event> timeheap;//timer event mini-heap
    public int event_count_active=0;
    public int event_count=0;
    public EventOp eventOp;
    public boolean event_break;
    public long timecache=0;

EventOp 为libevent提供I/O demultiplex机制

先看下EventOp的全局属性

    private Selector selector;

    private LibEvent libEvent;

    private Map<SelectableChannel, Event> acceptEventByChannel = new HashMap<SelectableChannel, Event>();

    private Map<SelectableChannel,Event> readEventByChannel = new HashMap<SelectableChannel, Event>();

    private Map<SelectableChannel,Event> writeEventByChannel = new HashMap<SelectableChannel, Event>();

    public EventOp(LibEvent libEvent) {
        this.libEvent = libEvent;
    }

/**
 * libevent 抽象,可以理解为event 处理的统一接口
 */
public interface LibEvent {

    void init() throws IOException;

    void event_add(Event event,long timeout) throws IOException;

    void event_del(Event event);

    void event_loop();

    Event event_set(SelectableChannel channel,int ev_events,EventCallBackHandler eventCallBackHandler,Object... args);

    void event_active(Event event,int ev_res,int ncalls);

}

下面是Add Event为例

/**
 * 将event添加至reactor chanel,java这里就是selector - channel
 */
 public void add(Event event) throws IOException {
        if (event != null) {
            if((event.ev_events&EventConfig.EV_ACCEPT)>0){
                SelectionKey key = event.selectableChannel.keyFor(selector);
                if (key == null) {
                    key = event.selectableChannel.register(selector, SelectionKey.OP_ACCEPT);
                } else {
                    key.interestOps(key.interestOps() | SelectionKey.OP_ACCEPT);
                }
                acceptEventByChannel.put(event.selectableChannel,event);
            }else if((event.ev_events&EventConfig.EV_READ)>0){
                SelectionKey key = event.selectableChannel.keyFor(selector);
                if (key == null) {
                    key = event.selectableChannel.register(selector, SelectionKey.OP_READ);
                } else {
                    key.interestOps(key.interestOps() | SelectionKey.OP_READ);
                }
                readEventByChannel.put(event.selectableChannel,event);
            }else if((event.ev_events&EventConfig.EV_WRITE)>0){
                SelectionKey key = event.selectableChannel.keyFor(selector);
                if (key == null) {
                    key = event.selectableChannel.register(selector, SelectionKey.OP_WRITE);
                } else {
                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
                }
                writeEventByChannel.put(event.selectableChannel,event);
            }
        }

    }

Dispatch 处理I/O事件就绪的过程:遍历 selector.selectedKeys 根据时间的类型,交给Libevent 接口的实现来处理

 public int dispatch(long timeout) throws IOException {
        if (timeout > 0) {
            int n = selector.select(timeout);
            if (n > 0) {
                Set<SelectionKey> set=selector.selectedKeys();
                Iterator<SelectionKey> it = set.iterator();
                while (it.hasNext()) {
                    SelectionKey key = it.next();
                    Event accept=null;
                    Event read = null;
                    Event write = null;
                    int ev_res=0;
                    if(!key.isValid()){
                        continue;
                    }
                    if ((key.interestOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
                        ev_res |= EventConfig.EV_ACCEPT;
                        accept = acceptEventByChannel.get(key.channel());
                    }
                    if ((key.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                        ev_res |= EventConfig.EV_READ;
                        read = readEventByChannel.get(key.channel());
                    }
                    if ((key.interestOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
                        ev_res |= EventConfig.EV_WRITE;
                        write = writeEventByChannel.get(key.channel());
                    }
                    if (accept != null && (accept.ev_events & EventConfig.EV_ACCEPT)>0) {
                        libEvent.event_active(accept,ev_res,1);
                    }
                    if (read != null && (read.ev_events & EventConfig.EV_READ)>0) {
                        libEvent.event_active(read,ev_res,1);
                    }
                    if (write != null && (write.ev_events & EventConfig.EV_WRITE)>0) {
                        libEvent.event_active(write,ev_res,1);
                    }
                    it.remove();
                }
            }
            return n;
        }else{
            return 0;
        }
    }

event active处理

 public void event_active(Event event, int ev_res, int ncalls) {
        if ((event.ev_flags & EventConfig.EVLIST_ACTIVE) > 0) {
            event.ev_flags |= ev_res;
            return;
        }
        event.ev_ncalls = ncalls;
        event.ev_pncalls = null;
        event.ev_res = ev_res;
        eventQueueInsert(event,EventConfig.EVLIST_ACTIVE);
    }
/**
 * 注册到不同的数据结构
 */
public void eventQueueInsert(Event event,int queue) {
        if ((event.ev_flags & queue) > 0) {
            if ((event.ev_flags & EventConfig.EVLIST_ACTIVE) > 0) {
                return;
            }
        }
        event.ev_flags |= queue;
        switch (queue) {
            case EventConfig.EVLIST_INSERTED:
                eventBase.events_queue.add(event);
                break;
            case EventConfig.EVLIST_TIMEOUT:
                eventBase.timeheap.add(event);
                break;
            case EventConfig.EVLIST_ACTIVE:
                eventBase.active_queues[event.ev_pri].add(event);
                eventBase.event_count_active++;
                break;
            default:throw new IllegalArgumentException("unknown queue" + queue);
        }
    }

下面使用libevent创建server,并启动

final LibEvent libEvent = new LibEventHandler();
libEvent.init();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8080));
//注册事件
Event event = libEvent.event_set(serverSocketChannel, EventConfig.EV_ACCEPT|EventConfig.EV_PERSIST, new AcceptHandler(libEvent));
libEvent.event_add(event,2000);
System.out.println("server starting...");
libEvent.event_loop();

event_loop 处理过程:

public void event_loop() {
        this.eventBase.timecache=0;
        while (!done) {
            //根据堆中具有最小超时值的事件和当前时间来计算等待时间
            long timeout = getTimeoutNext();
            try {
                //等待I/O事件就绪
                // 调用系统I/O demultiplexer等待就绪I/O events,可能是epoll_wait,或者select/kqueue 等
                //dispatch()中,会把就绪signal event、I/O event插入到激活链表中
                this.eventOp.dispatch(eventBase, timeout);
                this.eventBase.timecache = getTime(eventBase);
                LinkedList<Event> activeList = null;
                for (int i=eventBase.active_queues.length-1;i>=0;i--) {
                    if (eventBase.active_queues[i].peek() != null) {
                        activeList = eventBase.active_queues[i];
                        break;
                    }
                }
                if (activeList != null) {
                    Event event = null;
                    //时间处理
                    while ((event=activeList.peek()) != null) {
                        if ((event.ev_events & EventConfig. EV_PERSIST) > 0) {
                            eventQueueRemove(event,EventConfig.EVLIST_ACTIVE);
                        }else{
                            event_del(event);
                        }
                        AtomicInteger ncalls = new AtomicInteger(event.ev_ncalls);
                        event.ev_pncalls = ncalls;
                        while (ncalls.get() > 0) {
                            event.ev_ncalls = ncalls.decrementAndGet();
                            event.eventCallBackHandler.callback(event.selectableChannel,event.ev_res,event.args);
                            if (eventBase.event_break) {
                                return;
                            }
                        }
                    }
                }
                if (!eventBase.timeheap.isEmpty()) {
                    long now = getTime(eventBase);
                    Event event = null;
                    while ((event = eventBase.timeheap.peek()) != null) {
                        if (event.ev_timeout > now) {
                            break;
                        }
                        this.event_del(event);
                        event_active(event,EventConfig.EV_TIMEOUT,1);
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

public long getTime(EventBase eventBase) {
        if (eventBase.timecache > 0) {
            return eventBase.timecache;
        }
        return System.currentTimeMillis();
    }
private long getTimeoutNext() {
    long selectionTimeout = 1000L;
    if (this.eventBase.event_count_active > 0) {
        selectionTimeout = -1;
    } else {
        Event timeoutEvent = eventBase.timeheap.peek();
        if (timeoutEvent != null) {
            long now = getTime(this.eventBase);
            if (timeoutEvent.ev_timeout < now) {
                selectionTimeout = -1L;
            } else
                selectionTimeout = timeoutEvent.ev_timeout - now;
        }
    }
    return selectionTimeout;
}