成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專欄INFORMATION COLUMN

易用的 canal java 客戶端 canal-client

aboutU / 1349人閱讀

摘要:易用的客戶端自身提供了簡(jiǎn)單的客戶端,數(shù)據(jù)格式較為復(fù)雜,處理消費(fèi)數(shù)據(jù)也不太方便,為了方便給業(yè)務(wù)使用,提供一種直接能獲取實(shí)體對(duì)象的方式來進(jìn)行消費(fèi)才更方便。

易用的canaljava 客戶端

canal 自身提供了簡(jiǎn)單的客戶端,數(shù)據(jù)格式較為復(fù)雜,處理消費(fèi)數(shù)據(jù)也不太方便,為了方便給業(yè)務(wù)使用,提供一種直接能獲取實(shí)體對(duì)象的方式來進(jìn)行消費(fèi)才更方便。
先說一下實(shí)現(xiàn)的思路,首先canal 客戶端的消息對(duì)象有兩種,message 和 flatMessage,分別是普通的消息(protobuf格式)和消息隊(duì)列的扁平消息(json格式),現(xiàn)在將這兩種消息轉(zhuǎn)化為我們直接使用的 model 對(duì)象,根據(jù)消息中的數(shù)據(jù)庫表名稱找到對(duì)應(yīng)的的實(shí)體對(duì)象,那么如何根據(jù)數(shù)據(jù)庫表名找到實(shí)體對(duì)象呢?
第一種方式,如果我們的實(shí)體對(duì)象都使用JPA 的 @Table注解來標(biāo)識(shí)表和實(shí)體的對(duì)應(yīng)關(guān)系,可以使用該注解來找到實(shí)體對(duì)象和表名的關(guān)系
第二種方式,可以使用自定義注解的來標(biāo)注實(shí)體和表名的關(guān)系,為解耦各個(gè)表的處理,我們使用策略模式來封裝各個(gè)表的增刪改操作

canal 主要客戶端類 ClientIdentity

canal client和server交互之間的身份標(biāo)識(shí),目前clientId寫死為1001. (目前canal server上的一個(gè)instance只能有一個(gè)client消費(fèi),clientId的設(shè)計(jì)是為1個(gè)instance多client消費(fèi)模式而預(yù)留的)

CanalConnector

SimpleCanalConnector/ClusterCanalConnector : 兩種connector的實(shí)現(xiàn),simple針對(duì)的是簡(jiǎn)單的ip直連模式,cluster針對(duì)多ip的模式,可依賴CanalNodeAccessStrategy進(jìn)行failover控制

CanalNodeAccessStrategy

SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:兩種failover的實(shí)現(xiàn),simple針對(duì)給定的初始ip列表進(jìn)行failover選擇,cluster基于zookeeper上的cluster節(jié)點(diǎn)動(dòng)態(tài)選擇正在運(yùn)行的canal server.

ClientRunningMonitor/ClientRunningListener/ClientRunningData

client running相關(guān)控制,主要為解決client自身的failover機(jī)制。canal client允許同時(shí)啟動(dòng)多個(gè)canal client,通過running機(jī)制,可保證只有一個(gè)client在工作,其他client做為冷備. 當(dāng)運(yùn)行中的client掛了,running會(huì)控制讓冷備中的client轉(zhuǎn)為工作模式,這樣就可以確保canal client也不會(huì)是單點(diǎn). 保證整個(gè)系統(tǒng)的高可用性.

Canal 客戶端類型

canal 客戶端可以主要分以下幾種類型

單一ip 直連模式

這種方式下,可以啟動(dòng)多個(gè)客戶端,連接同一個(gè)canal 服務(wù)端,多個(gè)客戶端只有一個(gè)client 工作,其他的可以作為冷備,當(dāng)一個(gè)client的掛了,其他的客戶端會(huì)有一個(gè)進(jìn)入工作模式
缺點(diǎn):連接同一個(gè)服務(wù)端,如果服務(wù)端掛了將導(dǎo)致不可用

多ip 模式

這種方式下,客戶端連接多個(gè)canal服務(wù)端,一個(gè)客戶端隨機(jī)選擇一個(gè)canal server 消費(fèi),當(dāng)這個(gè)server 掛了,會(huì)選擇另外一個(gè)進(jìn)行消費(fèi)
缺點(diǎn):不支持訂閱消費(fèi)

zookeeper 模式

使用zookeeper來server,client 的狀態(tài),當(dāng)兩個(gè)canal server 連接zookeeper 后,
優(yōu)先連接的節(jié)點(diǎn)作為 活躍節(jié)點(diǎn),client從活躍節(jié)點(diǎn)消費(fèi),當(dāng)server掛了以后,從另外一個(gè)節(jié)點(diǎn)消費(fèi)
缺點(diǎn):不支持訂閱消費(fèi)

消息 隊(duì)列模式

canal 支持消息直接發(fā)送到消息隊(duì)列,從消息隊(duì)列消費(fèi),目前支持的有kafka 和rocketMq,這種方式支持訂閱消費(fèi)

canal 客戶端實(shí)現(xiàn) EntryHandler 實(shí)體消息處理器

首先定義一個(gè)策略接口,定義增加,更新,刪除功能,使用java 8聲明方法為default,讓客戶端選擇實(shí)現(xiàn)其中的方法,提高靈活性,客戶端實(shí)現(xiàn)EntryHandler接口后,會(huì)返回基于handler中的泛型的實(shí)例對(duì)象,在對(duì)應(yīng)的方法中實(shí)現(xiàn)自定義邏輯

public interface EntryHandler {

    default void insert(T t) {

    }


    default void update(T before, T after) {

    }


    default void delete(T t) {

    }
}

定義一個(gè)canalClient 的抽象類,封裝canal 的鏈接開啟關(guān)閉操作,啟動(dòng)一個(gè)線程不斷去消費(fèi)canal 數(shù)據(jù),依賴一個(gè) messageHandler 封裝消息處理的邏輯

public abstract class AbstractCanalClient implements CanalClient {



    @Override
    public void start() {
        log.info("start canal client");
        workThread = new Thread(this::process);
        workThread.setName("canal-client-thread");
        flag = true;
        workThread.start();
    }

    @Override
    public void stop() {
        log.info("stop canal client");
        flag = false;
        if (null != workThread) {
            workThread.interrupt();
        }

    }

    @Override
    public void process() {
        if (flag) {
            try {
                connector.connect();
                connector.subscribe(filter);
                while (flag) {
                    Message message = connector.getWithoutAck(batchSize, timeout, unit);
                    log.info("獲取消息 {}", message);
                    long batchId = message.getId();
                    if (message.getId() != -1 && message.getEntries().size() != 0) {
                        messageHandler.handleMessage(message);
                    }
                    connector.ack(batchId);
                }
            } catch (Exception e) {
                log.error("canal client 異常", e);
            } finally {
                connector.disconnect();
            }
        }
    }

}

基于該抽象類,分別提供各種客戶端的實(shí)現(xiàn)

SimpleCanalClient

ClusterCanalClient

ZookeeperCanalClient

KafkaCanalClient

消息處理器 messageHandler

消息處理器 messageHandler 封裝了消息處理邏輯,其中定義了一個(gè)消息處理方法

public interface MessageHandler {

     void handleMessage(T t);

}

消息處理器可能要適配4種情況,分別是消費(fèi)message,flatMessage和兩種消息的同步與異步消費(fèi)
消息處理的工作主要有兩個(gè)

獲取增刪改的行數(shù)據(jù),交給行處理器繼續(xù)處理

在上下文對(duì)象中保存其他的數(shù)據(jù),例如庫名,表名,binlog 時(shí)間戳等等數(shù)據(jù)

首先我們封裝一個(gè)抽象的 message 消息處理器,實(shí)現(xiàn)MessageHandler接口

public abstract class AbstractMessageHandler implements MessageHandler {


    @Override
    public void handleMessage(Message message) {
        List entries = message.getEntries();
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA)) {
                try {
                    EntryHandler entryHandler = HandlerUtil.getEntryHandler(entryHandlers, entry.getHeader().getTableName());
                    if(entryHandler!=null){
                        CanalModel model = CanalModel.Builder.builder().id(message.getId()).table(entry.getHeader().getTableName())
                                .executeTime(entry.getHeader().getExecuteTime()).database(entry.getHeader().getSchemaName()).build();
                        CanalContext.setModel(model);
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        List rowDataList = rowChange.getRowDatasList();
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        for (CanalEntry.RowData rowData : rowDataList) {
                            rowDataHandler.handlerRowData(rowData,entryHandler,eventType);
                        }
                    }
                } catch (Exception e) {
                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                }finally {
                   CanalContext.removeModel();
                }

            }
        }
    }
}

分別定義兩個(gè)實(shí)現(xiàn)類,同步與異步實(shí)現(xiàn)類,繼承AbstractMessageHandler抽象類

public class SyncMessageHandlerImpl extends AbstractMessageHandler {


    public SyncMessageHandlerImpl(List entryHandlers, RowDataHandler rowDataHandler) {
        super(entryHandlers, rowDataHandler);
    }

    @Override
    public void handleMessage(Message message) {
        super.handleMessage(message);
    }
}
public class AsyncMessageHandlerImpl extends AbstractMessageHandler {


    private ExecutorService executor;


    public AsyncMessageHandlerImpl(List entryHandlers, RowDataHandler rowDataHandler, ExecutorService executor) {
        super(entryHandlers, rowDataHandler);
        this.executor = executor;
    }

    @Override
    public void handleMessage(Message message) {
        executor.execute(() -> super.handleMessage(message));
    }
}
RowDataHandler 行消息處理器

消息處理器依賴的行消息處理器主要是將原始的column list 轉(zhuǎn)為 實(shí)體對(duì)象,并將相應(yīng)的增刪改消息交給相應(yīng)的hangler對(duì)象方法,行消息處理器分別需要處理兩種對(duì)象,一個(gè)是 message的行數(shù)據(jù) 和 flatMessage 的行數(shù)據(jù)

public interface RowDataHandler {


    void handlerRowData(T t, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception;
}

兩個(gè)行處理器的實(shí)現(xiàn)為

public class RowDataHandlerImpl implements RowDataHandler {



    private IModelFactory> modelFactory;




    public RowDataHandlerImpl(IModelFactory modelFactory) {
        this.modelFactory = modelFactory;
    }

    @Override
    public void handlerRowData(CanalEntry.RowData rowData, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception {
        if (entryHandler != null) {
            switch (eventType) {
                case INSERT:
                    Object object = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());
                    entryHandler.insert(object);
                    break;
                case UPDATE:
                    Set updateColumnSet = rowData.getAfterColumnsList().stream().filter(CanalEntry.Column::getUpdated)
                            .map(CanalEntry.Column::getName).collect(Collectors.toSet());
                    Object before = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList(),updateColumnSet);
                    Object after = modelFactory.newInstance(entryHandler, rowData.getAfterColumnsList());
                    entryHandler.update(before, after);
                    break;
                case DELETE:
                    Object o = modelFactory.newInstance(entryHandler, rowData.getBeforeColumnsList());
                    entryHandler.delete(o);
                    break;
                default:
                    break;
            }
        }
    }
}
public class MapRowDataHandlerImpl implements RowDataHandler>> {



    private IModelFactory> modelFactory;


    public MapRowDataHandlerImpl(IModelFactory> modelFactory) {
        this.modelFactory = modelFactory;
    }

    @Override
    public void handlerRowData(List> list, EntryHandler entryHandler, CanalEntry.EventType eventType) throws Exception{
        if (entryHandler != null) {
            switch (eventType) {
                case INSERT:
                    Object object = modelFactory.newInstance(entryHandler, list.get(0));
                    entryHandler.insert(object);
                    break;
                case UPDATE:
                    Object before = modelFactory.newInstance(entryHandler, list.get(1));
                    Object after = modelFactory.newInstance(entryHandler, list.get(0));
                    entryHandler.update(before, after);
                    break;
                case DELETE:
                    Object o = modelFactory.newInstance(entryHandler, list.get(0));
                    entryHandler.delete(o);
                    break;
                default:
                    break;
            }
        }
    }
}
IModelFactory bean實(shí)例創(chuàng)建工廠

行消息處理的依賴的工廠 主要是是通過反射創(chuàng)建與表名稱對(duì)應(yīng)的bean實(shí)例

public interface IModelFactory {


    Object newInstance(EntryHandler entryHandler, T t) throws Exception;


    default Object newInstance(EntryHandler entryHandler, T t, Set updateColumn) throws Exception {
        return null;
    }
}
CanalContext canal 消息上下文

目前主要用于保存bean實(shí)例以外的其他數(shù)據(jù),使用threadLocal實(shí)現(xiàn)

代碼已在github開源canal-client

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/74866.html

相關(guān)文章

  • 使用canal+Kafka進(jìn)行數(shù)據(jù)庫同步實(shí)踐

    摘要:比如,服務(wù)數(shù)據(jù)庫的數(shù)據(jù)來源于服務(wù)的數(shù)據(jù)庫服務(wù)的數(shù)據(jù)有變更操作時(shí),需要同步到服務(wù)中。第二種解決方案通過數(shù)據(jù)庫的進(jìn)行同步。并且,我們還用這套架構(gòu)進(jìn)行緩存失效的同步。目前這套同步架構(gòu)正常運(yùn)行中,后續(xù)有遇到問題再繼續(xù)更新。在微服務(wù)拆分的架構(gòu)中,各服務(wù)擁有自己的數(shù)據(jù)庫,所以常常會(huì)遇到服務(wù)之間數(shù)據(jù)通信的問題。比如,B服務(wù)數(shù)據(jù)庫的數(shù)據(jù)來源于A服務(wù)的數(shù)據(jù)庫;A服務(wù)的數(shù)據(jù)有變更操作時(shí),需要同步到B服務(wù)中。第一...

    Tecode 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<