解决思路1可以设计消息主键,每次插入数据时,查询判断此消息是否已经写入,可以排除重复写入数据问题。但是此种手段需要查询HBase,由于写入并发量相对较高>1000条/s。会影响写入速度。由于此时有单点的Redis,由于数据到达的时间相隔不会太长,可以考虑将数据持久化到Redis中,插入时,到Redis中查询,Redis中查询复杂度为常数,基本可以忽略查询对性能的影响。判断是否已经写入。而写入到Redis中的数据可以设置过期时间,可以自动清除。
采用此种方式,需要考虑,新的消费者不能消费数据from beginning。如果新消费者从头开始消费数据,此时Redis缓存已经清除,也会重复写入数据。需要在写入时作相应判断,超过Redis缓存到达的数据不用写入。
1 | public void positionInfoHandler(KafkaRecievedLocationMessage<KafkaRecievedLocationMessageBody> kafkaRecievedLocationMessage, String groupId) { |
设置Redis的Key,方法setValueNx,如果Key设置成功会返回OK,如果设置失败则返回null:
1 | EX seconds -- Set the specified expire time, in seconds. |
1 | /** |