您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關Canal1.1.4中怎么使用RocketMQ將MySQL同步到Redis,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
Canal結合RocketMQ同步MySQL
略
略
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency> <!-- 根據個人需要依賴 --> <dependency> <groupId>javax.persistence</groupId> <artifactId>persistence-api</artifactId> </dependency>
SQLType.java
import lombok.AccessLevel; import lombok.NoArgsConstructor; /** * Canal監聽SQL類型 * * @author Yu * @date 2019/09/08 00:18 **/ @NoArgsConstructor(access = AccessLevel.PRIVATE) public class SQLType { /**插入*/ public static final String INSERT = "INSERT"; /**更新*/ public static final String UPDATE = "UPDATE"; /**刪除*/ public static final String DELETE = "DELETE"; }
User.java
import lombok.Data; import javax.persistence.Id; import java.io.Serializable; /** * UserPo對象 * * @author Yu * @date 2019/09/08 14:13 **/ @Data public class User implements Serializable { private static final long serialVersionUID = -6845801275112259322L; @Id private Integer uid; private String username; private String password; private String sex; }
CanalSynService.java
import com.alibaba.otter.canal.protocol.FlatMessage; import java.util.Collection; /** * Canal同步服務 * * @author Yu * @date 2019/09/08 00:00 **/ public interface CanalSynService<T> { /** * 處理數據 * * @param flatMessage CanalMQ數據 */ void process(FlatMessage flatMessage); /** * DDL語句處理 * * @param flatMessage CanalMQ數據 */ void ddl(FlatMessage flatMessage); /** * 插入 * * @param list 新增數據 */ void insert(Collection<T> list); /** * 更新 * * @param list 更新數據 */ void update(Collection<T> list); /** * 刪除 * * @param list 刪除數據 */ void delete(Collection<T> list); }
AbstractCanalMQ2RedisService.java
import com.alibaba.otter.canal.protocol.FlatMessage; import com.google.common.collect.Sets; import com.taco.springcloud.canal.constant.SQLType; import com.taco.springcloud.core.component.ApplicationContextHolder; import com.taco.springcloud.core.exception.BizException; import com.taco.springcloud.core.exception.constants.BaseApiCodeEnum; import com.taco.springcloud.core.utils.JsonUtil; import com.taco.springcloud.redis.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.util.ReflectionUtils; import javax.annotation.Resource; import javax.persistence.Id; import java.lang.reflect.Field; import java.lang.reflect.ParameterizedType; import java.util.*; /** * 抽象CanalMQ通用處理服務 * * @author Yu * @date 2019/09/08 00:05 **/ @Slf4j public abstract class AbstractCanalMQ2RedisService<T> implements CanalSynService<T> { @Resource private RedisTemplate<String, Object> redisTemplate; @Resource private RedisUtils redisUtils; private Class<T> cache; /** * 獲取Model名稱 * * @return Model名稱 */ protected abstract String getModelName(); @Override public void process(FlatMessage flatMessage) { if(flatMessage.getIsDdl()) { ddl(flatMessage); return; } Set<T> data = getData(flatMessage); if(SQLType.INSERT.equals(flatMessage.getType())) { insert(data); } if(SQLType.UPDATE.equals(flatMessage.getType())) { update(data); } if(SQLType.DELETE.equals(flatMessage.getType())) { delete(data); } } @Override public void ddl(FlatMessage flatMessage) { //TODO : DDL需要同步,刪庫清空,更新字段處理 } @Override public void insert(Collection<T> list) { insertOrUpdate(list); } @Override public void update(Collection<T> list) { insertOrUpdate(list); } private void insertOrUpdate(Collection<T> list) { redisTemplate.executePipelined( (RedisConnection redisConnection) -> { for (T data : list) { String key = getWrapRedisKey(data); RedisSerializer keySerializer = redisTemplate.getKeySerializer(); RedisSerializer valueSerializer = redisTemplate.getValueSerializer(); redisConnection.set(keySerializer.serialize(key), valueSerializer.serialize(data)); } return null; }); } @Override public void delete(Collection<T> list) { Set<String> keys = Sets.newHashSetWithExpectedSize(list.size()); for (T data : list) { keys.add(getWrapRedisKey(data)); } //Set<String> keys = list.stream().map(this::getWrapRedisKey).collect(Collectors.toSet()); redisUtils.delAll(keys); } /** * 封裝redis的key * * @param t 原對象 * @return key */ protected String getWrapRedisKey(T t) { return new StringBuilder() .append(ApplicationContextHolder.getApplicationName()) .append(":") .append(getModelName()) .append(":") .append(getIdValue(t)) .toString(); } /** * 獲取類泛型 * * @return 泛型Class */ protected Class<T> getTypeArguement() { if(cache == null) { cache = (Class<T>) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]; } return cache; } /** * 獲取Object標有@Id注解的字段值 * * @param t 對象 * @return id值 */ protected Object getIdValue(T t) { Field fieldOfId = getIdField(); ReflectionUtils.makeAccessible(fieldOfId); return ReflectionUtils.getField(fieldOfId, t); } /** * 獲取Class標有@Id注解的字段名稱 * * @return id字段名稱 */ protected Field getIdField() { Class<T> clz = getTypeArguement(); Field[] fields = clz.getDeclaredFields(); for (Field field : fields) { Id annotation = field.getAnnotation(Id.class); if (annotation != null) { return field; } } log.error("PO類未設置@Id注解"); throw new BizException(BaseApiCodeEnum.FAIL); } /** * 轉換Canal的FlatMessage中data成泛型對象 * * @param flatMessage Canal發送MQ信息 * @return 泛型對象集合 */ protected Set<T> getData(FlatMessage flatMessage) { List<Map<String, String>> sourceData = flatMessage.getData(); Set<T> targetData = Sets.newHashSetWithExpectedSize(sourceData.size()); for (Map<String, String> map : sourceData) { T t = JsonUtil.mapConvertPojo(map, getTypeArguement()); targetData.add(t); } return targetData; } }
TestUsersConsumer.java
import com.alibaba.otter.canal.protocol.FlatMessage; import com.taco.springcloud.canal.model.User; import com.taco.springcloud.canal.service.AbstractCanalMQ2RedisService; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; @Slf4j @Service @RocketMQMessageListener(topic = "test_users", consumerGroup = "users") public class TestUsersConsumer extends AbstractCanalMQ2RedisService<User> implements RocketMQListener<FlatMessage> { @Getter private String modelName = "user"; @Override public void onMessage(FlatMessage s) { process(s); } }
看完上述內容,你們對Canal1.1.4中怎么使用RocketMQ將MySQL同步到Redis有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。