成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專欄INFORMATION COLUMN

記一次Spring Batch完整入門實踐

Baaaan / 1231人閱讀

摘要:什么是作為的子項目,是一款基于的企業(yè)批處理框架。首先,運(yùn)行的基本單位是一個,一個就做一件批處理的事情??偨Y(jié)為我們提供了非常實用的功能,對批處理場景進(jìn)行了完善的抽象,它不僅能實現(xiàn)小數(shù)據(jù)的遷移,也能應(yīng)對大企業(yè)的大數(shù)據(jù)實踐應(yīng)用。

前言

本文將從0到1講解一個Spring Batch是如何搭建并運(yùn)行起來的。
本教程將講解從一個文本文件讀取數(shù)據(jù),然后寫入MySQL。

什么是 Spring Batch

Spring Batch 作為 Spring 的子項目,是一款基于 Spring 的企業(yè)批處理框架。通過它可以構(gòu)建出健壯的企業(yè)批處理應(yīng)用。Spring Batch 不僅提供了統(tǒng)一的讀寫接口、豐富的任務(wù)處理方式、靈活的事務(wù)管理及并發(fā)處理,同時還支持日志、監(jiān)控、任務(wù)重啟與跳過等特性,大大簡化了批處理應(yīng)用開發(fā),將開發(fā)人員從復(fù)雜的任務(wù)配置管理過程中解放出來,使他們可以更多地去關(guān)注核心的業(yè)務(wù)處理過程。

更多的介紹可以參考官網(wǎng):https://spring.io/projects/sp...

環(huán)境搭建

我是用的Intellij Idea,用gradle構(gòu)建。

可以使用Spring Initializr 來創(chuàng)建Spring boot應(yīng)用。地址:https://start.spring.io/

首先選擇Gradle Project,然后選擇Java。填上你的Group和Artifact名字。

最后再搜索你需要用的包,比如Batch是一定要的。另外,由于我寫的Batch項目是使用JPA向MySQL插入數(shù)據(jù),所以也添加了JPA和MySQL。其他可以根據(jù)自己需要添加。

點擊Generate Project,一個項目就創(chuàng)建好了。

Build.gralde文件大概就長這個樣子:

buildscript {
   ext {
      springBootVersion = "2.0.4.RELEASE"
   }
   repositories {
      mavenCentral()
   }
   dependencies {
      classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
   }
}

apply plugin: "java"
apply plugin: "idea"
apply plugin: "org.springframework.boot"
apply plugin: "io.spring.dependency-management"

group = "com.demo"
version = "0.0.1-SNAPSHOT"
sourceCompatibility = 1.8

repositories {
   mavenCentral()
}

dependencies {
   compile("org.springframework.boot:spring-boot-starter-batch")
   compile("org.springframework.boot:spring-boot-starter-jdbc")
   compile("org.springframework.boot:spring-boot-starter-data-jpa")
   compile group: "com.fasterxml.jackson.datatype", name: "jackson-datatype-joda", version: "2.9.4"
   compile group: "org.jadira.usertype", name: "usertype.core", version: "6.0.1.GA"
   compile group: "mysql", name: "mysql-connector-java", version: "6.0.6",
   testCompile("org.springframework.boot:spring-boot-starter-test")
   testCompile("org.springframework.batch:spring-batch-test")
}
Spring Batch 結(jié)構(gòu)

網(wǎng)上有很多Spring Batch結(jié)構(gòu)和原理的講解,我就不詳細(xì)闡述了,我這里只講一下Spring Batch的一個基本層級結(jié)構(gòu)。

首先,Spring Batch運(yùn)行的基本單位是一個Job,一個Job就做一件批處理的事情。
一個Job包含很多Step,step就是每個job要執(zhí)行的單個步驟。

如下圖所示,Step里面,會有Tasklet,Tasklet是一個任務(wù)單元,它是屬于可以重復(fù)利用的東西。
然后是Chunk,chunk就是數(shù)據(jù)塊,你需要定義多大的數(shù)據(jù)量是一個chunk。

Chunk里面就是不斷循環(huán)的一個流程,讀數(shù)據(jù),處理數(shù)據(jù),然后寫數(shù)據(jù)。Spring Batch會不斷的循環(huán)這個流程,直到批處理數(shù)據(jù)完成。

構(gòu)建Spring Batch

首先,我們需要一個全局的Configuration來配置所有的Job和一些全局配置。

代碼如下:

@Configuration
@EnableAutoConfiguration
@EnableBatchProcessing(modular = true)
public class SpringBatchConfiguration {
    @Bean
    public ApplicationContextFactory firstJobContext() {
        return new GenericApplicationContextFactory(FirstJobConfiguration.class);
    }
    
    @Bean
    public ApplicationContextFactory secondJobContext() {
        return new GenericApplicationContextFactory(SecondJobConfiguration.class);
    }

}

@EnableBatchProcessing是打開Batch。如果要實現(xiàn)多Job的情況,需要把EnableBatchProcessing注解的modular設(shè)置為true,讓每個Job使用自己的ApplicationConext。

比如上面代碼的就創(chuàng)建了兩個Job。

例子背景

本博客的例子是遷移數(shù)據(jù),數(shù)據(jù)源是一個文本文件,數(shù)據(jù)量是上百萬條,一行就是一條數(shù)據(jù)。然后我們通過Spring Batch幫我們把文本文件的數(shù)據(jù)全部遷移到MySQL數(shù)據(jù)庫對應(yīng)的表里面。

假設(shè)我們遷移的數(shù)據(jù)是Message,那么我們就需要提前創(chuàng)建一個叫Message的和數(shù)據(jù)庫映射的數(shù)據(jù)類。

@Entity
@Table(name = "message")
public class Message {
    @Id
    @Column(name = "object_id", nullable = false)
    private String objectId;

    @Column(name = "content")
    private String content;

    @Column(name = "last_modified_time")
    private LocalDateTime lastModifiedTime;

    @Column(name = "created_time")
    private LocalDateTime createdTime;
}
構(gòu)建Job

首先我們需要一個關(guān)于這個Job的Configuration,它將在SpringBatchConfigration里面被加載。

@Configuration
@EnableAutoConfiguration
@EnableBatchProcessing(modular = true)
public class SpringBatchConfiguration {
    @Bean
    public ApplicationContextFactory messageMigrationJobContext() {
        return new GenericApplicationContextFactory(MessageMigrationJobConfiguration.class);
    }
}

下面的關(guān)于構(gòu)建Job的代碼都將寫在這個MessageMigrationJobConfiguration里面。

public class MessageMigrationJobConfiguration {
}

我們先定義一個Job的Bean。

@Autowired
private JobBuilderFactory jobBuilderFactory;

@Bean
public Job messageMigrationJob(@Qualifier("messageMigrationStep") Step messageMigrationStep) {
    return jobBuilderFactory.get("messageMigrationJob")
            .start(messageMigrationStep)
            .build();
}

jobBuilderFactory是注入進(jìn)來的,get里面的就是job的名字。
這個job只有一個step。

Step

接下來就是創(chuàng)建Step。

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Bean
public Step messageMigrationStep(@Qualifier("jsonMessageReader") FlatFileItemReader jsonMessageReader,
                                 @Qualifier("messageItemWriter") JpaItemWriter messageItemWriter,
                                 @Qualifier("errorWriter") Writer errorWriter) {
    return stepBuilderFactory.get("messageMigrationStep")
            .chunk(CHUNK_SIZE)
            .reader(jsonMessageReader).faultTolerant().skip(JsonParseException.class).skipLimit(SKIP_LIMIT)
            .listener(new MessageItemReadListener(errorWriter))
            .writer(messageItemWriter).faultTolerant().skip(Exception.class).skipLimit(SKIP_LIMIT)
            .listener(new MessageWriteListener())
            .build();
}

stepBuilderFactory是注入進(jìn)來的,然后get里面是Step的名字。
我們的Step中可以構(gòu)建很多東西,比如reader,processer,writer,listener等等。

下面我們就逐個來看看step里面的這些東西是如何使用的。

Chunk

Spring batch在配置Step時采用的是基于Chunk的機(jī)制,即每次讀取一條數(shù)據(jù),再處理一條數(shù)據(jù),累積到一定數(shù)量后再一次性交給writer進(jìn)行寫入操作。這樣可以最大化的優(yōu)化寫入效率,整個事務(wù)也是基于Chunk來進(jìn)行。

比如我們定義chunk size是50,那就意味著,spring batch處理了50條數(shù)據(jù)后,再統(tǒng)一向數(shù)據(jù)庫寫入。
這里有個很重要的點,chunk前面需要定義數(shù)據(jù)輸入類型和輸出類型,由于我們輸入是Message,輸出也是Message,所以兩個都直接寫Message了。
如果不定義這個類型,會報錯。

.chunk(CHUNK_SIZE)
Reader

Reader顧名思義就是從數(shù)據(jù)源讀取數(shù)據(jù)。
Spring Batch給我們提供了很多好用實用的reader,基本能滿足我們所有需求。比如FlatFileItemReader,JdbcCursorItemReader,JpaPagingItemReader等。也可以自己實現(xiàn)Reader。

本例子里面,數(shù)據(jù)源是文本文件,所以我們就使用FlatFileItemReader。FlatFileItemReader是從文件里面一行一行的讀取數(shù)據(jù)。
首先需要設(shè)置文件路徑,也就是設(shè)置resource。
因為我們需要把一行文本映射為Message類,所以我們需要自己設(shè)置并實現(xiàn)LineMapper。

@Bean
public FlatFileItemReader jsonMessageReader() {
    FlatFileItemReader reader = new FlatFileItemReader<>();
    reader.setResource(new FileSystemResource(new File(MESSAGE_FILE)));
    reader.setLineMapper(new MessageLineMapper());
    return reader;
}
Line Mapper

LineMapper的輸入就是獲取一行文本,和行號,然后轉(zhuǎn)換成Message。
在本例子里面,一行文本就是一個json對象,所以我們使用JsonParser來轉(zhuǎn)換成Message。

public class MessageLineMapper implements LineMapper {
    private MappingJsonFactory factory = new MappingJsonFactory();

    @Override
    public Message mapLine(String line, int lineNumber) throws Exception {   
        JsonParser parser = factory.createParser(line);
        Map map = (Map) parser.readValueAs(Map.class);
        Message message = new Message();
        ... // 轉(zhuǎn)換邏輯
        return message;
    }
}
Processor

由于本例子里面,數(shù)據(jù)是一行文本,通過reader變成Message的類,然后writer直接把Message寫入MySQL。所以我們的例子里面就不需要Processor,關(guān)于如何寫Processor其實和reader/writer是一樣的道理。
從它的接口可以看出,需要定義輸入和輸出的類型,把輸入I通過某些邏輯處理之后,返回輸出O。

public interface ItemProcessor {
    O process(I item) throws Exception;
}
Writer

Writer顧名思義就是把數(shù)據(jù)寫入到目標(biāo)數(shù)據(jù)源里面。
Spring Batch同樣給我們提供很多好用實用的writer。比如JpaItemWriter,F(xiàn)latFileItemWriter,HibernateItemWriter,JdbcBatchItemWriter等。同樣也可以自定義。

本例子里面,使用的是JpaItemWriter,可以直接把Message對象寫到數(shù)據(jù)庫里面。但是需要設(shè)置一個EntityManagerFactory,可以注入進(jìn)來。

@Autowired
private EntityManagerFactory entityManager;

@Bean
public JpaItemWriter messageItemWriter() {
    JpaItemWriter writer = new JpaItemWriter<>();
    writer.setEntityManagerFactory(entityManager);
    return writer;
}

另外,你需要配置數(shù)據(jù)庫的連接等東西。由于我使用的spring,所以直接在Application.properties里面配置如下:

spring.datasource.url=jdbc:mysql://database
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.driverClassName=com.mysql.cj.jdbc.Driver
spring.jpa.database-platform=org.hibernate.dialect.MySQLDialect
spring.jpa.show-sql=true
spring.jpa.properties.jadira.usertype.autoRegisterUserTypes=true
spring.jackson.serialization.write-dates-as-timestamps=false
spring.batch.initialize-schema=ALWAYS
spring.jpa.hibernate.ddl-auto=update

spring.datasource相關(guān)的設(shè)置都是在配置數(shù)據(jù)庫的連接。
spring.batch.initialize-schema=always表示讓spring batch在數(shù)據(jù)庫里面創(chuàng)建默認(rèn)的數(shù)據(jù)表。
spring.jpa.show-sql=true表示在控制臺輸出hibernate讀寫數(shù)據(jù)庫時候的SQL。
spring.jpa.database-platform=org.hibernate.dialect.MySQLDialect是在指定MySQL的方言。

Listener

Spring Batch同樣實現(xiàn)了非常完善全面的listener,listener很好理解,就是用來監(jiān)聽每個步驟的結(jié)果。比如可以有監(jiān)聽step的,有監(jiān)聽job的,有監(jiān)聽reader的,有監(jiān)聽writer的。沒有你找不到的listener,只有你想不到的listener。

在本例子里面,我只關(guān)心,read的時候有沒有出錯,和write的時候有沒有出錯,所以,我只實現(xiàn)了ReadListener和WriteListener。
在read出錯的時候,把錯誤結(jié)果寫入一個多帶帶的error列表文件中。

public class MessageItemReadListener implements ItemReadListener {
    private Writer errorWriter;

    public MessageItemReadListener(Writer errorWriter) {
        this.errorWriter = errorWriter;
    }

    @Override
    public void beforeRead() {
    }

    @Override
    public void afterRead(Message item) {
    }

    @Override
    public void onReadError(Exception ex) {
         errorWriter.write(format("%s%n", ex.getMessage()));
    }
}

在write出錯的時候,也做同樣的事情,把出錯的原因?qū)懭攵鄮У娜罩局小?/p>

public class MessageWriteListener implements ItemWriteListener {

    @Autowired
    private Writer errorWriter;

    @Override
    public void beforeWrite(List items) {
    }

    @Override
    public void afterWrite(List items) {
    }

    @Override
    public void onWriteError(Exception exception, List items) {
        errorWriter.write(format("%s%n", exception.getMessage()));
        for (Message message : items) {
            errorWriter.write(format("Failed writing message id: %s", message.getObjectId()));
        }
    }
}

前面有說chuck機(jī)制,所以write的listener傳入?yún)?shù)是一個List,因為它是累積到一定的數(shù)量才一起寫入。

Skip

Spring Batch提供了skip的機(jī)制,也就是說,如果出錯了,可以跳過。如果你不設(shè)置skip,那么一條數(shù)據(jù)出錯了,整個job都會掛掉。
設(shè)置skip的時候一定要設(shè)置什么Exception才需要跳過,并且跳過多少條數(shù)據(jù)。如果失敗的數(shù)據(jù)超過你設(shè)置的skip limit,那么job就會失敗。
你可以分別給reader和writer等設(shè)置skip機(jī)制。

writer(messageItemWriter).faultTolerant().skip(Exception.class).skipLimit(SKIP_LIMIT)
Retry

這個和Skip是一樣的原理,就是失敗之后可以重試,你同樣需要設(shè)置重試的次數(shù)。
同樣可以分別給reader,writer等設(shè)置retry機(jī)制。

如果同時設(shè)置了retry和skip,會先重試所有次數(shù),然后再開始skip。比如retry是10次,skip是20,會先重試10次之后,再開始算第一次skip。

運(yùn)行Job

所有東西都準(zhǔn)備好以后,就是如何運(yùn)行了。
運(yùn)行就是在main方法里面用JobLauncher去運(yùn)行你制定的job。

下面是我寫的main方法,main方法的第一個參數(shù)是job的名字,這樣我們就可以通過不同的job名字跑不同的job了。

首先我們通過運(yùn)行起來的Spring application得到j(luò)obRegistry,然后通過job的名字找到對應(yīng)的job。

接著,我們就可以用jobLauncher去運(yùn)行這個job了,運(yùn)行的時候會傳一些參數(shù),比如你job里面需要的文件路徑或者文件日期等,就可以通過這個jobParameters傳進(jìn)去。如果沒有參數(shù),可以默認(rèn)傳當(dāng)前時間進(jìn)去。

public static void main(String[] args) {
    String jobName = args[0];

    try {
        ConfigurableApplicationContext context = SpringApplication.run(ZuociBatchApplication.class, args);
        JobRegistry jobRegistry = context.getBean(JobRegistry.class);
        Job job = jobRegistry.getJob(jobName);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        JobExecution jobExecution = jobLauncher.run(job, createJobParams());
        if (!jobExecution.getExitStatus().equals(ExitStatus.COMPLETED)) {
            throw new RuntimeException(format("%s Job execution failed.", jobName));
        }
    } catch (Exception e) {
        throw new RuntimeException(format("%s Job execution failed.", jobName));
    }
}

private static JobParameters createJobParams() {
    return new JobParametersBuilder().addDate("date", new Date()).toJobParameters();
}

最后,把jar包編譯出來,在命令行執(zhí)行下面的命令,就可以運(yùn)行你的Spring Batch了。

java -jar YOUR_BATCH_NAME.jar YOUR_JOB_NAME
調(diào)試

調(diào)試主要依靠控制臺輸出的log,可以在application.properties里面設(shè)置log輸出的級別,比如你希望輸出INFO信息還是DEBUG信息。
基本上,通過查看log都能定位到問題。

logging.path=build/logs
logging.file=${logging.path}/batch.log
logging.level.com.easystudio=INFO
logging.level.root=INFO
log4j.logger.org.springframework.jdbc=INFO
log4j.logger.org.springframework.batch=INFO
logging.level.org.hibernate.SQL=INFO
Spring Batch數(shù)據(jù)表

如果你的batch最終會寫入數(shù)據(jù)庫,那么Spring Batch會默認(rèn)在你的數(shù)據(jù)庫里面創(chuàng)建一些batch相關(guān)的表,來記錄所有job/step運(yùn)行的狀態(tài)和結(jié)果。

大部分表你都不需要關(guān)心,你只需要關(guān)心幾張表。

batch_job_instance:這張表能看到每次運(yùn)行的job名字。

batch_job_execution:這張表能看到每次運(yùn)行job的開始時間,結(jié)束時間,狀態(tài),以及失敗后的錯誤消息是什么。

batch_step_execution:這張表你能看到更多關(guān)于step的詳細(xì)信息。比如step的開始時間,結(jié)束時間,提交次數(shù),讀寫次數(shù),狀態(tài),以及失敗后的錯誤信息等。

總結(jié)

Spring Batch為我們提供了非常實用的功能,對批處理場景進(jìn)行了完善的抽象,它不僅能實現(xiàn)小數(shù)據(jù)的遷移,也能應(yīng)對大企業(yè)的大數(shù)據(jù)實踐應(yīng)用。它讓我們開發(fā)批處理應(yīng)用可以事半功倍。

最后一個tips,搭建Spring Batch的過程中,會遇到各種各樣的問題。只要善用Google,都能找到答案。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://m.hztianpu.com/yun/76991.html

相關(guān)文章

  • 一次慘烈的阿里面試經(jīng)歷

    摘要:當(dāng)我們的需求出現(xiàn)變動時,工廠模式會需要進(jìn)行相應(yīng)的變化??偨Y(jié)來說,要想成功進(jìn)行一次阿里巴巴的面試,你需要了解甚至掌握以下內(nèi)容語言,尤其是線程原理數(shù)據(jù)庫事務(wù),加鎖,重點分布式設(shè)計模式可以說是涉及范圍非常廣了。 showImg(https://segmentfault.com/img/bV8cSY?w=576&h=432); 前言 今天本是一個陽光明媚,鳥語花香的日子。于是我決定在逛街中感受...

    Eastboat 評論0 收藏0
  • Java相關(guān)

    摘要:本文是作者自己對中線程的狀態(tài)線程間協(xié)作相關(guān)使用的理解與總結(jié),不對之處,望指出,共勉。當(dāng)中的的數(shù)目而不是已占用的位置數(shù)大于集合番一文通版集合番一文通版垃圾回收機(jī)制講得很透徹,深入淺出。 一小時搞明白自定義注解 Annotation(注解)就是 Java 提供了一種元程序中的元素關(guān)聯(lián)任何信息和著任何元數(shù)據(jù)(metadata)的途徑和方法。Annotion(注解) 是一個接口,程序可以通過...

    wangtdgoodluck 評論0 收藏0
  • 2019 Java 全棧工程師進(jìn)階路線圖,一定要收藏

    摘要:結(jié)合我自己的經(jīng)驗,我整理了一份全棧工程師進(jìn)階路線圖,給大家參考。乾坤大挪移第一層第一層心法,主要都是基本語法,程序設(shè)計入門,悟性高者十天半月可成,差一點的到個月也說不準(zhǔn)。 技術(shù)更新日新月異,對于初入職場的同學(xué)來說,經(jīng)常會困惑該往那個方向發(fā)展,這一點松哥是深有體會的。 我剛開始學(xué)習(xí) Java 那會,最大的問題就是不知道該學(xué)什么,以及學(xué)習(xí)的順序,我相信這也是很多初學(xué)者經(jīng)常面臨的問題。?我...

    wangdai 評論0 收藏0
  • 一次使iview庫的Radio可取消的過程

    摘要:概述庫用的是是我們非常常用的組件。有一個特征是選中之后無法取消?,F(xiàn)實中取消的需求是常見且可以理解的。所以看到這個需求之后第一嘗試在組件之上搞一搞,這一搞就入坑了,現(xiàn)在就來理一理我的入坑之路吧。 概述 ui庫用的是iview . radio、radioGroup是我們非常常用的組件。radio有一個特征是選中之后無法取消?,F(xiàn)實中取消radio的需求是常見且可以理解的。所以看到這個需求之后...

    荊兆峰 評論0 收藏0
  • 一次 Laravel 應(yīng)用性能調(diào)優(yōu)經(jīng)歷

    摘要:為了一探究竟,于是開啟了這次應(yīng)用性能調(diào)優(yōu)之旅。使用即時編譯器和都能輕輕松松的讓你的應(yīng)用程序在不用做任何修改的情況下,直接提高或者更高的性能。 這是一份事后的總結(jié)。在經(jīng)歷了調(diào)優(yōu)過程踩的很多坑之后,我們最終完善并實施了初步的性能測試方案,通過真實的測試數(shù)據(jù)歸納出了 Laravel 開發(fā)過程中的一些實踐技巧。 0x00 源起 最近有同事反饋 Laravel 寫的應(yīng)用程序響應(yīng)有點慢、20幾個并...

    warkiz 評論0 收藏0

發(fā)表評論

0條評論

閱讀需要支付1元查看
<