RedisTemplate实现消息队列并且批量插入数据。

2020-05-22 16:04:31来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

RedisTemplate实现消息队列并且批量插入数据。

早期由于生产环境业务量小。所以日志是一条一条commit的。运行也没出过问题。

后来随着业务扩大并发量上来后,日志写入因为频繁与数据库打交道导致数据库连接池经常占满,直至程序崩溃。

因为日志并非需要实时响应。所以考虑改用异步+批量提交的方式。

为了缓解jvm内存压力。采用redis做消息队列(因为原项目有集成过redis,公司不想使用其他mq增加维护成本)。

所以在网上找了篇springboot整合redistemplate做消息队列的资料。稍微改了一下。

参考资料:https://blog.csdn.net/qq_38553333/article/details/82833273

 

首先是redisConfig。

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;


@Configuration
@EnableCaching //开启注解
public class RedisConfig {
    /**
     * retemplate相关配置
     * @param factory
     * @return
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {

        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // 配置连接工厂
        template.setConnectionFactory(factory);

        //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
        Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);

        ObjectMapper om = new ObjectMapper();
        // 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jacksonSeial.setObjectMapper(om);

        // 值采用json序列化
        template.setValueSerializer(jacksonSeial);
        //使用StringRedisSerializer来序列化和反序列化redis的key值
        template.setKeySerializer(new StringRedisSerializer());

        // 设置hash key 和value序列化模式
        template.setHashKeySerializer(new StringRedisSerializer());
        template.setHashValueSerializer(jacksonSeial);
        template.afterPropertiesSet();

        return template;
    }


    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        return container;
    }
}

  

消息实体Message

import com.alibaba.fastjson.JSON;
import lombok.Data;

import java.util.UUID;

@Data
public class Message {
    private String id;
    private Integer retryCount;
    private String content;
    private Integer status;
    private String topic;

    public Message() {
    }

    public Message(String topic, Object object) {
        this.id = UUID.randomUUID().toString().replace("-", "");
        this.retryCount = 0;
        this.content = JSON.toJSONString(object);
        this.status = 0;
        this.topic = topic;
    }
}

  

Redis订阅管理,采用观察者模式。

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

@Component
public class TopicSubscriber {
    private final Map<String, Set<String>> subscriberMap = new HashMap();

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
  // 观察者模式实现消费者注册。
    public Boolean addConsumer(String topic, String consumer) {
        Set<String> consumerList = subscriberMap.get(topic);

        if (consumerList == null) {
            consumerList = new HashSet<>();
        }
        Boolean b = consumerList.add(consumer);
        subscriberMap.put(topic, consumerList);
        return b;
    }

    public Boolean removeConsumer(String topic, String comsumer) {
        Set<String> consumerList = subscriberMap.get(topic);
        Boolean b = false;
        if (consumerList != null) {
            b = consumerList.remove(comsumer);
            subscriberMap.put(topic, consumerList);
        }
        return b;
    }
  //消息广播
    public void broadcast(String topic, String id) {
        if (subscriberMap.get(topic) != null) {
            for (String consumer : subscriberMap.get(topic)) {
                String key = String.join("_", topic, consumer, id);
                if (!redisTemplate.hasKey("fail_" + key)) {
                    redisTemplate.opsForValue().set(key, id);
                    redisTemplate.opsForList().leftPush(topic + "_" + consumer, topic);
                }
            }
        }

    }
}

  

然后是Redis发布者

import com.alibaba.fastjson.JSON;
import com.redis.mq.subscriber.TopicSubscriber;
import io.netty.util.CharsetUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class RedisPublisher {

    @Autowired
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    TopicSubscriber subscriber;

    @PostConstruct
    public void init() throws Exception {
        // todo test thread
        /*new Thread(() -> {
            int count = 0;
            try {
                Thread.sleep(3000l);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            while (count < 14) {
                try {
                    Thread.sleep(100l);
                    Generate generate = new Generate();
                    generate.setIdNo("" + count);
                    this.publish("GenerateLog", generate);
                    count++;
                } catch (Exception e) {
                }
            }
        }).start();*/
    }
  
    public void publish(String topic, Object content) {  //消息发布到redis
        Message message = new Message(topic, content);
        subscriber.broadcast(topic, message.getId());
        redisTemplate.getConnectionFactory().getConnection().publish(
                topic.getBytes(CharsetUtil.UTF_8), JSON.toJSONString(message).getBytes()
        );
    }
}

  

Redis消费者。实现MessageListener的onMessage就可以。为了易于扩展,这里使用了泛型。

import com.alibaba.fastjson.JSON;
import com.cache.redis.mq.subscriber.TopicSubscriber;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

import java.lang.reflect.ParameterizedType;
import java.util.concurrent.TimeUnit;

public abstract class RedisListener<T> implements MessageListener {

    @Autowired
    protected RedisTemplate<String, Object> redisTemplate;

    @Autowired
    protected RedisMessageListenerContainer messageListenerContainer;

    @Autowired
    protected TopicSubscriber subscriber;

    @Override
    public void onMessage(org.springframework.data.redis.connection.Message message, byte[] bytes) {
        String name = this.getClass().getSimpleName();
        String topic = new String(message.getChannel());
        String content = new String(message.getBody());
        Message m = JSON.parseObject(content, Message.class);
        String key = String.join("_", topic, name, m.getId());

        Object b = redisTemplate.opsForList().rightPop(topic + "_" + name);
        if (b != null && b.equals(m.getTopic())) {
            T t = JSON.parseObject(m.getContent(),
                    ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]);
            handler(t);  // 处理redis消息。
            // set data expire.使用redis的expire接口直接丢弃消费过的数据。
            redisTemplate.expire(key, 1, TimeUnit.NANOSECONDS);
        } else {
            // todo retry
            redisTemplate.opsForValue().set("fail_" + key, content);
        }
    }

    protected abstract void handler(T t);
}

 

到这里,基础的redisMq就差不多了。下面涉及具体的业务及批量插入。

 首先,加一个logHander接口。

public interface LogHandler {
    void process();
}

 

写一个抽象类继承RedisListener并且实现LogHander。这里用到了redis的put和poll阻塞队列。

因为使用了mybatisplus又不想重新写mybatis foreach批量查询语句。所以这里偷懒直接用mybatis的sqlsession的单条预编译,批量commit。

import com.cache.redis.mq.RedisListener;
import com.server.log.store.LogStore;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.listener.ChannelTopic;

import javax.annotation.PostConstruct;
import java.lang.reflect.ParameterizedType;
import java.util.List;

@Slf4j
public abstract class AbstractLogHandler<T, M> extends RedisListener<T> implements LogHandler {

    @Autowired
    SqlSessionFactory factory;

    @PostConstruct
    public void addListener() {
        messageListenerContainer.addMessageListener(this, new ChannelTopic(getTopic()));
        subscriber.addConsumer(getTopic(), this.getClass().getSimpleName());
        process();
    }

    @Override
    protected void handler(T t) {
        getStore().put(t);  //阻塞直到能新写入。这里其实可以加个超时时间。避免一直阻塞。
    }

    protected abstract String getTopic();

    protected abstract LogStore<T> getStore();

    protected void commit(List<T> data) {
        if (data == null || data.isEmpty()) return;
        SqlSession session = factory.openSession(ExecutorType.BATCH);
        try {
            M mapper = session.getMapper(
                    (Class<M>) (((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[1])
            );
            save(data, mapper);
            session.commit();
        } catch (Exception e) {
            log.error(String.format("topic %s 数据批量写入失败。{}", getTopic()), e);
            session.rollback();
        }finally {
            session.close();
        }

        data.forEach(o -> o = null);
        data.clear();
    }

    protected abstract void save(List<T> data, M m);
}

  

LogStore阻塞队列

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

@Slf4j
public class LogStore<T> {

    private static final Integer QUEUE_CAPACITY = 10000;

    private BlockingQueue<T> logQueue;

    public LogStore() {
        this(QUEUE_CAPACITY);
    }

    public LogStore(int capacity) {
        this.logQueue = new LinkedBlockingQueue<>(capacity);
    }

    public void put(T t) {
        try {
            logQueue.put(t);
        } catch (InterruptedException e) {
            log.info("logStore put exception:{}", e);
        }
    }

    public T poll(long seconds) {
        try {
            return logQueue.poll(seconds, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            return null;
        }
    }
}

  

 到这里,基础的业务代码就写的差不多了。然后我们看下具体的业务处理类怎么写。

比如我们的注册日志,只要实现抽象类AbstraceLogHandler就可以了

import comcommon.constant.Constant;
import com.common.po.RegLog;
import com.dao.mapper.RegLogMapper;
import com.server.log.store.LogStore;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Component
public class RegisterLogHandler extends AbstractLogHandler<RegLog, RegLogMapper> {
    private final LogStore<RegLog> store = new LogStore<>();
    private String topic = Constant.TOPIC_REGISTER_LOG;
    // todo 可配置
    private final Integer batchSize = 300;
    private final Integer waitSeconds = 2;

    ExecutorService executor = Executors.newSingleThreadExecutor();

    @Override
    protected String getTopic() {
        return this.topic;
    }

    @Override
    protected LogStore<RegLog> getStore() {
        return this.store;
    }

    @Override
    public void process() {
        executor.execute(() -> {    //开启线程从redis中poll数据。
            List<RegLog> data = new ArrayList<>(batchSize);
            while (true) {
                RegLog generate = this.store.poll(waitSeconds);
                if (generate != null) {
                    if (data.size() >= batchSize) {
                        commit(data);
                    }

                    data.add(generate);
                } else {  //处理不足batchSize的尾巴数据。
                    if (data.size() > 0) {
                        commit(data);
                    }
                }
            }
        });
    }

    @Override
    protected void save(List<RegLog> data, RegLogMapper mapper) {
        data.forEach(o -> {
            if (o.getRegNo() == null) {
                String genNo = UUID.randomUUID().toString();
                o.setRegNo(genNo);
            }
            mapper.insert(o);  //因为不想写mybatis的foreach语句。所以这里直接用mybatisplus的insert单条语句。到这里sqlssesion并没有commit.
        });
    }
}

  

调用:

@Autowired
    protected RedisPublisher publisher;


publisher.publish(Constant.TOPIC_REGISTER_LOG, log);

  


原文链接:https://www.cnblogs.com/braska/p/12936937.html
如有疑问请与原作者联系

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:LeetCode 105. 从前序与中序遍历序列构造二叉树

下一篇:怎么自学JAVA开发?