广告位联系
返回顶部
分享到

Redis+Hbase+RocketMQ实际使用问题案例介绍

Redis 来源:互联网 作者:佚名 发布时间:2023-01-27 14:51:08 人浏览
摘要

需求 将Hbase数据,解析后推送到RocketMQ。 redis使用list数据类型,存储了需要推送的数据的RowKey及表名。 简单画个流程图就是: 分析及确定方案 Redis 明确list中元素结构{rowkey:rowkey,table

需求

  • 将Hbase数据,解析后推送到RocketMQ。
  • redis使用list数据类型,存储了需要推送的数据的RowKey及表名。

简单画个流程图就是:

分析及确定方案

Redis

  • 明确list中元素结构{"rowkey":rowkey,"table":table}解析出rowkey;
  • 一次取多个元素加快效率;取了之后放入重试队列,并删除原来的元素;
  • 处理数据永远是重试队列里的,成功之后删除,失败就加上重试次数并重新放回;
  • 明确从list中取值所使用的redis命令;
  • 范围获取LRANGE;
  • 范围删除(留下指定范围的数据)LTRIM;
  • 判断list长度LLEN;
  • 加入listRPUSH;删除LREM等等;
  • 从Hbase获取数据失败和发送到mq失败都令重试次数加一;
  • 每次碰到重试次数不为0的数据都休眠1s;
  • 设置最大重试次数,达到限制后丢弃;
  • 考虑客户redis部署方式,单机、主从、集群、哨兵等;
  • 选择合适的客户端,Jedis、Redisson、Lettuce等;
  • 编写不同的操作代码,也可以利用配置文件、环境变量、工厂模式等适配各种部署模式;

Hbase

  • 基本理论知识学习(原来没接触过),rowkey是没条数据的主键,限定符是字段名,列族是多个限定名的集合等;
  • 当时看这个觉得不错https://www.jb51.net/article/230731.htm因为是不停读取数据、链接、Table不用close,可以缓存起来,没必要每次都创建;
  • 确定批量获取数据方式为批量Get,没用scan;
  • 了解解析方式,一些网上的解析试了之后会乱码,这边用的是它自带的CellUtil.clone相关方法;
  • 考虑所有都没数据时休眠10s;

RocketMQ

  • 有现成的发送代码,公司封装好的;
  • 调整发送的速度、太快了服务端会吃不消(获取Hbase数据速度太快了,最开始没限制一会儿就入了百万数据),设置超时时间(默认3s);
  • 调整服务端的内存、线程数等参数;

实现

配置

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

#server configuration

server.port=8896

#log config

logging.file.path=./logs

#redis-standalone

redis.standalone.host=

redis.standalone.port=6379

redis.standalone.password=

redis.standalone.enable=true

#redis-cluster

redis.cluster.nodes=

redis.cluster.password=

redis.cluster.timeout=30000

redis.cluster.enable=false

# Zookeeper 集群地址,逗号分隔

hbase.zookeeper.quorum=

# Zookeeper 端口

hbase.zookeeper.property.clientPort=2181

# 消息目的rocketmq地址

rocketmq.server.host=

# 发送消息间隔时间,防止发送过快mq受不了

rocketmq.send.interval.millisec=10

# 每次从redis读取数据量限制。

data.access.redisDataSize=100

# 失败数据重试次数,超过的直接丢弃

data.access.retryNum=10

# 需要接入的表,需要发送到rocketmq的topic和在redis中的key的映射。xxx.xxx.xxx[topic]=redisKey

data.access.topicKeyMap[weibo_hbase]=data:sync:notice:suanzi:weibo:back

data.access.topicKeyMap[wechat_hbase]=data:sync:notice:suanzi:wechat:back

部分代码

获取配置,其余的直接@Value("${}"):

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

@Setter

@Getter

@Configuration

@ConfigurationProperties(prefix = "data.access")

public class AccessRedisMqConfig {

 

    /**

     * key:topic; value:redis的key

     */

    private Map<String, String> topicKeyMap = new HashMap<>();

 

    /**

     * 一次从redis中读取数据量限制

     */

    private long redisDataSize = 50;

 

    /**

     * 失败数据重试次数

     */

    private int retryNum = 10;

 

}

开启接入:

1

2

3

4

5

6

7

8

9

10

11

12

@Component

public class AdapterRunner implements ApplicationRunner {

 

    @Resource

    private DataAccessService dataAccessService;

 

    @Override

    public void run(ApplicationArguments args) {

        System.out.println("项目已启动,开始接入数据到RocketMQ……");

        dataAccessService.accessData2Mq();

    }

}

其他代码其实也在分析里了。

踩坑

mq发送问题

1

2

3

4

5

6

7

8

9

10

11

12

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsync call timeout

    at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:525)

    at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:523)

    at org.apache.rocketmq.client.impl.MQClientAPIImpl.onExceptionImpl(MQClientAPIImpl.java:610)

    at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:167)

    at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:572)

    at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)

    at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:319)

    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)

    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)

    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

    at java.base/java.util.concurrent.ThreadPoolExecutor$Wo

上面分析也说了,注意发送速度,有多少资源就接入多快。还有注意相关三个端口是否开放。


版权声明 : 本文内容来源于互联网或用户自行发布贡献,该文观点仅代表原作者本人。本站仅提供信息存储空间服务和不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权, 违法违规的内容, 请发送邮件至2530232025#qq.cn(#换@)举报,一经查实,本站将立刻删除。
原文链接 : https://www.cnblogs.com/letscrazy/p/case.html
相关文章
  • 本站所有内容来源于互联网或用户自行发布,本站仅提供信息存储空间服务,不拥有版权,不承担法律责任。如有侵犯您的权益,请您联系站长处理!
  • Copyright © 2017-2022 F11.CN All Rights Reserved. F11站长开发者网 版权所有 | 苏ICP备2022031554号-1 | 51LA统计