熔断器
首先定义一个 CircuitBreaker, 下面是它的属性:
state: CircuitBreaker 当前状态, 有三种状态:
- CloseState: 闭合状态, 此时正常提供服务;
- OpenState: 断开状态, 此时拒绝服务请求;
- HalfOpenState: 半开状态, 是 CloseState 和 OpenState 的中间态, 可提供服务, 但会根据服务请求的数量, 向上述两种状态转换
counter: 计数器, 存储请求的次数, CircuitBreaker 根据请求的次数去切换状态
thresholdForOpen: CircuitBreaker 切换到 OpenState 的阈值
idleTimeForOpen: CircuitBreaker 切换到 OpenState 保持的时间
thresholdForHalfOpen: CircuitBreaker 切换到 HalfOpenState 的阈值
首先来实现一个单机版的熔断器
state.js
/**
* 抽象 state 父类
*/
class AbstractState {
constructor(time = Date.now()) {
this.startTime = time;
console.info(`${this.getName()} --> startTime = ${this.startTime / 1000}`);
}
getName() {
return this.constructor.name;
}
canPass() {
return true;
}
checkout(breaker) {
}
}
/**
* 闭合
*/
class CloseState extends AbstractState {
constructor() {
super();
}
canPass(breaker) {
return true;
}
checkout(breaker) {
let period = breaker.thresholdForOpen[1] * 1000;
let now = Date.now();
if (now >= this.startTime + period) { // 过了这段校验时间, 清零等待重新开始
this.startTime = Date.now();
breaker.reset();
}
console.info('checkout --> = ', breaker.getCount());
if (breaker.getCount() >= breaker.thresholdForOpen[0]) { // 在这段校验时间内, 超过断路阈值, 切换到 `OpenState`
breaker.reset();
breaker.setState(new OpenState())
}
}
}
/**
* 半开
*/
class HalfOpenState extends AbstractState {
constructor() {
super();
}
canPass(breaker) {
let limit = breaker.thresholdForHalfOpen[0];
return breaker.getCount() <= limit;
}
checkout(breaker) {
console.info('checkout --> count = ', breaker.getCount());
let period = breaker.thresholdForHalfOpen[1] * 1000;
let now = Date.now();
if (now >= this.startTime + period) {
breaker.reset();
if (breaker.getCount() > breaker.thresholdForHalfOpen[0]) { // 依然超过断路阈值, 切到 `OpenState`
breaker.setState(new OpenState());
} else { // 低于断路阈值, 切到 `CloseState`
breaker.setState(new CloseState());
}
}
}
}
/**
* 断路
*/
class OpenState extends AbstractState {
constructor() {
super();
}
canPass() {
return false;
}
checkout(breaker) {
let period = breaker.idleTimeForOpen * 1000;
let now = Date.now();
if (now >= this.startTime + period) { // 过了这段校验时间, 切换到 `HalfOpenState`
breaker.reset();
breaker.setState(new HalfOpenState());
}
}
}
module.exports = {
AbstractState: AbstractState,
CloseState: CloseState,
HalfOpenState: HalfOpenState,
OpenState: OpenState,
};
counter.js
class Counter {
constructor(num = 0) {
this.num = num;
}
get() {
return this.num;
}
increase() {
this.num += 1;
}
reset() {
this.num = 0;
}
}
module.exports = {
Counter: Counter,
};
CircuitBreaker.js
const {CloseState, HalfOpenState, OpenState} = require('./state');
const {Counter} = require('./counter');
class CircuitBreaker {
/**
* @param thresholdForOpen {string} format: '600/60'
* '600/60' 例如,这意味着最大允许请求为每60秒600次,或者断路器将切换到OpenState
* @param idleTimeForOpen {number} unit: second
* 600 例如,这意味着如果断路器切换到OpenState,它将保持600秒
* @param thresholdForHalfOpen {string} format: '300/60'
* '300/60' 例如,这意味着如果最大请求数超过每60秒300个,或断路器切换至关闭状态
*/
constructor(thresholdForOpen = '600/60', idleTimeForOpen = 5 * 60, thresholdForHalfOpen = '300/60') {
this.idleTimeForOpen = idleTimeForOpen;
this.thresholdForOpen = thresholdForOpen.split('/');
this.thresholdForHalfOpen = thresholdForHalfOpen.split('/');
this.counter = new Counter(); // 每60秒的最大次数
this.state = new CloseState(); // 默认状态
}
getState() {
return this.state;
}
setState(state) {
console.info(`switch state from ${this.getState().getName()} to ${state.getName()}`);
this.state = state;
}
reset() {
this.counter.reset();
}
canPass() {
return this.getState().canPass(this);
}
count() {
// 计数器 +1, 同时让 当前的 state 去做条件校验
this.counter.increase();
this.getState().checkout(this);
}
getCount() {
return this.counter.get();
}
}
module.exports = CircuitBreaker;
abstract_server.js
const restify = require('restify');
class AbstractServer {
constructor(name, version, port) {
this.catchUnhandleException();
this.name = name;
this.version = version;
this.port = port;
}
/**
* 根据配置项启动服务
*/
run() {
this.server = restify.createServer({
name: this.name,
version: this.version,
// log: log
});
this.setFork();
this.setPlugin();
this.initApi();
this.initPort();
this.onNotFound();
}
onNotFound() {
this.server.on('NotFound', function (req, res, error, cb) {
console.info(`Request for ${req.url} not found`);
res.send('not found')
});
}
setFork() {
this.server.pre((req, res, next) => {
res.charSet('utf-8');
next();
});
}
setPlugin() {
this.server.use(restify.pre.userAgentConnection());
this.server.use(restify.plugins.acceptParser(this.server.acceptable));
this.server.use(restify.plugins.queryParser());
this.server.use(restify.plugins.bodyParser({
maxBodySize: 5120,
mapParams: true,
mapFiles: false,
overrideParams: false
}));
this.server.use((req, res, next) => {
console.info(`Request received: method: ${req.method}, URL: ${req.url}`);
next();
});
}
/**
* 设置服务端口
*/
initPort() {
this.server.listen(this.port, () => {
console.info(`http server started and listening on port ${this.port}`);
});
}
/**
* 初始化 Api, 子类必须重写
*/
initApi() {
// 下为 example
// this.server.post('task', function (req, res, next) {
//
// });
}
/**
* 捕获异常
*/
catchUnhandleException() {
// 捕获 uncaughtException
process.on('uncaughtException', function (err) {
console.error(`uncaughtException: ${err}`);
});
// 捕获 unhandledRejection
process.on('unhandledRejection', function (err, p) {
console.error(`unhandledRejection: ${err}`);
});
}
}
module.exports = AbstractServer;
写个 Web 服务测试下
const AbstractServer = require('./abstract_server');
const CircuitBreaker = require('./breaker/circuit_breaker');
class App extends AbstractServer {
constructor(name, version, port) {
super(name, version, port);
// 为方便测试, 观察效果, 我们使用较小的阈值
this.breaker = new CircuitBreaker('10/10', 20, '5/10');
}
checkFlow(req, res, next) {
console.debug('come into check flow');
// 调用真实的业务逻辑之前, 先让熔断器计数并校验
this.breaker.count();
if (this.breaker.canPass()) {
next();
} else {
res.end('reject\n');
}
}
setPlugin() {
super.setPlugin();
this.server.use(this.checkFlow.bind(this));
}
initApi() {
this.server.get('/test', this.test.bind(this));
}
test(req, res, next) {
console.debug('come into test');
res.end('ok\n');
}
}
let app = new App('test_server', '1.0.0', 6666);
app.run();
上面会对所有请求限流, 我的需求实际上是想要针对部分调用者限流, 所以会针对不同的调用者, 有不同的熔断器
const AbstractServer = require('./abstract_server');
const CircuitBreaker = require('./breaker/circuit_breaker');
let globalBreakers = new Map();
// setInterval(() => {
// console.log('global --> = ', globalBreakers);
// }, 3000);
class App extends AbstractServer {
constructor(name, version, port) {
super(name, version, port);
}
checkFlow(req, res, next) {
console.debug('come into check flow');
let appid = req.query['appid'];
if (!appid) {
console.error('no appid');
res.end('no appid\n');
return;
}
console.debug('checkFlow --> appid = ' + appid);
let breaker;
if (globalBreakers.has(appid)) {
breaker = globalBreakers.get(appid);
} else {
breaker = new CircuitBreaker('10/10', 20, '5/10');
globalBreakers.set(appid, breaker);
}
breaker.count();
if (breaker.canPass()) {
next();
} else {
res.end('reject\n');
}
}
setPlugin() {
super.setPlugin();
this.server.use(this.checkFlow.bind(this));
}
initApi() {
this.server.get('/test', this.test.bind(this));
}
test(req, res, next) {
console.debug('come into test');
res.end('ok\n');
}
}
let app = new App('test_server', '1.0.0', 6666);
app.run();