Govern EventBus事件驅(qū)動(dòng)架構(gòu)框架
Govern EventBus 是一個(gè)歷經(jīng)四年生產(chǎn)環(huán)境驗(yàn)證的事件驅(qū)動(dòng)架構(gòu)框架, 通過事件總線機(jī)制來治理微服務(wù)間的遠(yuǎn)程過程調(diào)用。 使用本地事務(wù)來支持微服務(wù)內(nèi)強(qiáng)一致性,事件總線來實(shí)現(xiàn)微服務(wù)間的最終一致性,另外還提供了事件發(fā)布/訂閱失敗的自動(dòng)補(bǔ)償機(jī)制。
執(zhí)行流
安裝
初始化 db
create table compensate_leader ( name varchar(16) not null primary key, term_start bigint unsigned not null, term_end bigint unsigned not null, transition_period bigint unsigned not null, leader_id varchar(100) not null, version int unsigned not null ); create table publish_event ( id bigint unsigned auto_increment primary key, event_name varchar(100) not null, event_data mediumtext not null, status smallint unsigned not null, published_time bigint unsigned default 0 not null, version smallint unsigned not null, create_time bigint unsigned not null ); create index idx_status on publish_event (status); create table publish_event_compensate ( id bigint unsigned auto_increment primary key, publish_event_id bigint unsigned not null, start_time bigint unsigned not null, taken bigint unsigned not null, failed_msg text null ); create table publish_event_failed ( id bigint unsigned auto_increment primary key, publish_event_id bigint unsigned not null, failed_msg text not null, create_time bigint unsigned not null ); create table subscribe_event ( id bigint unsigned auto_increment primary key, subscribe_name varchar(100) not null, status smallint unsigned not null, subscribe_time bigint unsigned not null, event_id bigint unsigned not null, event_name varchar(100) not null, event_data mediumtext not null, event_create_time bigint unsigned not null, version smallint unsigned not null, create_time bigint unsigned not null, constraint uk_subscribe_name_even_id_event_name unique (subscribe_name, event_id, event_name) ); create index idx_status on subscribe_event (status); create table subscribe_event_compensate ( id bigint unsigned auto_increment primary key, subscribe_event_id bigint unsigned not null, start_time bigint unsigned not null, taken int unsigned not null, failed_msg text null ); create table subscribe_event_failed ( id bigint unsigned auto_increment primary key, subscribe_event_id bigint unsigned not null, failed_msg text not null, create_time bigint unsigned not null ); insert into compensate_leader (name, term_start, term_end, transition_period, leader_id, version) values ('publish_leader', 0, 0, 0, '', 0); insert into compensate_leader (name, term_start, term_end, transition_period, leader_id, version) values ('subscribe_leader', 0, 0, 0, '', 0);
Gradle
val eventbusVersion = "0.9.2";
implementation("me.ahoo.eventbus:eventbus-spring-boot-starter:${eventbusVersion}")
implementation("me.ahoo.eventbus:eventbus-spring-boot-autoconfigure:${eventbusVersion}") {
capabilities {
requireCapability("me.ahoo.eventbus:rabbit-bus-support")
//requireCapability("me.ahoo.eventbus:kafka-bus-support")
}
}
Maven
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <artifactId>demo</artifactId> <properties> <eventbus.version>0.9.2</eventbus.version> </properties> <dependencies> <dependency> <groupId>me.ahoo.eventbus</groupId> <artifactId>eventbus-spring-boot-starter</artifactId> <version>${eventbus.version}</version> </dependency> <dependency> <groupId>me.ahoo.eventbus</groupId> <artifactId>eventbus-rabbit</artifactId> <version>${eventbus.version}</version> </dependency> <!--<dependency>--> <!-- <groupId>me.ahoo.eventbus</groupId>--> <!-- <artifactId>eventbus-kafka</artifactId>--> <!-- <version>${eventbus.version}</version>--> <!--</dependency>--> </dependencies> </project>
Spring Boot Application Config
spring: application: name: eventbus-demo datasource: url: jdbc:mysql://localhost:3306/eventbus_db?serverTimezone=GMT%2B8&characterEncoding=utf-8 username: root password: root rabbitmq: host: localhost username: eventbus password: eventbus govern: eventbus: rabbit: exchange: eventbus compensate: db: publish: schedule: initial-delay: 30 period: 10 subscribe: schedule: initial-delay: 30 period: 10 enabled: true subscriber: prefix: ${spring.application.name}.
快速上手
一般情況下 Publisher 與 Subscriber 不在同一個(gè)應(yīng)用服務(wù)內(nèi)。 這里只是作為演示用途。
Publisher
/** * 定義發(fā)布事件 */ public class OrderCreatedEvent { private long orderId; public long getOrderId() { return orderId; } public void setOrderId(long orderId) { this.orderId = orderId; } @Override public String toString() { return "OrderCreatedEvent{" + "orderId=" + orderId + '}'; } }
package me.ahoo.eventbus.demo.service; import me.ahoo.eventbus.core.annotation.Publish; import me.ahoo.eventbus.demo.event.OrderCreatedEvent; import org.springframework.stereotype.Service; /** * @author ahoo wang */ @Service public class OrderService { @Publish public OrderCreatedEvent createOrder() { OrderCreatedEvent orderCreatedEvent = new OrderCreatedEvent(); orderCreatedEvent.setOrderId(1L); return orderCreatedEvent; } }
Subscriber
package me.ahoo.eventbus.demo.service;
import lombok.extern.slf4j.Slf4j;
import me.ahoo.eventbus.core.annotation.Subscribe;
import me.ahoo.eventbus.demo.event.OrderCreatedEvent;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class NoticeService {
@Subscribe
public void handleOrderCreated(OrderCreatedEvent orderCreatedEvent) {
log.info("handleOrderCreated - event:[{}].", orderCreatedEvent);
/**
* 執(zhí)行相應(yīng)的業(yè)務(wù)代碼
* send sms / email ?
*/
}
}
評(píng)論
圖片
表情
