读写分离实现原理:

读写分离原理

1.配置app.properties

首先在app.properties文件中配置多个数据库连接的基本数据。


#主库
ds.master.type=mysql
ds.master.jdbc.driver=com.mysql.jdbc.Driver
ds.master.url=${profile.ds.master.url}
ds.master.username=${profile.jdbc.username}
ds.master.password=${profile.jdbc.password}

#从库1
ds.slave1.type=mysql
ds.slave1.jdbc.driver=com.mysql.jdbc.Driver
ds.slave1.url=${profile.ds.slave1.url}
ds.slave1.username=${profile.slave1.username}
ds.slave1.password=${profile.slave1.password}

#从库2
ds.slave2.type=mysql
ds.slave2.jdbc.driver=com.mysql.jdbc.Driver
ds.slave2.url=${profile.ds.slave2.url}
ds.slave2.username=${profile.slave2.username}
ds.slave2.password=${profile.slave2.password}

jdbc.testSql=SELECT 'x'
jdbc.pool.init=1
jdbc.pool.minIdle=3
jdbc.pool.maxActive=50

2.配置spring-context.xml

    
    <!--主库-->
    <!-- 数据源配置, 使用 Druid 数据库连接池 -->
	<bean id="dataSourceMaster" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
	    <!-- 数据源驱动类可不写,Druid默认会自动根据URL识别DriverClass -->
	    <property name="driverClassName" value="${ds.master.jdbc.driver}" />
	    
		<!-- 基本属性 url、user、password -->
		<property name="url" value="${ds.master.url}" />
		<property name="username" value="${ds.master.username}" />
		<property name="password" value="${ds.master.password}" />
		
		<!-- 配置初始化大小、最小、最大 -->
		<property name="initialSize" value="${jdbc.pool.init}" />
		<property name="minIdle" value="${jdbc.pool.minIdle}" /> 
		<property name="maxActive" value="${jdbc.pool.maxActive}" />
		
		<!-- 配置获取连接等待超时的时间 -->
		<property name="maxWait" value="60000" />
		
		<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
		<property name="timeBetweenEvictionRunsMillis" value="60000" />
		
		<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
		<property name="minEvictableIdleTimeMillis" value="300000" />
		
		<property name="validationQuery" value="${jdbc.testSql}" />
		<property name="testWhileIdle" value="true" />
		<property name="testOnBorrow" value="false" />
		<property name="testOnReturn" value="false" />
		
		<!-- 打开PSCache,并且指定每个连接上PSCache的大小(Oracle使用)
		<property name="poolPreparedStatements" value="true" />
		<property name="maxPoolPreparedStatementPerConnectionSize" value="20" /> -->
		
		<!-- 配置监控统计拦截的filters -->
	    <property name="filters" value="stat" /> 
	</bean>


    <!--从库1-->
	<!-- 数据源配置, 使用 Druid 数据库连接池 -->
	<bean id="dataSourceSlave1" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
		<!-- 数据源驱动类可不写,Druid默认会自动根据URL识别DriverClass -->
		<property name="driverClassName" value="${ds.slave1.jdbc.driver}" />

		<!-- 基本属性 url、user、password -->
		<property name="url" value="${ds.slave1.url}" />
		<property name="username" value="${ds.slave1.username}" />
		<property name="password" value="${ds.slave1.password}" />

		<!-- 配置初始化大小、最小、最大 -->
		<property name="initialSize" value="${jdbc.pool.init}" />
		<property name="minIdle" value="${jdbc.pool.minIdle}" />
		<property name="maxActive" value="${jdbc.pool.maxActive}" />

		<!-- 配置获取连接等待超时的时间 -->
		<property name="maxWait" value="60000" />

		<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
		<property name="timeBetweenEvictionRunsMillis" value="60000" />

		<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
		<property name="minEvictableIdleTimeMillis" value="300000" />

		<property name="validationQuery" value="${jdbc.testSql}" />
		<property name="testWhileIdle" value="true" />
		<property name="testOnBorrow" value="false" />
		<property name="testOnReturn" value="false" />

		<!-- 打开PSCache,并且指定每个连接上PSCache的大小(Oracle使用)
		<property name="poolPreparedStatements" value="true" />
		<property name="maxPoolPreparedStatementPerConnectionSize" value="20" /> -->

		<!-- 配置监控统计拦截的filters -->
		<property name="filters" value="stat" />
	</bean>


    <!--从库2-->
	<!-- 数据源配置, 使用 BoneCP 数据库连接池 -->
	<bean id="dataSourceSlave2" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
		<!-- 数据源驱动类可不写,Druid默认会自动根据URL识别DriverClass -->
		<property name="driverClassName" value="${ds.slave2.jdbc.driver}" />

		<!-- 基本属性 url、user、password -->
		<property name="url" value="${ds.slave2.url}" />
		<property name="username" value="${ds.slave2.username}" />
		<property name="password" value="${ds.slave2.password}" />

		<!-- 配置初始化大小、最小、最大 -->
		<property name="initialSize" value="${jdbc.pool.init}" />
		<property name="minIdle" value="${jdbc.pool.minIdle}" />
		<property name="maxActive" value="${jdbc.pool.maxActive}" />

		<!-- 配置获取连接等待超时的时间 -->
		<property name="maxWait" value="60000" />

		<!-- 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 -->
		<property name="timeBetweenEvictionRunsMillis" value="60000" />

		<!-- 配置一个连接在池中最小生存的时间,单位是毫秒 -->
		<property name="minEvictableIdleTimeMillis" value="300000" />

		<property name="validationQuery" value="${jdbc.testSql}" />
		<property name="testWhileIdle" value="true" />
		<property name="testOnBorrow" value="false" />
		<property name="testOnReturn" value="false" />

		<!-- 打开PSCache,并且指定每个连接上PSCache的大小(Oracle使用)
		<property name="poolPreparedStatements" value="true" />
		<property name="maxPoolPreparedStatementPerConnectionSize" value="20" /> -->

		<!-- 配置监控统计拦截的filters -->
		<property name="filters" value="stat" />
	</bean>


    <!--将ThreadLocalRountingDataSource.java类交给Spring进行管理-->
	<bean id="dataSource" class="com.jeefw.core.persistence.datasource.ThreadLocalRountingDataSource">
		<property name="targetDataSources">
			<map key-type="com.jeefw.core.persistence.datasource.DataSources">
				<entry key="MASTER" value-ref="dataSourceMaster"/>
				<entry key="SLAVE1" value-ref="dataSourceSlave1"/>
				<entry key="SLAVE2" value-ref="dataSourceSlave2"/>
			</map>
		</property>
		<property name="defaultTargetDataSource" ref="dataSourceMaster"></property>
	</bean>

3.新建DataSources.java类

    /**
     * 定义一个enum来表示不同的数据源
     */
    public enum DataSources {
        MASTER, SLAVE1, SLAVE2
    }

4.新建ThreadLocalRountingDataSource.java类

自定义一个数据源类新建ThreadLocalRountingDataSource.java,该类继承org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource 并重写determineCurrentLookupKey()方法。

    
    public class ThreadLocalRountingDataSource extends AbstractRoutingDataSource {

        @Override
        protected Object determineCurrentLookupKey() {
            // TODO Auto-generated method stub
            return DataSourceTypeManager.get();
        }
    }

5.新建数据源持有类DataSourceTypeManager.java

    //通过 TheadLocal 来保存每个线程选择哪个数据源的标志(key)
    public class DataSourceTypeManager {

        private static final ThreadLocal<DataSources> dataSourceTypes = new ThreadLocal<>();

        public static DataSources get() {
            return dataSourceTypes.get();
        }

        public static void set(DataSources dataSourceType) {
            dataSourceTypes.set(dataSourceType);
        }

        public static void reset() {
            dataSourceTypes.set(DataSources.MASTER);
        }

        public static void clear() {
            dataSourceTypes.remove();
        }
    }

6.启用AOP

在 spring-context.xml头中加入以下内容中开启AOP,

<beans 
	xmlns:aop="http://www.springframework.org/schema/aop"
	xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">

<!-- 配置自动为匹配 @AspectJ 注解的 Java 类生成代理对象--><aop:aspectj-autoproxy proxy-target-class="true"/>

spring-context.xml的其他配置如下:

    <!-- 使用Annotation自动注册Bean,解决事物失效问题:在主容器中不扫描@Controller注解,在SpringMvc中只扫描@Controller注解。  -->
	<context:component-scan base-package="com.jeefw"><!-- base-package 如果多个,用“,”分隔 -->
		<context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
	</context:component-scan>
	
 	<!-- MyBatis begin -->
    <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="dataSource" ref="dataSource"/>
        <property name="typeAliasesPackage" value="com.jeefw"/>
        <property name="typeAliasesSuperType" value="com.jeefw.core.persistence.BaseEntity"/>
        <property name="mapperLocations" value="classpath*:com/jeefw/modules/**/*Mapper.xml"/>
	<property name="configLocation" value="classpath:/mybatis/mybatis-config.xml"></property>
	<property name="configurationProperties">
		<props>
			<prop key="dual">${jdbc.dual}</prop>
		</props>
	</property>
    </bean>
    
    <!-- 扫描basePackage下所有以@MyBatisMapper注解的接口 -->
    <bean id="mapperScannerConfigurer" class="org.mybatis.spring.mapper.MapperScannerConfigurer">
        <property name="sqlSessionFactoryBeanName" value="sqlSessionFactory" />
        <property name="basePackage" value="com.jeefw"/>
        <property name="annotationClass" value="com.jeefw.core.persistence.annotation.MyBatisMapper"/>
    </bean>
    
    <!-- 定义事务 -->
	<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSourceMaster" />
	</bean>

	<!-- 事务控制 -->
	<tx:advice id="txAdvice" transaction-manager="transactionManager">
		<tx:attributes>
			<tx:method name="find*" read-only="true" />
			<tx:method name="get*" read-only="true" />
			<tx:method name="sync*" read-only="true" />
			<tx:method name="create*" propagation="REQUIRED" rollback-for="java.lang.Exception" />
			<tx:method name="save*" propagation="REQUIRED" rollback-for="java.lang.Exception" />
			<tx:method name="add*" propagation="REQUIRED" rollback-for="java.lang.Exception" />
			<tx:method name="update*" propagation="REQUIRED" rollback-for="java.lang.Exception" />
			<tx:method name="insert*" propagation="REQUIRED" rollback-for="java.lang.Exception" />
			<tx:method name="delete*" propagation="REQUIRED" rollback-for="java.lang.Exception" />

			<!-- 一个事务涉及一个数据源不能在事务内部去切换数据源成功,所以对多数据源的方法暂不开启事务~分布式事务 -->
			<!-- <tx:method name="crud*" propagation="REQUIRED" rollback-for="java.lang.Exception" /> -->
			<!-- <tx:method name="*" /> -->
		</tx:attributes>
	</tx:advice>

7.动态切换数据源

新建

        package com.jeefw.core.persistence.datasource;

        import com.google.common.collect.Lists;
        import com.jeefw.core.persistence.annotation.MyBatisMapper;
        import com.jeefw.modules.monitor.mapper.ScheduleJobMapper;
        import org.apache.log4j.Logger;
        import org.aspectj.lang.JoinPoint;
        import org.aspectj.lang.annotation.Aspect;
        import org.aspectj.lang.annotation.Before;
        import org.aspectj.lang.annotation.Pointcut;
        import org.aspectj.lang.reflect.MethodSignature;
        import org.springframework.core.Ordered;
        import org.springframework.stereotype.Component;

        import java.lang.reflect.Method;
        import java.util.List;
        import java.util.Random;
        import java.util.regex.Matcher;
        import java.util.regex.Pattern;

        /**
         * 数据源自动切换AOP
         * <p>
         * 这里我们定义了一个 Aspect 类,我们使用 @Before 来在符合@Pointcut 中的方法被调用之前,
         * 调用 DataSourceTypeManager.set(DataSources.MASTER) 设置了 key 的类型为 DataSources.MASTER,
         * 所以 dataSource 会根据key=DataSources.MASTER 选择 dataSourceSlave 这个dataSource。
         * 所以该方法对于的sql语句会在master数据库上执行.
         * 我们可以不断的扩充 DataSourceInterceptor这个 Aspect,在中进行各种各样的定义,
         * 来为某个service的某个方法指定合适的数据源对应的dataSource。
         * 这样我们就可以使用 Spring AOP 的强大功能来,十分灵活进行配置了。
         */
        @Aspect
        @Component
        //@Order(-100) // execute before @Transactional   在执行sql前就切换数据源
        public class DataSourceInterceptor implements Ordered {
            private Logger logger=Logger.getLogger(getClass());
            private final static String SLAVE_KEY_WORD = "SLAVE";

            /**
             * 拦截目标方法,获取由@DataSource指定的数据源标识,设置到线程存储中以便切换数据源
             *
             * @throws Exception
             */
        //    @Pointcut("@target(com.jeefw.core.persistence.annotation.MyBatisMapper)")
        //    @Pointcut("@within(com.jeefw.core.persistence.annotation.MyBatisMapper)")
        //    @Pointcut("@annotation(com.jeefw.core.persistence.annotation.MyBatisMapper)")
            @Pointcut("execution(public * com.jeefw.modules..*Mapper.*(..))")  //此处为拦截Mapper包下的所有方法
            public void intercept() {

            }

            @Before("intercept()")
            public void before(JoinPoint point) {
                Class<?> target = point.getTarget().getClass();
                MethodSignature signature = (MethodSignature) point.getSignature();
                // 默认使用目标类型的注解,如果没有则使用其实现接口的注解
                for (Class<?> clazz : target.getInterfaces()) {
                    resolveDataSource(clazz, signature.getMethod());
                }
            }

            /**
             * 提取目标对象方法注解和类型注解中的数据源标识
             *
             * @param clazz
             * @param method
             */
            private void resolveDataSource(Class<?> clazz, Method method) {
                try {
                    // 默认使用类型注解
                    if (clazz.isAnnotationPresent(MyBatisMapper.class)) {
                        MyBatisMapper source = clazz.getAnnotation(MyBatisMapper.class);
                        DataSources ds = source.dataSource();
                        //定时任务ScheduleJobMapper不读写分离,在主库读写
                        if (ds == DataSources.MASTER && !clazz.getName().equals(ScheduleJobMapper.class.getName())) {//主库
                            //可在此处自己根据需求编写数据库自动切换的规则
                            Matcher matcher = Pattern.compile("^(find|get|execSelect)", Pattern.CASE_INSENSITIVE).matcher(method.getName());
                            if (matcher1.find()) {
                                //查询方法使用随机从库
                                DataSourceTypeManager.set(randomSlave());
                                return;
                            }
                        }
                        DataSourceTypeManager.clear();
                        DataSourceTypeManager.set(ds);
                        logger.warn("=============【多数据源】已切换至主库==============");
                    }

                } catch (Exception e) {
                    System.out.println(clazz + ":" + e.getMessage());
                    e.printStackTrace();
                }
            }

            /**
             * 随机从库
             *
             * @return
             */
            private DataSources randomSlave() {
                List<DataSources> slaves = Lists.newArrayList();
                for (DataSources ds : DataSources.values()) {
                    if (ds.toString().contains(SLAVE_KEY_WORD)) {
                        slaves.add(ds);
                    }
                }
                int r = new Random().nextInt(slaves.size());
                return slaves.get(r);
            }

            @Override
            public int getOrder() {
                return -100;
            }
        }

8. 配置主从数据库

主数据库my.ini:

[mysqld]
#禁用DNS解析,解决响应缓慢的问题
skip-name-resolve   
server-id=1
#开启GTID复制
gtid-mode = ON
enforce-gtid-consistency = ON
log-slave-updates = ON
master-verify-checksum = 1
log-bin=mysql-bin
log_bin_index = mysql-bin.index
binlog_format= mixed
max_binlog_size = 100M
binlog-checksum = CRC32

从数据库1my.ini:

[mysqld]
server-id=2
#开启GTID复制
gtid-mode = ON
enforce-gtid-consistency = ON
log-slave-updates = ON
skip-slave-start = true
slave-parallel-type = LOGICAL_CLOCK # slave-parallel-type有两个之,DATABASE和LOGICAL_CLOCK,DATABASE: 默认值,兼容5.6以schema维度的并行复制, LOGICAL_CLOCK: MySQL 5.7基于组提交的并行复制机制。
slave-parallel-workers = 8 #太多的线程会增加线程间同步的开销,建议4-8个slave线程
master_info_repository = TABLE
relay_log_info_repository = TABLE
relay_log_recovery = ON
#从库只读
read_only = ON
super_read_only = ON

从数据库2my.ini:

[mysqld]
server-id=3
#开启GTID复制
gtid-mode = ON
enforce-gtid-consistency = ON
log-slave-updates = ON
skip-slave-start = true
slave-parallel-type = LOGICAL_CLOCK # slave-parallel-type有两个之,DATABASE和LOGICAL_CLOCK,DATABASE: 默认值,兼容5.6以schema维度的并行复制, LOGICAL_CLOCK: MySQL 5.7基于组提交的并行复制机制。
slave-parallel-workers = 8 #太多的线程会增加线程间同步的开销,建议4-8个slave线程
master_info_repository = TABLE
relay_log_info_repository = TABLE
relay_log_recovery = ON
#从库只读
read_only = ON
super_read_only = ON

重启所有主从服务器: 进入主数据库

show master status;

mysql> show master status;

    +------------------+----------+--------------+------------------+-------------------+
    | File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
    +------------------+----------+--------------+------------------+-------------------+
    | mysql-bin.000001 |      629 |              |                  |                   |
    +------------------+----------+--------------+------------------+-------------------+
    1 row in set (0.00 sec)

每个从库执行

CHANGE MASTER TO master_host = '127.0.0.1',
 master_port = 3306,
 master_user = 'root',
 master_password = 'root',
 master_log_file = 'mysql-bin.000001',
 master_log_pos = 629;

START SLAVE;

SHOW SLAVE STATUS;

查看Slave_IO_Running和Slave_SQL_Running:Yes 则操作成功

MySQL同步故障:” Slave_SQL_Running:No” 解决办法

STOP SLAVE;

CHANGE MASTER TO master_auto_position = 0;

STOP SLAVE;

CHANGE MASTER TO master_host = '127.0.0.1',
 master_port = 3306,
 master_user = 'root',
 master_password = 'root',
 master_log_file = 'mysql-bin.000001',
 master_log_pos = 629;

START SLAVE;

SHOW SLAVE STATUS;
赞 赏