读写分离实现原理:
1.配置app.properties
首先在app.properties文件中配置多个数据库连接的基本数据。
#主库
ds.master.type=mysql
ds.master.jdbc.driver=com.mysql.jdbc.Driver
ds.master.url=${profile.ds.master.url}
ds.master.username=${profile.jdbc.username}
ds.master.password=${profile.jdbc.password}
#从库1
ds.slave1.type=mysql
ds.slave1.jdbc.driver=com.mysql.jdbc.Driver
ds.slave1.url=${profile.ds.slave1.url}
ds.slave1.username=${profile.slave1.username}
ds.slave1.password=${profile.slave1.password}
#从库2
ds.slave2.type=mysql
ds.slave2.jdbc.driver=com.mysql.jdbc.Driver
ds.slave2.url=${profile.ds.slave2.url}
ds.slave2.username=${profile.slave2.username}
ds.slave2.password=${profile.slave2.password}
jdbc.testSql=SELECT 'x'
jdbc.pool.init=1
jdbc.pool.minIdle=3
jdbc.pool.maxActive=50
2.配置spring-context.xml
<!--主库-->
<!-- 数据源配置, 使用 Druid 数据库连接池 -->
<bean id="dataSourceMaster" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
<!-- 数据源驱动类可不写,Druid默认会自动根据URL识别DriverClass -->
<property name="driverClassName" value="${ds.master.jdbc.driver}" />
<!-- 基本属性 url、user、password -->
<property name="url" value="${ds.master.url}" />
<property name="username" value="${ds.master.username}" />
<property name="password" value="${ds.master.password}" />
<!-- 配置初始化大小、最小、最大 -->
<property name="initialSize" value="${jdbc.pool.init}" />
<property name="minIdle" value="${jdbc.pool.minIdle}" />
<property name="maxActive" value="${jdbc.pool.maxActive}" />
<!-- 配置获取连接等待超时的时间 -->
<property name="maxWait" value="60000" />
<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
<property name="minEvictableIdleTimeMillis" value="300000" />
<property name="validationQuery" value="${jdbc.testSql}" />
<property name="testWhileIdle" value="true" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
<!-- 打开PSCache,并且指定每个连接上PSCache的大小(Oracle使用)
<property name="poolPreparedStatements" value="true" />
<property name="maxPoolPreparedStatementPerConnectionSize" value="20" /> -->
<!-- 配置监控统计拦截的filters -->
<property name="filters" value="stat" />
</bean>
<!--从库1-->
<!-- 数据源配置, 使用 Druid 数据库连接池 -->
<bean id="dataSourceSlave1" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
<!-- 数据源驱动类可不写,Druid默认会自动根据URL识别DriverClass -->
<property name="driverClassName" value="${ds.slave1.jdbc.driver}" />
<!-- 基本属性 url、user、password -->
<property name="url" value="${ds.slave1.url}" />
<property name="username" value="${ds.slave1.username}" />
<property name="password" value="${ds.slave1.password}" />
<!-- 配置初始化大小、最小、最大 -->
<property name="initialSize" value="${jdbc.pool.init}" />
<property name="minIdle" value="${jdbc.pool.minIdle}" />
<property name="maxActive" value="${jdbc.pool.maxActive}" />
<!-- 配置获取连接等待超时的时间 -->
<property name="maxWait" value="60000" />
<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
<property name="minEvictableIdleTimeMillis" value="300000" />
<property name="validationQuery" value="${jdbc.testSql}" />
<property name="testWhileIdle" value="true" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
<!-- 打开PSCache,并且指定每个连接上PSCache的大小(Oracle使用)
<property name="poolPreparedStatements" value="true" />
<property name="maxPoolPreparedStatementPerConnectionSize" value="20" /> -->
<!-- 配置监控统计拦截的filters -->
<property name="filters" value="stat" />
</bean>
<!--从库2-->
<!-- 数据源配置, 使用 BoneCP 数据库连接池 -->
<bean id="dataSourceSlave2" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
<!-- 数据源驱动类可不写,Druid默认会自动根据URL识别DriverClass -->
<property name="driverClassName" value="${ds.slave2.jdbc.driver}" />
<!-- 基本属性 url、user、password -->
<property name="url" value="${ds.slave2.url}" />
<property name="username" value="${ds.slave2.username}" />
<property name="password" value="${ds.slave2.password}" />
<!-- 配置初始化大小、最小、最大 -->
<property name="initialSize" value="${jdbc.pool.init}" />
<property name="minIdle" value="${jdbc.pool.minIdle}" />
<property name="maxActive" value="${jdbc.pool.maxActive}" />
<!-- 配置获取连接等待超时的时间 -->
<property name="maxWait" value="60000" />
<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
<property name="timeBetweenEvictionRunsMillis" value="60000" />
<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
<property name="minEvictableIdleTimeMillis" value="300000" />
<property name="validationQuery" value="${jdbc.testSql}" />
<property name="testWhileIdle" value="true" />
<property name="testOnBorrow" value="false" />
<property name="testOnReturn" value="false" />
<!-- 打开PSCache,并且指定每个连接上PSCache的大小(Oracle使用)
<property name="poolPreparedStatements" value="true" />
<property name="maxPoolPreparedStatementPerConnectionSize" value="20" /> -->
<!-- 配置监控统计拦截的filters -->
<property name="filters" value="stat" />
</bean>
<!--将ThreadLocalRountingDataSource.java类交给Spring进行管理-->
<bean id="dataSource" class="com.jeefw.core.persistence.datasource.ThreadLocalRountingDataSource">
<property name="targetDataSources">
<map key-type="com.jeefw.core.persistence.datasource.DataSources">
<entry key="MASTER" value-ref="dataSourceMaster"/>
<entry key="SLAVE1" value-ref="dataSourceSlave1"/>
<entry key="SLAVE2" value-ref="dataSourceSlave2"/>
</map>
</property>
<property name="defaultTargetDataSource" ref="dataSourceMaster"></property>
</bean>
3.新建DataSources.java类
/**
* 定义一个enum来表示不同的数据源
*/
public enum DataSources {
MASTER, SLAVE1, SLAVE2
}
4.新建ThreadLocalRountingDataSource.java类
自定义一个数据源类新建ThreadLocalRountingDataSource.java,该类继承org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource 并重写determineCurrentLookupKey()方法。
public class ThreadLocalRountingDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
// TODO Auto-generated method stub
return DataSourceTypeManager.get();
}
}
5.新建数据源持有类DataSourceTypeManager.java
//通过 TheadLocal 来保存每个线程选择哪个数据源的标志(key)
public class DataSourceTypeManager {
private static final ThreadLocal<DataSources> dataSourceTypes = new ThreadLocal<>();
public static DataSources get() {
return dataSourceTypes.get();
}
public static void set(DataSources dataSourceType) {
dataSourceTypes.set(dataSourceType);
}
public static void reset() {
dataSourceTypes.set(DataSources.MASTER);
}
public static void clear() {
dataSourceTypes.remove();
}
}
6.启用AOP
在 spring-context.xml头中加入以下内容中开启AOP,
<beans
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">
<!-- 配置自动为匹配 @AspectJ 注解的 Java 类生成代理对象--><aop:aspectj-autoproxy proxy-target-class="true"/>
spring-context.xml的其他配置如下:
<!-- 使用Annotation自动注册Bean,解决事物失效问题:在主容器中不扫描@Controller注解,在SpringMvc中只扫描@Controller注解。 -->
<context:component-scan base-package="com.jeefw"><!-- base-package 如果多个,用“,”分隔 -->
<context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
</context:component-scan>
<!-- MyBatis begin -->
<bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
<property name="dataSource" ref="dataSource"/>
<property name="typeAliasesPackage" value="com.jeefw"/>
<property name="typeAliasesSuperType" value="com.jeefw.core.persistence.BaseEntity"/>
<property name="mapperLocations" value="classpath*:com/jeefw/modules/**/*Mapper.xml"/>
<property name="configLocation" value="classpath:/mybatis/mybatis-config.xml"></property>
<property name="configurationProperties">
<props>
<prop key="dual">${jdbc.dual}</prop>
</props>
</property>
</bean>
<!-- 扫描basePackage下所有以@MyBatisMapper注解的接口 -->
<bean id="mapperScannerConfigurer" class="org.mybatis.spring.mapper.MapperScannerConfigurer">
<property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
<property name="basePackage" value="com.jeefw"/>
<property name="annotationClass" value="com.jeefw.core.persistence.annotation.MyBatisMapper"/>
</bean>
<!-- 定义事务 -->
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSourceMaster" />
</bean>
<!-- 事务控制 -->
<tx:advice id="txAdvice" transaction-manager="transactionManager">
<tx:attributes>
<tx:method name="find*" read-only="true" />
<tx:method name="get*" read-only="true" />
<tx:method name="sync*" read-only="true" />
<tx:method name="create*" propagation="REQUIRED" rollback-for="java.lang.Exception" />
<tx:method name="save*" propagation="REQUIRED" rollback-for="java.lang.Exception" />
<tx:method name="add*" propagation="REQUIRED" rollback-for="java.lang.Exception" />
<tx:method name="update*" propagation="REQUIRED" rollback-for="java.lang.Exception" />
<tx:method name="insert*" propagation="REQUIRED" rollback-for="java.lang.Exception" />
<tx:method name="delete*" propagation="REQUIRED" rollback-for="java.lang.Exception" />
<!-- 一个事务涉及一个数据源不能在事务内部去切换数据源成功,所以对多数据源的方法暂不开启事务~分布式事务 -->
<!-- <tx:method name="crud*" propagation="REQUIRED" rollback-for="java.lang.Exception" /> -->
<!-- <tx:method name="*" /> -->
</tx:attributes>
</tx:advice>
7.动态切换数据源
新建
package com.jeefw.core.persistence.datasource;
import com.google.common.collect.Lists;
import com.jeefw.core.persistence.annotation.MyBatisMapper;
import com.jeefw.modules.monitor.mapper.ScheduleJobMapper;
import org.apache.log4j.Logger;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* 数据源自动切换AOP
* <p>
* 这里我们定义了一个 Aspect 类,我们使用 @Before 来在符合@Pointcut 中的方法被调用之前,
* 调用 DataSourceTypeManager.set(DataSources.MASTER) 设置了 key 的类型为 DataSources.MASTER,
* 所以 dataSource 会根据key=DataSources.MASTER 选择 dataSourceSlave 这个dataSource。
* 所以该方法对于的sql语句会在master数据库上执行.
* 我们可以不断的扩充 DataSourceInterceptor这个 Aspect,在中进行各种各样的定义,
* 来为某个service的某个方法指定合适的数据源对应的dataSource。
* 这样我们就可以使用 Spring AOP 的强大功能来,十分灵活进行配置了。
*/
@Aspect
@Component
//@Order(-100) // execute before @Transactional 在执行sql前就切换数据源
public class DataSourceInterceptor implements Ordered {
private Logger logger=Logger.getLogger(getClass());
private final static String SLAVE_KEY_WORD = "SLAVE";
/**
* 拦截目标方法,获取由@DataSource指定的数据源标识,设置到线程存储中以便切换数据源
*
* @throws Exception
*/
// @Pointcut("@target(com.jeefw.core.persistence.annotation.MyBatisMapper)")
// @Pointcut("@within(com.jeefw.core.persistence.annotation.MyBatisMapper)")
// @Pointcut("@annotation(com.jeefw.core.persistence.annotation.MyBatisMapper)")
@Pointcut("execution(public * com.jeefw.modules..*Mapper.*(..))") //此处为拦截Mapper包下的所有方法
public void intercept() {
}
@Before("intercept()")
public void before(JoinPoint point) {
Class<?> target = point.getTarget().getClass();
MethodSignature signature = (MethodSignature) point.getSignature();
// 默认使用目标类型的注解,如果没有则使用其实现接口的注解
for (Class<?> clazz : target.getInterfaces()) {
resolveDataSource(clazz, signature.getMethod());
}
}
/**
* 提取目标对象方法注解和类型注解中的数据源标识
*
* @param clazz
* @param method
*/
private void resolveDataSource(Class<?> clazz, Method method) {
try {
// 默认使用类型注解
if (clazz.isAnnotationPresent(MyBatisMapper.class)) {
MyBatisMapper source = clazz.getAnnotation(MyBatisMapper.class);
DataSources ds = source.dataSource();
//定时任务ScheduleJobMapper不读写分离,在主库读写
if (ds == DataSources.MASTER && !clazz.getName().equals(ScheduleJobMapper.class.getName())) {//主库
//可在此处自己根据需求编写数据库自动切换的规则
Matcher matcher = Pattern.compile("^(find|get|execSelect)", Pattern.CASE_INSENSITIVE).matcher(method.getName());
if (matcher1.find()) {
//查询方法使用随机从库
DataSourceTypeManager.set(randomSlave());
return;
}
}
DataSourceTypeManager.clear();
DataSourceTypeManager.set(ds);
logger.warn("=============【多数据源】已切换至主库==============");
}
} catch (Exception e) {
System.out.println(clazz + ":" + e.getMessage());
e.printStackTrace();
}
}
/**
* 随机从库
*
* @return
*/
private DataSources randomSlave() {
List<DataSources> slaves = Lists.newArrayList();
for (DataSources ds : DataSources.values()) {
if (ds.toString().contains(SLAVE_KEY_WORD)) {
slaves.add(ds);
}
}
int r = new Random().nextInt(slaves.size());
return slaves.get(r);
}
@Override
public int getOrder() {
return -100;
}
}
8. 配置主从数据库
主数据库my.ini:
[mysqld]
#禁用DNS解析,解决响应缓慢的问题
skip-name-resolve
server-id=1
#开启GTID复制
gtid-mode = ON
enforce-gtid-consistency = ON
log-slave-updates = ON
master-verify-checksum = 1
log-bin=mysql-bin
log_bin_index = mysql-bin.index
binlog_format= mixed
max_binlog_size = 100M
binlog-checksum = CRC32
从数据库1my.ini:
[mysqld]
server-id=2
#开启GTID复制
gtid-mode = ON
enforce-gtid-consistency = ON
log-slave-updates = ON
skip-slave-start = true
slave-parallel-type = LOGICAL_CLOCK # slave-parallel-type有两个之,DATABASE和LOGICAL_CLOCK,DATABASE: 默认值,兼容5.6以schema维度的并行复制, LOGICAL_CLOCK: MySQL 5.7基于组提交的并行复制机制。
slave-parallel-workers = 8 #太多的线程会增加线程间同步的开销,建议4-8个slave线程
master_info_repository = TABLE
relay_log_info_repository = TABLE
relay_log_recovery = ON
#从库只读
read_only = ON
super_read_only = ON
从数据库2my.ini:
[mysqld]
server-id=3
#开启GTID复制
gtid-mode = ON
enforce-gtid-consistency = ON
log-slave-updates = ON
skip-slave-start = true
slave-parallel-type = LOGICAL_CLOCK # slave-parallel-type有两个之,DATABASE和LOGICAL_CLOCK,DATABASE: 默认值,兼容5.6以schema维度的并行复制, LOGICAL_CLOCK: MySQL 5.7基于组提交的并行复制机制。
slave-parallel-workers = 8 #太多的线程会增加线程间同步的开销,建议4-8个slave线程
master_info_repository = TABLE
relay_log_info_repository = TABLE
relay_log_recovery = ON
#从库只读
read_only = ON
super_read_only = ON
重启所有主从服务器: 进入主数据库
show master status;
mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000001 | 629 | | | |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)
每个从库执行
CHANGE MASTER TO master_host = '127.0.0.1',
master_port = 3306,
master_user = 'root',
master_password = 'root',
master_log_file = 'mysql-bin.000001',
master_log_pos = 629;
START SLAVE;
SHOW SLAVE STATUS;
查看Slave_IO_Running和Slave_SQL_Running:Yes 则操作成功
MySQL同步故障:” Slave_SQL_Running:No” 解决办法
STOP SLAVE;
CHANGE MASTER TO master_auto_position = 0;
STOP SLAVE;
CHANGE MASTER TO master_host = '127.0.0.1',
master_port = 3306,
master_user = 'root',
master_password = 'root',
master_log_file = 'mysql-bin.000001',
master_log_pos = 629;
START SLAVE;
SHOW SLAVE STATUS;