简单画个流程图就是:
Redis
Hbase
RocketMQ
配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
#server configuration server.port=8896 #log config logging.file.path=./logs #redis-standalone redis.standalone.host= redis.standalone.port=6379 redis.standalone.password= redis.standalone.enable=true #redis-cluster redis.cluster.nodes= redis.cluster.password= redis.cluster.timeout=30000 redis.cluster.enable=false # Zookeeper 集群地址,逗号分隔 hbase.zookeeper.quorum= # Zookeeper 端口 hbase.zookeeper.property.clientPort=2181 # 消息目的rocketmq地址 rocketmq.server.host= # 发送消息间隔时间,防止发送过快mq受不了 rocketmq.send.interval.millisec=10 # 每次从redis读取数据量限制。 data.access.redisDataSize=100 # 失败数据重试次数,超过的直接丢弃 data.access.retryNum=10 # 需要接入的表,需要发送到rocketmq的topic和在redis中的key的映射。xxx.xxx.xxx[topic]=redisKey data.access.topicKeyMap[weibo_hbase]=data:sync:notice:suanzi:weibo:back data.access.topicKeyMap[wechat_hbase]=data:sync:notice:suanzi:wechat:back |
获取配置,其余的直接@Value("${}"):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
@Setter @Getter @Configuration @ConfigurationProperties(prefix = "data.access") public class AccessRedisMqConfig {
/** * key:topic; value:redis的key */ private Map<String, String> topicKeyMap = new HashMap<>();
/** * 一次从redis中读取数据量限制 */ private long redisDataSize = 50;
/** * 失败数据重试次数 */ private int retryNum = 10;
} |
开启接入:
1 2 3 4 5 6 7 8 9 10 11 12 |
@Component public class AdapterRunner implements ApplicationRunner {
@Resource private DataAccessService dataAccessService;
@Override public void run(ApplicationArguments args) { System.out.println("项目已启动,开始接入数据到RocketMQ……"); dataAccessService.accessData2Mq(); } } |
其他代码其实也在分析里了。
mq发送问题
1 2 3 4 5 6 7 8 9 10 11 12 |
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsync call timeout at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:525) at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:523) at org.apache.rocketmq.client.impl.MQClientAPIImpl.onExceptionImpl(MQClientAPIImpl.java:610) at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:167) at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:572) at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54) at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:319) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Wo |
上面分析也说了,注意发送速度,有多少资源就接入多快。还有注意相关三个端口是否开放。