熔断器

首先定义一个 CircuitBreaker, 下面是它的属性:

  • state: CircuitBreaker 当前状态, 有三种状态:

    • CloseState: 闭合状态, 此时正常提供服务;
    • OpenState: 断开状态, 此时拒绝服务请求;
    • HalfOpenState: 半开状态, 是 CloseState 和 OpenState 的中间态, 可提供服务, 但会根据服务请求的数量, 向上述两种状态转换
  • counter: 计数器, 存储请求的次数, CircuitBreaker 根据请求的次数去切换状态

  • thresholdForOpen: CircuitBreaker 切换到 OpenState 的阈值

  • idleTimeForOpen: CircuitBreaker 切换到 OpenState 保持的时间

  • thresholdForHalfOpen: CircuitBreaker 切换到 HalfOpenState 的阈值

首先来实现一个单机版的熔断器

state.js

/**
 * 抽象 state 父类
 */
class AbstractState {

    constructor(time = Date.now()) {
        this.startTime = time;
        console.info(`${this.getName()} --> startTime = ${this.startTime / 1000}`);
    }

    getName() {
        return this.constructor.name;
    }

    canPass() {
        return true;
    }

    checkout(breaker) {
    }
}


/**
 * 闭合
 */
class CloseState extends AbstractState {
    constructor() {
        super();
    }

    canPass(breaker) {
        return true;
    }

    checkout(breaker) {
        let period = breaker.thresholdForOpen[1] * 1000;
        let now = Date.now();
        if (now >= this.startTime + period) { // 过了这段校验时间, 清零等待重新开始
            this.startTime = Date.now();
            breaker.reset();
        }

        console.info('checkout -->  = ', breaker.getCount());
        if (breaker.getCount() >= breaker.thresholdForOpen[0]) { // 在这段校验时间内, 超过断路阈值, 切换到 `OpenState`
            breaker.reset();
            breaker.setState(new OpenState())
        }
    }
}


/**
 * 半开
 */
class HalfOpenState extends AbstractState {
    constructor() {
        super();
    }

    canPass(breaker) {
        let limit = breaker.thresholdForHalfOpen[0];
        return breaker.getCount() <= limit;
    }

    checkout(breaker) {
        console.info('checkout --> count = ', breaker.getCount());
        let period = breaker.thresholdForHalfOpen[1] * 1000;
        let now = Date.now();
        if (now >= this.startTime + period) {
            breaker.reset();
            if (breaker.getCount() > breaker.thresholdForHalfOpen[0]) { // 依然超过断路阈值, 切到 `OpenState`
                breaker.setState(new OpenState());
            } else { // 低于断路阈值, 切到 `CloseState`
                breaker.setState(new CloseState());
            }
        }
    }

}

/**
 * 断路
 */
class OpenState extends AbstractState {
    constructor() {
        super();
    }

    canPass() {
        return false;
    }

    checkout(breaker) {
        let period = breaker.idleTimeForOpen * 1000;
        let now = Date.now();
        if (now >= this.startTime + period) { // 过了这段校验时间, 切换到 `HalfOpenState`
            breaker.reset();
            breaker.setState(new HalfOpenState());
        }
    }
}


module.exports = {
    AbstractState: AbstractState,
    CloseState: CloseState,
    HalfOpenState: HalfOpenState,
    OpenState: OpenState,
};

counter.js

class Counter {
    constructor(num = 0) {
        this.num = num;
    }

    get() {
        return this.num;
    }

    increase() {
        this.num += 1;
    }

    reset() {
        this.num = 0;
    }
}

module.exports = {
    Counter: Counter,
};

CircuitBreaker.js

const {CloseState, HalfOpenState, OpenState} = require('./state');
const {Counter} = require('./counter');


class CircuitBreaker {

    /**
     * @param thresholdForOpen {string} format: '600/60'
     *              '600/60' 例如,这意味着最大允许请求为每60秒600次,或者断路器将切换到OpenState
     * @param idleTimeForOpen {number} unit: second
     *              600 例如,这意味着如果断路器切换到OpenState,它将保持600秒
     * @param thresholdForHalfOpen {string} format: '300/60'
     *              '300/60' 例如,这意味着如果最大请求数超过每60秒300个,或断路器切换至关闭状态
     */
    constructor(thresholdForOpen = '600/60', idleTimeForOpen = 5 * 60, thresholdForHalfOpen = '300/60') {
        this.idleTimeForOpen = idleTimeForOpen;
        this.thresholdForOpen = thresholdForOpen.split('/');
        this.thresholdForHalfOpen = thresholdForHalfOpen.split('/');
        this.counter = new Counter();   // 每60秒的最大次数
        this.state = new CloseState();  // 默认状态
    }

    getState() {
        return this.state;
    }

    setState(state) {
        console.info(`switch state from ${this.getState().getName()} to ${state.getName()}`);
        this.state = state;
    }

    reset() {
        this.counter.reset();
    }

    canPass() {
        return this.getState().canPass(this);
    }

    count() {
        // 计数器 +1, 同时让 当前的 state 去做条件校验
        this.counter.increase();
        this.getState().checkout(this);
    }

    getCount() {
        return this.counter.get();
    }
}


module.exports = CircuitBreaker;

abstract_server.js

const restify = require('restify');


class AbstractServer {

    constructor(name, version, port) {
        this.catchUnhandleException();
        this.name = name;
        this.version = version;
        this.port = port;
    }


    /**
     * 根据配置项启动服务
     */
    run() {
        this.server = restify.createServer({
            name: this.name,
            version: this.version,
            // log: log
        });

        this.setFork();
        this.setPlugin();

        this.initApi();
        this.initPort();
        this.onNotFound();
    }


    onNotFound() {
        this.server.on('NotFound', function (req, res, error, cb) {
            console.info(`Request for ${req.url} not found`);
            res.send('not found')
        });
    }


    setFork() {
        this.server.pre((req, res, next) => {
            res.charSet('utf-8');
            next();
        });
    }


    setPlugin() {
        this.server.use(restify.pre.userAgentConnection());
        this.server.use(restify.plugins.acceptParser(this.server.acceptable));
        this.server.use(restify.plugins.queryParser());
        this.server.use(restify.plugins.bodyParser({
            maxBodySize: 5120,
            mapParams: true,
            mapFiles: false,
            overrideParams: false
        }));

        this.server.use((req, res, next) => {
            console.info(`Request received: method: ${req.method}, URL: ${req.url}`);
            next();
        });
    }


    /**
     * 设置服务端口
     */
    initPort() {
        this.server.listen(this.port, () => {
            console.info(`http server started and listening on port ${this.port}`);
        });
    }


    /**
     * 初始化 Api, 子类必须重写
     */
    initApi() {
        // 下为 example
        // this.server.post('task', function (req, res, next) {
        //
        // });
    }


    /**
     * 捕获异常
     */
    catchUnhandleException() {
        // 捕获 uncaughtException
        process.on('uncaughtException', function (err) {
            console.error(`uncaughtException: ${err}`);
        });

        // 捕获 unhandledRejection
        process.on('unhandledRejection', function (err, p) {
            console.error(`unhandledRejection: ${err}`);
        });
    }
}

module.exports = AbstractServer;

写个 Web 服务测试下

const AbstractServer = require('./abstract_server');
const CircuitBreaker = require('./breaker/circuit_breaker');

class App extends AbstractServer {

    constructor(name, version, port) {
        super(name, version, port);
        // 为方便测试, 观察效果, 我们使用较小的阈值
        this.breaker = new CircuitBreaker('10/10', 20, '5/10');
    }

    checkFlow(req, res, next) {
        console.debug('come into check flow');

        // 调用真实的业务逻辑之前, 先让熔断器计数并校验
        this.breaker.count();
        if (this.breaker.canPass()) {
            next();
        } else {
            res.end('reject\n');
        }
    }

    setPlugin() {
        super.setPlugin();
        this.server.use(this.checkFlow.bind(this));
    }


    initApi() {
        this.server.get('/test', this.test.bind(this));
    }

    test(req, res, next) {
        console.debug('come into test');
        res.end('ok\n');
    }
}

let app = new App('test_server', '1.0.0', 6666);
app.run();

上面会对所有请求限流, 我的需求实际上是想要针对部分调用者限流, 所以会针对不同的调用者, 有不同的熔断器

const AbstractServer = require('./abstract_server');
const CircuitBreaker = require('./breaker/circuit_breaker');

let globalBreakers = new Map();

// setInterval(() => {
//     console.log('global -->  = ', globalBreakers);
// }, 3000);


class App extends AbstractServer {

    constructor(name, version, port) {
        super(name, version, port);
    }

    checkFlow(req, res, next) {
        console.debug('come into check flow');
        let appid = req.query['appid'];
        if (!appid) {
            console.error('no appid');
            res.end('no appid\n');
            return;
        }
        console.debug('checkFlow --> appid = ' + appid);

        let breaker;
        if (globalBreakers.has(appid)) {
            breaker = globalBreakers.get(appid);
        } else {
            breaker = new CircuitBreaker('10/10', 20, '5/10');
            globalBreakers.set(appid, breaker);
        }

        breaker.count();
        if (breaker.canPass()) {
            next();
        } else {
            res.end('reject\n');
        }
    }

    setPlugin() {
        super.setPlugin();
        this.server.use(this.checkFlow.bind(this));
    }


    initApi() {
        this.server.get('/test', this.test.bind(this));
    }

    test(req, res, next) {
        console.debug('come into test');
        res.end('ok\n');
    }
}

let app = new App('test_server', '1.0.0', 6666);
app.run();
贡献者: mankueng