SpringBoot消息源碼解析:ActiveMQ自動(dòng)配置
ActiveMQ 自動(dòng)配置
ActiveMQ 是 Apache 提供的一個(gè)開源的消息系統(tǒng),很好地支持了 JMS 規(guī)范。在使用ActiveMQ 時(shí)需要在 pom 文件中引入
spring-boot-starter-activemq。
ActiveMQ 在 Spring Boot 的自動(dòng)配置類注冊同樣在 META-INF/spring.factories 中。
# Auto Configure
org. springframework . boot . autoconfigure . EnableAutoConfiguration=\
org. springframework . boot . autoconfigure . jms . activemq. ActiveMQAutoConfigurati
on, \ActiveMQAutoConfiguration 類沒有具體代碼實(shí)現(xiàn),主要通過注解完成功能的實(shí)現(xiàn)。
@Configuration(proxyBeanMethods = false)
@AutoConfigureBefore (JmsAutoConfiguration.class)
@AutoConfigureAfter({ JndiConnectionF actoryAutoConfiguration.class })
@Conditional0nClass({ ConnectionFactory. class, ActiveMQConnectionFactory.cl
@ConditionalOnMissingBean(ConnectionFactory.class)
@EnableConfigurationProperties({ ActiveMQProperties. class, JmsProperties.cl
ass })
@Import({ ActiveMQXAConnectionF actoryConfiguration. class,
ActiveMQConnectionFactoryConfiguration.class })public class ActiveMQAutoConfiguration {
}@AutoConfigureBefore 注解指定該類需在 JmsAutoConfiguration 配置之前進(jìn)行初始化。前面我們已經(jīng)講過,JmsAutoConfiguration 初始化時(shí)需 要用到 ActiveMQAutoConfiguration初始化的 ConnectionFactory,因此需要在 JmsAutoConfiguration 之前完成初始化。
@AutoConfigureAfter 中指定了在
JndiConnectionFactoryAutoConfiguration 配置完成之后進(jìn)行初始化。
JndiConnectionFactoryAutoConfiguration 中主要通過 JNDI 的方式獲得 ConnectionFactory實(shí)例下面我先看 JndiConnectionFactoryAutoConfiguration 的初始化 Condition 條件代碼。
@Configuration(proxyBeanMethods = false)
@AutoConfigureBefore (JmsAutoConfiguration. class)
@ConditionalOnClass (Jms Template.class)
@ConditionalOnMi ssingBean(ConnectionFactory. class)
@Conditional (JndiOrPropertyCondition. class)
@EnableConfigurationProperties ( JmsProperties. class)
public class JndiConnectionF actoryAutoConfiguration {
2
private static final String[] JNDI_ LOCATIONS = { "java:/JmsXA", "java:/XA
Conne -
ctionFactory" };
// JNDI 名稱或特定屬性的條
static class JndiOrProper
tyCondition extends AnyNestedCondition {
JndiOrPropertyCondition
() {
super(ConfigurationPh
ase. . PARSE_ CONFIGURATION);
@ConditionalOnJndi({ "j
ava:/JmsXA", "java:/XAConnectionFactory" })
static class Jndi {
@Conditiona lOnProperty
(prefix = "spring.jms", name = "jndi -name")
static class Property {
}
}
}我們不再贅述
JndiConnectionFactoryAutoConfiguration 初始化條件中的常規(guī)項(xiàng),重點(diǎn)看@Conditional 指 定 的 JndiOrPropertyCondition 條 件 。
該 類 的 實(shí) 現(xiàn) 就 在
JndiConnectionFactory-AutoConfiguration 內(nèi)部,通過繼承 AnyNestedCondition 來實(shí)現(xiàn)判斷條件:當(dāng)指定的 JNDI 生效或 spring.jmsjndi-name 被配置時(shí)才會(huì)生效。SpringBoot 默認(rèn)會(huì)檢查 java:/JmsXA 和 java:/XAConnectionFactory 兩個(gè) JNDI 地址。

看完注解部分再來看內(nèi)部的實(shí)現(xiàn)。
//省略炷解
public class JIndiConnectionF actoryAutoConfiguration {
//該地址數(shù)組,與下面的條件保持一致
private static final String[] JNDI_ LOCATIONS = { "java:/JmsXA", "java:/XA
Connec -
tionFactory" };
@Bean
public ConnectionFactory connectionF actory(JmsProperties properties) th
IndiLocatorDelegate jndiLocatorDelegate = JndiLocatorDelegate . create-
DefaultResourceRefLocator();
//如果配置文件中配置了 JNDI 名稱,則通過捐定的 INDI 名稱獲段 ConnectionFactor
if (StringUtils .hasLength(properties . getJndiName())) {
return jndiLocatorDelegate . lookup(properties . getJndiName(), Conne -
ctionFactory.class);
/如果配置文件中未配置 JNDI 名稱, 則使用默認(rèn)的名稱進(jìn)行查我
return findJndiConnectionF actory(jndiLocatorDelegate);
private Connect ionFactory
findJndiConnectionFactory(IndiLocatorDelegate jindilocator-
Delegate) {
//遍歷默認(rèn)值并進(jìn)行查找
for (String name : JNDI
_LOCATIONS) {
try {
return jndiLocatorD
elegate . lookup(name, ConnectionFactory.class);
} catch (NamingExcep
tion ex) {
//吞沒異常,并繼續(xù)
} throw new IllegalSta
teException("Unable to find Conne
ctionFactory in JNDI location
”+ Arrays.
asList(JNDI
ATION
S));
}
}獲得 ConnectionFactory 的過程比較簡單,首先判斷配置文件中是否配置了指定的 JNDI 名稱,如果配置了,便按照配置進(jìn)行查找;如果未配置,則遍歷默認(rèn) JNDI 名稱數(shù)組,進(jìn)行查找。
查找功能由 JndiL ocatorDelegate 提供,該類繼承自 JndiLocatorSupport,提供了公共的查找(lookup) 方法,可以方便地用作委托類。
我們繼續(xù)看 ActiveMQAutoConfiguration 的注解,@ConditionalOnClass 指定了 classpath中 必 須 有 ConnectionFactory 類 和 ActiveMQConnectionFactory 類 存 在 。同 時(shí) ,@Conditional-OnMissingBean 指定容器中不能存在 ConnectionFactory 的 Bean。
@
EnableConfigurationProperties 屬性導(dǎo)入了 ActiveMQ 的 ActiveMQProperties 配置和JMS 的 JmsProperties 配置。
最后@lmport 引入了自動(dòng)配置
ActiveMQXAConnectionFactoryConfiguration 和自動(dòng)配置ActiveMQConnectionFactoryConfiguration.
ActiveMQXAConnectionFactoryConfiguration 主要用來初始化 ConnectionFactory,包含兩部分內(nèi)容:創(chuàng)建 XA 連接工廠和創(chuàng)建普通連接工廠。
@Configuration(ConnectionFactory. class)
@Conditional0nClass (TransactionManager. class)
@ConditionalOnBean(XAConnectionFactoryWrapper . class)
@ConditionalOnMissingBean(ConnectionFactory. class)
class ActiveMQXAConnectionF actoryConfiguration {
//創(chuàng)建 XA 連接 I 廠
@Primary
@Bean(name = { "jmsConnectionFactory", "xaJmsConnectionFactory" })
public ConnectionFactory jmsConnectionF actory(ActiveMQProperties properties,
objectProviderectionFactoryCustomizer> factoryCustomizers ,
XAConnectionF actoryWrapper
wrapper) throws Exception {
ActiveMQXAConnectionFactory connectionFactory = new ActiveMQConnection-
FactoryFactory(
properties, factoryCustomizers . orderedStream(). collect(Collectors
ist()))“8. createConnectionFactory(ActiveMQXAConnectionFactory. class);
return wrapper . wrapConnectionFactory (connectionFactory);
//創(chuàng)建普通連接工廠
@Bean
@Conditiona lOnProperty(prefix = "spring. activemq. pool", name = "enabled" ,
havingValue = "false", matchIfMissing = true)
public ActiveMQConnectionF actory nonXaJmsConnectionF actory (ActiveMQProper
ties propertie5,
objectProvider
factoryCustomizers) {
return new ActiveMQConnectionF actoryFactory(properties,
factoryCustomizers . ordereds
tream(). collect(Collectors . tolist()))
. createConnectionFactory(ActiveMQConnectionFactory. class);
}
} 如 果 沒 有 干 預(yù) , 該 自 動(dòng) 配 置 類 是 不 會(huì) 自 動(dòng) 生 效 的 , 只 有 當(dāng) 滿 足
XAConnectionFactory-Wrapper 的 Bean 存在時(shí), 才會(huì)進(jìn)行實(shí)例化操作,默認(rèn)狀態(tài)下是沒有對應(yīng)的 Bean 的。
jmsConnectionFactory 中 直 接 創(chuàng) 建 了
ActiveMQXAConnectionFactory 類 , 然 后 通 過XAConnectionFactoryWrapper 包裝類的包裝將其注冊到 JTATransactionManager.nonXaJmsConnectionFactory 方法直接創(chuàng)建 ActiveMQConnectionFactoryFactory 對象,并將配置屬性和定制化操作傳入。ActiveMQConnectionFactoryFactory 本質(zhì)上也是一個(gè)ConnectionFactory。
另外一個(gè)導(dǎo)入類
ActiveMQConnectionFactoryConfiguration 主要用來配置 ActiveMQ 的ConnectionFactory,提供了基于池化的 ConnectionFactory、基于緩存的 ConnectionFactory和普通的 ActiveMQ ConnectionFactory.ActiveMQConnectionFactoryConfiguration 中關(guān)于以上 3 種 ConnectionFactory 的初始化操作都比較簡單,我們直接看核心代碼。首先看普通的 ActiveMQ 連接工廠的初始化代碼。
@Bean
@ConditionalOnProperty(prefix = "spring . jms .cache", name = "enabled", havin
Value = "false")
ActiveMQConnectionF actory jmsConnectionF actory(ActiveMQProperties propertie
5,
ObjectProviderctionFactoryCustomizer> factoryCustomizers) {return createJmsConnectionFactory(properties, factoryCustomizers);
private static ActiveMQConnectionFactory createJmsConnectionF actory(ActiveM
Propert
ies properties,
ObjectP
rovider factoryCustomizers) {
return new ActiveMQConnectionFactoryFactory(properties ,
factoryCustomizers . orderedStr
eam(). collect(Collectors . tolist()))
. createConnectionF actory(Act iveMQConnectionFactory.class);
} 該連接工廠的初始化需要指定配置 spring.jms cache .enabled=false 時(shí)才會(huì)進(jìn)行,初始化操作就是創(chuàng)建一個(gè)
ActiveMQConnectionFactoryFactory 對象,并將配置屬性和定制化操作傳入,然后調(diào)用其 createConnectionFactory 方法,完成 ActiveMQConnectionFactory 的創(chuàng)建。
這里簡單看一下
ActiveMQConnectionFactoryFactory 創(chuàng)建過程中的核心代碼。
private T createConnectionFactoryInstce(Class factoryClass)
throws InstantiationException, IllegalAccessException, InvocationTarget -
Exception, NoSuchMethodException {
String brokerUrl = determineBrokerUrl();
String user = this . properties. getUser();
String password this.properties.getPassword);
f (StringUtils.hasLength(user) & StringUtils.hasLength(password)) {
gclassot ”
factoryClass. getConstructor(String.class, String.class, Strin
newInstance(user, password, brokerUrl);
return factoryClass . getConstructor(String. class) . newInstance(brokerUr1);
) 上面代碼中獲得 brokerUrl、 用戶名、密碼等內(nèi)容,然后通過
ActiveMQConnection-Factory的 Class 對象 獲取構(gòu)造器并創(chuàng)建對象。
其中關(guān)于 brokerUrl 的獲取,如果配置文件中指定了 brokerUr,則使用指定的,如果未指定并且 inMemory 配 置 項(xiàng) 為 true ( 默 認(rèn) 為 true) , 則 brokerUr 默 認(rèn) 為“vm://ocalhost?broker.persistent=false"其他情況則 brokerUrl 為 tp:/localhost:61616",相關(guān)代碼如下。
class ActiveMOConnectionFactoryFactory {private static final String DEFAULT_ EMBEDDED BROKER URL = "vm://localhos
t?broker . persistent-false";
private static final String DEFAULT _NETWORK_ BROKER_ _URL = "tcp://localhos
t:61616";
str IngdetermineBrokerurto t
null) {
f (this.properties.getBrokerUr1()
return this.properties. getBrokerUrl();
if (this.properties. isInMemory()) {
return DEFAULT_ EMBEDDED_ BROKER_ URL ;
return DEFAULT_ NETWORK_ BROKER_ URL ;
}
}在 ActiveMQConnectionF actoryF actory 中完成 ActiveMQConnectionFactory 對象創(chuàng)建之后,返回之前還進(jìn)行了- -系列的定制化操作,相關(guān)代碼如下。
private T doCreateConnectionFactory(Class-
T> factoryClass) throws Exception {
//獲得 ActiveMQConnectionFactory
” factory = createConnectionFactoryInstance(factoryClass);
//設(shè)置關(guān)閉超時(shí)時(shí)間
if (this. properties . getCloseTimeout() != null) {
factory. setCloseTimeout((int) this . properties . getCloseTimeout() . toMilli
s());
(0);
設(shè)置發(fā)送超時(shí)時(shí)間
F (this. properties . getSendTimeout() != null) {
factory . setSendTimeout((int) this . properties . getSendTimeout() . toMillis
();
//設(shè)置信任范圍
Packages packages = this.properties . getPackages();
if (packages . getTrustAll() != nu1l)
factory . setTrustAllPackages (packages . getTrustAll());
if (!packages .getTrusted(). isEmpty()) {
factory . setTrustedPackages (packages . getTrusted());
//定制化
customize(factory);return factory;
} 在上述代碼中,主要是設(shè)置配置文件中指定的參數(shù)值(如:配置關(guān)閉超時(shí)時(shí)間、發(fā)送超時(shí)時(shí)間)和 定 制 化 操 作 ( 如 遍 歷 調(diào) 用 構(gòu) 造 對 象 時(shí) 傳 入 的 參 數(shù)List<
ActiveMQConnectionFactory-Customizer>中元素的 customize 方法)。
下面看基于緩存的 CachingConnectionFactory 的創(chuàng)建源碼。
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass (CachingConnectionFactory. class)
@ConditionalOnProperty(prefix = "spring. jms .cache", name = "enabled", havin
gValue
= "true",
matchIfMissing = true )
static class CachingConnectionF actoryConfiguration {
@Bean
CachingConnectionFactory cachingJmsConnect ionFactory (JmsProperties jmsPro
perties,
ActiveMQProperties F
roperties,
ObjectProvidereMQConnectionFactoryCustomizer> factoryCusto-
mizers) {
JmsProperties . Cache cacheProperties = jmsProperties . getCache();
CachingConnectionFactory connectionFactory = new CachingConnectionF acto
ry(
createJmsConnectionFactory(properties, factoryCustomizers));
connectionFactory . setCacheConsumers(cacheProperties . isConsumers());
connectionFactory. setCacheProducers(cacheProperties . isProducers());
connectionFactory. setSessionCacheSize(cacheProperties . getSessionCacheSi
ze());
return connectionFactory;
}
}注解部分表明,如果未配置 spring.jms.cache.enabled 或配置其值為 true,均會(huì)使該自動(dòng)配置生效。創(chuàng)建 CachingConnectionFactory 的過程就是 new 一個(gè) CachingConnectionFactory對象,其參數(shù)包含上面我們講到的 ActiveMQConnectionFactory 對象(創(chuàng)建步驟完全一樣)和配置屬性。然后,根據(jù)配置參數(shù)設(shè)置是否緩存消費(fèi)者、生產(chǎn)者和預(yù)期的緩存大小。
CachingConnectionFactory 是 SingleConnectionFactory 的子類,它添加了會(huì)話( Session)緩 存 以 及 MessageProducer 緩 存 。默 認(rèn) 情 況 下 , 此 ConnectionFactory 還 會(huì) 將“reconnect-OnException" 屬 性 切 換 為 "true” , 從 而 允 許 發(fā) 生 異 常 時(shí) 自 動(dòng) 恢 復(fù) 重 置Connection。
CachingConnectionFactory 默認(rèn)情況下,只會(huì)緩存一個(gè)會(huì)話,其他進(jìn)一步的回話請求會(huì)按照需要?jiǎng)?chuàng)建并處理。在高并發(fā)環(huán)境下,要考慮提高“sessionCacheSize'的值。
最后再看一下基于 池化的 ConnectionFactory 的創(chuàng)建。
@Configuration(proxyBeanMethods = false)
@Conditional0nClass({ JmsPoolConnectionFactory . class, Pooled0bject. class })
static class PooledConnectionF actoryConfiguration {
@Bean(destroyMethod = "stop")
@Conditiona lOnProperty(prefix = "spring . activemq .pool", name = "enabled" ,
havingValue = "true")
JmsPoolConnectionFactory pooledJmsConnectionF actory(ActiveMQProperties pr
operties,
ObjectProviderMQConnectionF actoryCustomizer> factoryCustomizers) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnection-
FactoryFactory(properties,
factoryCustomizers . orderedStream(). collect(Collectors.to
List()))
. createConnectionF actory(ActiveMQConnectionFactory.class);
return new JmsPoolConnection actoryFactory(properties . getPool())
. createPooledConnection actory( connectionFactory);
}
} 下 面 講 解 JmsPoolConnectionFactory 的 創(chuàng) 建 步 驟 : 首 先 , 創(chuàng) 建 一 個(gè)
ActiveMQConnec-tionFactory 對象,與上面創(chuàng)建該類的方法完全一樣,不再贅述;隨后,創(chuàng)建 JmsPoolConne-ctionFactoryFactory 對象, 構(gòu)造參數(shù)為連接池的配置信息,然后調(diào)用對象的 createPooled-ConnectionFactory 方法,將 ActiveMQConnectionFactory 對象傳入。
在
createPooledConnectionFactory 方 法 中 主 要 就 是 new 出 一 個(gè)JmsPoolConnectionFactory 對象設(shè)置 ConnectionFactory 為 ActiveMQConnectionFactory,并進(jìn)行一些其他配置參數(shù)的判斷和設(shè)置。這里就不再展示相關(guān)代碼了。
至此,關(guān)于 ActiveMQ 自動(dòng)配置的講解已經(jīng)完成。
本文給大家講解的內(nèi)容是SpringBoot消息源碼解析:ActiveMQ自動(dòng)配置
下篇文章給大家講解的是@JmsL istener 注解解析;
覺得文章不錯(cuò)的朋友可以轉(zhuǎn)發(fā)此文關(guān)注小編;
感謝大家的支持!
本文就是愿天堂沒有BUG給大家分享的內(nèi)容,大家有收獲的話可以分享下,想學(xué)習(xí)更多的話可以到微信公眾號(hào)里找我,我等你哦。
