欢迎 云海归来博客网!

我的名片

网名:YunHaiGuiL

技能:Web前端、Java后端

现居:未知

Email:2405870111@qq.com

最新发布

基于单节点的redis分布式锁

单实例的分布式锁: - 加锁-设计思路是通过setnx命令(只有key不存中才可以加锁)给key设置一个value值(用UUID/雪花算法)并设置一个有效时间(避免死锁),注意value值必须是唯一的字符串(后期作为解锁条件之一的判断依据(自动失效也是一种)) - 在加锁的时候还需要设置一个加锁的超时时间,若超过这个时间则放弃获取锁 - 释放锁的时候通过UUID判断该线程的锁是不是该锁,若不是则放弃解锁,否则执行delete - 核心代码 ``` import redis.clients.jedis.Jedis; import util.JedisUtil; import java.util.UUID; public class LockUtil { public String LockTimeout(long lockTime, int lockExpire, String lockName) { Jedis jedis = null; try { jedis = new JedisUtil().getInstance(); String value; if (lockExpire <= 0 || lockTime <= 0) { return null; } value = UUID.randomUUID().toString(); //参试获取锁 Long islock = jedis.setnx(LockConst.LOCK_NAME_PRIFIX + lockName, value); if (islock > 0) { //成功获取锁后,加有效时间防止死锁 jedis.expire(LockConst.LOCK_NAME_PRIFIX + lockName, lockExpire); System.out.println("加锁成功"); return value; } else { //获取锁失败后,指定时间内重试 while (System.currentTimeMillis() < (lockTime + lockExpire)) { // System.out.println("在尝试获取锁!!"); islock = jedis.setnx(LockConst.LOCK_NAME_PRIFIX + lockName, value.toString()); if (islock > 0) { jedis.expire(LockConst.LOCK_NAME_PRIFIX + lockName, lockExpire); System.out.println("在尝试,加锁成功"); return value; } } return null; } } finally { if (jedis != null) { jedis.close(); } } } public boolean unLock(String locakName, String value) { Jedis jedis = new JedisUtil().getInstance(); if (value == null || locakName == null) { return false; } try { //value只根据解锁一致的则解锁 String val = jedis.get(LockConst.LOCK_NAME_PRIFIX + locakName); if (value.equals(val)) { jedis.del(LockConst.LOCK_NAME_PRIFIX + locakName); System.out.println("unlock成功"); return true; } System.out.println("unlock失败,该锁不是该线程上的"); return false; } catch (Exception e) { e.printStackTrace(); } finally { if (jedis != null) jedis.close(); } return false; } } ```

RocketMQ环境搭建以及基本消息生成和消费

# 消息队列(MQ) ### 1.作用 1. 削峰填谷 - 秒杀。可以将峰值的流量转移到其他地方(谷) - 主要解决瞬时压力大于应用服务能力导致消息丢失、系统奔溃等问题 2. 解耦合 - 生成者和消费者可以通过MQ解耦合 - 生成者-mq-消费者 3. 异步消息 - 生成者-mq-消费者 ### 2.主流MQ产品 1. 协议:AMQP(Advanced Message Queuing Protocol,高级消息队列协议)、XMPP、SMTP、STMP 2.主流产品: - ActiveMQ: apache,多种协议(AMQP,MQTT,OpenWire,Stomp),java,将数据直接持久化到数据库中,可靠性较低 - RabbitMQ:基于AMQP,运行Erlang语言,性能高,高并发,可靠性高 - kafaka: linkedin开源MQ,完全分布式架构,高吞吐量(单机10w/s),设计,是为了处理日志后来改为大数据的数据收集业务。 - RockeyMQ:阿里巴巴->Apache,双十一就采用这种MQ,java/c++/go语言,支持多协议(AMQP ,XMPP,STMP,STOP),分布式 易扩展,支持亿级消息堆积能力(单机1w以上持久化队列) - 其他MQ:Redis(也支持),ZeroMQ ### RockeyMQ - ##### 角色 - producer :生成者 - Consumer:消费者 - Push Consumer:Broker将消息 推给消费者 - Pull Consumer: 消费者请求Broker 将消息发送给我(拉) - Broker:MQ消息服务 - Producer Group:生成者集合 ### 搭建RoketMQ环境 - 需要linux环境下:centos7 :192.168.2.128 root/root - 下载MQ: https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.4.0/rocketmq-all-4.4.0-bin-release.zip - 上传到centos7中:cd /usr - 解压 :zip: unzip 文件名 ;tar: tar zxvf 文件名 - 配置: - nameserver:协调多个rocketmq - master:rocketmq主节点 - 域名映射:vi /etc/hosts 192.168.2.2.128 mqnameserver1 192.168.2.2.128 mqmaster1 存储路径: 1. 先进入mq: cd rocketmq ; 2. 然后创建文件夹:mkdir mqstore 3. mkdir mqstore /commitlog -提交日志 4. mkdir mqstore /consumequeue --消费者队列 5. mkdir mqstore /index -索引 配置消息队列:broker 路径:cd /usr/rocketmq/conf 2m-2s-async :2m 两个master ,2s两个slaver,async:异步 选择一个进入 cd 2m-2s-async ![](http://www.yunhaiguil.wang:80/static/HeadImages/2019-11-21fa49fc4a-8816-49cd-99ab-f223f07599b9.png) 目前单机就选择一个进入 vi broker-a.properties broker-a.properties: brokerId: 0表示master >0表示slaver 在broker-a.properties:配置如下 ``` brokerClusterName=DefaultCluster brokerName=broker-a brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH nameserverAddr=mqnameserver1:9876 defaultTopicQueueNums=4 autoCreateTopicEnable=true listenPort=10911 deleteWhen=04 fileReservedTime=48 #以下是配置的/usr/rocketmq/mqstore storePathRootDir=/usr/rocketmq/mqstore storePathCommitLog=/usr/rocketmq/mqstore/commitlog storePathConsuQueue=/usr/rocketmq/mqstore/consumequeue storePathIndex=/usr/rocketmq/mqstore/index maxMessageSize=65536 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH ``` 配置日志:创建日志文件: mkdir /usr/rocketmq/logs ​ 路径: /usr/rocketmq/logs ![image-20191030181428013](C:\Users\WJ\AppData\Roaming\Typora\typora-user-images\image-20191030181428013.png) 把xml中${user.home}替换为/usr/rocketmq/logs 使用linux命令一次性修改: sed -i 's#${user.home}#/usr/rocketmq/#g' *.xml 修改启动参数: 将bin/runbroker.sh runserver.sh:改为1g JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" 启动mq: 在bin目录 nohup sh mqnamesrv & --后台启动 //这些bin 目录下的文件 ./filename 输入: jps 查看进程:NamesrvStartup 说明启动成功 启动BrokerServer: nohup sh mqbroker -c /usr/rocketmq/conf/2m-2s-async/broker-a.properties& 查看进程出现: BrokerStartup 说明启动成功 - cat nohup.out ### 控制台(web界面) - 下载 https://github.com/apache/rocketmq-externals - 解压缩 - 导入工程 - 把springboot中配置文件的rocketmq.config.namesrvAddr=192.168.2.128:9876 - 修改工程maven改为自己的maven不用系统的maven ,此外可以把maven的镜像改为阿里云镜像 - 进入web界面 http://localhost:7111/#/ #### 创建MQ工程 Maven 批量删除下载失败的:for /r %i in (*.lastUpdated) do del %i MQ集群: 将master关闭:在bin目录里: 关闭mq : ./mqshutdown broker 关闭服务器server: ./mqshutdown namesrv ​ 主从同步: 之前已经搭建了master,配置slaver: ``` 先配域名映射:vi /etc/hosts 192.168.2.2.128 mqnameserver1 192.168.2.2.128 mqmaster1 //通过两个nameserver防止 单点故障 //下面这两个也有在master里面添加下面的映射来告诉master和nameserver1:129是master的从节点,已经另外一台nameserver 192.168.2.2.129 mqnameserver1 192.168.2.2.129 mqmaster1slaver1 将master节点上的 mq远程复制到slaver节点上: 命令: 进入的rocketmq目录: scp -r rocketmq/ root@192.168.2.129/usr/ ``` 修改master配置消息队列:broker 路径:cd /usr/rocketmq/conf 2m-2s-async :2m 两个master ,2s两个slaver,async:异步 选择一个进入 cd 2m-2s-async ![](http://www.yunhaiguil.wang:80/static/HeadImages/2019-11-21c4be382b-37ab-4615-a47a-ee69d832c13d.png) 选择一个进入 vi broker-a.properties broker-a.properties: brokerId: 0表示master >0表示slaver 在broker-a.properties:配置如下 ``` brokerClusterName=DefaultCluster brokerName=broker-a #brokerId:0表示master >0表示slaver brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH #添加nameserver2的地址 namesrvAddr=mqnameserver1:9876;mqnameserver2:9876 defaultTopicQueueNums=4 autoCreateTopicEnable=true listenPort=10911 deleteWhen=04 fileReservedTime=48 #以下是配置的/usr/rocketmq/mqstore storePathRootDir=/usr/rocketmq/mqstore storePathCommitLog=/usr/rocketmq/mqstore/commitlog storePathConsuQueue=/usr/rocketmq/mqstore/consumequeue storePathIndex=/usr/rocketmq/mqstore/index maxMessageSize=65536 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH ``` 再修改master中的broker-a-s.properties: 先进入 vi broker-a-s.properties 修改: ``` brokerClusterName=DefaultCluster brokerName=broker-a brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH namesrvAddr=mqnameserver1:9876;mqnameserver2:9876 defaultTopicQueueNums=4 autoCreateTopicEnable=true listenPort=10911 deleteWhen=04 fileReservedTime=48 #以下是配置的/usr/rocketmq/mqstore storePathRootDir=/usr/rocketmq/mqstore storePathCommitLog=/usr/rocketmq/mqstore/commitlog storePathConsuQueue=/usr/rocketmq/mqstore/consumequeue storePathIndex=/usr/rocketmq/mqstore/index maxMessageSize=65536 flushDiskType=ASYNC_FLUSH ``` 再在把master节点上 broke-a.properties 和 broker-a-s.properties的文件拷贝到slaver中: scp broker-a.properties root@192.168.2.129:/usr/rocketmq/conf/2m-2s-async scp broker-a-s.properties root@192.168.2.129:/usr/rocketmq/conf/2m-2s-async 为什么master节点配了还有在slaver中配 : 防止某一个挂掉了,另外一个可以直接启动不用重新配 master:主配置文件、从配置文件 slaver:主配置文件、从配置文件 启动master-slaver: 先启动master-128: 启动nameserv: 在bin中:nohup sh mqnamesrv & (在后台启动) 显式启动: ./mqnamesrv 启动BrokerServer: ​ nohup sh mqbroker -c /usr/rocketmq/conf/2m-2s-async/broker-a.properties & ​ 再启动slaver-129: 启动nameserv: 在bin中:nohup sh mqnamesrv & (在后台启动) 显式启动: ./mqnamesrv 启动BrokerServer: ​ nohup sh mqbroker -c /usr/rocketmq/conf/2m-2s-async/broker-a-s.properties & 修改项目的nameserver地址:rocketmq.config.namesrvAddr=192.168.2.128:9876;192.168.2.129:9876 然后启动可以看到: ![](http://www.yunhaiguil.wang:80/static/HeadImages/2019-11-2193ac0775-fcff-4e29-8496-fbfd04ef03d4.png) 说明搭建成功了; 验证集群: 关闭master 模拟节点宕机 ,看是否能从slaver中取数据 一般:向Master中写数据,从slaver中读数据 杀掉kill-9 进程号 使用消费者消费运行程序读取正常 为了方便可以把MQ控制台打包成jar :mvn clear package -Dmaven.test.skip=true 执行: java -jar jar名 #### 发送同步异步和单向消息、 看着官方文档例子进行修改即可 http://rocketmq.apache.org/docs/simple-example/ - 同步: ``` //发送同步消息 // SendResult send =producer.send(message); // System.out.println("发送成功:"+send); ``` - 异步 ``` //发送异步消息之后: 有两条线程:a.Main线程,发送完毕,立刻执行后面的程序; //b.处理消息的线程,并在处理完毕后,触发回调函数onSuccess(); // producer.send(message, new SendCallback() { // @Override // public void onSuccess(SendResult sendResult) { // System.out.println("发送成功:"+sendResult); // } // // @Override // public void onException(Throwable throwable) { // // } // }); ``` - 单向 ``` producer.sendOneway(message);//只发送消息,不接受返回值,不可靠的消息;不是特别重要的日志; ``` #### Push Consumer消费者模式 默认集群模式:(每个消费者都可以得到部分消息数据) ``` consumer.setMessageModel(MessageModel.CLUSTERING); ``` 搭建消费者集群:只需要将消费者的groupName设置相同即可 延迟: 不支持任意的时间精度,只支持以下几种 默认配置,在/usr/rocketmq/conf/broker.conf中配置 messageDelayLevel= 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 定义:生成者立刻将消息发送到队列,在队列中停留5s后发给消费者; ​ 延迟时间=消费者获取消息的时间-去消息入队的时间 #### 广播模式 特点:最大的不同,将全部消息内容,给每个消费者各一份(每个消费者,拥有一套网站的消息数据) 只需要将消费者中设置: ``` consumer.setMessageModel(MessageModel.BROADCASTING); ``` 设置订阅标签 ``` consumer.subscribe("Topic","tag1");订阅tag1 consumer.subscribe("Topic","*");//订阅所有 consumer.subscribe("Topic","tag1||tag2");//订阅消息1/2 ``` #### 偏移量Offset offset是消费的进度,指向某个topic 中 下一条数据在队列中存放的位置 offset存储在哪里?本地/远程/ - 集群模式:(远程存储)负载均衡,每个消费者 只消费一部分数据,因为各个消费者 需要通过一个共同的偏移量offset来指定每次消费的位置,因此此时offset就需要存放在远程(队列服务器中) - 广播模式:(本地存储)因为每个消费者都有一套完整消息数据,因此每个消费者就需要使用一个指针offset(偏移量)来维护当前消费的位置 - API层面: - 远程存储:org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore - 本地存储:org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore - 二者共同父类:OffsetStore #### Pull Consumer消费者模式 ![](http://www.yunhaiguil.wang:80/static/HeadImages/2019-11-217f98a14a-ba05-4e37-bfe0-a9809a95c224.png) idea中shif+f6批量改名字 pull方法一 ``` private static HashMap<MessageQueue,Long>offset=new HashMap<>(); public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { String groupName="pullGroup"; DefaultMQPullConsumer consumer=new DefaultMQPullConsumer(groupName); consumer.setNamesrvAddr(ConstT.NAMESERVER_ADDR); try { consumer.start(); System.out.println("pull start ..."); } catch (MQClientException e) { e.printStackTrace(); } Set<MessageQueue>mqs=consumer.fetchSubscribeMessageQueues("mytopic1"); for (MessageQueue mq:mqs) { //分别获取每条mq中的数据 while (true){ //pullBlockIfNotFound 1队列 2null 3偏移量 4每次最多pull消息的个数 PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getOffset(mq), 2); System.out.println(pullResult); //当消费pull了若干个消息之后,需要重新设置偏移量 setOffset(mq,pullResult.getNextBeginOffset()); if (pullResult.getPullStatus()== PullStatus.FOUND){ List<MessageExt>messageExts=pullResult.getMsgFoundList(); for (MessageExt m :messageExts) { System.out.println(m); } }else if (pullResult.getPullStatus()== PullStatus.NO_MATCHED_MSG){ break; }else{ System.out.println("error"); } } } consumer.shutdown(); } private static void setOffset(MessageQueue mq, long mqoffset){ offset.put(mq,mqoffset); } //获取某个mq的偏移量 private static long getOffset(MessageQueue mq){ return offset.get(mq)==null?0:offset.get(mq); } ``` pull 方法二(自己设置调度时间) ``` public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { String groupName="pullGroup"; MQPullConsumerScheduleService consumerservice=new MQPullConsumerScheduleService(groupName); consumerservice.getDefaultMQPullConsumer().setNamesrvAddr(ConstT.NAMESERVER_ADDR); consumerservice.setMessageModel(MessageModel.CLUSTERING); consumerservice.registerPullTaskCallback("mytopic1", new PullTaskCallback() { @Override public void doPullTask(MessageQueue messageQueue, PullTaskContext pullTaskContext) { System.out.println("---"+messageQueue.getQueueId()); MQPullConsumer consumer = pullTaskContext.getPullConsumer(); try { long offset=consumer.fetchConsumeOffset(messageQueue,false); System.out.println("offset="+offset); PullResult pullResult= consumer.pull(messageQueue,"*",offset,2); System.out.println(pullResult); //当消费pull了若干个消息之后,需要重新设置偏移量 if (pullResult.getPullStatus()== PullStatus.FOUND){ List<MessageExt>messageExts=pullResult.getMsgFoundList(); for (MessageExt m :messageExts) { System.out.println(m); } }else{ System.out.println("error"); } consumer.updateConsumeOffset(messageQueue,pullResult.getNextBeginOffset()); pullTaskContext.setPullNextDelayTimeMillis(3000); } catch (MQClientException e) { e.printStackTrace(); }catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }); try { consumerservice.start(); System.out.println("pull start ..."); } catch (MQClientException e) { e.printStackTrace(); } // consumerservice.shutdown(); } ``` #### 消息局部顺序消费问题 生成者: ``` public class MyProducerOrder { public static final String NAMESERVER_ADDR="192.168.2.128:9876"; public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { //创建生成者 DefaultMQProducer producer=new DefaultMQProducer("myProducer"); producer.setNamesrvAddr(NAMESERVER_ADDR); try { producer.start(); //启动生产者 } catch (MQClientException e) { e.printStackTrace(); } //准备消息 List<RequestInfo>requests= RequestService.getRequest(); for (RequestInfo requestInfo:requests){ Message message=new Message("requestTopic","request",requestInfo.getRequestId()+"",requestInfo.getRequestDesc().getBytes()); //send(final Message msg, final MessageQueueSelector selector, final Object arg) //selector(final List<MessageQueue> mqs, final Message msg, final Object arg); producer.send(message, (mqs,msg,id)->{ long rId=(long)id; long index=rId%mqs.size(); return mqs.get((int) index); }, requestInfo.getRequestId()); System.out.println(message); } System.out.println("发送成功"); producer.shutdown(); } } ``` 消费者: ``` 基础类: public class RequestInfo { private long requestId; private String requestDesc; @Override public String toString() { return "RequestInfo{" + "requestId=" + requestId + ", requestDesc='" + requestDesc + '\'' + '}'; } public RequestInfo() { } public RequestInfo(long requestId, String requestDesc) { this.requestId = requestId; this.requestDesc = requestDesc; } public long getRequestId() { return requestId; } public void setRequestId(long requestId) { this.requestId = requestId; } public String getRequestDesc() { return requestDesc; } public void setRequestDesc(String requestDesc) { this.requestDesc = requestDesc; } } public class RequestService { public static List<RequestInfo>request; static{ request=new ArrayList<>(); RequestInfo requestzsXd=new RequestInfo(1,"下单"); RequestInfo requestzsZf=new RequestInfo(1,"支付"); RequestInfo requestzsTj=new RequestInfo(1,"推荐"); RequestInfo requestlsXd=new RequestInfo(2,"下单"); RequestInfo requestlsZf=new RequestInfo(2,"支付"); RequestInfo requestlsTj=new RequestInfo(2,"推荐"); RequestInfo requestwwXd=new RequestInfo(3,"下单"); RequestInfo requestwwZf=new RequestInfo(3,"支付"); RequestInfo requestwwTj=new RequestInfo(3,"推荐"); request.add(requestzsXd); request.add(requestzsZf); request.add(requestzsTj); request.add(requestlsXd); request.add(requestlsZf); request.add(requestlsTj); request.add(requestwwXd); request.add(requestwwZf); request.add(requestwwTj); } public static List<RequestInfo> getRequest() { return request; } public static void setRequest(List<RequestInfo> request) { RequestService.request = request; } } //生成者: public class MyConsumerOrder { public static void main(String[] args) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group1"); consumer.setNamesrvAddr(ConstT.NAMESERVER_ADDR); try { consumer.subscribe("requestTopic","*"); //设置监听器 :当生成者有消息时将消息推送给消费者 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt m:msgs) { System.out.println(Thread.currentThread().getName()+"-"+new String(m.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println(); } catch (MQClientException e) { e.printStackTrace(); } } } ``` #### 生成者批量发送 ``` public static final String NAMESERVER_ADDR="192.168.2.128:9876"; public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { //创建生成者 DefaultMQProducer producer=new DefaultMQProducer("myProducer"); producer.setNamesrvAddr(NAMESERVER_ADDR); try { producer.start(); //启动生产者 } catch (MQClientException e) { e.printStackTrace(); } //准备消息 List<RequestInfo> requests= RequestService.getRequest(); List<Message>mgs=new ArrayList<>(); for (RequestInfo requestInfo:requests){ Message message=new Message("requestTopic","request",requestInfo.getRequestId()+"",requestInfo.getRequestDesc().getBytes()); //send(final Message msg, final MessageQueueSelector selector, final Object arg) //selector(final List<MessageQueue> mqs, final Message msg, final Object arg); mgs.add(message); System.out.println(message); } SendResult send = producer.send(mgs); System.out.println("发送成功"+send); producer.shutdown(); } ``` #### 消息的事务 <img src="C:\Users\WJ\AppData\Roaming\Typora\typora-user-images\image-20191102170539051.png" alt="image-20191102170539051" style="zoom:50%;" /> 事务补偿机制: 生产者再向MQ Server提交事务时,有两次机会,如果第一次失败,则进行回查,回查后还会进行第二次提交/回滚。 在MQserver中消息由三种状态: 提交状态:可以被消费者消费 回滚状态:不可以被消费者消费 未知状态(中间状态):需要借助本地事务,进一步回查,事务补偿机制 ``` public class MyProducerTransction { public static final String NAMESERVER_ADDR="192.168.2.128:9876"; public static void main(String[] args) { //创建生成者(支持事务) TransactionMQProducer producer=new TransactionMQProducer("myproducer"); producer.setNamesrvAddr(NAMESERVER_ADDR); producer.setTransactionListener(new TransactionListener() { //将MQServer的响应状态写入到本地事务中 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { //将消息的发送状态(commit,rollback,未知),写入本地事务 if (msg.getTags().equals("msg0")){ return LocalTransactionState.COMMIT_MESSAGE; }else if(msg.getTags().equals("msg1")){ return LocalTransactionState.ROLLBACK_MESSAGE; }else { return LocalTransactionState.UNKNOW; } } //事务在执行补偿机制时的检测本地事务 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println("事务的会查机制"+msg.getTags()); return LocalTransactionState.COMMIT_MESSAGE; } }); try { producer.start();//启动生成者 } catch (MQClientException e) { e.printStackTrace(); } //发送消息 for (int i=0;i<3;i++) { Message msg = new Message("txTopi", "msg" + i, ("msg" + i + "Content").getBytes()); try { TransactionSendResult result = producer.sendMessageInTransaction(msg,null ); System.out.println("发送:"+result); } catch (MQClientException e) { e.printStackTrace(); } } // producer.shutdown(); } } ``` 延迟消息、批量消息不支持事务机制 #### 概念 如何将mq中内存的数据持久化到盘: 同步刷盘、异步刷盘 Master-Slaver之间的复制方式:同步复制、异步复制(全局配置文件 异步:响应速度快 同步:保证数据的一致性 RocketMQ中的nameserver能否用zookeeper等协调框架取代? 可以!不过 rocketmq来说 协调功能的需求 比较简单,nameserver完全能够满足,没有必须使用 相对重量级的zookeeper。 nameserver来说,如果有多个nameserver集群,他们之间彼此独立,没有master\slaver之分,因此不需选择机制或其他 协调框架的高级功能。 nameserver需要完成的大部分任务: topicQueue、brokerAddress、brokerLive、clusterAddress 配置刷盘方式、复制方式:broker.conf,各个具体的配置文件中conf/2m-2s-async/Xxx.properties

方法重载-静态类型作为重载的判断依据

`/** * 面向对象的特征:封装、继承、多态 * 而多态主要体现在 "重载" Overload、"重写"Override */ abstract class Person{}; class Man extends Person{ }; class Woman extends Person{}; public class FenPai { //静态分配 方法重载 public void sayHello(Person person){ System.out.println("hello person"); } public void sayHello(Man man){ System.out.println("hello man"); } public void sayHello(Woman woman){ System.out.println("hello woman"); } ` public static void main(String[] args) { Person man=new Man(); Person woman=new Woman(); FenPai fenPai=new FenPai(); fenPai.sayHello(man); fenPai.sayHello(woman); /** * 运行结果: * hello person * hello person */ /** * 分析原因: * Person p = new Man() * 静态类型:是指在编译期间就可知的类型(Person) * 实际类型:是指在运行期间才可以确定的类型(new Man()) * 区别:静态和实际类型在程序中都可以发生一些变化, * 但二者区别在于静态类型的变化仅仅在使用时发生,变量本身的静态类型不会改变,并且最终的静态类型是在编译期间可知。 * 实际类型变化的结果是在运行期间才可以确定。 * 首先调用sayHello,使用那个重载版本,完全取决于参数的数量和参数类型 * 上面的例子特点:是静态类型相同但实际类型不同 * 又因为虚拟机在重载时,通过参数的静态类型决定的而不是实际类型 * 因此结果输出都是 "hello person" */ } }

高可用高性能MySQL架构学习

### mysql数据库架构 结构图:![](http://www.yunhaiguil.wang:80/static/HeadImages/2019-10-30b88d4bfb-ace3-497e-a7e3-d218ad13ebad.png) ### MyCat安装 ###### 基于主从同步mycat的作用: 操作Mycat:和sql92基本一致;端口号8066 /9066 ​ SQL92:语法严格的SQL:insert into student (id,name) values(1,'zs'); SQL99:语法宽松:insert into student values(1,'zs'); 1. ###### 分库分表: - 水平拆分:用户数据(用户数据1、用户数据2、用户数据3)【mycat主要】 - 垂直拆分:系统 (用户数据库,订单数据库)【mycat次要】 2. ###### 读写分离: 读-->服务器节点(数据库) ​ 写-->服务器节点(数据库) 下载mycat: Mycat-server-1.6.6.1-release-20181031195535-linux.tar.gz 安装mycat: ​ 上传mycat到一个新的服务器上bigdata02;cd /usr/local/ 中 ​ 解压缩: tar -zxvf 配置mycat: (cd /usr/local/mycat/conf) ​ 主要是 rule.xml、schema.xml、server.xml 进入server : vi server.xml; server.xml: 在<user >中配置mycat登录的name和密码 520WANGjian ​ schemas:mydb--逻辑库 <user name="root" defaultAccount="true"><!--主要修改--> <property name="password">520WANGjian</property><!--主要修改--> <!-- 逻辑库多个用,隔开 --> <property name="schemas">myDB</property> <!-- 表级 DML 权限设置 --> <!-- <privileges check="false"> <schema name="TESTDB" dml="0110" > <table name="tb01" dml="0000"></table> <table name="tb02" dml="1111"></table> </schema> </privileges> --> </user> <user name="user"> <property name="password">user</property> <property name="schemas">TESTDB</property> <property name="readOnly">true</property> </user> 修改schema.xml(读写分离,分库分表都在这里配): ​ 先进入schema.xml : cd /usr/local/mycat/conf/ vi schema.xml dataHost balance: 读请求的负载均衡 ​ 0:不开启读写分离,所以的读操作都发送writehost中 ​ 1:全部的readlyhost 和 stand by writehost(待命的写节点) 都参与 读操作的负载均衡(除了被激活的写节点) ​ 2:全部读请求 随机 发送给readhost、 writehost ​ 3: 读请求随机发送给writehost中的readhos (writehost不参与读操作) writeType:写请求的负载均衡 ​ 0:写请求先发送给schema.xml中的第一个writehos,当第一个writehost挂掉,再自动切换到其他的writehost中。切换的记录,会被记录在一个日志文件中。(conf/dnindex.projperties中) ​ 1:写请求随机发送给所有的writehost中 switchType:是否允许 “读操作” 在readhost 和writehost中自动切换(解决延迟问题:当从readhost中读取数据中出现网络延迟等问题时,自动从writehost中读取数据) ​ -1:不允许切换 ​ 1:默认值 允许切换 ​ 2:根据“主从同步的状态” 自动选择是否 切换。 ​ 主从之间 会持续发送心跳。 当心跳检测机制发送IO延迟,则readhost自动切换writehost, 否则不切换(推荐) 。 必须将心跳设置为sql语句 show slave status (否则没法知道slave是否io延迟) 修改: ``` <!--name该为逻辑数据库名字--> <schema name="myDB" checkSQLschema="false" sqlMaxLimit="100"> <!-- auto sharding by id (long) name分表名 datanode:拆分后的数据切片的位置dn1 dn2 role:拆分规则myrule() --> <table name="student" dataNode="dn1,dn2" rule="myrule" /> .. ... <!--分成两个库节点 --> <dataNode name="dn1" dataHost="localhost1" database="mydb01" /> <dataNode name="dn2" dataHost="localhost1" database="mydb02" /> <!--配置节点1 --> <dataHost name="localhost1" maxCon="1000" minCon="10" balance="3" writeType="0" dbType="mysql" dbDriver="native" switchType="2" slaveThreshold="100"> <heartbeat>show slave status</heartbeat> <!-- can have multi write hosts master-slave 读写分离的写在主从同步的master 读在slave --> <writeHost host="hostM1" url="192.168.2.2:3306" user="root" password="520WANGjian"> <!-- can have multi read hosts --> <readHost host="hostS2" url="192.168.2.128:3306" user="root" password="520WANGjian" /> </writeHost> <writeHost host="hostS1" url="localhost:3316" user="root" password="123456" /> <!-- <writeHost host="hostM2" url="localhost:3316" user="root" password="123456"/> --> </dataHost> ``` rule.xml:配置分库分表的规则 修改:选择算法 ``` <!--平均拆分 name改为 myrule--> <tableRule name="myrule"> <rule> <columns>id</columns> <algorithm>mod-long</algorithm> </rule> </tableRule> <!--平均分算法--> <function name="mod-long" class="io.mycat.route.function.PartitionByMod"> <!-- how many data nodes 拆分后的切片个数--> <property name="count">2</property> </function> ``` 数据准备: 在主机和从机创建数据库:mydb01 mydb02 create database mydb01; create database mydb02; 由于前面配置了主从同步,所以在主机中创建就会同步到从机,前提是把主机从机中my.cnf中指定同步数据库去掉 创建表:用学生表 : create table student (id int(4),name varchar(20)); 验证读写分离: ​ 写: bigdata[windows](master) 验证逻辑:向mycat周插入数据-->是否会自动写入的writehost(bigdata-windows) 具体步骤:登录mycat: ​ 先执行以下命令: cd /usr/local/mycat /bin 1. ​ 进入mycat的bin目录: cd bin 2. ​ 查看bin目录:ls 3. ​ 进入bin中的mycat目录: cd .. 4. ​ 开启mycat: bin/mycat start (关闭: bin/mycat stop ) 出现了运行几秒就关闭的情况 : 输入 tail -f /usr/local/mycat/logs/wrapper.log 查看日志: > ``` > INFO | jvm 1 | 2019/10/29 18:32:33 | at org.tanukisoftware.wrapper.WrapperSimpleApp.run(WrapperSimpleApp.java:240) > INFO | jvm 1 | 2019/10/29 18:32:33 | at java.lang.Thread.run(Thread.java:748) > INFO | jvm 1 | 2019/10/29 18:32:33 | Caused by: io.mycat.config.util.ConfigException: SelfCheck### schema TESTDB refered by user user is not exist! > INFO | jvm 1 | 2019/10/29 18:32:33 | at io.mycat.config.ConfigInitializer.selfChecking0(ConfigInitializer.java:142) > INFO | jvm 1 | 2019/10/29 18:32:33 | at io.mycat.config.ConfigInitializer.<init>(ConfigInitializer.java:118) > INFO | jvm 1 | 2019/10/29 18:32:33 | at io.mycat.config.MycatConfig.<init>(MycatConfig.java:72) > INFO | jvm 1 | 2019/10/29 18:32:33 | at io.mycat.MycatServer.<init>(MycatServer.java:158) > INFO | jvm 1 | 2019/10/29 18:32:33 | at io.mycat.MycatServer.<clinit>(MycatServer.java:101) > INFO | jvm 1 | 2019/10/29 18:32:33 | ... 7 more > STATUS | wrapper | 2019/10/29 18:32:36 | <-- Wrapper Stopped > ``` 可以看到配置异常 应该就是server.xml中有user 配置且user中配置的testdb在schema.xml中不存在 解决方法在server.xml中删除user配置就可以正常运行; 5. 查看是否开启:bin/mycat status 登录:mysql -uroot -p pw: 520WANGjian 提示:不能直接在mycat的节点登录mysql(需要安装mysql)或者远程登录 通过主机mysql远程登录mycat : mysql -uroot -p520WANGjian -h192.168.2.129 -P8066 登录成功后 show databases 查看是否有逻辑库 myDB ![](http://www.yunhaiguil.wang:80/static/HeadImages/2019-10-30a76faa28-c969-4a21-ac7a-3300a0f23cbf.png) ​ 然后通过mycat插入数据: ​ 先查看是否有student表 <img src="C:\Users\WJ\AppData\Roaming\Typora\typora-user-images\image-20191029110524761.png" alt="image-20191029110524761" style="zoom:50%;" /> 插入数据: insert into student(id,name) values(1,'zs'); 出现异常查看日志:tail -f /usr/local/mycat/logs/mycat.log 去解决(一般是授权问题或者schema.xml中的问题)可以重启虚拟机再查bug; insert into student(id,name) values(1,'zs'); ![](http://www.yunhaiguil.wang:80/static/HeadImages/2019-10-3084485776-752b-4dae-9e7c-72c366d56ff1.png) 成功插入; 在主机中查看到底mycat把第一条数据写在那个库里面: ![](http://www.yunhaiguil.wang:80/static/HeadImages/2019-10-300a2e0395-e482-443a-be18-9fc54bb2ec01.png) 第一条数据插入在mydb02 在插入一条:insert into student(id,name) values(2,'ls'); 再查询: ![](http://www.yunhaiguil.wang:80/static/HeadImages/2019-10-302463852a-f4c1-480e-aae1-7ff6b5d7ae75.png) 发现写在了mydb01里面; 可以看出已经实现了分库写入 ;由于采用的是平均分算法使用两个库个一般; 验证:从master中查看数据是否写入 ​ 读:bigdata1[linux](slave) ​ 验证读: 在从库里mydb01插入一条数据: insert into student values(3,'ww'); ​ 然后在mycat里面查看是否能查到,因为读写分离 ,配置是读库是 bigdata1中如果查出来证明读写已经分离: ![](http://www.yunhaiguil.wang:80/static/HeadImages/2019-10-3061e8cf84-2201-47de-8d5d-3b869388af00.png) 可以看到读取到了从库添加的数据和主库插入的数据(注:主从同步,只能主同步到从,从不能同步到主) 常见bug : Invalid DataSource:0(亲测搞了很久,最终查日志 修改重启才可以):防火墙没关、mysql数据库权限问题(可以主从交换远程登录测试是否有权限操作数据库)、ip、端口、 日志: mycat/log:执行操作错误日志 tail -f /usr/local/mycat/logs/mycat.log ​ wrapper.log:启动错误日志 tail -f /usr/local/mycat/logs/wrapper.log ###### mysql放权: ```数据库 use mysql ; update user set `Select_priv` = 'Y', `Insert_priv` = 'Y',`Update_priv` = 'Y',`Delete_priv` = 'Y', `Create_priv` = 'Y',`Drop_priv` = 'Y',`Reload_priv` = 'Y',`Shutdown_priv` = 'Y', `Process_priv` = 'Y',`File_priv` = 'Y',`Grant_priv` = 'Y',`References_priv` = 'Y', `Index_priv` = 'Y',`Alter_priv` = 'Y',`Show_db_priv` = 'Y',`Super_priv` = 'Y', `Create_tmp_table_priv` = 'Y',`Lock_tables_priv` = 'Y',`Execute_priv` = 'Y', `Repl_slave_priv` = 'Y',`Repl_client_priv` = 'Y',`Create_view_priv` = 'Y', `Show_view_priv` = 'Y',`Create_routine_priv` = 'Y',`Alter_routine_priv` = 'Y', `Create_user_priv` = 'Y',`Event_priv` = 'Y',`Trigger_priv` = 'Y',`Create_tablespace_priv` = 'Y' where user="root" ; flush privileges; ``` 再配置一台mycat防止宕机:启动bigdata03 再次搭建mycathe bigdata02组成集群 然后启动bigdata02/03上的mycat ##### haproxy 再在bigdata04安装haproxy,用于整合(2个)多个mycat 在线安装haproxy: 1. 查看可以的haproxy版本:yum list | grep haproxy 版本: haproxy.x86_64 2. 在线下载:yum -y install haproxy.x86_64 3. 设置:用户名默认haproxy : chown -R haproxy:haproxy /etc/haproxy/ 4. 修改配置文件(日志): vi /etc/rsyslog.conf ``` # Provides UDP syslog reception #$ModLoad imudp #$UDPServerRun 514 打两句话打开即可: # Provides UDP syslog reception $ModLoad imudp $UDPServerRun 514 #设置日志文件的路径: # Save boot messages also to boot.log local7.* /var/log/haproxy.log 重启日志服务: systemctl restart rsyslog.service ``` 5. 配置文件(haproxy) 1. yum安装后的配置文件默认在etc里面 2. vi /etc/haproxy/haproxy.cfg(里面的日志文件名 和上面配置的日志保持一致) ``` #--------------------------------------------------------------------- # Example configuration for a possible web application. See the # full configuration options online. # # http://haproxy.1wt.eu/download/1.4/doc/configuration.txt # #--------------------------------------------------------------------- #--------------------------------------------------------------------- # Global settings #--------------------------------------------------------------------- global # to have these messages end up in /var/log/haproxy.log you will # need to: # # 1) configure syslog to accept network log events. This is done # by adding the '-r' option to the SYSLOGD_OPTIONS in # /etc/sysconfig/syslog # # 2) configure local2 events to go to the /var/log/haproxy.log # file. A line like the following can be added to # /etc/sysconfig/syslog # # local2.* /var/log/haproxy.log # log 127.0.0.1 local2 chroot /var/lib/haproxy pidfile /var/run/haproxy.pid maxconn 4000 user haproxy group haproxy daemon # turn on stats unix socket stats socket /var/lib/haproxy/stats #--------------------------------------------------------------------- # common defaults that all the 'listen' and 'backend' sections will # use if not designated in their block #--------------------------------------------------------------------- defaults mode tcp log global option httplog option dontlognull option http-server-close option forwardfor except 127.0.0.0/8 option redispatch retries 3 timeout http-request 10s timeout queue 1m timeout connect 10s timeout client 1m timeout server 1m timeout http-keep-alive 10s timeout check 10s maxconn 3000 #--------------------------------------------------------------------- # main frontend which proxys to the backends #--------------------------------------------------------------------- frontend mycat bind 0.0.0.0:8066 bind 0.0.0.0:9066 mode tcp log global default_backend mycat_server #acl url_static path_beg -i /static /images /javascript /stylesheets # acl url_static path_end -i .jpg .gif .png .css .js # use_backend static if url_static # default_backend app #--------------------------------------------------------------------- # static backend for serving up images, stylesheets and such #--------------------------------------------------------------------- backend mycat_server balance roundrobin server mycat1 192.168.2.129:8066 check inter 5s rise 2 fall 3 server mycat2 192.168.2.130:8066 check inter 5s rise 2 fall 3 server mycatadmin1 192.168.2.129:9066 check inter 5s rise 2 fall 3 server mycatadmin2 192.168.2.130:9066 check inter 5s rise 2 fall 3 listen stats mode http bind 0.0.0.0:5000 stats enable stats hide-version stats uri /haproxy stats realm Haproxy \ Statistics #--账号密码 stats auth admin:admin stats admin if TRUE #--------------------------------------------------------------------- # round robin balancing between the various backends #--------------------------------------------------------------------- ``` 访问web的界面: /haproxy 启动并使用haproxy: 启动:systemctl start haproxy.service systemctl status haproxy.service 异常: haproxy.service: main process exited, code=exited, status=1/FAILURE cannot bind sockey:网络阻塞 解决:权限问题:setenforce 0 重新执行:systemctl start haproxy.service systemctl status haproxy.service <img src="C:\Users\WJ\AppData\Roaming\Typora\typora-user-images\image-20191029184333747.png" alt="image-20191029184333747" style="zoom:50%;" /> 启动成功后可以通过web来访问: http://192.168.2.131:5000/haproxy 可以看到你配置的mycat: ![](http://www.yunhaiguil.wang:80/static/HeadImages/2019-10-309f0fdb3f-956d-4e6d-a8a0-87999f9640fd.png) 如果status是down说明你的mycat已经离线 以上已经完成haproxy 配置 根据以上方法在配置一台 启动成功后可以通过web来访问: http://192.168.2.132:5000/haproxy 可以看到你配置的mycat: haproxy重启后 setenforce 0 systemctl start haproxy.service #### keepaylived:防止单点故障 可以再配一台虚拟机单独安装或者安装在和haproxy一起 在bigdata04/05在线安装keepaylived: 1. 查看可以的haproxy版本:yum list | grep keepaylived 版本:keepalived.x86_64 2. 在线下载:yum -y install keepalived.x86_64 3. 修改配置: vi /etc/keepalived/keepalived.conf(可以直接在虚拟机中改) ``` global_defs { router_id NodeA } vrrp_script chk_haproxy { script "/etc/check_haproxy.sh" interval 4 weight 3 } vrrp_instance VI_1 { state MASTER interface ens33#网卡名字 在虚拟机中输入:cd /etc/sysconfig/network-scripts/ ls virtual_router_id 10 priority 100 advert_int 1 track_script { chk_haproxy } authentication { auth_type PASS auth_pass 1234 } virtual_ipaddress { 192.168.2.222/24 } } ``` 然后在新建一个 check_haproxy.sh文件: ``` #!/bin/bash A='ps-C haproxy --no-header |wc -1' if [$A -eq 0];then systemctl start haproxy.service fi ``` 然后bigdata05和上面进行同样的操作 然后修改: router_id:NodeB priority:90 使用keepalived: ​ 启动: systemctl start keepalived.service ​ 开机启动:systemctl enable keepalived.service 查看状态:systemctl status keepalived.service 查看vip :ip a 开启 haproxy

SQL优化及主从同步4-主从同步

## MySQL主从同步: 数据准备: ​ windows:mysql主数据库 ​ linux :mysql 从数据库 安装window版 mysql 安装图形化客户端 Navicat Windows 连接Linux mysql 需要远程连接 如果要远程连接数据库,则需要授权远程访问。 授权远程访问:(A->B,则需要在B计算机mysql中执行以下命令) linux mysql命令:GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'root' WITH GRANT OPTION; FLUSH PRIVILEGES; 刷新权限 如果还是报错:可能是防火墙没关闭:在B关闭防火墙 service iptables stop; #### 实现主从同步(主从复制) 原理: 1. ​ master 将改变的数据记录在本地的一个二进制日志文件中(binary log);该过程称之为:二进制 2. slave将master中的binary log 拷贝到自己的relay log (中继日志文件)中 3. relay log 通过SQL线程将relay log中的数据读取到自己的数据库中; 4. MySQL主从复制 是异常操作,串行化的,有延迟的 同步核心:主数据库(master)通过一个二进制日志文件, 数据库中的数据改变时会在二进制文件中保存备份,从数据库(slave)的IO线程 去master中的二进制文件读取数据,写回到从IO线程中,IO线程把读取回来的数据放到Relay log中 ,通过SQL线程把数据写到slave中 配置: mysql -u root -h 192.168.2.2 -p Windows(mysql: my.ini): linux(mysql: my.cnf): 配置前 ,为了无无,先将权限、防火墙等处理:防火墙关闭:systemctl stop firewalld Mysql允许远程连接: Windows/linux 都执行mysql命令:GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY 'root' WITH GRANT OPTION; FLUSH PRIVILEGES; 刷新权限 主机操纵/代码: log-bin日志一般放data目录中 my.ini: #在[mysqld] 中添加 server-id=1 #二进制日志文件 log-bin="C:/Program Files/Software/mysql-5.5.62-winx64/data/mysql-bin" #记录错误的文件目录: log-error="C:/Program Files/Software/mysql-5.5.62-winx64/data/mysql-error" #主从同步时 忽略的数据库 binlog-ignore-db=mysql #(可选)指定主从同步时,同步那些数据库 binlog-do-db=test windows主机中的数据库授权那台计算机是 自己的从计算机的数据库: 命令: 授权从计算机的数据库的ip 192.168.2.* GRANT REPLICATION slave,reload,super ON *.* TO 'root'@'192.168.2.%' IDENTIFIED BY 'root'; flush privileges; 然后查看主机数据库的状态(每次在做主从同步前,需要观察,主机状态的最新值) show master status查看一下(mysql-bin. 000001 position 107) ![](http://www.yunhaiguil.wang:80/static/HeadImages/2019-10-28d8af5df1-d332-4ad1-9874-e270f45e6921.png) 从机操作/代码(linuex): 测试是否能远程连接主机: mysql -h 192.168.2.2 -uroot -proot (如果不行则重新授权指定id授权: GRANT REPLICATION SLAVE ON *.* TO 'root'@'192.168.2.128' IDENTIFIED BY 'root';) my.cnf: #[mysqld] server-id=2 log-bin=mysql-bin #从机和主机同步数据库test replicate-do-db=test linux 中数据库 授权那台计算机的数据库是自己的主数据库: CHANGE MASTER TO MASTER_HOST='192.168.2.2',--主机的ip MASTER_USER='root',---从机的账号 MASTER_PASSWORD='root',--从机的密码 MASTER_PORT=3306, master_log_file='mysql-bin.000002', master_log_pos= 107; 如果之前配过主从同步有报错:解决: STOP SLAVE; 开启主从同步: 从机Linux: 开启: start slave; 关闭:stop slave; 检测从机开启的状态: show slave status \G 观察: Slave_IO_Running: Connecting Slave_SQL_Running: Yes 如果不是两个都yes 则看下方的 Last_IO_Error: error connecting to master 'root@192168.2.2:3306' - retry-time: 60 retries: 86400 发现last_IO_error 中连接超时 检测主从中分别查看 serverid: show variables like 'server_id'; ​ 发现Windows 和linux 的serverid都是1; 解决 mysql:set global server_id=2; ![](http://www.yunhaiguil.wang:80/static/HeadImages/2019-10-28c31d14e5-1a0a-42fb-8f2b-2dee82d9a916.png) 测试是否主从同步: ​ 在主机中:create table tb(id int(4) primary key ,name varchar(20) not null); insert into tb values(1,'zs'); 然后在从机(linux中)进行查询: select * from tb; ![](http://www.yunhaiguil.wang:80/static/HeadImages/2019-10-2850b707b1-25a7-45af-b234-ecf809b75877.png)
往期回顾