简单实现CircuitBreaker

CircuitBreaker java 简单实现

纸上得来终觉浅,Demo一下

CircuitBreaker.java 主体,维护resetMillis openCount lastFailureTime等

    public AtomicLong openCount = new AtomicLong(0L);//熔断计数

    public AtomicLong lastFailure = new AtomicLong(0L);//最后失败时间

    public AtomicLong resetMillis = new AtomicLong(5 * 1000L);//重置窗口时间range

    public enum BreakerState{
        OPEN,
        HALF_CLOSED,
        CLOSED
    }
    public volatile BreakerState state = BreakerState.CLOSED;//默认关闭状态

    public boolean isHardTrip=false;//是否强制熔断

    public boolean bypass = false;//是否不使用熔断

    public boolean isAttemptLive = false;//是否尝试活着

    private Throwable tripException = null;//熔断异常

    private FailureInterpreter failureInterpreter = new DefaultFailureInterpreter();//错误中断处理

    public CircuitBreakerExceptionMapper<? extends Exception> exceptionMapper;

Runnable执行,通过handleFailure方法处理异常,是否启动熔断等

public void invoke(Runnable r) throws Exception {
        if(!bypass){
            if(!isAllowRequest()){
                throw mapException(new CircuitBreakerException());
            }
            try{
                isAttemptLive = true;
                r.run();
                close();
            }catch (Throwable throwable){
                handleFailure(throwable);//处理异常,是否启动熔断等
            }
            throw new IllegalStateException("can not goto here too.");
        }else {
            r.run();
        }
    }
 public void handleFailure(Throwable throwable) throws Exception{
        if (failureInterpreter == null||failureInterpreter.isTrip(throwable)) {
            this.tripException = throwable;
            trip();
        }else if(isAttemptLive){
            close();
        }
        if (throwable instanceof Exception) {
            throw (Exception)throwable;
        } else if (throwable instanceof Error) {
            throw (Error)throwable;
        } else {
            throw (RuntimeException)throwable;
        }

    }
/**
 * 打开熔断器
 */    
public void trip(){
        if(state!=BreakerState.OPEN){
            openCount.getAndIncrement();
        }
        state = BreakerState.OPEN;
        isAttemptLive = false;
        lastFailure.set(System.currentTimeMillis());
    }

如何判断是否可以打开熔断器呢?处理逻辑在FailureInterpreter

通过一个原子计数器counter 计算失败数量

public class DefaultFailureInterpreter implements FailureInterpreter {
    private Set<Class<? extends Throwable>> exceptions = new HashSet<Class<? extends Throwable>>();
    private int limit=0;//简单实现,基于errors 数量
    private long windowMills=0;//简单实现,circuitBreaker open持续时间
    private EventCounter counter;

    public DefaultFailureInterpreter() {
    }

    public DefaultFailureInterpreter(int limit, long windowMills) {
        this.limit = limit;
        this.windowMills = windowMills;
        initCounter();
    }

    public void initCounter(){
        if(limit>0&&windowMills>0){
            int capacity = limit + 1;
            if (counter == null) {
                counter = new EventCounter(capacity,windowMills);
            }else{
                counter.setCapacity(capacity);
                counter.setWindowMillis(windowMills);
            }
        }
    }

    public Set<Class<? extends Throwable>> getExceptions() {
        return exceptions;
    }

    public void setExceptions(Set<Class<? extends Throwable>> exceptions) {
        this.exceptions = exceptions;
    }

    public int getLimit() {
        return limit;
    }

    public void setLimit(int limit) {
        this.limit = limit;
        initCounter();
    }

    public long getWindowMills() {
        return windowMills;
    }

    public void setWindowMills(long windowMills) {
        this.windowMills = windowMills;
        initCounter();
    }

    public EventCounter getCounter() {
        return counter;
    }

    public void setCounter(EventCounter counter) {
        this.counter = counter;
    }

    @Override
    public boolean isTrip(Throwable cause) {
        for(Class<?> clazz : exceptions) {
            if (clazz.isInstance(cause)) {
                return false;
            }
        }
        if(this.limit > 0 && this.windowMills > 0){
            counter.mark();
            return counter.count() > limit;//开启熔断
        }
        return true;//有error 无配置limit 即默认开启
    }
}

EventCounter 实现

LinkedList queue 简单模拟滑动时间窗口 例如 windowMillis = 5s count返回 queue size,即有多少次错误发生

public class EventCounter {

    private long windowMillis;//circuitBreaker open持续时间 毫秒

    private int capacity;

    private LinkedList<Long> queue = new LinkedList();

    public EventCounter(int capacity, long windowMillis) {
        this.windowMillis = windowMillis;
        this.capacity = capacity;
    }
    public void mark() {
        long currentTimeMillis = System.currentTimeMillis();
        synchronized (queue) {
            if (queue.size() == capacity) {
                queue.removeFirst();
            }
            queue.addLast(currentTimeMillis);
        }
    }

    public int count(){
        long currentTimeMillis = System.currentTimeMillis();
        long removeTimesBeforeMillis = currentTimeMillis - windowMillis;
        synchronized (queue){
            while(!queue.isEmpty()&&queue.peek()<removeTimesBeforeMillis){
                queue.removeFirst();
            }
            return queue.size();
        }
    }

    public long getWindowMillis() {
        return windowMillis;
    }

    public void setWindowMillis(long windowMillis) {
        synchronized (queue) {
            this.windowMillis = windowMillis;
        }
    }

    public int getCapacity() {
        return capacity;
    }

    public void setCapacity(int capacity) {
        if (capacity <=0) {
            return;
        }
        synchronized (queue){
            if (capacity <this.capacity) {
                while (capacity < queue.size()) {
                    queue.removeFirst();
                }
            }
        }
        this.capacity = capacity;
    }

    public LinkedList<Long> getQueue() {
        return queue;
    }

    public void setQueue(LinkedList<Long> queue) {
        this.queue = queue;
    }
}

CircuitBreaker的Half-Closed如何实现呢?

public boolean isAllowRequest(){
        if (isHardTrip) {
            return false;
        }
        if(state==BreakerState.CLOSED){
            return true;
        }
        System.out.println("lastFailure "+lastFailure.get());
        System.out.println("resetMillis "+resetMillis.get());
        if(state==BreakerState.OPEN&&System.currentTimeMillis()-lastFailure.get()>=resetMillis.get()){
            System.out.println("this");
            state = BreakerState.HALF_CLOSED;
            return true;
        }
        return canAttempt();
    }
    private synchronized boolean canAttempt(){
        if(!(BreakerState.HALF_CLOSED==state)||isAttemptLive){
            return false;
        }
        return true;
    }

invoke时 调用isAllowRequest即可

public void invoke(Runnable r) throws Exception {
        if(!bypass){
            if(!isAllowRequest()){
                throw mapException(new CircuitBreakerException());
            }
            try{
                isAttemptLive = true;
                r.run();
                close();
            }catch (Throwable throwable){
                handleFailure(throwable);
            }
            throw new IllegalStateException("can not goto here too.");
        }else {
            r.run();
        }
    }

总结

这里只是demo

Hystrix实现得更为优雅,使用了Rxjava,API非常友好流畅